Long delay on reading from Kafka Spark

223 views Asked by At

There is a Spark streaming application which reads messages from Kafka, processes them and then stores them to some DB. During its latency optimization constant delay of ~2s on reading from Kafka was detected.
The delay is measured as "time when a message was read" - "timestamp assigned by Kafka broker" (since there's no time shift between Kafka and Spark nodes)
There are no intentionally set spark/kafka-connector configurations limiting the minimal message quantity for a single batch. The delay is constant, i.e. it remains the same under 100 msg/s and 10.000 msg/s.

Does anyone has any clue why the delay takes place and how to eliminate it?

Tried to run application for some hours in order to heat up JVM though it didn't change anything; Tried reading from topics with different partition number: 20 and 30 - no change also

Full message processing looks like the following:

val additionalOptions = Map(
    "startingOffsets" -> "latest",
    "kafka.security.protocol" -> "SASL_SSL",
    "kafka.sasl.mechanism" -> "SCRAM-SHA-512"
)

val df = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", ...)
    .option("subscribe", ...)
    .option("kafka.allow.auto.create.topics", "false")
    .options(additionalOptions)
    .load()

// df processing

df.writeStream
    .outputMode("append")
    .option("triggerType", "processingTime")
    .option("triggerInterval", 0)
    .option("checkpointLocation", ...)
    .option("outputMode", "append")
    .foreachBatch { (dataset: Dataset[W], batchId: Long) => writeBatch(dataset, batchId) }
    .start()
1

There are 1 answers

0
Rishabh Sharma On

Short Answer

As per docs here, under optional configuration lies a setting kafkaConsumer.pollTimeoutMs which defaults to 2 minutes. So theoritically any message published to kafka will be consumed with a latency of upto 2minutes. Update this to reduce your delay.

Theory

When your consumer is reading from the stream, depending on the producer throughput and processing logic of your consumer - you don't want to consume too early or consume too late.

For e.g. if your producer produces 1 msg/min, polling with timeout of 1 second does not make sense since only 1 out of 60 polls would be useful. However, if your producer throughput is something like 10msgs/sec you might want to reduce poll timeout. Then your consumer processing comes into picture - questions like what should be the ideal batch size for consumers. Depending on that you can configure your poll timeout.