Skip to content

Commit 1154b3b

Browse files
janczak10tswast
authored andcommitted
Fix Environment Migration script for encryption (GoogleCloudPlatform#1732)
* Fix Environment Migration script for encryption Add ability for Composer Environment Migration script to decrypt and reencrypt variables and connections using the new fernet key. * Fix lint errors. Add README.
1 parent aca87dd commit 1154b3b

File tree

4 files changed

+272
-8
lines changed

4 files changed

+272
-8
lines changed

composer/tools/README.rst

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
.. This file is automatically generated. Do not edit this file directly.
2+
3+
Google Cloud Composer Python Samples
4+
===============================================================================
5+
6+
.. image:: https://gstatic.com/cloudssh/images/open-btn.png
7+
:target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=composer/tools/README.rst
8+
9+
10+
This directory contains samples for Google Cloud Composer. `Google Cloud Composer`_ is a managed Apache Airflow service that helps you create, schedule, monitor and manage workflows. Cloud Composer automation helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command line tools, so you can focus on your workflows and not your infrastructure.
11+
12+
13+
14+
15+
.. _Google Cloud Composer: https://cloud.google.com/composer/docs
16+
17+
Setup
18+
-------------------------------------------------------------------------------
19+
20+
21+
Authentication
22+
++++++++++++++
23+
24+
This sample requires you to have authentication setup. Refer to the
25+
`Authentication Getting Started Guide`_ for instructions on setting up
26+
credentials for applications.
27+
28+
.. _Authentication Getting Started Guide:
29+
https://cloud.google.com/docs/authentication/getting-started
30+
31+
Install Dependencies
32+
++++++++++++++++++++
33+
34+
#. Clone python-docs-samples and change directory to the sample directory you want to use.
35+
36+
.. code-block:: bash
37+
38+
$ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
39+
40+
#. Install `pip`_ and `virtualenv`_ if you do not already have them. You may want to refer to the `Python Development Environment Setup Guide`_ for Google Cloud Platform for instructions.
41+
42+
.. _Python Development Environment Setup Guide:
43+
https://cloud.google.com/python/setup
44+
45+
#. Create a virtualenv. Samples are compatible with Python 2.7 and 3.4+.
46+
47+
.. code-block:: bash
48+
49+
$ virtualenv env
50+
$ source env/bin/activate
51+
52+
#. Install the dependencies needed to run the samples.
53+
54+
.. code-block:: bash
55+
56+
$ pip install -r requirements.txt
57+
58+
.. _pip: https://pip.pypa.io/
59+
.. _virtualenv: https://virtualenv.pypa.io/
60+
61+
Samples
62+
-------------------------------------------------------------------------------
63+
64+
Create a new Composer environment based on an existing environment
65+
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
66+
67+
.. image:: https://gstatic.com/cloudssh/images/open-btn.png
68+
:target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=composer/tools/copy_environment.py,composer/tools/README.rst
69+
70+
71+
72+
73+
To run this sample:
74+
75+
.. code-block:: bash
76+
77+
$ python copy_environment.py
78+
79+
usage: copy_environment.py [-h] [--running_as_service_account]
80+
[--override_machine_type OVERRIDE_MACHINE_TYPE]
81+
[--override_disk_size_gb OVERRIDE_DISK_SIZE_GB]
82+
[--override_network OVERRIDE_NETWORK]
83+
[--override_subnetwork OVERRIDE_SUBNETWORK]
84+
project location existing_env_name new_env_name
85+
86+
Clone a composer environment.
87+
88+
positional arguments:
89+
project Google Cloud Project containing existing Composer
90+
Environment.
91+
location Google Cloud region containing existing Composer
92+
Environment. For example `us-central1`.
93+
existing_env_name The name of the existing Composer Environment.
94+
new_env_name The name to use for the new Composer Environment.
95+
96+
optional arguments:
97+
-h, --help show this help message and exit
98+
--running_as_service_account
99+
Set this flag if the script is running on a VM with
100+
same service account as used in the Composer
101+
Environment. This avoids creating extra credentials.
102+
--override_machine_type OVERRIDE_MACHINE_TYPE
103+
Optional. Overrides machine type used for Cloud
104+
Composer Environment. Must be a fully specified
105+
machine type URI.
106+
--override_disk_size_gb OVERRIDE_DISK_SIZE_GB
107+
Optional. Overrides the disk size in GB used for Cloud
108+
Composer Environment.
109+
--override_network OVERRIDE_NETWORK
110+
Optional. Overrides the network used for Cloud
111+
Composer Environment.
112+
--override_subnetwork OVERRIDE_SUBNETWORK
113+
Optional. Overrides the subnetwork used for Cloud
114+
Composer Environment.
115+
116+
117+
118+
119+
120+
.. _Google Cloud SDK: https://cloud.google.com/sdk/

composer/tools/README.rst.in

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# This file is used to generate README.rst
2+
3+
product:
4+
name: Google Cloud Composer
5+
short_name: Cloud Composer
6+
url: https://cloud.google.com/composer/docs
7+
description: >
8+
`Google Cloud Composer`_ is a managed Apache Airflow service that helps
9+
you create, schedule, monitor and manage workflows. Cloud Composer
10+
automation helps you create Airflow environments quickly and use
11+
Airflow-native tools, such as the powerful Airflow web interface and
12+
command line tools, so you can focus on your workflows and not your
13+
infrastructure.
14+
15+
setup:
16+
- auth
17+
- install_deps
18+
19+
samples:
20+
- name: Create a new Composer environment based on an existing environment
21+
file: copy_environment.py
22+
show_help: True
23+
24+
cloud_client_library: false
25+
26+
folder: composer/tools

composer/tools/copy_environment.py

+123-8
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import argparse
3131
import ast
3232
import base64
33+
import contextlib
3334
import json
35+
import os
3436
import re
3537
import shutil
3638
import subprocess
@@ -39,12 +41,15 @@
3941
import time
4042
import uuid
4143

44+
from cryptography import fernet
4245
import google.auth
4346
from google.cloud import storage
4447
from google.oauth2 import service_account
45-
from googleapiclient import discovery
46-
from googleapiclient import errors
47-
48+
from googleapiclient import discovery, errors
49+
from kubernetes import client, config
50+
from mysql import connector
51+
import six
52+
from six.moves import configparser
4853

4954
DEFAULT_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]
5055

