Java Runnable Interface and Threadpool: EntityManager transactions that are run by multiple threads don't seem to close

46 views Asked by At

I am trying to build a scheduler service using java.util.concurrent.ScheduledExecutorService. This will call a custom class that implements the Runnable interface, and the Runnable interface will update an entity in my database via a CRUD repository that uses EntityManager for DB transactions.

SchedulerInit.java:

private ScheduledExecutorService executorService;
...
executorService = Executors.newScheduledThreadPool(5);
...
executorService.scheduleWithFixedDelay(executableRequestsTask, delay, 1, timeUnit);

ExecutableRequestsTasks.java:


public class ExecutableRequestsTask implements Runnable {

  @Inject
  private BatchRequestInterface batchRequestInterface;

  @Inject
  private BatchRequestExecutorImpl batchRequestExecutor;



  @Override
  public void run() {
    try {
      Iterable<BatchRequest> batchRequests = batchRequestInterface.displayCurrentBatchRequests(
              BatchRequestStatus.SUBMITTED, Instant.now()
      );
      if (batchRequests.iterator().hasNext()) {
        batchRequests.forEach(request -> {
          try {
            batchRequestExecutor.execute(request);
          } catch (IOException e) {
            throw new RuntimeException(e);
          }
        });
      } else {
          log.info("Batch requests are null");
      }
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}


BatchRequestExecutorImpl.java:

import org.eclipse.microprofile.faulttolerance.Asynchronous;

public class BatchRequestExecutorImpl  {

  @Inject
  private BatchRequestInterface batchRequestInterface;

  @Inject
  private KafkaMessageProducer producer;


  @Asynchronous
  public CompletionStage<Void> execute(BatchRequest request) throws IOException {
    try {
      return CompletableFuture.runAsync(() -> {
        BatchRequest lockedBatchRequest = null;
        try {
          lockedBatchRequest = batchRequestInterface.lockRequestByIdAndStatus(request.getId(),
                  BatchRequestStatus.SUBMITTED);
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
        if (lockedBatchRequest != null) {
          request.updateBatchRequestStatus(BatchRequestStatus.SCHEDULED);
          try {
            batchRequestInterface.update(request, request.getId());
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
          // todo: publish to kafka topic
          TaskScheduledEvent taskScheduledEvent = ...
          try {
            broadcast(taskScheduledEvent);
            log.info("broadcasted kafka event");
          } catch (IOException e) {
            log.error(e.getMessage());
            throw new RuntimeException(e);
          }
        } else {
          log.error(" Could not find Batch Request {} in Submitted Status", request.getId());
        }
      });
    } catch (Exception e) {
      log.error(e.getMessage());
      throw new RuntimeException(e);
    }
  }

BatchRequestInterfaceImpl:

public class BatchRequestInterfaceImpl extends CrudRepository implements BatchRequestInterface {

    public Iterable<BatchRequest> displayCurrentBatchRequests(BatchRequestStatus status, Instant scheduledStartDate) {
        Query query = em.createQuery("select br from BatchRequest br where br.status = :status and br.scheduledStartDate < :scheduledStartDate order by br.scheduledStartDate, br.id");
        query.setParameter("status", status);
        query.setParameter("scheduledStartDate", scheduledStartDate);
        query.setLockMode(LockModeType.PESSIMISTIC_WRITE);
        return query.getResultList();
    }


    public BatchRequest lockRequestByIdAndStatus(Long requestId, BatchRequestStatus status) {
        Query query = em.createQuery("select br from BatchRequest br where br.status = :status and br.id = :requestId");
        query.setParameter("requestId", requestId);
        query.setParameter("status", status);
        query.setLockMode(LockModeType.PESSIMISTIC_WRITE);
        return (BatchRequest) query.getSingleResult();
    }
}

CrudRepository.java:

@Transactional
public class CrudRepository {

    @PersistenceContext
    protected EntityManager em;

    //other methods

    public <S extends T> S update(S entity, ID id) {
        try {
            log.info("About to merge this newly created entity");
            S updatedEntity = em.merge(entity);
            log.info("Just merged!");
            return updatedEntity;
        } catch (Exception e) {
            log.error("Could not merge entity, here is the error msg " + e);
            return null;
        }
    }


microprofile-config.properties:

javax.sql.DataSource.myDs.dataSourceClassName=org.h2.jdbcx.JdbcDataSource
javax.sql.DataSource.myDs.dataSource.url=jdbc:h2:mem:test;INIT=RUNSCRIPT FROM 'classpath:data-h2.sql'
javax.sql.DataSource.myDs.dataSource.user=sa
javax.sql.DataSource.myDs.dataSource.password=

Project configuration: -Framework: Helidon MP version 3.2.2 -DB: H2 (local) -Language: Java

When I run my application, and post a new BatchRequest via rest call, the expected behavior is the following log messages should appear in my console:

"About to merge this newly created entity" "Just merged!" "broadcasted kafka event"

The above should appear ONLY once per every new BatchRequest I post to my endpoint. In other words, when I post a new batch request, it should call displayCurrentBatchRequests() which will return that batch request only once, then, post this batch request to the DB via the update() method.

However, the actual behavior is the following:

org.eclipse.persistence.exceptions.OptimisticLockException
Exception Description: The object cannot be updated because it has changed or been deleted since it was last read. 
...
org.eclipse.persistence.exceptions.OptimisticLockException
Exception Description: The object cannot be updated because it has changed or been deleted since it was last read. 
...(infinite loop)

On the line "em.merge(entity);" in CrudRepository.update()

So there are few issues going on here: (1) Calling update() the first time seems to trigger an OptimisticLockException, when the expected behavior is it should just save the newly created entity in the DB. (2) Multiple threads seem to be calling the update() method at the same time, triggering an OptimisticLockException infinite times.

I am unsure why those are happening, as in my BatchRequestInterfaceImpl.lockRequestByIdAndStatus and displayCurrentBatchRequests, I am clearly doing pessimistic locking on those queries so multiple threads cannot read from those tables at the same time. I also have the CrudRepository annotated with @Transactional, so each CRUD operation should be its own transaction (including update).

Any suggestions on how to fix this issue?

Update 1: tried to lock EntityManager in my update() method before calling merge:

em.lock(entity, LockModeType.PESSIMISTIC_WRITE);

But got the error: Entity must be managed to call lock: try merging the detached and try the lock again.

Tried to manage the Entity first by doing the following in the update() method:

em.find(entity, id)
em.lock(...)
em.merge()

Which gave the exact same error

0

There are 0 answers