I have this problem and have been stuck with it for a week now
I'm querying a huge amount of data (millions) from a DB, and then breaks them into partition using Multi and for each of them, process using Uni
My main function code looks roughly something like this:
public Uni<Void>processData(Request request) {
AtomicReference<List<Item>> listItemRef = new AtomicReference<>();
return getAllMatchingItem(request)
.onItem().transformToMulti(result -> Multi.createFrom().iterable(ListUtils.partition(result, 10000)))
.onItem().transformToUni(resultBatched -> Uni.createFrom().item(() -> resultBatched)
.chain(resultBatched -> {
listItemRef.setRelease(resultBatched);
return getOtherItemBasedOnItem(resultBatched);
})
.chain(otherItems -> {
List<Item> items = listItemRef.getPlain();
return createProcessedItems(items, otherItems);
})
.chain(processedItems -> {
return insertProcessedItems(processedItems);
})
.onTermination().invoke(() -> {
LOGGER.info("Data process success");
})
.replaceWithVoid() // to make sure no remaining reference to upstream Unis
)
.merge(1)
.collect().asList()
.replaceWithVoid()
.emitOn(MutinyHelper.executor(Vertx.currentContext()));
}
and the supporting functions are roughly like this:
public Uni<List<Item>> getAllMatchingItem(Request request) {
// get data to DB, basically PanacheRepository.find(...).list
return Uni.createFrom().item(() -> repo.find(" ... ").list)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.emitOn(MutinyHelper.executor(Vertx.currentContext()));
}
public Uni<List<OtherItem>> getOtherItemBasedOnItem(List<Item> items) {
// get data to DB, basically PanacheRepository.find(id in items.getIdSet).list()
return Uni.createFrom().item(() -> repo.find("id in ?1", items.getIDSet()).list)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.emitOn(MutinyHelper.executor(Vertx.currentContext()));
}
public Uni<List<ProcessedItem>> createProcessedItems(List<Item> items, List<OtherItem> otherItems) {
return Uni.createFrom().item(() -> doCreateProcessedItems(item, otherItems))
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.emitOn(MutinyHelper.executor(Vertx.currentContext()));
}
public List<ProcessedItem> doCreateProcessedItems(List<Item> items, List<OtherItem> otherItems) {
// process those items
// basically create Map, create list, etc
// no external function call, just plain code
}
public Uni<Void> insertProcessedItems(List<ProcessedItem> processedItems) {
// basically just insert/update
return Uni.createFrom().voidItem()
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.invoke(() -> repo.persist(processedItems))
.emitOn(MutinyHelper.executor(Vertx.currentContext()));
}
I suspect there are memory leaks because the log Data process success appears a few times before getting this exception
java.lang.OutOfMemoryError: Java heap space
I also getting this warning:
(vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3203 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
2024-02-03 21:20:10,152 WARN [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3203 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
I appreciate any clue I could get on how to solve this complex issue
Note: I'm using Quarkus version 2.16 if that matters
A Vert.x thread block checker warning means that you are doing some blocking I/O and/or long-running work on an event-loop thread.
From what I see in your code I suspect that you are doing calls to Hibernate (not Reactive) and then put data from memory into reactive pipelines, eventually collecting everything as a list (
.collect().asList()) does just that.Reactive is great if you do it properly end-to-end with non-blocking I/O. If that's not the case I'd recommend rewriting the code as plain imperative code.