Skip to content

SymmetricEncryptionClientSide

mike bayer edited this page Jul 26, 2022 · 10 revisions

Client Side Symmetric Encryption

These examples illustrate client-side symmetric encryption. For an example using server-side symmetric encryption, see SymmetricEncryptionServerSide.

Example One - Use hybrid_property()

Value processing via a hybrid, introduced at http://www.sqlalchemy.org/docs/orm/extensions/hybrid.html, to represent a model attribute in both an encrypted (or encoded, or whatever), database-backed form as well as an ad-hoc decrypted form. A second example illustrates the same idea using a TypeDecorator, introduced at http://www.sqlalchemy.org/docs/core/types.html#augmenting-existing-types.

import binascii
import uuid

from Crypto.Cipher import AES

from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import Comparator
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import Session

key = uuid.uuid4().bytes
"""The encryption key.   Random for this example."""


nonce = uuid.uuid4().bytes
"""for WHERE criteria to work, we need the encrypted value to be the same
each time, so use a fixed nonce if we need that feature.
"""

def aes_encrypt(data):
    cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
    data = data + (" " * (16 - (len(data) % 16)))
    return cipher.encrypt(data.encode("utf-8")).hex()


def aes_decrypt(data):
    cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
    return cipher.decrypt(binascii.unhexlify(data)).decode("utf-8").rstrip()

Base = declarative_base()

class User(Base):
    __tablename__ = 'user'

    id = Column(Integer, primary_key=True)
    encrypted_value = Column(String, nullable=False)

    @hybrid_property
    def value(self):
        return aes_decrypt(self.encrypted_value)

    @value.setter
    def value(self, value):
        self.encrypted_value = aes_encrypt(value)

    class encrypt_comparator(Comparator):
        def operate(self, op, other, **kw):
            return op(
                self.__clause_element__(), aes_encrypt(other),
                **kw
            )

    @value.comparator
    def value(cls):
        return cls.encrypt_comparator(
                    cls.encrypted_value
                )

e = create_engine('sqlite://', echo='debug')

Base.metadata.create_all(e)

s = Session(e)

# attribute set
u1 = User(value="some value")
s.add(u1)
s.commit()

# comparison
u2 = s.query(User).filter_by(value="some value").first()
assert u1 is u2

# attribute get
assert u1.value == "some value"

Example Two - Use TypeDecorator

For comparison, here's the same recipe using TypeDecorator, documented at https://docs.sqlalchemy.org/en/13/core/custom_types.html?highlight=typedecorator#augmenting-existing-types

import binascii
import uuid

from Crypto.Cipher import AES

from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import TypeDecorator
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session

key = uuid.uuid4().bytes
"""The encryption key.   Random for this example."""


nonce = uuid.uuid4().bytes
"""for WHERE criteria to work, we need the encrypted value to be the same
each time, so use a fixed nonce if we need that feature.
"""

def aes_encrypt(data):
    cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
    data = data + (" " * (16 - (len(data) % 16)))
    return cipher.encrypt(data.encode("utf-8")).hex()


def aes_decrypt(data):
    cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
    return cipher.decrypt(binascii.unhexlify(data)).decode("utf-8").rstrip()


class EncryptedValue(TypeDecorator):
    impl = String

    def process_bind_param(self, value, dialect):
        return aes_encrypt(value)

    def process_result_value(self, value, dialect):
        return aes_decrypt(value)


Base = declarative_base()


class User(Base):
    __tablename__ = "user"

    id = Column(Integer, primary_key=True)
    value = Column("encrypted_value", EncryptedValue(40), nullable=False)


e = create_engine("sqlite://", echo="debug")

Base.metadata.create_all(e)

s = Session(e)

# attribute set
u1 = User(value="some value")
s.add(u1)
s.commit()

# comparison
u2 = s.query(User).filter_by(value="some value").first()
assert u1 is u2

# attribute get
assert u1.value == "some value"
Clone this wiki locally