I am setting up a pipeline that uses Config from dagster so that the user can put in a user defined parameter when running the pipeline. My use case is running a sql query to pull in data and the user can define the start and end date for the data pull. I am running into a config error when passing on the results from that asset to another asset.
Here is the dagster documentation on running configurations: https://docs.dagster.io/concepts/configuration/config-schema
I have reproduced the error with a simple script below.
When I run this script in dagster UI, it prompts me that the scaffold is missing and it then generates the scaffold and I can enter the parameter - in this case I add "Bananas" to the fruit_select parameter. The parameter is used in the filter_data asset. I would like to then use that result from that asset (which is df2) in the following asset called filter_again.
This is the error I keep running into:
dagster._core.errors.DagsterInvalidConfigError: Error in config for op Error 1: Missing required config entry "config" at the root. Sample config for missing entry: {'config': {'fruit_select': '...'}}
Any help would be greatly appreciated! My goal is to have a user define a parameter when running the pipeline, the parameter is used in one of the assets to pull data or manipulate data, and then those results get passed on to another asset for another task.
import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, MaterializeResult, MetadataValue, Config, materialize
@asset
def generate_dataset():
# Function to generate random dates
def random_dates(start_date, end_date, n=10):
date_range = end_date - start_date
random_dates = [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]
return random_dates
# Set seed for reproducibility
random.seed(42)
# Define the number of rows
num_rows = 100
# Generate random data
fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']
fruit_column = [random.choice(fruits) for _ in range(num_rows)]
units_column = [random.randint(1, 10) for _ in range(num_rows)]
start_date = datetime(2022, 1, 1)
end_date = datetime(2022, 12, 31)
date_column = random_dates(start_date, end_date, num_rows)
# Create a DataFrame
df = pd.DataFrame({
'fruit': fruit_column,
'units': units_column,
'date': date_column
})
# Display the DataFrame
print(df)
return df
class fruit_config(Config):
fruit_select: str
@asset(deps=[generate_dataset])
def filter_data(config: fruit_config):
df = generate_dataset()
df2 = df[df['fruit'] == config.fruit_select]
print(df2)
return df2
@asset(deps=[filter_data])
def filter_again():
df2 = filter_data()
df3 = df2[df2['units'] > 5]
print(df3)
return df3
I realize that the code was just being implemented wrong... This dagster doc page was somewhat helpful https://docs.dagster.io/concepts/assets/software-defined-assets but the demo video https://www.youtube.com/watch?v=lRwpcyd6w8k was even more helpful.
Essentially, you have to pass the upstream assets in the downstream assets and there is an important piece to assigning the expected return output from the asset. Corrected code below: