Rx.NET Buffer emit all items on Cancellation

81 views Asked by At

I am using Rx.NET library and its method Buffer in the most simple manner like:

observable.Buffer(TimeSpan.FromSeconds(5), 10);

It works great except the case when cancellation token is activated. When that occurs I would like Buffer to emit all events which it holds at that moment and not to wait until Timer ticks. Is that possible?

Example: I have items 1, 2, 3 emitted in 2 seconds so limit 10 items is not reached and limit 5 seconds is not reached. Now cancellation is requested and I would like to get all buffered items to at least "see" them before ending request/process without waiting additional 3 seconds for timer.

1

There are 1 answers

3
Oleg Dok On BEST ANSWER

As I see the possible solution if I understand it correctly:

    public static IObservable<Unit> ToObservable(this CancellationToken ct) =>
        Observable.Create<Unit>(observer => ct.Register(() =>
            {
                observer.OnNext(Unit.Default);
                observer.OnCompleted();
            })
        );

    static void Main()
    {
        var cts = new CancellationTokenSource(6000);

        Observable.Interval(TimeSpan.FromSeconds(0.5))
            .TakeUntil(cts.Token.ToObservable())
            .Buffer(10)
            .Subscribe(b=>
            {
                // ... processing
            });
        Console.ReadLine();
    }

The main idea is to finish Buffer's source sequence on cancellation request - it makes Buffer immediately release collected items and finish its work, and later on Subscribe you just check if the cancellation was requested and make an appropriate decision upon data arrived.