Producer-consumer pattern and priority queue asyncio.PriorityQueue in Python

Producer-consumer pattern and priority queue asyncio.PriorityQueue in Python

In the previous article, you could learn what a queue is in general and how the asyncio.Queue FIFO queue works. Let’s continue and see how asyncio priority queues work using the aiohttp library example.PriorityQueue.

readme

The author of the previous article about asyncio.Queue described the topic about the FIFO queue in sufficient detail, but then he writes about other types of queues in a rather formulaic way, I also did not like the examples in the article and I decided to continue the topic with a partial translation of another article.

If you find a mistake, please use Ctrl+Enter and I will fix it. Thank you!

The producer-consumer pattern

Let’s repeat the meaning of the producer-consumer pattern. Imagine two types of tasks sharing a queue. Task A produces data and queues it, while task B pulls data from the queue for processing. This is the “producer-consumer” model, where task A is the producer, and task B is the consumer. By analogy with a supermarket, buyers are producers, cashiers are consumers, and a queue of buyers is a queue.

Why use the producer-consumer pattern?

In highly competitive applications, producers often generate data quickly and consumers process it slowly. So, producers have to wait for consumers to finish processing data before continuing to generate data.

Sometimes consumers process data quickly and producers slowly. This results in consumers waiting for producers to generate data before proceeding. The balance between producers and consumers requires a queue that stores the data produced by the producer. The queue acts as a buffer and separates producers and consumers.

Image by Peng Qian

Priority queue asyncio.PriorityQueue

In this article, we discussed how a FIFO queue works in asyncio. Now let’s see how asyncio.PriorityQueue priority queues work.

Definition from wikipedia:

Priority queue (eng. priority queue) – an abstract data type in programming that supports two mandatory operations – add an element and get the maximum[1] (Minimum). It is assumed that for each element it is possible to calculate its priority – a real number or, in the general case, an element of a linearly ordered set.

Why use asyncio.PriorityQueue?

Suppose there is a queue in which there are tasks, each of which requires a long processing time. Error log or user VIP access is a high priority task that needs immediate attention. What to do? This is where asyncio.PriorityQueue comes in handy.

We will briefly describe the implementation of asyncio.PriorityQueue

Unlike list-based FIFO queues, asyncio.PriorityQueue is heap-based. It is built using a binary tree structure.

You may be familiar with binary search trees, which ensure that the youngest node is always the leftmost node. However, the binary tree in asyncio.PriorityQueue ensures that the youngest node is always on top, so the node with the highest priority is always removed first.

On the left is a binary tree in PriorityQueue, on the right is a binary search tree. Image by Peng Qian.

The methods for asyncio.PriorityQueue priority queues are the same as for asyncio.Queue. You can read more about the methods in this article

Methods of the asyncio.PriorityQueue class
  • await queue.put(item) – to place the element in the queue. If the queue is full, this method will wait until a free slot becomes available.

  • await queue.get() – to get an element from the queue. If the queue is empty, this method will wait until an item becomes available.

  • queue.task_done() – to indicate that the previously received element has been processed. This method should be called by consumer routines after the element is finished.

  • await queue.join() – to block the processing of all elements in the queue. This method should be called by producer coroutines after they have finished enqueuing items.

    You can also use some “non-routine” queue methods, for example:

  • queue.put_nowait(item) – to put an element in a non-blocking queue. If the queue is full, this method throws the QueueFull exception.

  • queue.get_nowait() – to receive an element from the queue without blocking. If the queue is empty, this method throws the QueueEmpty exception.

  • queue.qsize() – to get the number of elements in the queue.

  • queue.empty() – To check if the queue is empty.

  • queue.full() – To check the filling of the queue.

An example of using asyncio.PriorityQueue in the real world

Let’s illustrate the use of asyncio.PriorityQueue on the example of a real scenario. Imagine that we have an API order service. The API takes time to process each order, but we can’t make users wait too long. Therefore, when a user places an order, the API first queues it, allowing a background task to process it asynchronously and immediately return a message to the user.

This API accepts orders from two types of users: normal and VIP. At the same time, it is necessary to ensure that the orders of VIP users are processed with the highest priority.

VIP orders are processed with the highest priority. Image by Peng Qian.

Let’s consider the implementation of a program with a priority queue on the example of aiohttp:

import asyncio
from asyncio import PriorityQueue, Task
from dataclasses import dataclass, field
from enum import IntEnum
from random import randrange

from aiohttp import web
from aiohttp.web_app import Application
from aiohttp.web_request import Request
from aiohttp.web_response import Response

app = Application()
routers = web.RouteTableDef()
QUEUE_KEY = "QUEUE_KEY"
TASK_KEY = "TASK_KEY"


class UserType(IntEnum):
    POWER_USER = 1
    NORMAL_USER = 2


@dataclass(order=True)
class WorkItem:
    user_type: UserType
    order_delay: int = field(compare=False)

First, we define an Enum, which means two categories: normal users and VIP users.

Then, with the help of dataclass, we define the user’s order, which contains the user’s type and the duration of the order’s processing. Order processing time is not taken into account when sorting priorities.

Next, we define a process_order_worker consumer method that receives orders from the queue and simulates their processing. Don’t forget to use queue.task_done() to tell the queue that we’ve finished processing the order.

async def process_order_worker(worker_id: int, queue: PriorityQueue):
    while True:
        work_item: WorkItem = await queue.get()
        print(f"process_order_worker: Worker_{worker_id} begin to process worker {work_item}")
        await asyncio.sleep(work_item.order_delay)
        print(f"process_order_worker: Worker_{worker_id} finished to process worker {work_item}")
        queue.task_done()

