How to use sqlalchemy's on_conflict_do_update.returning to return updated values?

299 views Asked by At

I am trying to do an upsert statement and have the query return the updated values. On inserts, it works fine because there is no data but when there is an update, the query returns the old data that is getting updated.

This is my upsert statement-

def upsert(model, data, constraints):
    insert_stmt: Insert = insert(model).values(data)
    do_update_stmt = insert_stmt.on_conflict_do_update(
        index_elements=constraints,
        set_=data,
    )
    return do_update_stmt

I execute it and get the values like this-

upsert_query = upsert(
            model,
            new_data,
            ["constraint"],
        )
        try:
            upsert_response = db.session.execute(
                upsert_query.returning(model)
            )
            updated_model = upsert_response.fetchone()[0]

The issue is that updated_model here returns the old data. I can commit the query and it would give me the updated data as the model is changed but in my specific use case, I don't want to commit until more code after the above is ran and if the following code fails to execute I want to rollback. Unfortunately, I can't seem to get the rollback to happen after I commit.

My question is; is there a way to get the updated data from the response here instead on updates? If not, how can I rollback this commit? db is a SqlAlchemy() instance that is shared across the app.

1

There are 1 answers

0
Ian Wilson On BEST ANSWER

When returning ORM objects you have to populate existing objects otherwise they will not be updated.

There is an example here:

using-returning-with-upsert-statements

Here is another example I made

The key line is

    res = session.execute(q, execution_options={"populate_existing": True}).fetchone()[0]
import sys
from sqlalchemy import (
    create_engine,
    Integer,
    String,
)
from sqlalchemy.orm import Session, declarative_base, mapped_column
from sqlalchemy.sql import select
from sqlalchemy.dialects import postgresql

username, password, db = sys.argv[1:4]


engine = create_engine(f"postgresql+psycopg2://{username}:{password}@/{db}", echo=True)


Base = declarative_base()


Base.metadata.create_all(engine)


class User(Base):
    __tablename__ = "users"

    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String, unique=True)
    beverage = mapped_column(String, nullable=False)


Base.metadata.create_all(engine)


with Session(engine) as session:
    u1 = User(id=1, name="user1", beverage="coffee")
    u2 = User(id=2, name="user2", beverage="tea")
    session.add_all([u1, u2])
    session.commit()

def upsert(model, insert_data, update_data, index_elements):
    insert_stmt = postgresql.insert(model).values(insert_data)
    do_update_stmt = insert_stmt.on_conflict_do_update(
        index_elements=index_elements,
        set_=update_data,
    )
    return do_update_stmt


with Session(engine) as session:
    u1 = session.scalars(select(User).where(User.id == 1)).first()
    assert u1.beverage != 'water', "This should be the old value."
    data = dict(id=1, name="user1", beverage="water")
    q = upsert(User, data, dict((k, v) for (k, v) in data.items() if k != "id"), ["id"]).returning(User)

    res = session.execute(q, execution_options={"populate_existing": True}).fetchone()[0]
    assert res.beverage == 'water', "This should be the new value."
    assert u1.beverage == 'water', "Our pre-existing object should be the same object but check anyways."

    session.commit()

    # Now read it back and check again after commit.
    u1 = session.scalars(select(User).where(User.id == 1)).first()
    print (u1.beverage == 'water')