Reactor API returning Task capacity of bounded elastic scheduler Exception

672 views Asked by At

I am new to ProjectReactor(using reactor-core:3.4.18), I am trying to parallelize the flux consumer subscription , I am creating a Scheduler with max threads as 2, but its failing with the following exception , whereas when i give threadscount as 4 , its working fine .

Scheduler schedulers = Schedulers.newBoundedElastic(2, 2, "PublishedThread");

        Flux.range(1, 10)
                .parallel()
                .runOn(schedulers)
                .doOnNext(e -> printName(e))
                .subscribe();

[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) Caused by: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) at reactor.core.Exceptions.failWithRejected(Exceptions.java:277)

Can someone help me to understand why giving less number of threads is throwing this exception ?

1

There are 1 answers

0
amanin On BEST ANSWER

This behaviour is documented in the scheduler creation method you use. From Schedulers.newBoundedElastic(int, int, String):

The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the provided queuedTaskCap. Past that point, a RejectedExecutionException is thrown.

So, with the scheduler you created, you cannot dispatch more than 4 tasks at the same time. Now, as you have not specified the parallelism/number of rails to use when calling Flux.parallel, I would say that the Flux tries to create more than 4 parallel tasks.

Workarounds:

  • You can use Flux.parallel(int) instead of Flux.parallel(), to try to reduce the parallelism level. For example, you could set it to 3:
    Flux.range(1, 10).parallel(3)
    
  • Or you can change the type of scheduler you create. For example, you could use a new parallel scheduler, that will create a fixed number of threads, and enqueue dispatched tasks, even when there is many:
    Scheduler scheduler = Schedulers.newParallel("PublishedThread", 2);
    Flux.range(0, 10).parallel().runOn(scheduler);
    

Here is a full example creating a ParallelFlux running 8 rails on a new parallel scheduler of 3 threads, and properly waiting for it to finish:

import java.util.concurrent.CountDownLatch;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class TestParallelLimitations {

    public static void main(String[] args) throws Exception {
        Scheduler scheduler = Schedulers.newParallel("PublishedThread", 3);
        try (AutoCloseable disposeScheduler = scheduler::dispose) {
            var flow = Flux.range(1, 12)
                    .parallel(8)
                    .runOn(scheduler)
                    .map(i -> "["+Thread.currentThread().getName()+"] -> "+i);

            var barrier = new CountDownLatch(flow.parallelism());

            flow.subscribe(System.out::println,
                    err -> {
                        System.err.println("ERROR: " + err.getMessage());
                        long remaining = barrier.getCount();
                        while (remaining > 0) {
                            barrier.countDown();
                            remaining = barrier.getCount();
                        }
                    },
                    barrier::countDown);

            barrier.await();
        }
    }
}

This example program outptut is:

[PublishedThread-3] -> 6
[PublishedThread-2] -> 5
[PublishedThread-2] -> 8
[PublishedThread-3] -> 3
[PublishedThread-3] -> 11
[PublishedThread-2] -> 2
[PublishedThread-2] -> 10
[PublishedThread-1] -> 4
[PublishedThread-1] -> 12
[PublishedThread-1] -> 7
[PublishedThread-1] -> 1
[PublishedThread-1] -> 9

We can see that all 12 elements have been processed/:dispatched on the 3 threads from the created scheduler.