Spring ReactiveMongoRepository function with nested reactive operations timing out

17 views Asked by At

I'm working with mongoDB and Spring Reactor for a school project. I'm trying to write a function where a Task record being created or updated triggers updates of all the relevant records in a User-Task join table. In this case, we're updating all the join records to store a user's unique sorting of their Tasks. The function works well enough in unit tests and when running locally, but when I deploy it to my team's free-tier PaaS hosting environment it times out so hard that it forces a hard restart of the whole container. I suspect that my function is written very inefficiently and would appreciate any suggestions on how to optimize it.

I was having a lot of trouble with race conditions between the Reactor doOnNext and doOnComplete in my initial implementation, so my current code is an unholy mix of reactive and imperative programming styles to force some level of sequential synchronous operation.

taskRepository and userTaskRepository are both extensions of ReactiveMongoRepository, and the userID logic isn't complete yet so for now the join records all have blank userIDs

return taskRepository.saveAll(tasks)
    .doOnNext(savedTask -> {
      userTaskRepository.findUserTaskByUserIdTaskId("", savedTask.getId())
          .hasElement().subscribe(joinRecordExists -> {
            // Create a new db entry linking the task to the user if none exists
            if (!joinRecordExists) {
              logger.info(
                  String.format("Inserting record for task: %s", savedTask.getTitle()));
              UserTask joinEntry = new UserTask();
              joinEntry.setUserId("");
              joinEntry.setTaskId(savedTask.getId());
              userTaskRepository.insert(joinEntry).subscribe(insertedTask -> {
                logger.debug(
                    String.format("Insertion result for usertask: %s",
                        insertedTask.toString()));
              });
            }
          });
    }).collectList().doAfterTerminate(() -> {
      List<Task> unsortedTasks = new ArrayList<>();
      List<UserTask> sortedJoinRecords = new ArrayList<>();
      logger.info("Insertion complete, starting sort operation...");
      // Select all tasks connected to the user
      userTaskRepository.findByUserId("")
          .collectList().doOnSuccess((joinList) -> {
            for (UserTask joinRecord : joinList) {
              Task taskRecord = taskRepository.findById(joinRecord.getTaskId()).block();
              if (taskRecord != null) {
                unsortedTasks.add(taskRecord);
              }
            }
            // Priority-Sort tasks then apply sorting values to UserTask records
            double currentPriorityValue = unsortedTasks.size();
            logger.info(String.format("Tasks to sort: %d", unsortedTasks.size()));
            for (Task sortedTask : prioritySortTasks(unsortedTasks)) {
              Optional<UserTask> userTaskOptional = joinList.stream()
                  .filter(ut -> ut.getTaskId().equals(sortedTask.getId())).findFirst();
              if (userTaskOptional.isPresent()) {
                UserTask userTask = userTaskOptional.get();
                userTask.setSortValue(currentPriorityValue--);
                sortedJoinRecords.add(userTask);
              }
            }
            // update UserTask records
            logger.debug(String.format("Saving %d records", sortedJoinRecords.size()));
            List<UserTask> output = userTaskRepository.saveAll(sortedJoinRecords).collectList()
                .block();
          }).subscribe();
    });

The function gives the following output when run either in unit tests or locally:

> Inserting record for task: Test Task 1
> Insertion result for task: UserTask(....)
> ...(repeat for 4 additional Tasks & UserTasks)...
> Insertion complete, starting sort operation...
> Tasks to sort: 5
> Saving 5 records
> Saved join record: UserTask(sortValue=3.0, ...)
> ...(repeat for 4 additional UserTasks)...

But when deployed to production the whole thing times out:

> Inserting record for task: (test data)
> Health contributor org.springframework.boot.actuate.autoconfigure.health.HealthEndpointConfiguration$AdaptedReactiveHealthContributors$1 (mongo) took 65100ms to respond
> (repeat with all the other AdaptedReactiveHealthContributors)

Any suggestions? The rest of the code can be found here: https://github.com/TobyMarch/1TaaT/tree/main/task-services/src/main/java/com/taat/taskservices

0

There are 0 answers