IT이야기

SqlAlchemy로 upsert를 수행하는 방법

cyworld 2021. 10. 8. 21:08
반응형

SqlAlchemy로 upsert를 수행하는 방법?


데이터베이스에 존재하지 않으려는 레코드가 있고 이미 존재하는 경우(기본 키가 있음) 필드가 현재 상태로 업데이트되기를 원합니다. 이것은 종종 upsert 라고 합니다 .

다음의 불완전한 코드 조각은 작동하는 방식을 보여주지만 지나치게 투박해 보입니다(특히 열이 더 많은 경우). 더 나은/가장 좋은 방법은 무엇입니까?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

이 작업을 수행하는 더 좋거나 덜 장황한 방법이 있습니까? 다음과 같은 것이 좋습니다.

sess.upsert_this(desired_default, unique_key = "name")

unique_keykwarg는 분명히 불필요 하지만 (ORM이 이를 쉽게 파악할 수 있어야 함) SQLAlchemy가 기본 키에서만 작동하는 경향이 있기 때문에 추가했습니다. 예: Session.merge 가 적용 가능한지 여부를 살펴 보았지만 이는 기본 키에서만 작동합니다. 이 경우에는 이 목적에 그다지 유용하지 않은 자동 증분 ID입니다.

이에 대한 샘플 사용 사례는 기본 예상 데이터를 업그레이드했을 수 있는 서버 응용 프로그램을 시작할 때입니다. 즉, 이 upsert에 대한 동시성 문제가 없습니다.


SQLAlchemy에는 최근 버전에서 내장 session.add되었지만 이전에는 별도의 session.saveorupdate호출 이었던 "저장 또는 업데이트" 동작이 있습니다. 이것은 "업서트"가 아니지만 귀하의 요구에 충분할 수 있습니다.

여러 개의 고유 키가 있는 클래스에 대해 묻는 것이 좋습니다. 이것이 바로 이 작업을 수행하는 올바른 단일 방법이 없는 이유라고 생각합니다. 기본 키는 고유 키이기도 합니다. 고유한 제약 조건이 없고 기본 키만 있다면 충분히 간단한 문제일 것입니다. 주어진 ID가 없는 경우 또는 ID가 없음인 경우 새 레코드를 생성합니다. 그렇지 않으면 해당 기본 키로 기존 레코드의 다른 모든 필드를 업데이트합니다.

그러나 추가적인 고유 제약 조건이 있는 경우 이러한 간단한 접근 방식에는 논리적 문제가 있습니다. 개체를 "업서트"하고 개체의 기본 키가 기존 레코드와 일치하지만 다른 고유 열이 다른 레코드 와 일치하는 경우 어떻게 해야 합니까? 마찬가지로 기본 키가 기존 레코드와 일치하지 않지만 다른 고유 열 기존 레코드와 일치하면 어떻게 될까요? 귀하의 특정 상황에 대한 정답이 있을 수 있지만 일반적으로 단일 정답은 없다고 주장합니다.

이것이 내장된 "upsert" 작업이 없는 이유입니다. 애플리케이션은 각각의 특정 경우에 이것이 의미하는 바를 정의해야 합니다.


SQLAlchemy는 ON CONFLICT두 가지 방법 on_conflict_do_update()on_conflict_do_nothing().

문서 에서 복사 :

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)

나는 "뛰기 전에 살펴보기" 접근 방식을 사용합니다.

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
    filter(Switch_Command.switch_id == switch.id).\
    filter(Switch_Command.command_id == command.id).first()

# If we didn't get anything, make one
if not switch_command:
    switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)

# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()

session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()

장점은 이것이 db-neutral이고 읽기에 명확하다고 생각합니다. 단점은 다음과 같은 시나리오에서 잠재적인 경쟁 조건 이 있다는 것입니다 .

  • 우리는 db에 대해 쿼리하고 switch_command하나를 찾지 못합니다.
  • 우리는 생성 switch_command
  • 다른 프로세스 또는 스레드가 switch_command우리와 동일한 기본 키로 생성
  • 우리는 우리의 switch_command

요즘 SQLAlchemy는 두 가지 유용한 기능 on_conflict_do_nothingon_conflict_do_update. 이러한 기능은 유용하지만 ORM 인터페이스에서 더 낮은 수준의 SQLAlchemy Core로 전환해야 합니다.

