With RxJava Ive become accustomed to my repositories returning Observables of data which automatically update whenever theres an underlying change. I acheive this by simply having a subject in my repository that gets notified with the relevant change info, and observables like getAll() go off of that.
As an example, take this psuedo code like snippet:
fun getAll(): Observable<List<Model> {
subject
.filter { isChangeRelevant(it) }
.startWith(initialChangeEvent)
.map { queryAll() }
}
Ive been curious about how and if the same thing can be acheived using coroutines only?
You can use Kotlin Coroutines
Channels.If you only want your values to be emitted like a stream (so you can
for-eachoff of it) you can useproduceto create them (which returns aReceiveChannel):You can use a for-each (or
consumeEach) on the values oftest()to receive its values.If you want your channel to be exactly like RxJava's PublishSubject, you can use ConflatedBroadCastChannel, and emit values to it:
You can use
broadCastChannel.offer(value)to send values to the channel.To receive values from the channel you can use a simple for-each loop: