Pyspark very slow in loop with updating same dataframe again and again

76 views Asked by At

I want to implement a logic in databricks pyspark where I want to update next days value based on the updated value of last 14 days. I am using loops to do it. Below is the code but it is very slow and after certain point want move forward.

import time
start_time = time.time()

DateList=score_data.select("AsOfdate").distinct().orderBy('AsOfDate').rdd.flatMap(lambda x: x).collect()

score_data=score_data.withColumn('score_original',col('score')).withColumn('Amount_original',col('Amount'))

for date in DateList:
  rolling_window=date+timedelta(-14)
  print(date)
  df_current=score_data.filter(col("AsOfDate").isin(date))

  if 'rolling_median' in df_current.columns:
    df_current=df_current.drop('rolling_median')

  df_rolling = score_data.filter((col("AsOfDate")<=date) & (col("AsOfDate")>rolling_window))

  rolling_median=df_rolling.filter(col('score').isNotNull()).groupBy('id').agg(F.expr('(percentile_approx(score,0.5)+percentile_approx(score,0.50001))/2').alias('rolling_median'))

  df_current=df_current.join(rolling_median,'id','left')
  df_current=df_current.withColumn('rolling_gap',abs(col('score')-col('rolling_median')))
  df_current=df_current.withColumn('score',when(col('rolling_gap')>3,None).otherwise(col('score'))).withColumn('Amount',when(col('rolling_gap')>3,None).otherwise(col('Amount')))

  score_data=score_data.filter(~(col("AsOfDate").isin(date)))
  score_data=score_data.unionByName(df_current,allowMissingColumns=True)
print(f"Execution time: {time.time() - start_time}")  

is there a way to improve this code? In attached data, green part show the updated score and orange part show the original score. So, next day, 31-aug-2023, I want to use the median based on the update score(null).

enter image description here

1

There are 1 answers

1
greenie On

Doing operations on a data frame within a for loop that includes such operations like dropping a column, join and union are all reasons why the code is not performant. It is also not very easy to understand. Instead, try to see if you can do the following:

  1. collect the date, and score value in to a dictionary for just the past 14 days
  2. pass this dictionary in to a UDF function to create your new score values.
  3. Inside the UDF add the for loop of dates calculations.