Celery chain with dynamically-defined chord

587 views Asked by At

I am trying to generate a chord from a list of signatures. The length of this list is not known when the chain starts to execute. Eg. The output of task_c is a list of lenth n, that list should generate a list of the same length of celery signatures to be executed in parallel. I can only access the result of the previous task with a partial signature .s(), so how can I dynamically define this group of signatures inside my chain?

Eg. how do I go from this:

@task
def task_d(kwargs):
    return [task_e.si(i) for i in random.sample(range(10, 30), 5)]

ctask = chain(
    task_a.si(**kwargs),
    task_b.si(**kwargs),
    task_c.si(**kwargs),
    chord(
            [
                task_e.si(**kwargs),
                task_e.si(**kwargs),
                task_e.si(**kwargs),
            ],
            my_callback.si(**kwargs),
        ),
    ),

...to this

ctask = chain(
    task_a.si(**kwargs),
    task_b.si(**kwargs),
    task_c.si(**kwargs),
    chord(
            [task_d.s(**kwargs)],     <<<<<< ?
            my_callback.si(**kwargs),
        ),
    ),
1

There are 1 answers

0
2ps On BEST ANSWER

I think you have to do it like this:

@task
def task_d(list_of_length_n):
    tasks = [task_e.si(i) for i in list_of_length_n]
    chord(tasks, my_callback.si(**kwargs))()

ctask = chain(
    task_a.si(**kwargs),
    task_b.si(**kwargs),
    task_c.si(**kwargs),
    task_d.s(),
)