I have an external service with a simple API, based on registration :
API.listen(listener)will register the listener to receive updates whenever they come inAPI.stopListening(listener)will remove the listener, which will no longer receive updates
I believe that a Kotlin Shared-Flow would be a good representation, and more convenient to work with, so I'm trying to bridge the two apis.
However, I can't figure out when / how to remove the listener when the flow should be stopped.
fun API.toFlow(): SharedFlow<Int> {
val flow = MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val listener = Listener { newValue -> flow.tryEmit(newValue) }
this.listen(listener)
// TODO: remove listener at some point
return flow
}
There is a dedicated flow builder for exactly this kind of scenario, the
callbackFlow:Inside the lambda you define and register the listener. For each value that should be emitted to the flow, you need to call one of the
sendfunctions (ortrySend, ortrySendBlocking, depending on your needs for error handling).When the flow is done (after calling
cancelorcloseinside the builder block, or the collecting coroutine is cancelled), the block ofawaitCloseis called. This can be used to clean up after you are done listening, like unregistering the listener.This flow builder creates a cold flow, meaning that every time the flow is collected, the builder block is executed again (even if
API.toFlow()was called only once). If you want to share this flow for multiple collectors, you can convert it to aSharedFlowby callingshareIn. The configuration of the flow from your example looks a lot like aStateFlow, though, so you can directly callstateIninstead ofshareIn.Regarding the comment about stopping a flow made hot by calling
shareInorstateIn, let's take a look at what actually goes on when a flow generated by acallbackFlowis collected.Let's assume we created a
StateFlow(compared to a genericSharedFlowit will implicitly havereplay = 1andonBufferOverflow = BufferOverflow.DROP_OLDESTwith an additionaldistinctUntilChanged()) with this:Now, we need a value for
flowScopewhich will be theCoroutinesScopethat should be used to run the flow. It is needed because the flow is now hot and it will run even when there is no one to collect it (not needed for cold flows since the collecting coroutine scope is used for that).In addition, we need an
initialvalue that instantly provides a value for the flow. AStateFlowalways has a value, even when the underlying flow (here thecallbackFlow) hasn't yet emitted a new value.These two values can be provided as a parameter to the
API.toFlow()function, for example.Regarding your question, the parameter
startedis the most interesting, though: It defines how the underlying flow is started. WithSharingStarted.WhileSubscribed()the underlying flow starts when there is at least one collector subscribed to theStateFlow. The flow is being stopped when the last collector unsubscribed.For the
callbackFlowthat means that the builder block is executed every time when a collector subscribes to theStateFlowand is the only one doing so. Every subsequent subscribers will receive the same values the first subscriber receives. This way collectors can subscribe and unscubscribe from theStateFlow, thecallbackFlowwill only notice the first one.Only when the last collector unsubscribed, the
callbackFlowis notices that it should be closed. In that case theawaitCloseclose lambda is executed und the flow is completed.The
StateFlow, however, is still active (StateFlows never complete). Should a new collector subscribe to the flow, the underlying flow is restarted. For ourcallbackFlowthat means that the builder block is executed again and a new flow (with a new listener) is created.As I understood, this is your desired behavior, and
SharingStarted.WhileSubscribed()provides this out-of-the-box. There are otherSharingStartedbehaviors, though.One final note: If you frequently add and remove subscribers to your
StateFlowyou may run into the situation where the underlyingcallbackFlowis repeatedly stopped and restarted, which may be costly depending on how yourListenerworks. In that case you can specify a time period as an optional parameter toSharingStarted.WhileSubscribed()for that the underlying flow is kept alive even after the last collector unsubscribed. If in that time period a new collector subscribes, theStateFlowresumes as though nothing happened and the underlyingcallbackFlownever gets stopped and restarted. A common value would be 5000 milliseconds, but you can put there anything that fits your use case.