I am trying to create a Flux by querying some API endpoint multiple times. The payload I receive has a variable that indicates if the data is exhausted, meaning in this JSON payload there is a done variable that is false while there is still data to be fetched and it is set to true when it's been all fetched. I would then complete this Flux.
I am trying to make this work and failing so not to complicate things I created a simpler code that just emits strings for this example.
The interface that I will mock, but this returns the Mono<String> but in real life this will be returning a Mono of this record I am fetching from API.
public interface RecordProvider {
Mono<String> fetchString();
}
Then in the service that should process these I have these methods:
private final AtomicInteger stringCounter = new AtomicInteger(0);
public Flux<String> assembleStringsFlux() {
return Flux.<String>create(this::generateStringMono);
}
private void generateStringMono(FluxSink<String> fluxSink) {
recordProvider.fetchString()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(
stringResult -> {
if (stringCounter.get() == 5) {
fluxSink.complete();
} else {
fluxSink.next(stringResult);
}
},
fluxSink::error);
}
As you can see I will count the number of strings/items fetched and after 5 will complete. This is not the real life scenario but this is just for testing.
I try to make this work in my tests as follows:
@Test
void givenString() {
// given
// when
when(recordProvider.fetchString()).thenReturn((Mono<String>) Mono.just("bla"));
var result = recordProcessor.assembleStringsFlux()
.map(someString -> {
System.out.println("whaaaat = " + someString);
return someString;
}).subscribeOn(Schedulers.boundedElastic());
// then
StepVerifier.create(result)
.expectNextMatches(stringSignal -> stringSignal.equals("bla"))
.expectNextMatches(stringSignal -> stringSignal.equals("bla"))
.expectNextMatches(stringSignal -> stringSignal.equals("bla"))
.expectNextMatches(stringSignal -> stringSignal.equals("bla"))
.verifyComplete();
}
When I run this test, only one string is printed to log and then it seems like it just hangs until it finishes with error.
Any ideas what I am doing wrong?
UPDATE 1: Ok, as I feared, I guess a single subscribe will not be enough. So I need a loop where I get these elements until I hit the terms for completing the Flux. But What I am wondering, if I am accessing a method that returns a Mono should I really do something like the following:
return Flux.create(fluxSink -> {
do {
var myStr = recordProvider.fetchString().block();
fluxSink.next(myStr);
stringCounter.incrementAndGet();
} while (stringCounter.get() < 5);
fluxSink.complete();
Because I am trying not to call block() and I am wondering how do I subscribe over and over again?
Ok, so I figured it out, I made a mistake, a big overlook...
The generateStringMono method should look like this:
So basically I need to call the method again.
Also in the test,
thenReturnshould not be used, butthenAnswerand it should look like this:The
monoListabove is just a list of different strings that then I check in my step verifier so that it's not always the same.