Ensuring that aio_pika consumer runs forever alongside FastAPI

599 views Asked by At

I wrote a aio_pika consumer task that is supposed to run forever in a FastAPI app. This task is part of a manager object which implement a pub/sub pattern:

from aio_pika import connect_robust
from aio_pika.abc import AbstractIncomingMessage

class MyManager(object):
    def __init__(self):
        self.waiter = asyncio.Future()
    
    def publish(self, value):
        waiter, self.waiter = self.waiter, asyncio.Future()
        waiter.set_result((value, self.waiter))

    async def subscribe(self):
        waiter = self.waiter
        while True:
            value, waiter = await waiter
            yield value

    __aiter__ = subscribe

    async def on_message(self, message: AbstractIncomingMessage) -> None:
        try:
            async with message.process():
                # do whatever deserialization of the received item
                item = json.loads(message.body)
                # share the item with subscribers
                self.publish(item)

        except Exception as e:
            logger.error(e, exc_info=True)

    async def run(self):
        connection = await connect_robust(
            settings.amqp_url,
            loop=asyncio.get_running_loop()
        )
        channel = await connection.channel()
        my_queue = await channel.get_queue('my-queue')

        await my_queue.consume(self.on_message)

        await asyncio.Future()

        await connection.close()

This consumer tasks is created as during startup of the FastAPI app:

my_manager = asyncio.Future()

@app.on_event("startup")
async def on_startup():
    my_manager.set_result(MyManager())
    task = asyncio.create_task((await my_manager).run())

Note that the manager is only instantiated during on_startup to ensure that there is an existing asyncio loop.

Unfortunately, the task stops working after a few weeks/months. I was unable to log what event caused the this. I not sure if the the task crashes or if the connection to the AMQP server dropped without ever reconnecting. I am not even sure how/where to catch/log the issue.

What could possibly be the cause of this issue and how to fix it?

As an additional context, the manager is used in a Server Sent Events route:

@router.get('/items')
async def items_stream(request: Request):
    async def event_publisher():
        try:
            aiter = (await my_manager).__aiter__()
            while True:
                task = asyncio.create_task(aiter.__anext__())
                event = await asyncio.shield(task)
                yield dict(data=event)
        except asyncio.CancelledError as e:
            print(f'Disconnected from client (via refresh/close) {request.client}')
            raise e

    return EventSourceResponse(event_publisher())

The async iterator is shielded to prevent the issue described here.

1

There are 1 answers

0
DurandA On

While I did not clearly identified what exception was raised, I hardened the connection so the task reconnects in case of error:

async def run(self):
    connection = None
    while True:  # Reconnect if the connection is lost
        try:
            connection = await connect_robust(
                settings.amqp_url,
                loop=asyncio.get_running_loop()
            )
            channel = await connection.channel()
            my_queue = await channel.get_queue('my_queue')

            await my_queue.consume(self.on_message)

            # This will raise CancelledError when the task is cancelled
            await asyncio.Future()

        except asyncio.CancelledError:
            logger.info("Consumer cancelled, exiting...")
            break
        except Exception as e:
            logger.error(f"Unexpected error: {e}", exc_info=True)
            await asyncio.sleep(10)  # Wait before reconnecting
        finally:
            if connection:
                await connection.close()

This is running continuously for a month without any permanent disconnection.