Rx .Net doesn't continue sequence after an error is caught

309 views Asked by At

I'm sowewhat new to rx and currently I am using it to schedule a cron job which is continuously polling a server via grpc (found that solution a time ago here on Stackoverflow).

The polling function with rx:

Observable.Interval(Timespan)
    .Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
    .Concat()
    .Catch((Exception ex) => OnErrorFunc(ex))
    .Subscribe(m_CTS.Token);`

The error function which handles the occurring error:

private IObservable<System.Reactive.Unit> OnError(Exception aException)
{
    LOG.Error("CronJob failed " + Job.Method+ " " + aException.Message );
    return Observable.Empty<System.Reactive.Unit>();
}

When the task fails e.g. the server is not available, an exception is thrown. Rx catches the error with the error function, but doesn't continue to schedule/fire the task.

Apparently the empty observable is not sufficient for the sequence to continue.

I already tried to return an

Observable.FromAsync(() => Job(m_CTS.Token)) 

but this doesn't work either.

What is the correct return type to get the sequence going? Or maybe my approach is wrong? Is .Retry the better option?

To elimnate confusion here the whole class:

public class CronJob : ICronJob
{

    private CancellationTokenSource m_CTS;
    private static readonly ILog LOG = LogManager.GetLogger( System.Reflection.MethodBase.GetCurrentMethod().DeclaringType );
    public Func<CancellationToken,Task> Job{ get; private set; }
    public Func<Exception,IObservable< System.Reactive.Unit>> OnErrorFunc { get; set; }
    public TimeSpan Timespan;
    public bool Running { get; private set; } = false;
    
    public CronJob(Func<CancellationToken,Task> aJob, TimeSpan aTimeSpan, Func<Exception,IObservable<System.Reactive.Unit>> aOnErrorFunc = null)
    {
        Job= aJob;
        Timespan = aTimeSpan;
        OnErrorFunc = aOnErrorFunc ?? OnError;
        m_CTS = new CancellationTokenSource();
    }
    
    public void StartInstantly()
    {
        StartTask();
        Observable.Interval(Timespan)
            .StartWith(-1L)
            .Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
            .Concat()
            .Catch((Exception ex) => OnErrorFunc(ex))
            .Subscribe(m_CTS.Token);
    }

    private void StartTask()
    {
        if (Running)
            throw new Exception("Task allready started");

        m_CTS = new CancellationTokenSource();
        Running = true;
    }

    public void StartAfterTimeSpan()
    {
        StartTask();
        
        Observable.Interval(Timespan)
            .Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
            .Concat()
            .Catch((Exception ex) => OnErrorFunc(ex))
            .Subscribe(m_CTS.Token);
            
    }

    public void Stop()
    {
        m_CTS.Cancel();
        Running = false;
    }

    public void SetTask(Func<CancellationToken,Task> aJob)
    {
        if(Running)
            Stop();
        Job= aJob;
    }

    private IObservable<System.Reactive.Unit> OnError(Exception aException)
    {
        LOG.Error("CronJob failed " + Job.Method+ " " + aException.Message );
      // return Observable.Empty<System.Reactive.Unit>();
      return Observable.Empty<System.Reactive.Unit>();
    }

}
2

There are 2 answers

0
Shlomo On

Retry combined with Catch is closer to what you're looking for.

Normally, an exception terminates the observable pipeline, by sending an Error notification to the subscription. Catch catches the Error notification, and replaces it with the new observable in Catch parameter. In your case, that's an Empty: So the observable pipeline would terminate normally (with a Completed notification), but it would still terminate.

Retry suppresses the Error notification and tries to resubscribe the whole pipeline. Something like this is probably what you're after:

    var retryLimit = 3;
    Observable.Interval(Timespan)
        .StartWith(-1L)
        .Select(l => Observable.FromAsync(() => Task(m_CTS.Token)))
        .Concat()
        .Catch((Exception ex) => OnErrorFunc(ex))
        .Retry(retryLimit) // will permanently die after 3 errors
        .Subscribe(m_CTS.Token);

For further reading, I recommend this documentation: http://introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html#Catch

0
Bluescreenterror On

If found what I was missing in an older question here on SO in an answer from @Enigmativity https://stackoverflow.com/a/70695753/11957956

First note I renamed my public Func<CancellationToken,Task> Task to public Func<CancellationToken,Task> Job to impede confusion. (I also edited my original question)

To get it running I wrapped my Job into a Func:

Func<long, Task> handler = async (i) =>
            {
                await Job(m_CTS.Token);
            };

and pass it like that:

Observable
 .Interval(Timespan)
 .Select(i => Observable.FromAsync(() => handler(i))
 .Catch<System.Reactive.Unit, Exception>(ex => OnErrorFunc(ex)))
 .Concat()
 .LastOrDefaultAsync()
 .Subscribe(m_CTS.Token);

@Shlomo Answer will also work. Which is the better soulition,IDK maybe someone with more experience can chime in and I will mark the best answer.