ThreadLocal context lost with CompletableFuture in Java

39 views Asked by At

I have a class, DatabaseContextHolder, that manages the data source based on the user's API key using ThreadLocal.

public final class DatabaseContextHolder {

    private static final ThreadLocal<Stack<DataSourceType>> ctx = new ThreadLocal<>();

    public static void setCtx(DataSourceType dataSourceType) {
        getCtx().push(dataSourceType);
    }

    public static void restoreCtx() {
        final Stack<DataSourceType> ctx = getCtx();
        if (!ctx.isEmpty()) {
            ctx.pop();
        }
    }

    public static Optional<DataSourceType> peekDataSource() {
        final Stack<DataSourceType> ctx = getCtx();

        if (ctx.isEmpty()) {
            return Optional.empty();
        }

        return Optional.of(ctx.peek());
    }

    private static Stack<DataSourceType> getCtx() {
        if (ctx.get() == null) {
            ctx.set(new Stack<>());
        }

        return ctx.get();
    }
}

This works fine without multi-threading. However, when I use CompletableFuture, the ctx field is always empty inside the asynchronous tasks. Example of methods with CompletableFuture:

private final ExecutorService executorService = Executors.newCachedThreadPool();

public ResponseEntity<Map<String, List<ReportGroupScheduleExecutionBean>>> getScheduleExecutionByGroup(@RequestParam(value = "accountId") int accountId,
                                                                                                           @RequestParam("dateFrom") @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime dateFrom,
                                                                                                           @RequestParam("dateTo") @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime dateTo,
                                                                                                           @RequestParam(name = "groupIds") Set<Integer> groupIds,
                                                                                                           @AuthenticationPrincipal UserBean userBean) throws NoDataException {
        ZoneId userTimeZone = ZoneId.of(userBean.getUserTimeZone());
        DateTimeFormatter zoneTimeFormatter = DateTimeFormatter.ofPattern(ModelsConstants.PATTERN_TIME_ONLY_MINUTES).withZone(userTimeZone);
        AccountEntity account = accountService.findById(accountId).orElseThrow(() -> new NoDataException("Account not exist"));
        List<CompletableFuture<List<ReportGroupScheduleExecutionBean>>> futures = new ArrayList<>();
        groupIds.forEach(id -> {
            CompletableFuture<List<ReportGroupScheduleExecutionBean>> future = CompletableFuture.supplyAsync(
                    () -> {
                        try {
                            return reportService.getRouteScheduleExecutionReportByGroup(new ReportInfoBean(
                                    dateFrom, dateTo, account, userBean.getUserTimeZone(), zoneTimeFormatter), id);
                        } catch (NoDataException e) {
                            throw new RuntimeException(e);
                        }
                    }, executorService);
            futures.add(future);
        });
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        CompletableFuture<List<ReportGroupScheduleExecutionBean>> futuresAll = allFutures.thenApply
                (v -> futures.stream()
                        .map(CompletableFuture::join)
                        .flatMap(List::stream)
                        .collect(Collectors.toList())
                );
        try {
            return new ResponseEntity<>(futuresAll.get().stream().collect(Collectors.groupingBy(ReportGroupScheduleExecutionBean::getAccountName)), HttpStatus.OK);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

What I've Tried:

  • mark all methods as synchronized
  • tried different executors for CompletableFuture

How can I ensure that the ThreadLocal context is propagated correctly to threads spawned by CompletableFuture?

1

There are 1 answers

0
John Bollinger On

How can I ensure that the ThreadLocal context is propagated correctly to threads spawned by CompletableFuture?

You seem to have altogether the wrong idea about ThreadLocal. The whole point is that a given ThreadLocal object holds a different, independent value for each thread. Whatever value one thread set()s on it, that thread alone can get() back from it. The objective is to avoid sharing data between threads.

Thus ...

when I use CompletableFuture, the ctx field is always empty inside the asynchronous tasks

Yes! The asynchronous tasks run in different threads from the one in which you set a value in the ThreadLocal, so they are not supposed to see that value in the ThreadLocal. Nor a copy of it or any such thing, either. They get either its initial value or whatever value they last set themselves.

You can partially break that by causing your ThreadLocal instance to provide the same object as its initial value to all threads, but that is inappropriate under most circumstances. If you want a shared object then dispense with the ThreadLocal and set up such an object directly. ThreadLocal does not do anything useful for you in that case. In particular, it does not provide any kind of synchronization for a shared object exposed as its initial value to multiple threads.


It seems unlikely that you want thread-specific Stacks that are not also task-specific, but if you did want that then you could avoid each thread having to set() its own by configuring a ThreadLocal with an initial value supplier, via withInitial():

    private static final ThreadLocal<Stack<DataSourceType>> ctx =
            ThreadLocal.withInitial(Stack::new);

Again, I don't think that's what you actually want, but I include it for completeness.