-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
SymmetricEncryptionClientSide
mike bayer edited this page Jul 26, 2022
·
10 revisions
These examples illustrate client-side symmetric encryption. For an example using server-side symmetric encryption, see SymmetricEncryptionServerSide.
Value processing via a hybrid, introduced at http://www.sqlalchemy.org/docs/orm/extensions/hybrid.html, to represent a model attribute in both an encrypted (or encoded, or whatever), database-backed form as well as an ad-hoc decrypted form. A second example illustrates the same idea using a TypeDecorator, introduced at http://www.sqlalchemy.org/docs/core/types.html#augmenting-existing-types.
import binascii
import uuid
from Crypto.Cipher import AES
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import Comparator
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import Session
key = uuid.uuid4().bytes
"""The encryption key. Random for this example."""
nonce = uuid.uuid4().bytes
"""for WHERE criteria to work, we need the encrypted value to be the same
each time, so use a fixed nonce if we need that feature.
"""
def aes_encrypt(data):
cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
data = data + (" " * (16 - (len(data) % 16)))
return cipher.encrypt(data.encode("utf-8")).hex()
def aes_decrypt(data):
cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
return cipher.decrypt(binascii.unhexlify(data)).decode("utf-8").rstrip()
Base = declarative_base()
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
encrypted_value = Column(String, nullable=False)
@hybrid_property
def value(self):
return aes_decrypt(self.encrypted_value)
@value.setter
def value(self, value):
self.encrypted_value = aes_encrypt(value)
class encrypt_comparator(Comparator):
def operate(self, op, other, **kw):
return op(
self.__clause_element__(), aes_encrypt(other),
**kw
)
@value.comparator
def value(cls):
return cls.encrypt_comparator(
cls.encrypted_value
)
e = create_engine('sqlite://', echo='debug')
Base.metadata.create_all(e)
s = Session(e)
# attribute set
u1 = User(value="some value")
s.add(u1)
s.commit()
# comparison
u2 = s.query(User).filter_by(value="some value").first()
assert u1 is u2
# attribute get
assert u1.value == "some value"
For comparison, here's the same recipe using TypeDecorator, documented at https://docs.sqlalchemy.org/en/13/core/custom_types.html?highlight=typedecorator#augmenting-existing-types
import binascii
import uuid
from Crypto.Cipher import AES
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import TypeDecorator
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
key = uuid.uuid4().bytes
"""The encryption key. Random for this example."""
nonce = uuid.uuid4().bytes
"""for WHERE criteria to work, we need the encrypted value to be the same
each time, so use a fixed nonce if we need that feature.
"""
def aes_encrypt(data):
cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
data = data + (" " * (16 - (len(data) % 16)))
return cipher.encrypt(data.encode("utf-8")).hex()
def aes_decrypt(data):
cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
return cipher.decrypt(binascii.unhexlify(data)).decode("utf-8").rstrip()
class EncryptedValue(TypeDecorator):
impl = String
def process_bind_param(self, value, dialect):
return aes_encrypt(value)
def process_result_value(self, value, dialect):
return aes_decrypt(value)
Base = declarative_base()
class User(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True)
value = Column("encrypted_value", EncryptedValue(40), nullable=False)
e = create_engine("sqlite://", echo="debug")
Base.metadata.create_all(e)
s = Session(e)
# attribute set
u1 = User(value="some value")
s.add(u1)
s.commit()
# comparison
u2 = s.query(User).filter_by(value="some value").first()
assert u1 is u2
# attribute get
assert u1.value == "some value"