Avoid Broadcast Nested Loop Join in Pyspark when the joining condition has a OR clause

107 views Asked by At

I have have set auto-broadcast operation to off , when I am running my job , with the config

"spark.sql.autoBroadcastJoinThreshold", -1

Now , I have 2 dataframes when I use only 1 column in the joining clause , it does not go for a broadcast join

data1 = [("123","abc")]
schema =(StructType([ 
    StructField("id1",StringType(),True), 
    StructField("id2",StringType(),True)])) 
data_df1 = spark.createDataFrame(data=data1,schema=schema)
data2 = [("123","abc")]
schema =(StructType([ 
    StructField("id1",StringType(),True), 
    StructField("id2",StringType(),True)])) 
data_df2 = spark.createDataFrame(data=data1,schema=schema)
join_condition_1 = [(data_df1.id1 == data_df2.id1) ]
data_df1.join(data_df2,join_condition_1, "left").explain()

Query Plan1 :

== Physical Plan ==
SortMergeJoin [id1#308], [id1#312], LeftOuter
:- *(1) Sort [id1#308 ASC NULLS FIRST], false, 0
:  +- Exchange(coordinator id: 1272911761) hashpartitioning(id1#308, 2000), coordinator[target post-shuffle partition size: 67108864]
:     +- Scan ExistingRDD[id1#308,id2#309]
+- *(3) Sort [id1#312 ASC NULLS FIRST], false, 0
   +- Exchange(coordinator id: 1272911761) hashpartitioning(id1#312, 2000), coordinator[target post-shuffle partition size: 67108864]
      +- *(2) Filter isnotnull(id1#312)
         +- Scan ExistingRDD[id1#312,id2#313]

If I use a Or clause in the join condition , it goes for a broadcast nested loop join

join_condition_2 = [(data_df1.id1 == data_df2.id1) | (data_df1.id2 == data_df2.id2) ]
data_df1.join(data_df2,join_condition_2, "left").explain()

Query Plan 2:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((id1#308 = id1#312) || (id2#309 = id2#313))
:- Scan ExistingRDD[id1#308,id2#309]
+- BroadcastExchange IdentityBroadcastMode
   +- Scan ExistingRDD[id1#312,id2#313]

Why is this happening and how do I prevent the broadcast join to occur in the 2nd case? Spark version 2.4.4

0

There are 0 answers