Casting cassandra timestamp column as timeuuid

1.3k views Asked by At

I'm getting events from Kafka and storing into Cassandra. Parsing json which contains fields eventID, sessionID, timestamp, userID to create columns for Cassandra table which looks like this:

cassandra@cqlsh> CREATE TABLE mydata.events (
   ...     "event_date" date,
   ...     "eventID" text,
   ...     "userID" text,
   ...     timestamp timeuuid,
   ...     "sessionID" text,
   ...     "fullJson" text,
   ...     PRIMARY KEY ("event_date", timestamp, "sessionID")

and in code:

case class cassandraFormat(
                       eventID: String, 
                       sessionID: String,
                       timeuuid: UUID, // timestamp as timeuuid
                       userID: String,
                       event_date: LocalDate, // YYYY-MM-dd format
                       fullJson: String // full json from Kafka
                     )

I need to add timestamp column as timeuuid. Since I'm parsing from json, extracted all values from header and created columns in this fashion:

 val allJson = rdd.
            map(x => {
              implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
              //use serialization default to format a Map to JSON
              (x, Serialization.write(x))
            }).
            filter(x => x._1 isDefinedAt "header").
            map(x => (x._1("header"), x._2)).
            filter(x => (x._1 isDefinedAt "userID") &&
              (x._1 isDefinedAt "eventID") &&
              (x._1 isDefinedAt "sessionID") &&
              (x._1 isDefinedAt "timestamp").
            map(x => cassFormat(x._1("eventID").toString,
              x._1("sessionID").toString,
              com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong),
              x._1("userID").toString,
              com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(x._1("timestamp").toString.toLong),
              x._2))

This part:

com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong)

is generating Error

java.lang.NumberFormatException: For input string: "2019-05-09T09:00:52.553+0000" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

Even tried: java.util.UUID.fromString(x._1("timestamp").toString, also generating same Error. How to properly cast/convert timestamp as timeuuid and insert into Cassandra via spark job

3

There are 3 answers

0
dejanmarich On BEST ANSWER

I managed to do it, converting timestamp format to dateTime and to millis, then generate uuid:

val dateTimePattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
val dateFormatter = DateTimeFormatter.ofPattern(dateTimePattern)

val allJson = rdd.
              map(x => {
                implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
                //use serialization default to format a Map to JSON
                (x, Serialization.write(x))
              }).
              filter(x => x._1 isDefinedAt "header").
              map(x => (x._1("header"), x._2)).
              filter(x => (x._1 isDefinedAt "userID") &&
                (x._1 isDefinedAt "eventID") &&
                (x._1 isDefinedAt "sessionID") &&
                (x._1 isDefinedAt "timestamp").
              map(x => {
                var millis: Long  = System.currentTimeMillis() // if timestamp format is invalid, put current timestamp instead
                try {
                  val dateStr: String = x._1("timestamp").asInstanceOf[String]
                  // timestamp from event json
                  // create DateTime from Timestamp string
                  val dateTime: ZonedDateTime = ZonedDateTime.parse(dateStr, dateFormatter)
                  // create millis from DateTime
                  millis = dateTime.toInstant.toEpochMilli
                } catch {
                  case e: Exception =>
                    e.printStackTrace()
                }
                // generate timeuuid
                val uuid = new UUID(UUIDs.startOf(millis).getMostSignificantBits, random.nextLong)
                // generate eventDate
                val eventDate = com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(millis)
                cassFormat(x._1("eventID").toString,
                  x._1("sessionID").toString,
                  uuid,
                  x._1("userID").toString,
                  eventDate,
                  x._2)
              })
            allJson.saveToCassandra(CASSANDRA_KEYSPACE_NAME, CASSANDRA_EVENTS_TABLE)
        }
      })

timestamp column in cassandra now looks like: 58976340-7313-11e9-910d-60dce7513b94

1
Dionysis Nt. On

You have a string that is not a number and you are trying to convert it into one using toLong. thus the exception.

Looking at this, it looks like you can get a UUID based on some timestamp using this method:

public static UUID getTimeUUID(long when)

You are gonna have to parse the string into a DateTime or an Instant then pass the milliseconds of that DateTime/ Instant to getTimeUUID

0
Maciej Szymczyk On

I've solved that problem with UDF.

import com.datastax.driver.core.utils.UUIDs
import org.apache.spark.sql.functions.udf
 
val toTimeuuid: java.sql.Timestamp => String = x => UUIDs.startOf(x.getTime()).toString()
val fromTimeuuid: String => java.sql.Timestamp = x => new java.sql.Timestamp(UUIDs.unixTimestamp(java.util.UUID.fromString(x)))
 
val toTimeuuidUDF = udf(toTimeuuid)
val fromTimeuuidUDF = udf(fromTimeuuid)