I am running a SPARK job and for the most it goes fast but at the last task it gets stuck in one of the stages. I can see there is a lot more shuffle read / rows happening for that task, and tried a bunch of re-partitioning strategies to make sure an even distribution. But still can't get through it. Could you please help? Attaching images for the same too.
The join that I am doing is trying to look-up for some private data which is in a delta lake table (all of this is being done on Databricks).
Table 1 with all desired event logs / rows is: sizeInBytes=218.2 TiB; BUT, filtered on a partition key date for just the last 4 days. Still huge enough I assume, as there are a lot of events.
Table 2 the look up table for the personal fields which are hashed in the above table is: sizeInBytes=1793.9 GiB. This table just has 4 columns. Key, hash, timestamp and type. This is just a simple look up table.
enter image description here enter image description here
Essentially, there are 4 hashed out field that I need to reverse look up and that needs 4 separate joins with this look up table. This is quite expensive, but at this point there is no way out for this. The join is happening on that hashed_key, which I tried to use in reparitioning scheme for the Dataframes. I thought doing this will bring the same hash_keys in the same partition and then they could be picked up by the same executor. This is the hypothesis, but still I see one task running for a long time as it is doing exorbitant amount of shuffle reads and going through a lot more rows.
What could I be doing wrong? Is repartitioning not a good approach here? I read somewhere that I could try ITERATIVE broadcasting. That involved breaking the smaller table (which seems the lookup table here) in to smaller chunks (I think lesser than 8 GB) and then broadcast it multiple times to eventually merge all data later.
Any help would be appreciated as I am getting stuck at the same place with these strategies.
Thank you!
Doing a union on a few types to create first Dataframe. Then joining it with the lookup.
allIncrementalEvents.as("e")
.filter(col("e.type") === "authentication")
.filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
.filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
.filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
.repartition(partitions)
UNION
allIncrementalEvents.as("e")
.filter(col("e.type") === "session")
.filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
.filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
.filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
.repartition(partitions)
UNION
allIncrementalEvents.as("e")
.filter(col("e.type") === "other")
.filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
.filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
.filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
.repartition(partitions)
Join
extractAuthEvents
.union(extractSubEvents)
.union(extractOpenEvents)
.union(extractSessionEvents)
.join(reverseLookupTableDf.as("adId"),
col("adId") === col("adId.hashed"),
"leftouter"
)
.join(reverseLookupTableDf.as("ip"),
col("ae.ip") === col("ip.hashed"),
"leftouter"
)
.join(reverseLookupTableDf.as("ua"),
col("ae.ua") === col("ua.hashed"),
"leftouter"
)
.join(reverseLookupTableDf.as("uid"),
col("ae.uuid") === col("uid.hashed"),
"leftouter"
)