I am trying to create file on 1st of every month. The taskgroup should check if todays date is 1 then run (copy_to_s3 >> download_file >> sftp_upload_file) tasks if not run dummy task
this aod should be retrieved from get_run_param task defined above.
I have commented out errors on each modification of code.
aod = datetime.strptime(ds, "%Y-%m-%d") pulls date from taskinstance {{ds}}
@dag(
"Out_File",
start_date=pendulum.datetime(2023, 11, 1, tz='US/Eastern'),
catchup=False,
schedule = [ds_DQ1_DAG, ds_DQ2_DAG],
#schedule_interval = '30 5 * * *', # DAG Time Zone set to EST
dagrun_timeout = timedelta(hours=4),
doc_md=__doc__,
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=1),
"execution_timeout": timedelta(hours=3),
"on_failure_callback": ms_teams_callback_functions.failure_callback
})
@task(multiple_outputs=True)
def get_run_param(query_name,ds=None,ti=None) -> dict:
sf_hook = SnowflakeHook(snowflake_conn_id=snowflake)
query_name=query_name
aod = datetime.strptime(ds, "%Y-%m-%d")
current_date=aod.strftime("%Y%m%d")#today's date
previous_date=aod - timedelta(days=1)
previous_date_str = previous_date.strftime("%Y%m%d")#to point yesterday's date.
year=aod.strftime("%Y")
month=aod.strftime("%m")
day=aod.strftime("%d")
target_file = f"{query_name}_{current_date}.csv"
# Push only 'asOfDate' to XCom
#ti.xcom_push(key='as_Of_Date', value=aod)
return {
"asOfDate": aod, # Asofdate
"file_name":target_file, # FileName
"download_location": download_location, # Download Location from s3 to local
"Sftp_loc": SFTP_Directory_Path ,# SFTP Location to Download to
"query_name":query_name,
"sql_file_path":sql_file_path,# Query to run/generate file
"previous_date_str":previous_date_str,
"current_date":current_date,
"year":year,
"month":month,
"day":day
}
#dummy task to show that we skipped monthly file generation when its not 1st of the month
@task
def no_monthly_file(dummy:dict):
logging.info("No Monthly file generated.")
with TaskGroup(group_id=tg_id_3) as tg2:
get_var = get_run_param('Test_FIle1')
#aod_str = get_var['current_date'] #returns aod in str format
#aod = datetime.strptime(aod_str, "%Y%m%d")
##aod = datetime.strptime(get_var['asOfDate'], "%Y-%m-%d")-- TypeError: strptime() argument 1 must be str, not PlainXComArg using xcom generated value is the issue
##aod = get_var['asOfDate']#errors out
#if aod.day != 1:
# no_monthly_file_dummy=no_monthly_file(get_var)
# no_monthly_file_dummy
#else :
#pull asoddate value from xcom not dag /use task instance
#as_Of_Date = tg2.xcom_pull(key='as_Of_Date', task_ids='get_run_param')
as_of_date_str = get_var['current_date']
as_of_date = datetime.strptime(as_of_date_str, "%Y%m%d") #E as_of_date = datetime.strptime(as_of_date_str, "%Y%m%d") TypeError: strptime() argument 1 must be str, not PlainXComArg
as_of_date_str = get_var['current_date']
if as_of_date.day != 1:
no_monthly_file_dummy=no_monthly_file(get_var)
no_monthly_file_dummy
else :
copy_to_s3 = copy_sf_to_s3(get_var)
download_file = download_s3_file(copy_to_s3)
sftp_upload_file = sftp_upload(download_file, sftp_conn_id=SFTP_Conn1)
copy_to_s3 >> download_file >> sftp_upload_file```