I'm sowewhat new to rx and currently I am using it to schedule a cron job which is continuously polling a server via grpc (found that solution a time ago here on Stackoverflow).
The polling function with rx:
Observable.Interval(Timespan)
.Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Subscribe(m_CTS.Token);`
The error function which handles the occurring error:
private IObservable<System.Reactive.Unit> OnError(Exception aException)
{
LOG.Error("CronJob failed " + Job.Method+ " " + aException.Message );
return Observable.Empty<System.Reactive.Unit>();
}
When the task fails e.g. the server is not available, an exception is thrown. Rx catches the error with the error function, but doesn't continue to schedule/fire the task.
Apparently the empty observable is not sufficient for the sequence to continue.
I already tried to return an
Observable.FromAsync(() => Job(m_CTS.Token))
but this doesn't work either.
What is the correct return type to get the sequence going? Or maybe my approach is wrong? Is .Retry the better option?
To elimnate confusion here the whole class:
public class CronJob : ICronJob
{
private CancellationTokenSource m_CTS;
private static readonly ILog LOG = LogManager.GetLogger( System.Reflection.MethodBase.GetCurrentMethod().DeclaringType );
public Func<CancellationToken,Task> Job{ get; private set; }
public Func<Exception,IObservable< System.Reactive.Unit>> OnErrorFunc { get; set; }
public TimeSpan Timespan;
public bool Running { get; private set; } = false;
public CronJob(Func<CancellationToken,Task> aJob, TimeSpan aTimeSpan, Func<Exception,IObservable<System.Reactive.Unit>> aOnErrorFunc = null)
{
Job= aJob;
Timespan = aTimeSpan;
OnErrorFunc = aOnErrorFunc ?? OnError;
m_CTS = new CancellationTokenSource();
}
public void StartInstantly()
{
StartTask();
Observable.Interval(Timespan)
.StartWith(-1L)
.Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Subscribe(m_CTS.Token);
}
private void StartTask()
{
if (Running)
throw new Exception("Task allready started");
m_CTS = new CancellationTokenSource();
Running = true;
}
public void StartAfterTimeSpan()
{
StartTask();
Observable.Interval(Timespan)
.Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Subscribe(m_CTS.Token);
}
public void Stop()
{
m_CTS.Cancel();
Running = false;
}
public void SetTask(Func<CancellationToken,Task> aJob)
{
if(Running)
Stop();
Job= aJob;
}
private IObservable<System.Reactive.Unit> OnError(Exception aException)
{
LOG.Error("CronJob failed " + Job.Method+ " " + aException.Message );
// return Observable.Empty<System.Reactive.Unit>();
return Observable.Empty<System.Reactive.Unit>();
}
}
Retrycombined withCatchis closer to what you're looking for.Normally, an exception terminates the observable pipeline, by sending an Error notification to the subscription.
Catchcatches the Error notification, and replaces it with the new observable inCatchparameter. In your case, that's anEmpty: So the observable pipeline would terminate normally (with a Completed notification), but it would still terminate.Retrysuppresses the Error notification and tries to resubscribe the whole pipeline. Something like this is probably what you're after:For further reading, I recommend this documentation: http://introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html#Catch