-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
SessionModifiedSQL
This recipe allows modification of SQL keyed to a particular Session. Examples include application of SQL comments or alternate table names within statements at execution time.
The ORM caches the compiled form of many of the SQL statements used during a flush, so while many recipes that provide for on-the-fly alteration of SQL use the compiler extension, in this case we need to ensure that we always are given access to the SQL string on every execution, so we use the before_cursor_execute event to intercept the SQL right before it's passed to the DBAPI cursor.
The recipe illustrates the usage of both ORM and Core events, as well as usage
of the Connection.info
and Session.info
dictionaries. Other use cases that
involve coordination between Session
and Connection
might build off of
similar features.
from contextlib import contextmanager
from sqlalchemy import event
from sqlalchemy.orm import Session
from sqlalchemy.engine import Engine
@event.listens_for(Session, "after_begin")
def _connection_for_session(session, trans, connection):
"""Share the 'info' dictionary of Session with Connection
objects.
This occurs as new Connection objects are associated with the
Session. The .info dictionary on Connection is local to the
DBAPI connection.
"""
connection.info['session_info'] = session.info
@contextmanager
def session_comment(session, comment):
"""Apply the given comment to all SQL emitted by the given Session.
"""
session.info["comment"] = comment
yield
del session.info["comment"]
@contextmanager
def session_shardid(session, shardid):
"""Apply the "shard" id to all SQL emitted by the given Session.
"""
session.info["shardid"] = shardid
yield
del session.info["shardid"]
@event.listens_for(Engine, "before_cursor_execute", retval=True)
def _apply_comment(connection, cursor, statement, parameters,
context, executemany):
"""Apply comments to statements.
We intercept all statement executions at the cursor level, where the
before_cursor_execute() event gives us the final string SQL statement
in all cases and also gives us a change to modify the string.
"""
session_info = connection.info.get('session_info', {})
if "comment" in session_info:
statement = statement + " -- %s" % session_info["comment"]
return statement, parameters
@event.listens_for(Engine, "before_cursor_execute", retval=True)
def _apply_shard_id(connection, cursor, statement, parameters,
context, executemany):
"""Apply a "shard id" to statements.
Similar to the comment listener, we alter the statement on the
fly replacing occurrences of "_shard_" with the current "shard id".
"""
session_info = connection.info.get('session_info', {})
if "shardid" in session_info:
statement = statement.replace("_shardid_", session_info["shardid"])
return statement, parameters
if __name__ == '__main__':
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.declarative import declarative_base
import logging
logging.basicConfig(format="%(message)s")
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
Base = declarative_base()
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
data = Column(String)
class B(Base):
__tablename__ = "b__shardid_"
# or, apply "_shardid_" to the "schema", so you get
# "select * from myshard.b"
# __tablename__ = "b"
# __table_args__ = {"schema": "_shardid_"}
id = Column(Integer, primary_key=True)
data = Column(String)
e = create_engine("sqlite://")
Base.metadata.create_all(e, tables=[A.__table__])
s = Session(e)
with session_shardid(s, "s1"):
Base.metadata.create_all(s.connection(), tables=[B.__table__])
with session_shardid(s, "s2"):
Base.metadata.create_all(s.connection(), tables=[B.__table__])
s.add(A(data='d1'))
s.commit()
with session_comment(s, "comment one"):
s.add(A(data='d2'))
d1 = s.query(A).filter_by(data='d1').one()
d1.data = 'd1modified'
s.commit()
with session_shardid(s, "s1"):
s.add(B(data='d2'))
s.commit()
with session_comment(s, "using shard s1"):
with session_shardid(s, "s1"):
s.query(A).join(B, A.data == B.data).all()
Output:
PRAGMA table_info("a")
()
CREATE TABLE a (
id INTEGER NOT NULL,
data VARCHAR,
PRIMARY KEY (id)
)
()
COMMIT
BEGIN (implicit)
PRAGMA table_info("b_s1")
()
CREATE TABLE b_s1 (
id INTEGER NOT NULL,
data VARCHAR,
PRIMARY KEY (id)
)
()
PRAGMA table_info("b_s2")
()
CREATE TABLE b_s2 (
id INTEGER NOT NULL,
data VARCHAR,
PRIMARY KEY (id)
)
()
INSERT INTO a (data) VALUES (?)
('d1',)
COMMIT
BEGIN (implicit)
INSERT INTO a (data) VALUES (?) -- comment one
('d2',)
SELECT a.id AS a_id, a.data AS a_data
FROM a
WHERE a.data = ? -- comment one
('d1',)
UPDATE a SET data=? WHERE a.id = ? -- comment one
('d1modified', 1)
COMMIT
BEGIN (implicit)
INSERT INTO b_s1 (data) VALUES (?)
('d2',)
COMMIT
BEGIN (implicit)
SELECT a.id AS a_id, a.data AS a_data
FROM a JOIN b_s1 ON a.data = b_s1.data -- using shard s1
()