Skip to content

SymmetricEncryptionServerSide

mike bayer edited this page Jun 2, 2020 · 8 revisions

Server Side Symmetric Encryption

These examples illustrate server-side symmetric encrpytion using MySQL-specific functions. For an example using client side encryption functions which will work on any database, see SymmetricEncryptionClientSide.

Example One - column_property() with persisted init_vector

This example illustrates a round trip using MySQL AES_ENCRYPT and AES_DECRYPT functions using the ORM, with the additional feature that an explicit "init vector" is stored in an extra column.

https://dev.mysql.com/doc/refman/8.0/en/encryption-functions.html#function_aes-encrypt

For the MariaDB version of the functions, remove the "init_vector" portion. A second example follows which illustrates a more succinct TypeDecorator approach if the init_vector is not explicit.

import os

from sqlalchemy import cast
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import LargeBinary
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.dialects.mysql import TINYBLOB
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import column_property
from sqlalchemy.orm import Session

Base = declarative_base()

my_key = "some key"


class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    enc_data = Column(LargeBinary)
    init_vector = Column(TINYBLOB)

    data = column_property(
        cast(
            func.aes_decrypt(
                enc_data, func.unhex(func.sha2(my_key, 512)), init_vector
            ),
            CHAR(charset="utf8"),
        )
    )


@event.listens_for(A.data, "set")
def set_a_data(target, value, oldvalue, initiator):
    if target.init_vector is None:
        target.init_vector = os.urandom(16)

    target.enc_data = func.aes_encrypt(
        value, func.unhex(func.sha2(my_key, 512)), target.init_vector
    )


e = create_engine("mysql://scott:tiger@mysql57/test", echo="debug")
Base.metadata.drop_all(e)
Base.metadata.create_all(e)

s = Session(e)

a1 = A()

a1.data = "my data"

s.add(a1)
s.commit()

print(s.query(A.data).filter(A.data == "my data").all())

Example Two - Simple TypeDecorator if additional init_vector is not used

if the init_vector is not stored in the table and is instead implicit, we can use TypeDecorator functions in very much the same way as is seen at SymmetricEncryptionClientSide:

import os

from sqlalchemy import cast
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import LargeBinary
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.dialects.mysql import TINYBLOB
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import column_property
from sqlalchemy.orm import Session
from sqlalchemy.types import TypeDecorator
from sqlalchemy import type_coerce

Base = declarative_base()

my_key = "some key"


class EncType(TypeDecorator):
    impl = LargeBinary

    def bind_expression(self, bindvalue):
        return func.aes_encrypt(
            type_coerce(bindvalue, CHAR()), func.unhex(func.sha2(my_key, 512)),
        )

    def column_expression(self, col):
        return cast(
            func.aes_decrypt(col, func.unhex(func.sha2(my_key, 512)),),
            CHAR(charset="utf8"),
        )


class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    data = Column(EncType)


e = create_engine("mysql://scott:tiger@mariadb104/test", echo="debug")
Base.metadata.drop_all(e)
Base.metadata.create_all(e)

s = Session(e)

a1 = A()

a1.data = "my data"

s.add(a1)
s.commit()

print(s.query(A.data).filter(A.data == "my data").all())
Clone this wiki locally