TPL ActionBlock not handling messages after an exception

58 views Asked by At

I've setup an ActionBlock<T> like so:

 private readonly ActionBlock<PresentationListNotification> _actionBlock;
    
    ...
    
  _actionBlock = new ActionBlock<PresentationListNotification>(HandleNotification);

Each time i wish to invoke the actionblock i do so like this:

try
{
    _actionBlock.Post(notification);
}
catch (AggregateException aggException)
{
    foreach (Exception ex in aggException.InnerExceptions)
    {
        _logger.LogError(ex,"Exception {Message} occurred processing notification", ex.Message);
    }
}

This all works fine, but I've noticed that when an exception occurred in the HandleNotification delegate (the actual implementation isn't probably important but it occurred when I was trying the set the result on a TaskCompletionSource which was in a faulted state), the actionblock would fail to post subsequent messages to my handler. In the debugger I could see that the IsCompleted and IsDecliningPermanently properties were True. How do I recover when the actionblock is in this state?

2

There are 2 answers

0
Theodor Zoulias On BEST ANSWER

How do I recover when the actionblock is in this state?

You can't. When a dataflow block has faulted, it will remain in this state forever. There is no recovering after that. You could consider:

  1. Prevent the block from failing, by try-catching any exception thrown inside the action.
  2. After the failure, replace the faulted block with a new one.
0
mdisibio On

As a fan of TPL Dataflow, I've settled on the following design pattern, unless you are satisfied with the default behavior that the entire block stops processing messages when an unhandled exception occurs.

  • Handle all exceptions in the delegate with which the block is created.

  • Wrap your input and/or output in a state object that conveys not only the actual input message and/or output message but if the operation was successful and captures the exception message if not (much like Task<T> captures the result or an exception).

Either:

a) Each block in the pipeline checks if the previous operation was successful or not and either passes on the error or continues with it's happy path operation.

b) Or each block uses the propagation extensions to connect to the next, with a filter clause that sends failed messages to a standalone queue or block for error handling/logging.

If 'a', then at the end of the pipeline, have a propagation clause with a filter that passes the final state object to an error block or success block, and does whatever final step is necessary.

Don't forget to send a completion signal to any standalone blocks when your primary pipeline is complete.