crossJoin two Spark dataframes not using crossJoin

82 views Asked by At

Let's assume I have two Spark data frames:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Example data for DataFrame 1
data1 = [
    ("Pool_A", "A", "X", 10),
    ("Pool_A", "A", "Y", 20),
    ("Pool_A", "B", "X", 15),
    ("Pool_B", "A", "X", 5),
    ("Pool_B", "B", "Y", 25),
]

# Define the schema for DataFrame 1
df1_schema = ["pool", "col1", "col2", "value"]

# Create DataFrame 1
df1 = spark.createDataFrame(data1, df1_schema)

# Example data for DataFrame 2
data2 = [
    ("A", "X", 100),
    ("A", "Y", 200),
    ("B", "X", 150),
    ("B", "Y", 250),
    ("C", "X", 300),
]

# Define the schema for DataFrame 2
df2_schema = ["col1", "col2", "default_value"]

# Create DataFrame 2
df2 = spark.createDataFrame(data2, df2_schema)

I want to join the two dataframes by propagating all possible combinations of "col1", "col2" for each "pool" and have the default "value" associated with it. I have a solution using a crossJoin, but wanted to see if there other elegant solutions (+cost of performance of using the crossJoin)

This is the desired output:

+-------+----+----+-----+
|   pool|col1|col2|value|
+-------+----+----+-----+
| Pool_B|   A|   X|    5|
| Pool_B|   B|   Y|   25|
| Pool_B|   C|   X|  300|
| Pool_B|   B|   X|  150|
| Pool_B|   A|   Y|  200|
| Pool_A|   A|   X|   10|
| Pool_A|   B|   X|   15|
| Pool_A|   A|   Y|   20|
| Pool_A|   B|   Y|  250|
| Pool_A|   C|   X|  300|
+-------+----+----+-----+
1

There are 1 answers

0
ZygD On BEST ANSWER

In big data distributed computing, there's really no other way apart from crossJoin to get all the combinations of two different dataframes. But before that, you will want to make a small dataframe with only "pools".

After the crossJoin we can join values from df1 and fill in gaps (nulls) with default values using coalesce.

from pyspark.sql import functions as F

df_pools = df1.select('pool').distinct()
df_comb = df_pools.crossJoin(df2)
df_joined = df_comb.join(df1, ['pool', 'col1', 'col2'], 'left')
df_coalesced = df_joined.select(
    'pool', 'col1', 'col2',
    F.coalesce('value', 'default_value').alias('value')
)
df_coalesced.show()
# +------+----+----+-----+
# |  pool|col1|col2|value|
# +------+----+----+-----+
# |Pool_B|   A|   Y|  200|
# |Pool_A|   A|   X|   10|
# |Pool_B|   A|   X|    5|
# |Pool_A|   A|   Y|   20|
# |Pool_A|   B|   Y|  250|
# |Pool_B|   B|   X|  150|
# |Pool_A|   B|   X|   15|
# |Pool_A|   C|   X|  300|
# |Pool_B|   B|   Y|   25|
# |Pool_B|   C|   X|  300|
# +------+----+----+-----+

That being said, if you are sure that the number of values in the "pool" column is not too big, you can extract the values from the dataframe as a list (into the driver) and send the list to executors.

pools = [x[0] for x in df1.select('pool').distinct().collect()]
df_comb = df2.withColumn('pool', F.explode(F.array(*[F.lit(x) for x in pools])))
df_joined = df_comb.join(df1, ['pool', 'col1', 'col2'], 'left')
df_coalesced = df_joined.select(
    'pool', 'col1', 'col2',
    F.coalesce('value', 'default_value').alias('value')
)
df_coalesced.show()
# +------+----+----+-----+
# |  pool|col1|col2|value|
# +------+----+----+-----+
# |Pool_B|   A|   Y|  200|
# |Pool_A|   A|   X|   10|
# |Pool_B|   A|   X|    5|
# |Pool_A|   A|   Y|   20|
# |Pool_A|   B|   Y|  250|
# |Pool_B|   B|   X|  150|
# |Pool_A|   B|   X|   15|
# |Pool_A|   C|   X|  300|
# |Pool_B|   B|   Y|   25|
# |Pool_B|   C|   X|  300|
# +------+----+----+-----+

Note: in Spark 3.4+, instead of F.array(*[F.lit(x) for x in pools]) you can use F.lit(pools)

Such approach would avoid crossJoin.

Query plan using crossJoin:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [pool#899, col1#907, col2#908, coalesce(value#921L, default_value#909L) AS value#927L]
   +- SortMergeJoin [pool#899, col1#907, col2#908], [pool#918, col1#919, col2#920], LeftOuter
      :- Sort [pool#899 ASC NULLS FIRST, col1#907 ASC NULLS FIRST, col2#908 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(pool#899, col1#907, col2#908, 200), ENSURE_REQUIREMENTS, [plan_id=4569]
      :     +- CartesianProduct
      :        :- HashAggregate(keys=[pool#899], functions=[], output=[pool#899])
      :        :  +- Exchange hashpartitioning(pool#899, 200), ENSURE_REQUIREMENTS, [plan_id=4564]
      :        :     +- HashAggregate(keys=[pool#899], functions=[], output=[pool#899])
      :        :        +- Project [pool#899]
      :        :           +- Scan ExistingRDD[pool#899,col1#900,col2#901,value#902L]
      :        +- Scan ExistingRDD[col1#907,col2#908,default_value#909L]
      +- Sort [pool#918 ASC NULLS FIRST, col1#919 ASC NULLS FIRST, col2#920 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(pool#918, col1#919, col2#920, 200), ENSURE_REQUIREMENTS, [plan_id=4570]
            +- Filter ((isnotnull(pool#918) AND isnotnull(col1#919)) AND isnotnull(col2#920))
               +- Scan ExistingRDD[pool#918,col1#919,col2#920,value#921L]

Query plan without crossJoin (i.e. sending the list to executors):

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [pool#1379, col1#1371, col2#1372, coalesce(value#1366L, default_value#1373L) AS value#1389L]
   +- SortMergeJoin [pool#1379, col1#1371, col2#1372], [pool#1363, col1#1364, col2#1365], LeftOuter
      :- Sort [pool#1379 ASC NULLS FIRST, col1#1371 ASC NULLS FIRST, col2#1372 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(pool#1379, col1#1371, col2#1372, 200), ENSURE_REQUIREMENTS, [plan_id=6619]
      :     +- Generate explode([Pool_A,Pool_B]), [col1#1371, col2#1372, default_value#1373L], false, [pool#1379]
      :        +- Scan ExistingRDD[col1#1371,col2#1372,default_value#1373L]
      +- Sort [pool#1363 ASC NULLS FIRST, col1#1364 ASC NULLS FIRST, col2#1365 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(pool#1363, col1#1364, col2#1365, 200), ENSURE_REQUIREMENTS, [plan_id=6620]
            +- Filter ((isnotnull(pool#1363) AND isnotnull(col1#1364)) AND isnotnull(col2#1365))
               +- Scan ExistingRDD[pool#1363,col1#1364,col2#1365,value#1366L]