이 두 함수가 SQLAlchemy의 구문을 사용하여 upserting을 구성하는 것은 그리 어렵지 않지만 이러한 함수는 upserting에 대한 완전한 즉시 사용 가능한 솔루션을 제공하는 것과는 거리가 멉니다.

나의 일반적인 사용 사례는 단일 SQL 쿼리/세션 실행에서 행의 큰 청크를 upsert하는 것입니다. 나는 일반적으로 upserting과 관련하여 두 가지 문제가 발생합니다.

예를 들어, 우리가 익숙했던 더 높은 수준의 ORM 기능이 누락되었습니다. ORM 개체를 사용할 수 없지만 대신 ForeignKey삽입 시 s 를 제공해야 합니다 .

두 가지 문제를 처리하기 위해 작성한 다음 함수를 사용 하고 있습니다.

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)

이것은 sqlite3 및 postgres에서 저에게 효과적입니다. 결합된 기본 키 제약 조건으로 실패할 수 있지만 추가 고유 제약 조건으로 인해 실패할 가능성이 큽니다.

    try:
        t = self._meta.tables[data['table']]
    except KeyError:
        self._log.error('table "%s" unknown', data['table'])
        return

    try:
        q = insert(t, values=data['values'])
        self._log.debug(q)
        self._db.execute(q)
    except IntegrityError:
        self._log.warning('integrity error')
        where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
        update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
        q = update(t, values=update_dict).where(*where_clause)
        self._log.debug(q)
        self._db.execute(q)
    except Exception as e:
        self._log.error('%s: %s', t.name, e)

아래는 redshift 데이터베이스에서 잘 작동하며 결합된 기본 키 제약 조건에서도 작동합니다.

출처 :

def start_engine() 함수에서 SQLAlchemy 엔진을 생성하는 데 필요한 몇 가지 수정 사항

from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql

Base = declarative_base()

def start_engine():
    engine = create_engine(os.getenv('SQLALCHEMY_URI', 
    'postgresql://localhost:5432/upsert'))
     connect = engine.connect()
    meta = MetaData(bind=engine)
    meta.reflect(bind=engine)
    return engine


class DigitalSpend(Base):
    __tablename__ = 'digital_spend'
    report_date = Column(Date, nullable=False)
    day = Column(Date, nullable=False, primary_key=True)
    impressions = Column(Integer)
    conversions = Column(Integer)

    def __repr__(self):
        return str([getattr(self, c.name, None) for c in self.__table__.c])


def compile_query(query):
    compiler = query.compile if not hasattr(query, 'statement') else 
  query.statement.compile
    return compiler(dialect=postgresql.dialect())


def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
    table = model.__table__

    stmt = insert(table).values(rows)

    update_cols = [c.name for c in table.c
                   if c not in list(table.primary_key.columns)
                   and c.name not in no_update_cols]

    on_conflict_stmt = stmt.on_conflict_do_update(
        index_elements=table.primary_key.columns,
        set_={k: getattr(stmt.excluded, k) for k in update_cols},
        index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
        )

    print(compile_query(on_conflict_stmt))
    session.execute(on_conflict_stmt)


session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])

이를 통해 문자열 이름을 기반으로 기본 모델에 액세스할 수 있습니다.

def get_class_by_tablename(tablename):
  """Return class reference mapped to table.
  https://stackoverflow.com/questions/11668355/sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to
  :param tablename: String with name of table.
  :return: Class reference or None.
  """
  for c in Base._decl_class_registry.values():
    if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
      return c


sqla_tbl = get_class_by_tablename(table_name)

def handle_upsert(record_dict, table):
    """
    handles updates when there are primary key conflicts

    """
    try:
        self.active_session().add(table(**record_dict))
    except:
        # Here we'll assume the error is caused by an integrity error
        # We do this because the error classes are passed from the
        # underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
        # them with it's own code - this should be updated to have
        # explicit error handling for each new db engine

        # <update>add explicit error handling for each db engine</update> 
        active_session.rollback()
        # Query for conflic class, use update method to change values based on dict
        c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
        c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk

        c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
        c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols

        c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()

        # apply new data values to the existing record
        for k, v in record_dict.items()
            setattr(c_target_record, k, v)

참조URL : https://stackoverflow.com/questions/7165998/how-to-do-an-upsert-with-sqlalchemy

반응형