I am using PythonOperator to push values using xcom and I have to use these values in BigQueryInsertJobOperator as parameters and pass to sql file.
def func1(**context):
context['ti'].xcom_push(key='param1', value=param1)
func1 = PythonOperator(
task_id = "func1",
python_callable = func1,
dag = dag,
provide_context=True)
bq_op_load = BigQueryInsertJobOperator(
task_id="bq_op_load",
configuration={
"query": {
"query": "{% include 'sql_file_path.sql' %}",
"useLegacySql": False,
}
},
params = {
"param1":'{{ task_instance.xcom_pull(task_ids="func1", key="param1") }}'
},
)
The sql file is receiving this parameters as below:
CALL `project.db.storedproc`('{{params.param1}}');
But instead of getting the value I am getting '{{ task_instance.xcom_pull(task_ids="func1", key="param1") }}'
That's because you're using a Jinja template in another Jinja template, but the operator renders your templates once.
To fix it, you can provide the first template directly in your file without using params: