Quarkus Mutiny Multi and Uni memory leaks and io.vertx.core.VertxException: Thread blocked

100 views Asked by At

I have this problem and have been stuck with it for a week now

I'm querying a huge amount of data (millions) from a DB, and then breaks them into partition using Multi and for each of them, process using Uni

My main function code looks roughly something like this:

public Uni<Void>processData(Request request) {
    AtomicReference<List<Item>> listItemRef = new AtomicReference<>();
    return getAllMatchingItem(request)
            .onItem().transformToMulti(result -> Multi.createFrom().iterable(ListUtils.partition(result, 10000)))
            .onItem().transformToUni(resultBatched -> Uni.createFrom().item(() -> resultBatched)
                    .chain(resultBatched -> {
                        listItemRef.setRelease(resultBatched);
                        return getOtherItemBasedOnItem(resultBatched);
                    })
                    .chain(otherItems -> {
                        List<Item> items = listItemRef.getPlain();
                        return createProcessedItems(items, otherItems);
                    })
                    .chain(processedItems -> {
                        return insertProcessedItems(processedItems);
                    })
                    .onTermination().invoke(() -> {
                        LOGGER.info("Data process success");
                    })
                    .replaceWithVoid() // to make sure no remaining reference to upstream Unis
            )
            .merge(1)
            .collect().asList()
            .replaceWithVoid()
            .emitOn(MutinyHelper.executor(Vertx.currentContext()));
}

and the supporting functions are roughly like this:

public Uni<List<Item>> getAllMatchingItem(Request request) {
    // get data to DB, basically PanacheRepository.find(...).list
    return Uni.createFrom().item(() -> repo.find(" ... ").list)
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
            .emitOn(MutinyHelper.executor(Vertx.currentContext()));
}

public Uni<List<OtherItem>> getOtherItemBasedOnItem(List<Item> items) {
    // get data to DB, basically PanacheRepository.find(id in items.getIdSet).list()
    return Uni.createFrom().item(() -> repo.find("id in ?1", items.getIDSet()).list)
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
            .emitOn(MutinyHelper.executor(Vertx.currentContext()));
}

public Uni<List<ProcessedItem>> createProcessedItems(List<Item> items, List<OtherItem> otherItems) {
    return Uni.createFrom().item(() -> doCreateProcessedItems(item, otherItems))
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
            .emitOn(MutinyHelper.executor(Vertx.currentContext()));
}

public List<ProcessedItem> doCreateProcessedItems(List<Item> items, List<OtherItem> otherItems) {
    // process those items
    // basically create Map, create list, etc
    // no external function call, just plain code
}

public Uni<Void> insertProcessedItems(List<ProcessedItem> processedItems) {
    // basically just insert/update
    return Uni.createFrom().voidItem()
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
            .invoke(() -> repo.persist(processedItems))
            .emitOn(MutinyHelper.executor(Vertx.currentContext()));
}

I suspect there are memory leaks because the log Data process success appears a few times before getting this exception

java.lang.OutOfMemoryError: Java heap space

I also getting this warning:

(vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3203 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
2024-02-03 21:20:10,152 WARN [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3203 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked

I appreciate any clue I could get on how to solve this complex issue

Note: I'm using Quarkus version 2.16 if that matters

1

There are 1 answers

4
jponge On

A Vert.x thread block checker warning means that you are doing some blocking I/O and/or long-running work on an event-loop thread.

From what I see in your code I suspect that you are doing calls to Hibernate (not Reactive) and then put data from memory into reactive pipelines, eventually collecting everything as a list (.collect().asList()) does just that.

Reactive is great if you do it properly end-to-end with non-blocking I/O. If that's not the case I'd recommend rewriting the code as plain imperative code.