How to register celery class based task with a custom name in 5.x?

23 views Asked by At

Architecture: Flask app running in its own container, Celery running in its own container. Message broker is Redis. This is all python btw.

Problem: I cant figure out how to register a class based task with a custom name.

Versions:

celery==5.2.7
redis==5.0.3

Details: I want to build class based tasks as I want to be able to use the on_success handler on every task. Note all class based tasks are defined in the celery container, so the flask app has no way to know what they are. The flask app must be able to call them by a "name". I tried using the default name, which I could not get to work with celery based classes, it required a name. I also tried to configure a custom name. Every time I try to do this I get this error

[2024-03-12 22:53:01,292: CRITICAL/MainProcess] Unrecoverable error: AttributeError("'NoneType' object has no attribute 'push'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py", line 332, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/tasks.py", line 26, in start
    c.update_strategies()
  File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py", line 562, in update_strategies
    task.__trace__ = build_tracer(name, task, loader, self.hostname,
  File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 361, in build_tracer
    push_request = request_stack.push
AttributeError: 'NoneType' object has no attribute 'push'

Really I just want to find a way to have celery tasks perform an operation after they complete their tasks. on_success seems like the right thing to do, but I cant figure out how to use it outside of a class based task, and I cant get class based tasks working. Can anyone help?

1

There are 1 answers

0
Dave On

I gave up trying to use class based tasks. No idea how to get that to work. I ended up using task_success, to perform operations after a task completes.

@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
    if sender.name == 'customer':
        # Extract result dictionary
        notification_url = result['notification_url']
        job_id = result['job_id']
        job_status = result['status']
        task_id = sender.request.id
        task_state = celery.AsyncResult(task_id).state

        logger.info(f"Task {task_id}: {task_state}, job_id: {job_id}: {job_status}")

        # send async response
        send_async_response(db, notification_url, task_id, job_id, "200", job_status, task_state, f"Task succeeded for customer.")



@celery.task(name='customer')
def backend(jobs, job_id, backend_config, method, auth_user, notification_url, sleep_time=0):
    logger.info(f"sleeping for {str(sleep_time)} sec")
    time.sleep(int(sleep_time))
    outcome = customer_work(jobs, job_id, backend_config, method, auth_user, notification_url)
    return outcome