In cats-effect, what is the safer alternative of `IO.unsafeRunCancelable`?

298 views Asked by At

In cats-effect 2, the safer alternative to this function would be IO.runCancelable.

In cats-effect 3, this API was replaced by its unsafe variant or a Dispatcher implementation, according to the following article:

https://typelevel.org/cats-effect/docs/migration-guide#io

I'm trying to figure out both replacement, the unsafe variant is fairly straightforward:

    
    import scala.concurrent.duration._
    implicit val ioRuntime: IORuntime = cats.effect.unsafe.IORuntime.builder().build()

    val base: IO[Unit] = IO {
      println("...")
    }.delayBy(1.second)

    val twenty: IO[Unit] = {

      val srcs = (1 to 20)
        .map { _ =>
          base
        }

      srcs.reduce(_ *> _)
    }

    val withCallback = twenty.guaranteeCase {
      case Outcome.Succeeded(fa) =>
        IO {
          println("SUCCESS!")
        }
      case Outcome.Errored(ee) =>
        IO {
          println(s"FAILUE: ${ee.toString}")
        }
      case _ => IO.pure() // do nothing
    }

    def timeoutIn(milliS: Int): Unit = {
      val cancelling = withCallback.unsafeRunCancelable()

      Thread.sleep(milliS)

      cancelling()
      println("finished")
    }

    timeoutIn(30000)

When I try to convert it into the Dispatcher variant:

    
    import scala.concurrent.duration._
    implicit val ioRuntime: IORuntime = cats.effect.unsafe.IORuntime.builder().build()

    val base: IO[Unit] = IO {
      println("...")
    }.delayBy(1.second)

    val twenty: IO[Unit] = {

      val srcs = (1 to 20)
        .map { _ =>
          base
        }

      srcs.reduce(_ *> _)
    }

    val withCallback = twenty.guaranteeCase {
      case Outcome.Succeeded(fa) =>
        IO {
          println("SUCCESS!")
        }
      case Outcome.Errored(ee) =>
        IO {
          println(s"FAILUE: ${ee.toString}")
        }
      case _ => IO.pure() // do nothing
    }

    def timeoutIn(milliS: Int): Unit = {
//      val cancelling = withCallback.unsafeRunCancelable()

      val cancelling = {

        val start: IO[() => Future[Unit]] = Dispatcher.sequential[IO](false).use { dispatcher =>
          def result = dispatcher.unsafeRunCancelable(
            withCallback
          )
          IO.delay(result)
        }

        start.unsafeRunSync()
      }

      Thread.sleep(milliS)

      cancelling()
      println("finished")
    }

    timeoutIn(30000)

I found that all effects defined in the chain of withCallback won't be executed, the program will just wait for 30 seconds and then terminates.

So I have 2 questions:

  • What's wrong with the second implementation and how to fix it to have the same behaviour?

  • If fixed, what are possible advantages of using the Dispatcher?

0

There are 0 answers