Convert registration-based service to Flow

43 views Asked by At

I have an external service with a simple API, based on registration :

  • API.listen(listener) will register the listener to receive updates whenever they come in
  • API.stopListening(listener) will remove the listener, which will no longer receive updates

I believe that a Kotlin Shared-Flow would be a good representation, and more convenient to work with, so I'm trying to bridge the two apis.

However, I can't figure out when / how to remove the listener when the flow should be stopped.

fun API.toFlow(): SharedFlow<Int> {
  val flow = MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
  val listener = Listener { newValue -> flow.tryEmit(newValue) }
  this.listen(listener)
  // TODO: remove listener at some point
  return flow
}
1

There are 1 answers

4
Leviathan On BEST ANSWER

There is a dedicated flow builder for exactly this kind of scenario, the callbackFlow:

fun API.toFlow() = callbackFlow {
    val listener = Listener { newValue -> trySendBlocking(newValue) }
    [email protected](listener)

    awaitClose { stopListening(listener) }
}
//.shareIn(...)
//.stateIn(...)

Inside the lambda you define and register the listener. For each value that should be emitted to the flow, you need to call one of the send functions (or trySend, or trySendBlocking, depending on your needs for error handling).

When the flow is done (after calling cancel or close inside the builder block, or the collecting coroutine is cancelled), the block of awaitClose is called. This can be used to clean up after you are done listening, like unregistering the listener.

This flow builder creates a cold flow, meaning that every time the flow is collected, the builder block is executed again (even if API.toFlow() was called only once). If you want to share this flow for multiple collectors, you can convert it to a SharedFlow by calling shareIn. The configuration of the flow from your example looks a lot like a StateFlow, though, so you can directly call stateIn instead of shareIn.

Regarding the comment about stopping a flow made hot by calling shareIn or stateIn, let's take a look at what actually goes on when a flow generated by a callbackFlow is collected.

Let's assume we created a StateFlow (compared to a generic SharedFlow it will implicitly have replay = 1 and onBufferOverflow = BufferOverflow.DROP_OLDEST with an additional distinctUntilChanged()) with this:

.stateIn(
    scope = flowScope,
    started = SharingStarted.WhileSubscribed(),
    initialValue = initial,
)

Now, we need a value for flowScope which will be the CoroutinesScope that should be used to run the flow. It is needed because the flow is now hot and it will run even when there is no one to collect it (not needed for cold flows since the collecting coroutine scope is used for that).

In addition, we need an initial value that instantly provides a value for the flow. A StateFlow always has a value, even when the underlying flow (here the callbackFlow) hasn't yet emitted a new value.

These two values can be provided as a parameter to the API.toFlow() function, for example.

Regarding your question, the parameter started is the most interesting, though: It defines how the underlying flow is started. With SharingStarted.WhileSubscribed() the underlying flow starts when there is at least one collector subscribed to the StateFlow. The flow is being stopped when the last collector unsubscribed.

For the callbackFlow that means that the builder block is executed every time when a collector subscribes to the StateFlow and is the only one doing so. Every subsequent subscribers will receive the same values the first subscriber receives. This way collectors can subscribe and unscubscribe from the StateFlow, the callbackFlow will only notice the first one.

Only when the last collector unsubscribed, the callbackFlow is notices that it should be closed. In that case the awaitClose close lambda is executed und the flow is completed.

The StateFlow, however, is still active (StateFlows never complete). Should a new collector subscribe to the flow, the underlying flow is restarted. For our callbackFlow that means that the builder block is executed again and a new flow (with a new listener) is created.

As I understood, this is your desired behavior, and SharingStarted.WhileSubscribed() provides this out-of-the-box. There are other SharingStarted behaviors, though.

One final note: If you frequently add and remove subscribers to your StateFlow you may run into the situation where the underlying callbackFlow is repeatedly stopped and restarted, which may be costly depending on how your Listener works. In that case you can specify a time period as an optional parameter to SharingStarted.WhileSubscribed() for that the underlying flow is kept alive even after the last collector unsubscribed. If in that time period a new collector subscribes, the StateFlow resumes as though nothing happened and the underlying callbackFlow never gets stopped and restarted. A common value would be 5000 milliseconds, but you can put there anything that fits your use case.