@@ -370,8 +375,112 @@ def export_data(sql_client, project, instance, gcs_bucket_name, filename):
370375
wait_sql_operation(sql_client, project, operation.get("name"))
371376

372377

378+
def get_fernet_key(composer_env):
379+
print("Retrieving fernet key for Composer Environment {}.".format(
380+
composer_env.get('name', '')))
381+
gke_cluster_resource = composer_env.get("config", {}).get("gkeCluster")
382+
project_zone_cluster = re.match(
383+
"projects/([^/]*)/zones/([^/]*)/clusters/([^/]*)", gke_cluster_resource
384+
).groups()
385+
tmp_dir_name = None
386+
try:
387+
print("Getting cluster credentials {} to retrieve fernet key.".format(
388+
gke_cluster_resource))
389+
tmp_dir_name = tempfile.mkdtemp()
390+
kubeconfig_file = tmp_dir_name + "/config"
391+
os.environ["KUBECONFIG"] = kubeconfig_file
392+
if subprocess.call(
393+
[
394+
"gcloud",
395+
"container",
396+
"clusters",
397+
"get-credentials",
398+
project_zone_cluster[2],
399+
"--zone",
400+
project_zone_cluster[1],
401+
"--project",
402+
project_zone_cluster[0]
403+
]
404+
):
405+
print("Failed to retrieve cluster credentials: {}.".format(
406+
gke_cluster_resource))
407+
sys.exit(1)
408+
409+
kubernetes_client = client.CoreV1Api(
410+
api_client=config.new_client_from_config(
411+
config_file=kubeconfig_file))
412+
airflow_configmap = kubernetes_client.read_namespaced_config_map(
413+
"airflow-configmap", "default")
414+
config_str = airflow_configmap.data['airflow.cfg']
415+
with contextlib.closing(six.StringIO(config_str)) as config_buffer:
416+
config_parser = configparser.ConfigParser()
417+
config_parser.readfp(config_buffer)
418+
return config_parser.get("core", "fernet_key")
419+
except Exception as exc:
420+
print("Failed to get fernet key for cluster: {}.".format(str(exc)))
421+
sys.exit(1)
422+
finally:
423+
if tmp_dir_name:
424+
shutil.rmtree(tmp_dir_name)
425+
426+
427+
def reencrypt_variables_connections(old_fernet_key_str, new_fernet_key_str):
428+
old_fernet_key = fernet.Fernet(old_fernet_key_str.encode("utf-8"))
429+
new_fernet_key = fernet.Fernet(new_fernet_key_str.encode("utf-8"))
430+
db = connector.connect(
431+
host="127.0.0.1",
432+
user="root",
433+
database="airflow-db",
434+
)
435+
variable_cursor = db.cursor()
436+
variable_cursor.execute("SELECT id, val, is_encrypted FROM variable")
437+
rows = variable_cursor.fetchall()
438+
for row in rows:
439+
id = row[0]
440+
val = row[1]
441+
is_encrypted = row[2]
442+
if is_encrypted:
443+
updated_val = new_fernet_key.encrypt(
444+
old_fernet_key.decrypt(bytes(val))).decode()
445+
variable_cursor.execute(
446+
"UPDATE variable SET val=%s WHERE id=%s", (updated_val, id))
447+
db.commit()
448+
449+
conn_cursor = db.cursor()
450+
conn_cursor.execute(
451+
"SELECT id, password, extra, is_encrypted, is_extra_encrypted FROM "
452+
"connection")
453+
rows = conn_cursor.fetchall()
454+
for row in rows:
455+
id = row[0]
456+
password = row[1]
457+
extra = row[2]
458+
is_encrypted = row[3]
459+
is_extra_encrypted = row[4]
460+
if is_encrypted:
461+
updated_password = new_fernet_key.encrypt(
462+
old_fernet_key.decrypt(bytes(password))).decode()
463+
conn_cursor.execute(
464+
"UPDATE connection SET password=%s WHERE id=%s",
465+
(updated_password, id))
466+
if is_extra_encrypted:
467+
updated_extra = new_fernet_key.encrypt(
468+
old_fernet_key.decrypt(bytes(extra))).decode()
469+
conn_cursor.execute(
470+
"UPDATE connection SET extra=%s WHERE id=%s",
471+
(updated_extra, id))
472+
db.commit()
473+
474+
373475
def import_data(
374-
sql_client, service_account_key, project, instance, gcs_bucket, filename
476+
sql_client,
477+
service_account_key,
478+
project,
479+
instance,
480+
gcs_bucket,
481+
filename,
482+
old_fernet_key,
483+
new_fernet_key
375484
):
376485
tmp_dir_name = None
377486
fuse_dir = None
@@ -383,7 +492,6 @@ def import_data(
383492
if subprocess.call(["mkdir", fuse_dir]):
384493
print("Failed to make temporary subdir {}.".format(fuse_dir))
385494
sys.exit(1)
386-
print(str(["gcsfuse", gcs_bucket, fuse_dir]))
387495
if subprocess.call(["gcsfuse", gcs_bucket, fuse_dir]):
388496
print(
389497
"Failed to fuse bucket {} with temp local directory {}".format(
@@ -424,9 +532,11 @@ def import_data(
424532
):
425533
print("Failed to import database.")
426534
sys.exit(1)
535+
print("Reencrypting variables and connections.")
536+
reencrypt_variables_connections(old_fernet_key, new_fernet_key)
427537
print("Database import succeeded.")
428-
except Exception:
429-
print("Failed to copy database.")
538+
except Exception as exc:
539+
print("Failed to copy database: {}".format(str(exc)))
430540
sys.exit(1)
431541
finally:
432542
if proxy_subprocess:
@@ -522,6 +632,9 @@ def copy_database(project, existing_env, new_env, running_as_service_account):
522632
gcs_db_dump_bucket.name,
523633
"db_dump.sql",
524634
)
635+
print("Obtaining fernet keys for Composer Environments.")
636+
old_fernet_key = get_fernet_key(existing_env)
637+
new_fernet_key = get_fernet_key(new_env)
525638
print("Preparing database import to new Environment.")
526639
import_data(
527640
sql_client,
@@ -530,6 +643,8 @@ def copy_database(project, existing_env, new_env, running_as_service_account):
530643
new_sql_instance,
531644
gcs_db_dump_bucket.name,
532645
"db_dump.sql",
646+
old_fernet_key,
647+
new_fernet_key,
533648
)
534649
finally:
535650
if gke_service_account_key:
@@ -542,7 +657,7 @@ def copy_database(project, existing_env, new_env, running_as_service_account):
542657
)
543658
if gcs_db_dump_bucket:
544659
print("Deleting temporary Cloud Storage bucket.")
545-
# delete_bucket(gcs_db_dump_bucket)
660+
delete_bucket(gcs_db_dump_bucket)
546661

547662

548663
def copy_gcs_bucket(existing_env, new_env):

composer/tools/requirements.txt

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
cryptography==2.3.1
12
google-api-python-client==1.6.4
23
google-auth==1.5.1
34
google-cloud-storage==1.11.0
5+
kubernetes==7.0.0
6+
mysql-connector-python==8.0.12

0 commit comments

Comments
 (0)