I have have set auto-broadcast operation to off , when I am running my job , with the config
"spark.sql.autoBroadcastJoinThreshold", -1
Now , I have 2 dataframes when I use only 1 column in the joining clause , it does not go for a broadcast join
data1 = [("123","abc")]
schema =(StructType([
StructField("id1",StringType(),True),
StructField("id2",StringType(),True)]))
data_df1 = spark.createDataFrame(data=data1,schema=schema)
data2 = [("123","abc")]
schema =(StructType([
StructField("id1",StringType(),True),
StructField("id2",StringType(),True)]))
data_df2 = spark.createDataFrame(data=data1,schema=schema)
join_condition_1 = [(data_df1.id1 == data_df2.id1) ]
data_df1.join(data_df2,join_condition_1, "left").explain()
Query Plan1 :
== Physical Plan ==
SortMergeJoin [id1#308], [id1#312], LeftOuter
:- *(1) Sort [id1#308 ASC NULLS FIRST], false, 0
: +- Exchange(coordinator id: 1272911761) hashpartitioning(id1#308, 2000), coordinator[target post-shuffle partition size: 67108864]
: +- Scan ExistingRDD[id1#308,id2#309]
+- *(3) Sort [id1#312 ASC NULLS FIRST], false, 0
+- Exchange(coordinator id: 1272911761) hashpartitioning(id1#312, 2000), coordinator[target post-shuffle partition size: 67108864]
+- *(2) Filter isnotnull(id1#312)
+- Scan ExistingRDD[id1#312,id2#313]
If I use a Or clause in the join condition , it goes for a broadcast nested loop join
join_condition_2 = [(data_df1.id1 == data_df2.id1) | (data_df1.id2 == data_df2.id2) ]
data_df1.join(data_df2,join_condition_2, "left").explain()
Query Plan 2:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((id1#308 = id1#312) || (id2#309 = id2#313))
:- Scan ExistingRDD[id1#308,id2#309]
+- BroadcastExchange IdentityBroadcastMode
+- Scan ExistingRDD[id1#312,id2#313]
Why is this happening and how do I prevent the broadcast join to occur in the 2nd case? Spark version 2.4.4