Faust Streaming retry topic mechanism

625 views Asked by At

I have two topics

  • main_topic
  • retry_topic

I want if the logic fails in main_topic it should call retry_topic and if that fails so there should be max 3 retries that it should do.

I've tried using sink in faust streaming what it does is that it yield result from my main topic to retry_topic but I'm still not able to limit it to 3 retries.

Is there a way to do that in faust/kafka streaming because I know that celery has this feature.

1

There are 1 answers

0
Artem Ilin On

One way to achieve that is to use headers.

@app.agent(topic)
async def topic_handler(stream: faust.Stream):
    async for event in stream:
        try:
            await process_event(event)
        except Exception as e:
            logger.error(f'Error processing')
            current_try = int(stream.current_event.headers.get('retries', b'0').decode()) + 1
            await asyncio.sleep(min(current_try * 10, MAX_RETRY_WAIT))
            await retry_topic.send(key=event['id'], value=event,
                                   headers={'retries': str(current_try).encode(),
                                            'error': repr(e).encode()})

Now if you want to use the same agent for two topics, before this agent you may define topic as one of main_topic or retry_topic, something like this:

use_retry_topic = os.environ['USE_RETRY_TOPIC']
topic = app.topic(retry_topic_name if use_retry_topic else main_topic_name)
retry_topic = app.topic(retry_topic_name)

This way you need two processes. One starts with USE_RETRY_TOPIC = False, it reads main topic and if something goes wrong it sends a message to retry_topic after a delay. The other process starts with USE_RETRY_TOPIC = True, it consumes the retry topic and if something goes wrong again - sends the message to the very same topic once again, but with incremented retries count.

You can add a condition to check the retries count if it is greater than 3 if you want.

Please note that this delay logics maybe not very safe, e.g. if the process fails unexpectedly while waiting to send the message into the retry_topic, this message might be lost.

Also this approach may break the message order, link