-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
UniqueObject
Various patterns for instantiating an object that may or may not correspond to an existing row, kept unique on some field or set of fields.
note: for a newer pattern that's more targeted towards the goal of a "lookup" object attached to a parent, without needing to pass Session objects or use thread local sessions, see the UniqueObjectValidatedOnPending recipe.
We'll define a function _unique()
that will provide the "guts" to the unique recipe. This function is given a Session
to work with, and associates a dictionary with the Session()
which keeps track of current "unique" keys.
def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
cache = session.info.get("_unique_cache", None)
if cache is None:
session.info['_unique_cache'] = cache = {}
key = (cls, hashfunc(*arg, **kw))
if key in cache:
return cache[key]
else:
with session.no_autoflush:
q = session.query(cls)
q = queryfunc(q, *arg, **kw)
obj = q.first()
if not obj:
obj = constructor(*arg, **kw)
session.add(obj)
cache[key] = obj
return obj
The above function is given everything we need to check for an existing instance, query for it, or create:
widget = _unique(
session,
Widget,
lambda name:name,
lambda query, name:query.filter(Widget.name == name)
Widget,
(),
{"name":"some name"}
)
Some methods of integrating this method follows.
This is the most straightforward way to go; a @classmethod
is used to indicate the desire for a "unique"
object. The Session
is passed explicitly. The target class is augmented using a mixin, and it defines additional methods which produce the correct functionality for the "hash" and "query filter" functions.
class UniqueMixin(object):
@classmethod
def unique_hash(cls, *arg, **kw):
raise NotImplementedError()
@classmethod
def unique_filter(cls, query, *arg, **kw):
raise NotImplementedError()
@classmethod
def as_unique(cls, session, *arg, **kw):
return _unique(
session,
cls,
cls.unique_hash,
cls.unique_filter,
cls,
arg, kw
)
# optional asyncio version as well
@classmethod
async def async_as_unique(cls, async_session, *arg, **kw):
return await async_session.run_sync(cls.as_unique, *arg, **kw)
Usage:
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = create_engine('sqlite://', echo=True)
Session = sessionmaker(bind=engine)
class Widget(UniqueMixin, Base):
__tablename__ = 'widget'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True, nullable=False)
@classmethod
def unique_hash(cls, name):
return name
@classmethod
def unique_filter(cls, query, name):
return query.filter(Widget.name == name)
Base.metadata.create_all(engine)
session = Session()
w1, w2, w3 = Widget.as_unique(session, name='w1'), \
Widget.as_unique(session, name='w2'), \
Widget.as_unique(session, name='w3')
w1b = Widget.as_unique(session, name='w1')
assert w1 is w1b
assert w2 is not w3
assert w2 is not w1
session.commit()
This version overrides __new__()
to achieve a similar result. It also uses a global scoped_session. The target class is augmented using a class decorator, where lambdas are given to provide the functionality for the "hash" and "query filter" functions.
from functools import wraps
def unique_constructor(scoped_session, hashfunc, queryfunc):
def decorate(cls):
def _null_init(self, *arg, **kw):
pass
@wraps(cls)
def __new__(cls, bases, *arg, **kw):
# no-op __new__(), called
# by the loading procedure
if not arg and not kw:
return object.__new__(cls)
session = scoped_session()
def constructor(*arg, **kw):
obj = object.__new__(cls)
obj._init(*arg, **kw)
return obj
return _unique(
session,
cls,
hashfunc,
queryfunc,
constructor,
arg, kw
)
# note: cls must be already mapped for this part to work
cls._init = cls.__init__
cls.__init__ = _null_init
cls.__new__ = classmethod(__new__)
return cls
return decorate
Usage:
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = create_engine('sqlite://', echo=True)
Session = scoped_session(sessionmaker(bind=engine))
@unique_constructor(Session,
lambda name: name,
lambda query, name: query.filter(Widget.name == name)
)
class Widget(Base):
__tablename__ = 'widget'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True, nullable=False)
Base.metadata.create_all(engine)
w1, w2, w3 = Widget(name='w1'), \
Widget(name='w2'), \
Widget(name='w3')
w1b = Widget(name='w1')
assert w1 is w1b
assert w2 is not w3
assert w2 is not w1
Session.commit()
The above approach can be implemented with any combination of: mixins or class decorators, additional methods on Widget or lambdas, scoped session or explicit session, @classmethod
or overriding __new__()
.
Which one is more Pythonic? Likely the patterns present in the first version :).
I used this recipe for an answer in discussions once (https://github.com/sqlalchemy/sqlalchemy/issues/5374#issuecomment-2033453622) , for this one, objects have a collection of items automatically replaced with a uniqueified list:
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import UniqueConstraint
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import registry
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
mapper_registry = registry()
Base = declarative_base()
bridge_category = Table(
"bridge_category",
Base.metadata,
Column("video_id", ForeignKey("video.id"), primary_key=True),
Column("category_id", ForeignKey("category.id"), primary_key=True),
UniqueConstraint("video_id", "category_id"),
)
class Video(Base):
__tablename__ = "video"
id = Column(Integer, primary_key=True)
title = Column(String)
categories = relationship(
"Category", secondary=bridge_category, back_populates="videos"
)
class Category(Base):
__tablename__ = "category"
id = Column(Integer, primary_key=True)
text = Column(String, unique=True)
videos = relationship(
"Video", secondary=bridge_category, back_populates="categories"
)
def unique_categories(session_or_factory):
def _unique_categories(session, categories):
with session.no_autoflush:
local_existing_categories = session.info.get("categories", None)
if local_existing_categories is None:
session.info["categories"] = local_existing_categories = {}
existing_categories = {
c.text: c
for c in session.scalars(
select(Category).where(
Category.text.in_(
[
c.text
for c in categories
if c.text not in local_existing_categories
]
)
)
)
}
local_existing_categories.update(existing_categories)
result = []
for c in categories:
if c.text in local_existing_categories:
result.append(local_existing_categories[c.text])
else:
local_existing_categories[c.text] = c
result.append(c)
return result
@event.listens_for(session_or_factory, "before_attach", retval=True)
def before_attach(sess, obj):
if isinstance(obj, Video):
# uniquify the categories collection
obj.categories = _unique_categories(sess, obj.categories)
if __name__ == "__main__":
engine = create_engine("sqlite://", echo=True)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
unique_categories(Session)
v1 = Video(
title="A", categories=[Category(text="blue"), Category(text="red")]
)
v2 = Video(
title="B", categories=[Category(text="green"), Category(text="red")]
)
v3 = Video(
title="C", categories=[Category(text="grey"), Category(text="red")]
)
videos = [v1, v2, v3]
with Session() as s:
s.add_all(videos)
s.commit()