Spring Integration delay along with thread MDC context

60 views Asked by At

I've a spring integration flow in which i have currently introduced a manual conditional delay by using Thread.sleep()

I understand that this is inefficient and would now like to refactor the whole flow using spring integrations native delay support

However my current application is very much single threaded and as such there are a lot of monitoring/tracing libraries adding their context in the threadcontext/mdc

 IntegrationFlow.from("input")
            .handle((message, h) -> {
                // initial logic which may set thread context
            })
            .handle((message, h) -> {
            // conditional delay logic invoking Thread.delay()
            })
            .handle((message, h) -> {
            // subsequent logic which may rely on the thread context
            })
            .channel("output")
            .get();

The channel is being defined in this way

 @Bean(name = "input")
public MessageChannel getChannel() {
    DirectChannel dc = new DirectChannel();
    dc.setComponentName("input");
    dc.setDatatypes(ConsumerRecord.class);
    return dc;
}

Even the thread which is sending the message to this channel can set something in the mdc; this works because i'm using DirectChannel and per my understanding for DirectChannel the execution of the flow happens on the calling thread only.

Now will it work if i refactor this to something similar to the below code

    IntegrationFlow.from("input")
            .handle((message, h) -> {
                // initial logic which may set thread context
            })
            .delay(d -> {// some delay config})
            .handle((message, h) -> {
            // subsequent logic which may rely on the thread context
            })
            .channel("output")
            .get();

My hunch is that the context will get lost the moment the message goes into the delay channel and the next time whichever thread gets assigned to this task will not remember the context and as such my subsequent logic would fail.

What is the right way to achieve this in the world of spring-integration ?? Thanks in advance.

1

There are 1 answers

0
Artem Bilan On

That's correct. The DelayHandler uses a TaskScheduler to delay those messages via scheduled tasks. And it is not a surprise that thread locals are lost in a scheduled thread.

One of the simplest way to propagate that thread local context (MDC in your case) is to set it into a message header and then restore it back to thread local in the next step after delayer.

Another way is to implement a TaskScheduler facade similar to the DelegatingSecurityContextTaskScheduler and inject it into that delay() function.

UPDATE

Here is some idea how MDC propagation can be implemented: https://moelholm.com/blog/2017/07/24/spring-43-using-a-taskdecorator-to-copy-mdc-data-to-async-threads

It is discussed here: How to use MDC with thread pools?