この記事は🎄GMOペパボ エンジニア Advent Calendar 2024の11日目の記事です。
技術部データ基盤チームの@zaimyです。
ペパボの社内データ基盤では、ワークフローエンジンとしてApache Airflow(以降、Airflow)を利用しています。 本番環境にはGoogle CloudのマネージドサービスであるCloud Composerを採用していますが、バージョンアップに関して以下のような課題を抱えていました。
- 環境の切り替えが複雑で、バージョンアップ作業に時間がかかる
- Cloud Composerのイメージは1年でEOLを迎えるため、定期的なバージョンアップが必要
- Pythonのバージョンが変わる場合は、同一環境でのバージョンアップが不可能
- 同一環境をバージョンアップする場合はDAG1を停止する必要があり、データパイプラインが中断する
これらの課題に対して、以下のような解決策を採用しました。
- Blue-green deploymentによる無停止バージョンアップの実現
- 環境設定の外部化による柔軟な環境切り替えの実現
- GitHub Actionsを活用した継続的デプロイの改善
本記事では、先日実施したAirflow 2.6から2.9へのバージョンアップを例に、これらの解決策の詳細と実装方法をご紹介します。
変更前後のバージョンとバージョンアップの必要性
変更前後のバージョンは以下の通りです。
Stage | Cloud Composer | Airflow | Python |
---|---|---|---|
バージョンアップ前 | 2.4.6 | 2.6.3 | 3.8.12 |
バージョンアップ後 | 2.9.3 | 2.9.1 | 3.11.8 |
ご覧の通り、Cloud Composerのマイナーバージョンが5つ、Airflowのマイナーバージョンが3つ上がっており、Pythonのバージョンも変更される、そこそこ大きなバージョンアップになっています。
データ基盤のワークフローは、GitHub Enterpriseによるコード管理とGitHub Actionsを使った継続的デプロイを行っています。既存のデプロイ戦略は、シンプルにmainブランチのコードを単一のCloud Composer環境にデプロイするよう設計されていました。そのため、これまでのバージョンアップ作業では、一時的に新しいCloud Composer環境用のブランチにバージョンアップ用のコードを集める必要があり、該当のブランチを新環境にデプロイするGitHub Actionsのワークフローを作成するなどの対応を取っていました。このような作業の煩雑さから、頻繁なバージョンアップが困難という課題がありました。
なお、Cloud Composerのイメージはリリースから1年でEOLを迎えるのですが、今回のバージョンアップ前のイメージはこのEOLを迎える寸前であり、過去のバージョンアップ2もEOLに追われて実施していたため、継続的なバージョンアップを行える状態にする必要がありました。
同一のCloud Composer環境を使うバージョンアップの制約
Cloud Composerでは、同一環境のバージョンアップを行うこともできますが、バージョンアップ前後でPythonバージョンまたはCloud Composerのメジャーバージョンが変わらないことが条件になります。また、バージョンアップの際にはすべてのDAGを一時停止し、進行中のタスクが無い状態にしておく必要があり、DAGの停止期間が発生してしまうのがネックです。
今回はバージョンアップ前後でPythonのバージョンが変わり、同一環境を使うバージョンアップは選択できなかったため、複数環境を使って無停止でバージョンアップすることにしました。
今回作成した構成
基本的な方策としては、Airflowのstart_date/end_dateを用いたBlue-green deploymentを行います。設定ファイルにblueとgreenの環境名を定義して、DAGのコードとデプロイワークフローから利用できるようにしました。以下、コードを示しながら簡単に解説します。
Blue-green deploymentについて
Blue-green deploymentは、システムの無停止更新を実現するためのデプロイ戦略の一つです。この戦略では
- 同一の本番環境を2つ用意(BlueとGreen)
- 片方がアクティブな間にもう片方を更新
- 更新完了後にトラフィックを切り替え
という流れで、サービスを停止することなくシステムを更新できます。
Airflowの場合、実行環境の切り替えにロードバランサーは使用せず、DAGのstart_dateとend_dateを利用して制御します。例えば、
- Blue環境:2024年1月1日から2024年1月31日まで実行
- Green環境:2024年2月1日から実行
と設定することで、2月1日以降のワークフローを自動的に新環境で実行できます。
環境設定の実装
dags/composer_production_environments.jsonにblue、greenの環境名を定義します。また、Pythonで扱いやすいようにクラスを作成しました。
{
"blue": "composer-2-4-6-airflow-2-6-3",
"green": "composer-2-9-3-airflow-2-9-1"
}
class ComposerProductionEnvironment:
def __init__(self, config_path: str = f"{DAGS_FOLDER}/composer_production_environments.json"):
self.environments: Dict[str, str] = self._load_config(config_path)
def _load_config(self, config_path: str) -> Dict[str, str]:
with open(config_path, "r") as f:
return json.load(f)
def get_environment_type(self, environment_name: str) -> str:
for env_type, env in self.environments.items():
if env == environment_name:
return env_type
raise ValueError(f"Environment '{environment_name}' not found in configuration")
def is_blue(self, environment_name: str) -> bool:
return self.get_environment_type(environment_name) == "blue"
def is_green(self, environment_name: str) -> bool:
return self.get_environment_type(environment_name) == "green"
def get_all_environments(self) -> List[str]:
return list(self.environments.values())
環境によってDAGのstart_date/end_dateを変更する実装
ペパボの社内データ基盤では、DAGの初回実行日や実行間隔などを設定ファイルに切り出しています。
Airflowのstart_date/end_dateは実際にDAGが実行される時刻ではなくDAGが扱うData Intervalの始点を指し初見のユーザーには少し分かりづらいため、実際にDAGが実行される時刻を指定できるようにfirst_invoke_at/last_invoke_atというwrapperを用意しています。
このfirst_invoke_at/last_invoke_atに各環境用の値を定義します。以下は設定ファイルのyamlの一部です。
---
dbt_hourly:
first_invoke_at:
blue: 2024-12-04 18:00:00.000000000 +09:00
green: 2024-12-05 18:00:00.000000000 +09:00
last_invoke_at:
blue: 2024-12-05 17:00:00.000000000 +09:00
cron: 0 * * * *
...
また、DAGの設定を扱うDagAttributesクラスで先程の設定ファイルを読み出す際に、先ほど定義したComposerProductionEnvironmentクラスを使って環境名から適切な設定を割り当てます。テスト環境など、本番環境としての環境名が不明なケースにもここで対応しています。
class DagAttributes:
def __init__(self, owner: str, base_dag_id: str):
self._owner = owner
self._base_dag_id = base_dag_id
try:
dag_attributes: dict[str, Any] = DAG_ATTRIBUTES_BAG[self._owner][self._base_dag_id]
except KeyError as exception:
raise DagAttributesNotFoundException(self._owner, self._base_dag_id) from exception
# 予約済の変数 COMPOSER_ENVIRONMENT は予告なく変更されうるため別変数 COMPOSER_ENVIRONMENT_NAME に環境名を保存している
# https://cloud.google.com/composer/docs/composer-2/set-environment-variables?hl=ja
composer_env_name = os.environ.get("COMPOSER_ENVIRONMENT_NAME")
composer_env_type = self._determine_composer_env_type(composer_env_name, dag_attributes["first_invoke_at"])
self._first_invoke_at: datetime = dag_attributes["first_invoke_at"][composer_env_type]
self._last_invoke_at: Optional[datetime] = dag_attributes.get("last_invoke_at", {}).get(composer_env_type)
...
def _determine_composer_env_type(self, env: Optional[str], first_invoke_at: dict[Literal["blue", "green"], datetime]) -> Literal["blue", "green"]:
if env:
composer_env = ComposerProductionEnvironment()
if composer_env.is_blue(env):
return "blue"
elif composer_env.is_green(env):
return "green"
# 環境変数が設定されていない場合や、環境が不明な場合は、first_invoke_atが定義されている方を使用
if "blue" in first_invoke_at and "green" not in first_invoke_at:
return "blue"
elif "green" in first_invoke_at and "blue" not in first_invoke_at:
return "green"
elif "blue" in first_invoke_at and "green" in first_invoke_at:
# 両方定義されている場合は、より新しい方を使用
return "blue" if first_invoke_at["blue"] > first_invoke_at["green"] else "green"
else:
raise ValueError("Neither blue nor green environment is defined in first_invoke_at")
デプロイワークフローの実装
GitHub Actionsのデプロイワークフローでmatrix strategyを使って複数環境にそれぞれデプロイするjobを生成します。環境名が定義されたdags/composer_production_environments.jsonをextract_environments jobでパースして、結果をdeploy jobのstrategy属性に与えることで並列にデプロイしています。ワークフロー定義のyamlは以下のようになります。
name: Deploy
on:
push:
branches: [ main ]
jobs:
extract_environments:
runs-on: normal
container: ...
outputs:
environments: ${{ steps.env_names.outputs.environments }}
steps:
- uses: actions/checkout@v3
- name: Extract environment names
id: env_names
run: |
CLOUD_COMPOSER_ENVS=$(jq -r 'to_entries | map(select(.value != "")) | map(.value) | join(",")' dags/composer_production_environments.json)
echo "environments=[$(echo $CLOUD_COMPOSER_ENVS | sed 's/,/","/g' | sed 's/.*/"&"/')]" >> $GITHUB_OUTPUT
deploy:
needs: [extract_environments]
runs-on: normal
container: ...
strategy:
matrix:
cloud_composer_env: ${{ fromJson(needs.extract_environments.outputs.environments) }}
env:
CLOUD_COMPOSER_ENV: ${{ matrix.cloud_composer_env }}
steps:
- uses: actions/checkout@v3
...
バージョンアップの流れ
実際のバージョンアップの流れは以下の通りです。
- GitHub Actionsのワークフロー用と社内環境に構築しているAirflowテスト環境用のイメージを修正する
- トピックブランチで開発環境のパッケージとPythonのバージョンを新しいCloud Composer環境に準拠して変更し、Cloud Composerのテスト環境とテスト用のGitHub Actionsのワークフローを使って全てのDAGの動作確認を行う
- 以下の変更を行い、以降のコードがmainブランチにおいて新しい環境に準拠したバージョンでCIされるようにする
- mainブランチで開発環境のパッケージとPythonのバージョンを変更する
- 先に用意したイメージを使って、社内環境に構築しているAirflowテスト環境を新しいCloud Composer環境に準拠して変更する
- コードベースに新しいCloud Composer環境名と環境ごとのfirst_invoke_at/last_invoke_atを定義してデプロイする
構成変更のメリットと今後
今回、Blue-green deploymentをベースに、環境ごとにfirst_invoke_at/last_invoke_at(start_date/end_date)を設定する構成を採用したことで、以下のようなメリットを得ることができました。
- mainブランチだけをデプロイに用いるため、開発が活発なリポジトリでも最新のコードベースに追従しながらバージョンアップを行える
- 環境ごと、DAGごとにfirst_invoke_at/last_invoke_atを設定するため、一部のDAGから新環境に切り替えられる
- 複数のDAGに依存関係があるケースなどでは一連の依存関係がすべて完了したタイミング以降に、関係するDAGをまとめて新環境に切り替えられる
- バージョンアップの準備が設定値を書き換えてテストを通すだけなので迅速にバージョンアップが可能
- Airflow 2.6から2.9へのバージョンアップでは、構成自体を変更したことに加えてバージョンの乖離によって必要になった修正があったことから2週間程度の時間がかかりましたが、その後実施したAirflow 2.10へのバージョンアップは2日程度で完了できました。
迅速にバージョンアップが可能となったことを活かして、今後はAirflowのマイナーバージョンがリリースされたタイミングや、Cloud Composerに大きな機能追加があったタイミングでバージョンアップを行っていく予定です。
一方で、複数環境を切り替えてバージョンアップを行う場合、古い環境をそのまま削除するとDAGの実行履歴が失われる問題もあります。ペパボでは、実行履歴のエクスポートなどによって実行履歴が失われないようにすることを検討しています。
-
Directed Acyclic Graph:有向非循環グラフ、Airflowにおけるワークフローの基本単位で、タスクの依存関係を定義し、それらを順序立てて実行するための仕組み ↩
-
データ基盤のワークフローエンジンをGoogle Cloud Composer 2へバージョンアップしました - Pepabo Tech Portal / Google Cloud Composerをcomposer-1.17.6-airflow-2.1.4にバージョンアップしました - Pepabo Tech Portal ↩