Mutiny.SessionFactory: use existing session for saving entities

1k views Asked by At

I have simple task: get data by artemis connection -> save data to DB. If we have open session - use it. Function for saving:

fun <T> merge(entity: T): Uni<T> {
        return sessionFactory.withTransaction { session -> session.merge(entity) }
}

If we have a single or rare event, then it works. But if we have several events in a short period of time (example two), then not all data is saved. Data from first event was saved, from second wasn't. Using logging and tests I reproduced this behavior:

sessionFactory.withTransaction { session1 ->
    session1.merge(entity1).invoke { _ ->
        sessionFactory.withTransaction { session2 ->
            session2.merge(entity2)
        }.subscribe().with(
            { println("success save entity2: $entity2") },
            { error -> println("error save entity2: $error") }
        )
    }
}.subscribe().with(
    { println("success save entity1: $entity1") },
    { error -> println("error save entity1: $error") }
)

That is, if data recording from the second event started after the merge function was called, but before the session was closed, the data will not be saved.

Next1. If I use an persist instead of a merge (session2.merge -> session2.persist), the data will be saved.

Next2. If I add a find before persist - behavior is the same as merge (doesn't save):

sessionFactory.withTransaction { session2 ->
    session2.find(Entry::class.java, entity2.id)
        .flatMap { session2.persist(entity2) }
//    session2.merge(entity2)
}

Next3. If I add flush before merge - I get error:

sessionFactory.withTransaction { session2 ->
    session2.flush()
        .flatMap { session2.merge(entity2) }
//    session2.merge(entity2)
}
E 13:24:23 23 [vert.x-eventloop-thread-0] errors.logSqlException - HR000057: Failed to execute statement [$1/* load Entity */ select entity0_.id as id1_3_0_, ... from Entity entity0_ where entity0_.id=$1]: $2could not load an entity: [Entity#b3c699b4-d490-4e56-bb76-904a1cb508a1]
error save entity2: javax.persistence.PersistenceException: org.hibernate.HibernateException: java.util.concurrent.CompletionException: io.vertx.pgclient.PgException: ERROR: current transaction is aborted, commands ignored until end of transaction block (25P02)
error save entity1: javax.persistence.PersistenceException: org.hibernate.HibernateException: io.vertx.pgclient.PgException: ERROR: duplicate key value violates unique constraint "entity_pkey" (23505)

Where b3c699b4-d490-4e56-bb76-904a1cb508a1 is entity2.id.

In this case, the entity1 is saved twice: first time after session2.flush() and second before session1 closing.

Question: how do save data from both events using an existing session? If I open a new session every time, there are no problems.

1

There are 1 answers

5
Davide D'Alto On

In the initial example, you are using .invoke instead of .call and there's no need to subscribe twice. But you also need to make sure the same session is not shared in parallel between pipelines.

What I mean is you shouldn't rely on code like this:

List<Uni<Void>> list = ...
for (Entity entity : entities) {
    list.add( session.persist(entity) );
}
Uni.join().all( list ).chain(session::flush);

It might look like it works in some cases, but it can lead to errors that are hard to debug. That's because the session is not thread-safe.

In your case, if you want to merge two entities using the same session, you need to make sure that the operations happen in the right order.

This should work for example:

sessionFactory.withTransaction { session ->
    session.merge(entity1)
        .invoke{ entity -> println("success save entity1: $entity") }
        .call { _ ->
            session.merge(entity2)
                    .invoke{ entity -> println("success save entity2: $entity") }
         }
}.subscribe().with(
    { println("Finished!") },
    { error -> println("Error: $error") }
)