Akka-stream group only Right elements of Either

419 views Asked by At

I have a source which emits Either[String, MyClass].

I want to call an external service with batches of MyClass and continue downstream with Either[String, ExternalServiceResponse], that's why I need to group elements of stream.

If the stream would emit only MyClass elements, it would be easy - just call grouped:

val source: Source[MyClass, NotUsed] = <custom implementation>
source
  .grouped(10)                 // Seq[MyClass]
  .map(callExternalService(_)) // ExternalServiceResponse

But how to group only elements on the right side of Either in my scenario?

val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
  .???                                                      // Either[String, Seq[MyClass]]
  .map {
    case Right(myClasses) => callExternalService(myClasses)
    case Left(string) => Left(string)
  }                                                         // Either[String, ExternalServiceResponse]

The following works, but is there any more idiomatic way?

val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
  .groupBy(2, either => either.isRight)
  .grouped(10)
  .map(input => input.headOption match {
    case Some(Right(_)) =>
      callExternalService(input.map(item => item.right.get))
    case _ =>
      input
  })
  .mapConcat(_.to[scala.collection.immutable.Iterable])
  .mergeSubstreams
2

There are 2 answers

0
Levi Ramsey On

This should transform a source of Either[L, R] into a source of Either[L, Seq[R]] with a configurable grouping of Rights.

def groupRights[L, R](groupSize: Int)(in: Source[Either[L, R], NotUsed]): Source[Either[L, Seq[R]], NotUsed] =
  in.map(Option _)  // Yep, an Option[Either[L, R]]
    .concat(Source.single(None)) // to emit when `in` completes
    .statefulMapConcat { () =>
      val buffer = new scala.collection.mutable.ArrayBuffer[R](groupSize)

      def dumpBuffer(): List[Either[L, Seq[R]] = {
        val out = List(Right(buffer.toList))
        buffer.clear()
        out
      }

      incoming: Option[Either[L,R]] => {
        incoming.map { _.fold(
            l => List(Left(l)),  // unfortunate that we have to re-wrap
            r => {
              buffer += r
              if (buffer.size == groupSize) {
                dumpBuffer()
              } else {
                Nil
              }
            }
          )
        }.getOrElse(dumpBuffer()) // End of stream
      }
    }

Beyond that, I'll note that the downstream code to call the external service could be rewritten as

.map(_.right.map(callExternalService))

If you can reliably call the external service with parallelism n, it may also be worth doing that with:

.mapAsync(n) { e.fold(
    l => Future.successful(Left(l)),
    r => Future { Right(callExternalService(r)) }
  )
}

You could even, if wanting to maximize throughput at the cost of preserving order, replace mapAsync with mapAsyncUnordered.

0
Xavier Guihot On

You could divide your source of eithers in two branches in order to process rights their own way and then merge back the two sub-flows:

// case class MyClass(x: Int)
// case class ExternalServiceResponse(xs: Seq[MyClass])
// def callExternalService(xs: Seq[MyClass]): ExternalServiceResponse =
//    ExternalServiceResponse(xs)
// val source: Source[Either[String, MyClass], _] =
//   Source(List(Right(MyClass(1)), Left("2"), Right(MyClass(3)), Left("4"), Right(MyClass(5))))

val lefts: Source[Either[String, Nothing], _] =
  source
    .collect { case Left(l) => Left(l) }

val rights: Source[Either[Nothing, ExternalServiceResponse], _] =
  source
    .collect { case Right(x: MyClass) => x }
    .grouped(2)
    .map(callExternalService)
    .map(Right(_))

val out: Source[Either[String, ExternalServiceResponse], _] = rights.merge(lefts)

// out.runForeach(println)
// Left(2)
// Right(ExternalServiceResponse(Vector(MyClass(1), MyClass(3))))
// Left(4)
// Right(ExternalServiceResponse(Vector(MyClass(5))))