I'm new to delta lakes but have used parquet files in PySpark for quite a while. I'm generally familiar with the concept, but have a scenario where I'm not sure of the best approach.
My scenario is this. I have information about web site usage - page views, clicks, etc. called events. There can be hundreds of events per session, each with an event timestamp, and each with the same session ID (when you sit down, open a browser, poke around, and close the browser that's a session). Each day I get a file with data for all records for the calendar day. The scenario I need to account for is when a session spans multiple days like a session that starts at 23:59 on 1/1 and ends at 00:01 on 1/2. In this case I will get records with the same session_id in the files for both days. Each record also gets an etl_timestamp value indicating when the record was created (all records get the exact same value for the entire set of records, they're not off by a few milliseconds from the first to last record added).
events
- event_id
- session_id
- url
- event_timestamp
- etl_timestamp (current timestamp when the *event record was loaded)
sessions
- session_id
- event_count
- first_event_timestamp
- last_event_timestamp
- etl_timestamp (current timestamp when the session record was created)
The source files are parquet. The files are far wider than just these few fields, more like 80 fields or so.
Traditionally in a RDBMS this is either an upsert or an insert and delete. I see that both are possible with delta lake. My initial thought was this.
- Execute the normal logic for all session_ids in today's file (1/2).
- Get the maximum sessions.etl_timestamp value for each record
- Delete records where the etl_timestamp for the record is NOT equal to the latest timestamp
The second option is upsert based on the same timestamp logic. Upsert in my traditional RDBMS experience has been super slow. I'm dealing with multiple billion total records, and generally a few million a day, so I'm not sure if that workload impacts decision here. I have a pretty significant amount of available (and scalable) horsepower using Azure.
I have no real use for point-in-time historical records, I'm only ever interested in the most recent record for each session_id.
So given all of this, what is the most efficient way to handle updating sessions that span multiple calendar days as described above?
Thanks!