How to create dynamic task mapping for postgres operator in airflow?

693 views Asked by At

I am completely new to AirFlow and I am trying to create 8 tasks which are pretty simillar.

I've read about expand() methond though I am not quite sure how to use it for PostgresOperator?

So I have this task:

t1 = PostgresOperator(
task_id='load_something_1',
postgres_conn_id="postgres_default",
sql = "SELECT somefunction_1()", 
dag=dag)

I need to create similar tasks only they gotta have load_something_2, load_something_3 etc. and SELECT somefucntion_2, SELECT somefucntion_3 etc.

How do I do this using dynamic task mapping ?

Thank you beforehand!

1

There are 1 answers

0
gbeaven On BEST ANSWER

It's hard to say whether you need expand() or not without knowing what your iterator looks like, and how the data is made available to the DAG, but here's how this could be accomplished with a simple iterator in a full-example DAG:

from datetime import datetime
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.decorators import dag, task

@dag(
    default_args={
        'owner': 'me'
    },
    dag_id=f'example-dag',
    start_date=datetime(2023,1,6),
    schedule_interval=None,
)
def workflow():

    @task
    def load_something(i):
        t1 = PostgresOperator(
            task_id=f'load_something',
            postgres_conn_id="postgres_default",
            sql = f"SELECT somefunction_{i}()",
        )
    my_tasks = [load_something(i) for i in range(1,9)]
    # my_tasks = [load_something.override(task_id=f'load_something_{i+1}')(i) for i in range(1,9)]
    my_tasks
workflow()

Note: just calling your task like my_tasks = [load_something(i) for i in range(1,9)] with the @task decorator will automatically enumerate your task names for you:enter image description here if you want to explicitly name the tasks, you can do so using the override() method. Uncomment out my_tasks = [load_something.override(task_id=f'load_something_{i}')(i) for i in range(1,9)]: enter image description here