DatabricksRunNowOperator fails to get spark_submit_params from XCom

24 views Asked by At

I have an AirFlow DAG aimed to be triggered manually. Its logic is straightforward. The DAG receives start_date and end_date as params and then does the following steps:

  1. Lists prefixes in the S3 bucket for each target date
  2. Builds spark-submit arguments using date and partitions extracted from S3 prefixes
  3. Triggers Databricks job with prepared arguments.

The complete code is below:

@dag(
    dag_id="test_backfill_dag_v1",
    start_date=datetime(2024, 2, 27),
    catchup=False,
    render_template_as_native_obj=True,
    schedule=None,
    params={
        "start_date": Param(default=f"{date.today()}", format="date", type="string"),
        "end_date": Param(default=f"{date.today()}", format="date", type="string"),
    },
)
def test_backfill_dag():

    @task
    def build_spark_submit_args(target_date: str, partitions: list[str]) -> list[str]:
        return build_args(start_date=target_date, end_date=target_date, partitions=partitions)

    @task
    def list_target_dates(**context):
        params = context["params"]
        start_end_range = date_range(params["start_date"], params["end_date"], freq="D")
        return start_end_range.strftime("%Y-%m-%d").tolist()

    @task
    def build_root_s3_prefix(target_date: str) -> str:
        return f"foo/date={target_date}/"

    @task
    def extract_target_partitions(prefixes: list[str]) -> list[str]:
        return extract_partitions(prefixes)

    @task_group
    def backfill_group(target_date: str) -> None:
        root_prefix = build_root_s3_prefix(target_date)

        s3_list_prefixes_operator = S3ListPrefixesOperator(
            task_id="s3_list_prefixes_operator",
            prefix=root_prefix,
            bucket="test-bucket",
            delimiter="/",
        )

        target_partitions = extract_target_partitions(s3_list_prefixes_operator.output)
        spark_submit_args = build_spark_submit_args(target_date, target_partitions)

        databricks_run_now = DatabricksRunNowOperator(
            retries=2,
            job_id="my-test-job",
            task_id="databricks_run_now",
            spark_submit_params="{{ ti.xcom_pull(task_ids='backfill_group.build_spark_submit_args') }}",
            max_active_tis_per_dagrun=1,
        )

        root_prefix >> s3_list_prefixes_operator >> segments >> spark_submit_args >> databricks_run_now

    dates = list_target_dates()
    backfill_group.expand(target_date=dates)


test_backfill_dag()

Everything works as expected except triggering DatabricksRunNowOperator. Tasks are failing with TypeError: Object of type LazyXComAccess is not JSON serializable:

[2024-02-27, 11:23:23 UTC] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 827, in execute
    self.run_id = hook.run_now(self.json)
                  ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks.py", line 223, in run_now
    response = self._do_api_call(RUN_NOW_ENDPOINT, json)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 596, in _do_api_call
    for attempt in self._get_retry_object():
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 347, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 314, in iter
    return fut.result()
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 598, in _do_api_call
    response = request_func(
               ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/requests/api.py", line 115, in post
    return request("post", url, data=data, json=json, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/requests/api.py", line 59, in request
    return session.request(method=method, url=url, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/requests/sessions.py", line 575, in request
    prep = self.prepare_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/requests/sessions.py", line 486, in prepare_request
    p.prepare(
  File "/home/airflow/.local/lib/python3.11/site-packages/requests/models.py", line 371, in prepare
    self.prepare_body(data, files, json)
  File "/home/airflow/.local/lib/python3.11/site-packages/requests/models.py", line 511, in prepare_body
    body = complexjson.dumps(json, allow_nan=False)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
          ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/json/encoder.py", line 200, in encode
    chunks = self.iterencode(o, _one_shot=True)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/json/encoder.py", line 258, in iterencode
    return _iterencode(o, 0)
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/json/encoder.py", line 180, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type LazyXComAccess is not JSON serializable

I've also tried to replace DatabricksRunNowOperator with a simple PythonOperator that prints the inputs:

def print_callable(**context):
    print(context["templates_dict"]["spark_submit_args"])
databricks_run_now = PythonOperator(
    task_id="databricks_run_now",
    python_callable=print_callable,
    provide_context=True,
    templates_dict={
        "spark_submit_args": "{{ ti.xcom_pull(task_ids='backfill_group.build_spark_submit_args') }}"
    },
)

With this replacement, the DAG was completed successfully. So, I can assume that the problem is related to the DatabricksRunNowOperator.

I would be glad if anyone could advise me on how I can fix my implementation.

0

There are 0 answers