how to race multiple zio effects and get first 2 that completes

109 views Asked by At

zio has method race which returns the first winner, and interrupts all "losers".

How to get the first two or n "winners" that finishes first, and interrupts all others?

The function signature looks like below, where n is the number of "winners", (not the max parallelism). The output list has size n.

final def raceN[R, E, A](n: Int)(as: Iterable[ZIO[R, E, A]]): ZIO[R, Nothing, List[Either[E,A]]]

One use case i can think of is load the same data from different source in parallel, only get the first 2 that returns first, and compare their content to make sure they are the same. If different, fail.

1

There are 1 answers

0
Lachezar On BEST ANSWER

You can use the Structured Concurrency primitives that ZIO offers to build this, but keep in mind that you need to think of cases like "long running effects that do not complete within reasonable time" (hint: may be you need a timeout).

Here is a short implementation of the raceN

object MyApp extends ZIOAppDefault:

  final def raceN[R, E, A](n: Int)(as: Iterable[ZIO[R, E, A]]): ZIO[R, Nothing, List[Either[E, A]]] =
    for {
      results    <- Ref.make(List.empty[Either[E, A]])
      latch      <- CountdownLatch.make(n)
      fibers     <- ZIO.collectAll {
                      as.map { (eff: ZIO[R, E, A]) =>
                        (eff
                          .either
                          .flatMap(effRes => results.update(effRes :: _))
                          *> latch.countDown).fork
                      }
                    }
      _          <- latch.await
      _          <- ZIO.foreachDiscard(fibers)(_.interrupt)
      raceResult <- results.get
    } yield raceResult

  override val run: UIO[ExitCode] =
    val effects = (1 to 5).map(n => ZIO.sleep(n.seconds).as(n).flatMap(n => ZIO.cond(n % 2 == 1, n, s"error: $n")))
    raceN(2)(effects).flatMap(res => Console.printLine(res.toString).orDie) *> ZIO.succeed(ExitCode.success)