Trying to create a stream (Flux) of strings with create method but it doesn't work

41 views Asked by At

I am trying to create a Flux by querying some API endpoint multiple times. The payload I receive has a variable that indicates if the data is exhausted, meaning in this JSON payload there is a done variable that is false while there is still data to be fetched and it is set to true when it's been all fetched. I would then complete this Flux.

I am trying to make this work and failing so not to complicate things I created a simpler code that just emits strings for this example.

The interface that I will mock, but this returns the Mono<String> but in real life this will be returning a Mono of this record I am fetching from API.

public interface RecordProvider {
    Mono<String> fetchString();
}

Then in the service that should process these I have these methods:

private final AtomicInteger stringCounter = new AtomicInteger(0);

public Flux<String> assembleStringsFlux() {
    return Flux.<String>create(this::generateStringMono);
}

private void generateStringMono(FluxSink<String> fluxSink) {
    recordProvider.fetchString()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe(
                    stringResult -> {
                        if (stringCounter.get() == 5) {
                            fluxSink.complete();
                        } else {
                            fluxSink.next(stringResult);
                        }
                    },
                    fluxSink::error);
}

As you can see I will count the number of strings/items fetched and after 5 will complete. This is not the real life scenario but this is just for testing.

I try to make this work in my tests as follows:

@Test
void givenString() {
    // given
    // when
    when(recordProvider.fetchString()).thenReturn((Mono<String>) Mono.just("bla"));

    var result = recordProcessor.assembleStringsFlux()
            .map(someString -> {
                System.out.println("whaaaat = " + someString);
                return someString;
            }).subscribeOn(Schedulers.boundedElastic());

    // then
    StepVerifier.create(result)
            .expectNextMatches(stringSignal -> stringSignal.equals("bla"))
            .expectNextMatches(stringSignal -> stringSignal.equals("bla"))
            .expectNextMatches(stringSignal -> stringSignal.equals("bla"))
            .expectNextMatches(stringSignal -> stringSignal.equals("bla"))
            .verifyComplete();
}

When I run this test, only one string is printed to log and then it seems like it just hangs until it finishes with error.

Any ideas what I am doing wrong?

UPDATE 1: Ok, as I feared, I guess a single subscribe will not be enough. So I need a loop where I get these elements until I hit the terms for completing the Flux. But What I am wondering, if I am accessing a method that returns a Mono should I really do something like the following:

    return Flux.create(fluxSink -> {
            do {
                var myStr = recordProvider.fetchString().block();
                fluxSink.next(myStr);
                stringCounter.incrementAndGet();
            } while (stringCounter.get() < 5);
            fluxSink.complete();

Because I am trying not to call block() and I am wondering how do I subscribe over and over again?

1

There are 1 answers

0
Dusko On

Ok, so I figured it out, I made a mistake, a big overlook...

The generateStringMono method should look like this:

private void generateStringMono(FluxSink<String> fluxSink) {
    recordProvider.fetchString()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe(
                    stringResult -> {
                        if (stringCounter.get() == 5) {
                            fluxSink.complete();
                        } else {
                            fluxSink.next(stringResult);
                            generateStringMono(sluxSink); // <-- forgot this
                        }
                    },
                    fluxSink::error);
}

So basically I need to call the method again.

Also in the test, thenReturn should not be used, but thenAnswer and it should look like this:

when(recordProvider.generateStringMono()).thenAnswer(new Answer<Mono<String>>() {
            @Override
            public Mono<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
                return monoList.get(callCounter.getAndIncrement());
            }
        });

The monoList above is just a list of different strings that then I check in my step verifier so that it's not always the same.