Java: How to count and process elements in a Stream without auxiliary structures or peek()?

70 views Asked by At

I'm working with Java streams and I need a method to both count all elements in the stream and, at the same time, perform operations on a subset of them using skip() and limit(). I'm aiming to do this without using any auxiliary structures to reduce heap space usage, because the data entered can be very large.

I'd also prefer to steer clear of the peek() method due to its potential side effects in specific scenarios, as detailed in this 4Comprehension article.

public static <T> long extractAndProcess(Stream<T> streamData, int pageIndex, int itemsPerPage,   Consumer<T> itemHandler) {

    long startPosition = (long) pageIndex * itemsPerPage;
    AtomicLong totalCount = new AtomicLong();

    streamData.peek(item -> totalCount.incrementAndGet())
              .skip(startPosition)
              .limit(itemsPerPage)
              .forEachOrdered(itemHandler);

    return totalCount.get();
}

I tried to use peek() here, but in combination with .limit() it doesn't work as I assumed in the specific cases.

2

There are 2 answers

1
Louis Wasserman On BEST ANSWER

The only viable solution that deals with the case you've described looks like

streamData.forEachOrdered(elem -> {
  int i = totalCount.getAndIncrement();
  if (i >= startPosition && i < startPosition + itemsPerPage) {
    itemHandler.accept(i);
  }
})

I don't believe a better solution exists; streams are really not designed to do what you're trying to do with them.

0
M. Justin On

Here's a solution using the JEP 461: Stream Gatherers Java 22 preview language feature. As it's a preview feature of an upcoming Java release, it's not yet something that's ready for production code.

public static <T> long extractAndProcess(
        Stream<T> streamData, int pageIndex, int itemsPerPage, Consumer<T> itemHandler) {
    class Count {
        long count = 0;
    }

    long startPosition = (long) pageIndex * itemsPerPage;
    long endPosition = startPosition + itemsPerPage;

    return streamData
            .gather(Gatherer.<T, Count, Long>ofSequential(
                    Count::new,
                    Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                        long count = state.count++;
                        if (startPosition <= count && count < endPosition) {
                            itemHandler.accept(element);
                        }
                        return true;
                    }),
                    (state, downstream) -> downstream.push(state.count)))
            .findAny().orElseThrow();
}

This uses the new Stream.gather method with a custom Gatherer which counts all the elements in the stream, and applies the given itemHandler in order to any elements within the given page. The results of this gather method is a Stream<Long> with a single element equal to the count of items in the stream.

Internally, the count is captured in a single interim Count object, and its long count value is returned by the gatherer's finisher.

Javadocs

Gatherer:

An intermediate operation that transforms a stream of input elements into a stream of output elements, optionally applying a final action when the end of the upstream is reached. […]

[…]

There are many examples of gathering operations, including but not limited to: grouping elements into batches (windowing functions); de-duplicating consecutively similar elements; incremental accumulation functions (prefix scan); incremental reordering functions, etc. The class Gatherers provides implementations of common gathering operations.

API Note:

A Gatherer is specified by four functions that work together to process input elements, optionally using intermediate state, and optionally perform a final action at the end of input. They are:

Stream.gather(Gatherer<? super T,?,R> gatherer):

Returns a stream consisting of the results of applying the given gatherer to the elements of this stream.

Gatherer.ofSequential(initializer, integrator, finisher)

Returns a new, sequential, Gatherer described by the given initializer, integrator, and finisher.