So basically what I am trying to do is the following:
- Load Batch of Data from the Database
- Map that data (
Object[]query result) to a class representing the data in a readable format - Write to File
- Repeat until query gets no more results
I listed the structures that I am familiar with that seem to fit the need and why they don't fit my needs.
- Iterator → Has no option to map and filter without calling
next()- I need to define the map function in a subclass though without actually having the data (similar to a stream), so that I can pass the "Stream" way up to a calling class and only there call
next, which then calls all the map functions as a result
- I need to define the map function in a subclass though without actually having the data (similar to a stream), so that I can pass the "Stream" way up to a calling class and only there call
- Stream → All data needs to be available before mapping and filtering is possible
- Observable → Sends data as soon as it comes available. I need to process it in sync though
To get more of a feeling what I am trying to do, I made a small example:
// Disclaimer: "Something" is the structure I am not sure of now.
// Could be an Iterator or something else that fits (Thats the question)
public class Orchestrator {
@Inject
private DataGetter dataGetter;
public void doWork() {
FileWriter writer = new FileWriter("filename");
// Write the formatted data to the file
dataGetter.getData()
.forEach(data -> writer.writeToFile(data));
}
}
public class FileWriter {
public void writeToFile(List<Thing> data) {
// Write to file
}
}
public class DataGetter {
@Inject
private ThingDao thingDao;
public Something<List<Thing>> getData() {
// Map data to the correct format and return that
return thingDao.getThings()
.map(partialResult -> /* map to object */);
}
}
public class ThingDao {
public Something<List<Object[]>> getThings() {
Query q = ...;
// Dont know what to return
}
}
What I have got so far:
I tried to go from the base of an Iterator, because it's the only one that really fulfills my memory requirements. Then I have added some methods to map and loop over the data. It's not really a robust design though and it's going to be harder than I thought, so I wanted to know if there is anything out there already that does what I need.
public class QIterator<E> implements Iterator<List<E>> {
public static String QUERY_OFFSET = "queryOffset";
public static String QUERY_LIMIT = "queryLimit";
private Query query;
private long lastResultIndex = 0;
private long batchSize;
private Function<List<Object>, List<E>> mapper;
public QIterator(Query query, long batchSize) {
this.query = query;
this.batchSize = batchSize;
}
public QIterator(Query query, long batchSize, Function<List<Object>, List<E>> mapper) {
this(query, batchSize);
this.mapper = mapper;
}
@Override
public boolean hasNext() {
return lastResultIndex % batchSize == 0;
}
@Override
public List<E> next() {
query.setParameter(QueryIterator.QUERY_OFFSET, lastResultIndex);
query.setParameter(QueryIterator.QUERY_LIMIT, batchSize);
List<Object> result = (List<Object>) query.getResultList(); // unchecked
lastResultIndex += result.size();
List<E> mappedResult;
if (mapper != null) {
mappedResult = mapper.apply(result);
} else {
mappedResult = (List<E>) result; // unchecked
}
return mappedResult;
}
public <R> QIterator<R> map(Function<List<E>, List<R>> appendingMapper) {
return new QIterator<>(query, batchSize, (data) -> {
if (this.mapper != null) {
return appendingMapper.apply(this.mapper.apply(data));
} else {
return appendingMapper.apply((List<E>) data);
}
});
}
public void forEach(BiConsumer<List<E>, Integer> consumer) {
for (int i = 0; this.hasNext(); i++) {
consumer.accept(this.next(), i);
}
}
}
This works so far, but has some unchecked assignments which I do not really like and also I would like to have the ability to "append" one QIterator to another which is not hard by itself, but it should also take the maps that follow after the append.
Assume you have a DAO that provides data in a paginated manner, e.g. by applying the
LIMITandOFFSETclauses to the underlying SQL. Such a DAO class would have a method that takes those values as argument, i.e. the method would conform to the following functional method:E.g. calling
getData(0, 20)would return the first 20 rows (page 1), callinggetData(60, 20)would return the 20 rows on page 4. If the method returns less than 20 rows, it means we got the last page. Asking for data after the last row will return an empty list.For the demo below, we can mock such a DAO class:
If you then want to generate a
Streamof rows from that method, streaming all rows in blocks of a certain size, we need aSpliteratorfor that, so we can useStreamSupport.stream(Spliterator<T> spliterator, boolean parallel)to create a stream.Here is an implementation of such a
Spliterator:We can now test that using the mock DAO above:
Output
As can be seen, we get 13 rows of data, retrieved from the database in blocks of 5 rows.
The data is not retrieved from the database until it is needed, causing low memory footprint, depending on block size and the stream operation not caching the data.