What is the function of `microBatchDF._jdf.sparkSession().sql` in this code?

59 views Asked by At

What is the function of microBatchDF._jdf.sparkSession().sql in this code?

def upsert_to_delta(microBatchDF, batchId):
    microBatchDF.createOrReplaceTempView("updates")
    microBatchDF._jdf.sparkSession().sql("""
        MERGE INTO silver s
        USING updates u
        ON s.mrn = u.mrn
        WHEN MATCHED AND s.dob <> u.dob OR
                         s.sex <> u.sex OR
                         s.gender <> u.gender OR
                         s.first_name <> u.first_name OR
                         s.last_name <> u.last_name OR
                         s.street_address <> u.street_address OR
                         s.zip <> u.zip OR
                         s.city <> u.city OR
                         s.state <> u.state OR
                         s.updated <> u.updated
            THEN UPDATE SET *
        WHEN NOT MATCHED
            THEN INSERT *
    """)

query = (spark.readStream
              .table("bronze")
              .writeStream
              .foreachBatch(upsert_to_delta)
              .trigger(availableNow=True)
            #   .trigger(processingTime='5 seconds')
              .start())

DA.block_until_stream_is_ready(query)

The above is an upsert into a sliver table using streaming read against the bronze table, matching on unique identifier mrn.

0

There are 0 answers