After that, we implement the ordering API using aiohttp. This API responds to user requests, generates an order object, and places it in asyncio.PriorityQueue. It then immediately returns the response to the user, avoiding the wait time.

@routers.post("/order")
async def order(request: Request) -> Response:
    queue: PriorityQueue = app[QUEUE_KEY]
    body = await request.json()
    user_type = UserType.POWER_USER if body['power_user'] == 'True' else UserType.NORMAL_USER
    work_item = WorkItem(user_type, randrange(5))
    await queue.put(work_item)

    return Response(body="order placed!")

At application startup, we use the create_order_queue command to initialize the queue and order the consumption tasks.

async def create_order_queue(app: Application):
    print("create_order_queue: Begin to initialize queue and tasks.")
    queue: PriorityQueue = PriorityQueue(10)
    tasks = [asyncio.create_task(process_order_worker(i, queue)) for i in range(3)]
    app[QUEUE_KEY] = queue
    app[TASK_KEY] = tasks
    print("create_order_queue: Initialize queue and tasks success..")

After the application terminates, we use the destroy_order_queue command to ensure that all orders in the queue have been processed and the background tasks have been properly closed.

The queue.join() function will wait for all data in the queue to be processed. asyncio.wait_for sets a timeout of 20 seconds, after which it will no longer wait for queue.join() to complete.

async def destroy_order_queue(app: Application):
    queue: PriorityQueue = app[QUEUE_KEY]
    tasks: list[Task] = app[TASK_KEY]

    try:
        print("destroy_order_queue: Wait for 20 sec to let all work done.")
        await asyncio.wait_for(queue.join(), timeout=20.0)
    except Exception as e:
        print("destroy_order_queue: Cancel all tasks.")
        [task.cancel() for task in tasks]


app.add_routes(routers)
app.on_startup.append(create_order_queue)
app.on_shutdown.append(destroy_order_queue)
web.run_app(app)
Full listing of program code
import asyncio
from asyncio import PriorityQueue, Task
from dataclasses import dataclass, field
from enum import IntEnum
from random import randrange

from aiohttp import web
from aiohttp.web_app import Application
from aiohttp.web_request import Request
from aiohttp.web_response import Response

app = Application()
routers = web.RouteTableDef()
QUEUE_KEY = "QUEUE_KEY"
TASK_KEY = "TASK_KEY"


class UserType(IntEnum):
    POWER_USER = 1
    NORMAL_USER = 2


@dataclass(order=True)
class WorkItem:
    user_type: UserType
    order_delay: int = field(compare=False)


async def process_order_worker(worker_id: int, queue: PriorityQueue):
    while True:
        work_item: WorkItem = await queue.get()
        print(f"process_order_worker: Worker_{worker_id} begin to process worker {work_item}")
        await asyncio.sleep(work_item.order_delay)
        print(f"process_order_worker: Worker_{worker_id} finished to process worker {work_item}")
        queue.task_done()


@routers.post("/order")
async def order(request: Request) -> Response:
    queue: PriorityQueue = app[QUEUE_KEY]
    body = await request.json()
    user_type = UserType.POWER_USER if body['power_user'] == 'True' else UserType.NORMAL_USER
    work_item = WorkItem(user_type, randrange(5))
    await queue.put(work_item)

    return Response(body="order placed!")


async def create_order_queue(app: Application):
    print("create_order_queue: Begin to initialize queue and tasks.")
    queue: PriorityQueue = PriorityQueue(10)
    tasks = [asyncio.create_task(process_order_worker(i, queue)) for i in range(3)]
    app[QUEUE_KEY] = queue
    app[TASK_KEY] = tasks
    print("create_order_queue: Initialize queue and tasks success..")


async def destroy_order_queue(app: Application):
    queue: PriorityQueue = app[QUEUE_KEY]
    tasks: list[Task] = app[TASK_KEY]

    try:
        print("destroy_order_queue: Wait for 20 sec to let all work done.")
        await asyncio.wait_for(queue.join(), timeout=20.0)
    except Exception as e:
        print("destroy_order_queue: Cancel all tasks.")
        [task.cancel() for task in tasks]


app.add_routes(routers)
app.on_startup.append(create_order_queue)
app.on_shutdown.append(destroy_order_queue)
web.run_app(app)

We can test this implementation using a PyCharm HTTP request (Pro version only):

HTTP requests
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "True"}

###
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "False"}

###
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "False"}

###
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "True"}

Or, for example, yes, if you have the community version:

import requests
from random import randint

url="http://localhost:8080/order"
myobj = [{"power_user": "True"}, {"power_user": "False"}]

for i in range(5):
    requests.post(url, json=myobj[randint(0,1)])

The result of the program

As you can see, the two high-priority tasks are processing as expected. Perfectly!

Conclusions

In this article, we have repeated why the producer-consumer pattern is needed:

  • Balancing between producers and consumers, maximizing the use of resources.

  • A system solution that allows producers and consumers to scale independently of each other.

Also, on a real example, we saw how to use asyncio.PriorityQueue to solve situations when tasks need prioritization.

Asynchronous programming in Python is a powerful tool, and the producer-consumer pattern with asyncio.Queue is a versatile approach to handling parallelism and prioritization in your applications.

Contacts of the author of the article

Please subscribe if you found the articles useful and get new stories delivered to your inbox. If you have any questions, you can find me on LinkedIn or at Twitter(X).

Related posts