Dagster Error when using Config for user defined parameter and passing asset results to downstream assets

157 views Asked by At

I am setting up a pipeline that uses Config from dagster so that the user can put in a user defined parameter when running the pipeline. My use case is running a sql query to pull in data and the user can define the start and end date for the data pull. I am running into a config error when passing on the results from that asset to another asset.

Here is the dagster documentation on running configurations: https://docs.dagster.io/concepts/configuration/config-schema

I have reproduced the error with a simple script below.

When I run this script in dagster UI, it prompts me that the scaffold is missing and it then generates the scaffold and I can enter the parameter - in this case I add "Bananas" to the fruit_select parameter. The parameter is used in the filter_data asset. I would like to then use that result from that asset (which is df2) in the following asset called filter_again.

This is the error I keep running into: dagster._core.errors.DagsterInvalidConfigError: Error in config for op Error 1: Missing required config entry "config" at the root. Sample config for missing entry: {'config': {'fruit_select': '...'}}

Any help would be greatly appreciated! My goal is to have a user define a parameter when running the pipeline, the parameter is used in one of the assets to pull data or manipulate data, and then those results get passed on to another asset for another task.

import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, MaterializeResult, MetadataValue, Config, materialize

@asset 
def generate_dataset():
    # Function to generate random dates
    def random_dates(start_date, end_date, n=10):
        date_range = end_date - start_date
        random_dates = [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]
        return random_dates
    
    # Set seed for reproducibility
    random.seed(42)
    
    # Define the number of rows
    num_rows = 100
    
    # Generate random data
    fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']
    fruit_column = [random.choice(fruits) for _ in range(num_rows)]
    units_column = [random.randint(1, 10) for _ in range(num_rows)]
    start_date = datetime(2022, 1, 1)
    end_date = datetime(2022, 12, 31)
    date_column = random_dates(start_date, end_date, num_rows)
    
    # Create a DataFrame
    df = pd.DataFrame({
        'fruit': fruit_column,
        'units': units_column,
        'date': date_column
    })
    
    # Display the DataFrame
    print(df)
    return df

class fruit_config(Config):
    fruit_select: str 

@asset(deps=[generate_dataset]) 
def filter_data(config: fruit_config):
    df = generate_dataset()
    df2 = df[df['fruit'] == config.fruit_select]
    print(df2)
    return df2

@asset(deps=[filter_data]) 
def filter_again():
    df2 = filter_data()
    df3 = df2[df2['units'] > 5]
    print(df3)
    return df3
1

There are 1 answers

0
jdsimkin04 On

I realize that the code was just being implemented wrong... This dagster doc page was somewhat helpful https://docs.dagster.io/concepts/assets/software-defined-assets but the demo video https://www.youtube.com/watch?v=lRwpcyd6w8k was even more helpful.

Essentially, you have to pass the upstream assets in the downstream assets and there is an important piece to assigning the expected return output from the asset. Corrected code below:

import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, MaterializeResult, MetadataValue, Config, materialize

@asset 
def generate_dataset() -> pd.DataFrame:
    # Function to generate random dates
    def random_dates(start_date, end_date, n=10):
        date_range = end_date - start_date
        random_dates = [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]
        return random_dates
    
    # Set seed for reproducibility
    random.seed(42)
    
    # Define the number of rows
    num_rows = 100
    
    # Generate random data
    fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']
    fruit_column = [random.choice(fruits) for _ in range(num_rows)]
    units_column = [random.randint(1, 10) for _ in range(num_rows)]
    start_date = datetime(2022, 1, 1)
    end_date = datetime(2022, 12, 31)
    date_column = random_dates(start_date, end_date, num_rows)
    
    # Create a DataFrame
    df = pd.DataFrame({
        'fruit': fruit_column,
        'units': units_column,
        'date': date_column
    })
    
    # Display the DataFrame
    print(df)
    return df

class fruit_config(Config):
    fruit_select: str 

@asset 
def filter_data(generate_dataset: pd.DataFrame, config: fruit_config) -> pd.DataFrame:
    generate_dataset = generate_dataset[generate_dataset['fruit'] == config.fruit_select]
    print(generate_dataset)
    return generate_dataset

@asset
def filter_again(filter_data: pd.DataFrame) -> pd.DataFrame:
    filter_data = filter_data[filter_data['units'] > 5]
    print(filter_data)
    return filter_data