Listener for objects that are dropped by throttle

68 views Asked by At

Currently, I am trying to use throttle with my observable:

observable
         .throttleLatest(1000, TimeUnit.MILLISECONDS)
         .subscribe { myObject ->
             myObject.doA()
         }

The problem here is that I want to destroy all objects of the Observable. If it's dropped, destroy it immediately. Otherwise, destroy it after the onNext callback. Is there any kind of callback to know the objects that are dropped? Something likes:

observable
         .throttleLatest(1000, TimeUnit.MILLISECONDS)
         .onDrop { objectThatGotDropped ->
            objectThatGotDropped.destroyed()
         }
         .subscribe { myObject ->
             myObject.afterOneSecond()
             objectThatGotDropped.destroyed()
         }
1

There are 1 answers

1
akarnokd On

This is currently not possible in RxJava directly. I'll think about adding support for this.

For now, you could queue up items before the throttle, then call destroy on any of them not the current item:

ConcurrentQueue<T> queue = new ConcurrentLinkedQueue<>();
observable
         .doOnNext(queue::offer)
         .throttleLatest(1000, TimeUnit.MILLISECONDS)
         .subscribe ( myObject -> {
             while (!queue.isEmpty()) {
                 var entry = queue.poll();
                 if (entry == myObject) {
                     break;
                 }
                 entry.destroyed()
             }
             myObject.afterOneSecond()
         });

Two caveats though:

  • Items must be unique references, i.e., sending the very same object through the observable won't work.
  • Items are referenced for the duration of the throttling instead of released immediately when dropped.