With a data frame of IPs and updated status, we first need to delete the data frame's IPs from the SQL table. After that, we should insert this data frame with the updated status using Spark SQL. Additionally, the transaction should roll back if either the deletion or insertion encounters an error. I am stuck with various solutions; if you can suggest an optimal way of doing it, it would be of great help
val connection = DriverManager.getConnection(jdbcUrl, "user", "Password")
val statement = connection.createStatement()
try {
connection.setAutoCommit(false)
//Build the DELETE SQL statement
val deleteStatement =
s"""
|DELETE FROM test_logs1
|WHERE IPAddress IN (${ipList.map(ip => s"'$ip'").mkString(",")})
""".stripMargin
//Execute the DELETE statement
statement.execute(deleteStatement)
println("df_to_insert after deletion:")
df_to_insert.show()
df_to_insert.write
.mode("append")
.jdbc(jdbcUrl, tableName, connectionProperties)
statement.execute("COMMIT")
connection.commit()
}
catch {
case e: Exception =>
// Rollback the transaction in case of an exception
println("Error occurred: " + e.getMessage)
connection.rollback()
} finally {
// Close the statement and connection
if (statement != null) statement.close()
if (connection != null) connection.close()
}
This gives me a lock-timeout, retry transaction error
Your
DELETEstatement and yourINSERTones viadf_to_insert.writeare not using the same connection (and transaction).This likely explains the error you get as the
DELETEs are not commited yet when you try to insert.df_to_insert.writeis opening its own connection/transaction.