-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
DatabaseCrypt
Two methods illustrating how to use database-provided crypt() functions to produce transparent database-side password crypt and authentication. The example here uses Postgresql pgcrypto functions.
One example uses the ORM-level Hybrid Properties feature, and the other makes use of the Core TypeDecorator construct, in conjunction with the new Operator API and SQL bind processing features introduced in SQLAlchemy 0.8.
The key feature here is comparison of a database-encrypted password to a cleartext password, making use of the database-side crypt() function to encrypt the incoming cleartext. As this requires a function that uses both values simultaneously, we use a "Comparator" which provides for full customization of the __eq__()
operation. Both Hybrids and TypeDecorators make use of the same "Comparator" API, though the difference in usage results in slightly different implementations here.
The Hybrid Properties feature is ORM specific and allows us to define SQL comparison and assignment behavior at the mapped class level.
#!python
from sqlalchemy import Column, Integer, String, func
from sqlalchemy.orm import deferred
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property, Comparator
Base = declarative_base()
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
password_hashed = deferred(Column(String(40), nullable=False))
"""hashed password column.
deferred() here is optional. Using it prevents the
password_hashed column from being
fetched by default when a User object is loaded back from
the database.
"""
@hybrid_property
def password(self):
"""Relying upon database-side crypt() only, so in-Python usage
is notimplemented.
"""
raise NotImplementedError(
"Comparison only supported via the database")
class CryptComparator(Comparator):
"""A Comparator which provides an __eq__() method that will run
crypt() against both sides of the expression, to provide the
test password/salt pair.
"""
def __init__(self, password_hashed):
self.password_hashed = password_hashed
def __eq__(self, other):
return self.password_hashed == \
func.crypt(other, self.password_hashed)
@password.comparator
def password(cls):
"""Provide a Comparator object which calls crypt in the
appropriate fashion.
"""
return User.CryptComparator(cls.password_hashed)
@password.setter
def password(self, value):
"""assign the value of 'password',
using a UOW-evaluated SQL function.
See http://www.sqlalchemy.org/docs/orm/session.html#embedding-sql-insert-update-expressions-into-a-flush
for a description of SQL expression assignment.
"""
self.password_hashed = func.crypt(value, func.gen_salt('md5'))
if __name__ == '__main__':
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
engine = create_engine("postgresql://scott:tiger@localhost/test",
echo=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
session = Session(engine)
session.add_all([
User(name='user1', password="farb%349"),
User(name='user2', password="vy6kia%345"),
User(name='user3', password="0sm3EF88s"),
])
session.commit()
assert session.query(User.name).\
filter_by(password="vy6kia%345").scalar() == "user2"
assert session.query(User).\
filter_by(name='user3').\
filter_by(password="0sm3EF88s").count() == 1
assert session.query(User).\
filter_by(name='user1').\
filter_by(password="wrong").count() == 0
(requires 0.8 or above)
Using the TypeDecorator API, we can build the same crypt() feature entirely into a single datatype. TypeDecorator allows us to augment the behavior of a built-in type, in this case the String type. We install the crypt()
function as a "bind expression", which has the effect of an incoming bound value being wrapped within the given SQL expression when a statement is constructed. We also define a Comparator to provide the crypt() function during a comparison. This Comparator is similar to that of the hybrid, but includes that we use the type_coerce()
function to remove the PasswordType
from the expression first, so that we don't go into an endless loop when we re-use the __eq__()
operation.
#!python
from sqlalchemy import Column, Integer, String, func, TypeDecorator, type_coerce
from sqlalchemy.orm import deferred
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class PasswordType(TypeDecorator):
impl = String(40)
def bind_expression(self, bindvalue):
"""Apply a SQL expression to an incoming cleartext value being
rendered as a bound parameter.
For this example, this handler is intended only for the
INSERT and UPDATE statements. Comparison operations
within a SELECT are handled below by the Comparator.
"""
return func.crypt(bindvalue, func.gen_salt('md5'))
class comparator_factory(String.comparator_factory):
def __eq__(self, other):
"""Compare the local password column to an incoming cleartext
password.
This handler is invoked when a PasswordType column
is used in conjunction with the == operator in a SQL
expression, replacing the usage of the "bind_expression()"
handler.
"""
# we coerce our own "expression" down to String,
# so that invoking == doesn't cause an endless loop
# back into __eq__() here
local_pw = type_coerce(self.expr, String)
return local_pw == \
func.crypt(other, local_pw)
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
password = deferred(Column("password_hashed",
PasswordType, nullable=False))
"""password column.
The 'password' mapped attribute refers to a column
called 'password_hashed'. This illustrates a local
Python attribute named differently than the database column.
deferred() here is optional. Using it prevents the password
column from being fetched when a User object is loaded back
from the database.
"""
if __name__ == '__main__':
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
engine = create_engine("postgresql://scott:tiger@localhost/test",
echo=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
session = Session(engine)
session.add_all([
User(name='user1', password="farb%349"),
User(name='user2', password="vy6kia%345"),
User(name='user3', password="0sm3EF88s"),
])
session.commit()
assert session.query(User.name).\
filter_by(password="vy6kia%345").scalar() == "user2"
assert session.query(User).\
filter_by(name='user3').\
filter_by(password="0sm3EF88s").count() == 1
assert session.query(User).\
filter_by(name='user1').\
filter_by(password="wrong").count() == 0