I have a Kafka topic with 100 partitions, and I need to map messages into another type( it is long-term processing logic, about 500 ms for each message), then collect them into batches and send them to an external service. Here is my KStream bean:
@Bean
public KStream<String, List<Request>> kStream(StreamsBuilder streamsBuilder) {
var stream = streamsBuilder
.stream(topic, Consumed.with(Serdes.String(), getIncomingMessageSerde()));
var storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
Serdes.String(),
getBatchSerde());
streamsBuilder.addStateStore(storeBuilder);
stream
.mapValues(asyncService::processBatch) // process first
.process(this::getAggregationProcessor, storeName) // then processor collects messages into batch
.foreach(asyncService::saveBatches); // then save batch somewhere(app. 30 messages size)
return stream;
}
I created this processor for batching messages:
public class AggregationProcessor implements Processor<String, BatchInfo, String, AggregatedBatchesInfo> {
private final String stateStoreName;
private final String storeKey;
private final Integer batchSize;
private final Integer flushInterval;
private ProcessorContext<String, AggregatedBatchesInfo> context;
private KeyValueStore<String, AggregatedBatchesInfo> stateStore;
public AggregationProcessor(String stateStoreName, String storeKey, Integer batchSize, Integer flushInterval) {
this.stateStoreName = stateStoreName;
this.storeKey = storeKey;
this.batchSize = batchSize;
this.flushInterval = flushInterval;
}
@Override
public void init(ProcessorContext<String, AggregatedBatchesInfo> context) {
this.context = context;
this.stateStore = context.getStateStore(stateStoreName);
context.schedule(Duration.ofSeconds(flushInterval),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> flushBatch());
}
@Override
public void process(Record<String, BatchInfo> batchRecord) {
var aggregatedBatches = getAggregatedBatches();
aggregatedBatches.add(batchRecord.value());
updateState(aggregatedBatches);
if (aggregatedBatches.size() >= batchSize) {
flushBatch();
}
}
private void flushBatch() {
var aggregatedBatches = getAggregatedBatches();
if (!aggregatedBatches.isEmpty()) {
context.forward(new Record<>(randomUUID().toString(), aggregatedBatches, System.currentTimeMillis()));
updateState(new AggregatedBatchesInfo());
}
}
private AggregatedBatchesInfo getAggregatedBatches() {
final var storeValue = stateStore.get(storeKey);
return isNull(storeValue) ? new AggregatedBatchesInfo() : storeValue;
}
private void updateState(AggregatedBatchesInfo aggregatedBatches) {
stateStore.put(storeKey, aggregatedBatches);
}
@Override
public void close() {
flushBatch();
stateStore.flush();
}
}
The created State Store for saving collected batches is a topic with 100 partitions (each one for the Kafka stream task). For this reason, this store works only for one partition, so I need to wait a long time to collect it before saving it. Is it possible that different tasks could have the same shared store to make this collection process faster? Or maybe I chose the wrong approach to solve this issue.