Can't use ODM beanie API inside celery/arq task

325 views Asked by At

When there are operations on beanie documents in the task code, the beanie.exceptions.CollectionWasNotInitialized error is thrown. And everything works well if the same functions are called as ordinary functions, not tasks.

There are probably some sessions in the beanie that need to be passed in such situations. Сan you tell me how to do it please?

3

There are 3 answers

0
mpkn On

I've found this other conversation in Stack Overflow that I think can help managing to useBeanie inside a Celery task.

How to combine Celery with asyncio?, in particular the following comment:

https://stackoverflow.com/a/57286909/1995585

I did a quick test with the suggested solution and managed to run the sample code in the Beanie documentation inside a Celery task... but certainly is a workaround until Celery properly supports asyncio.

0
parakeetdev On

Celery doesn't support asyncio, therefore, doesn't support Beanie.

Beanie has a non-async version called Bunnet where everything is essentially the same, all you'd have to do is remove the async and awaits.


I've been having this same problem though on what I should do, because I don't want to sacrifice FastAPI's async/await, which comprises my entire codebase essentially.


How I've been managing:

Propan - An extremely simplified wrapper for aio-pika and primarily built for FastAPI with simple integration and I use it with RabbitMQ. It also uses Pydantic for serializing messages. I essentially didn't have to change a single thing for complete pub/sub messaging.

Tenacity - I've also been using tenacity for retries. Make retries easier with the decorator, like celery.

TaskIQ - They took a lot of inspiration from celery, but with asyncio. It's not a 1-to-1 and doesn't have RabbitMQ for a results backend I don't think. But if you're using Redis for messaging and results, it could give you the multiprocessing you're looking for that celery provides, without sacrificing asyncio.

Patio - Patio is pretty new, but it looks super promising. I think they're partnering with aio-pika. They have no docs though. I'm not sure if it's because it's not stable/production yet, or if that's literally all there is to it on their README.md.

0
Live2Code On

In the case of Arq (I had this exact issue): If you are spawning the arq workers in a different process than whatever process is enqueuing the jobs (e.g. web server process), then maybe you did not also initialize Beanie in the arq worker process(es).

In the arq worker on_startup hook make sure to init_beanie there also. (And don't forget to pass in all the collections, I have made that mistake before - update the list of collections in my web server init_beanie() but not in the init_beanie() in the arq worker startup function.