Python Dependecies for DataprocCreateBatchOperator

37 views Asked by At

Cannot submit Python job onto dataproc serverless when third party python dependencies are needed. It's working fine when dependencies are not needed. I push up the pyspark python file to a cloud storage bucket and then the DataprocCreateBatchOperator reads in that file. I was hoping I can just pass in a list of pip packages but this might not be baked into the operator.

From the docs, dataproc serverless offers a metadata option which I presumed is how we inform dataproc to install additional python dependencies but its not working as seen below.

DAG

from airflow import DAG
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateBatchOperator
from datetime import datetime, timedelta

PROJECT_ID = "foobar"
REGION = "us-central1"
IMPERSONATION_CHAIN = "[email protected]"
BUCKET = "gs://my-bucket"
JOB="countingwords.py"

BATCH_CONFIG = {
    "pyspark_batch": {
        "main_python_file_uri": f"{BUCKET}/python/latest/{JOB}",
        "args": ["gs://pub/shakespeare/rose.txt", f"{BUCKET}/sample-output-data"]
    },
    "environment_config": {
        "execution_config": {
            "network_uri": f"projects/{PROJECT_ID}/global/networks/main-vpc-prd",
            "subnetwork_uri": f"https://www.googleapis.com/compute/v1/projects/{PROJECT_ID}/regions/{REGION}/subnetworks/data-prd",
            "service_account": IMPERSONATION_CHAIN,
        }
    }
}

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}

with DAG(
    dag_id="serverless_countwords_py",
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    default_args=default_args,
    catchup=False,
) as dag:
    submit_serverless = DataprocCreateBatchOperator(
        task_id="submit_batch",
        project_id=PROJECT_ID,
        region=REGION,
        batch=BATCH_CONFIG,
        gcp_conn_id="google_cloud_default",
        metadata={'PIP_PACKAGES':'requests'}, ## <--- ISSUE
        impersonation_chain=IMPERSONATION_CHAIN,
    )

    submit_serverless

Error

Failed to execute job 103 for task submit_batch (too many values to unpack (expected 2)

Other Thoughts

Maybe I can somehow just zip my dependencies up and dump into a bucket and then dataproc can just read that zip? The BATCH_CONFIG.pyspark_batch has an additional field named python_file_uris where maybe I can just passed in a zipped file location containing all the dependencies. Unsure on the list of commands to achieve this zipped approach

0

There are 0 answers