I have a source which emits Either[String, MyClass].
I want to call an external service with batches of MyClass and continue downstream with Either[String, ExternalServiceResponse], that's why I need to group elements of stream.
If the stream would emit only MyClass elements, it would be easy - just call grouped:
val source: Source[MyClass, NotUsed] = <custom implementation>
source
.grouped(10) // Seq[MyClass]
.map(callExternalService(_)) // ExternalServiceResponse
But how to group only elements on the right side of Either in my scenario?
val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
.??? // Either[String, Seq[MyClass]]
.map {
case Right(myClasses) => callExternalService(myClasses)
case Left(string) => Left(string)
} // Either[String, ExternalServiceResponse]
The following works, but is there any more idiomatic way?
val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
.groupBy(2, either => either.isRight)
.grouped(10)
.map(input => input.headOption match {
case Some(Right(_)) =>
callExternalService(input.map(item => item.right.get))
case _ =>
input
})
.mapConcat(_.to[scala.collection.immutable.Iterable])
.mergeSubstreams
This should transform a source of
Either[L, R]into a source ofEither[L, Seq[R]]with a configurable grouping ofRights.Beyond that, I'll note that the downstream code to call the external service could be rewritten as
If you can reliably call the external service with parallelism
n, it may also be worth doing that with:You could even, if wanting to maximize throughput at the cost of preserving order, replace
mapAsyncwithmapAsyncUnordered.