Skip to content

Commit bf78d39

Browse files
Noremac201tswast
authored andcommitted
Composer: Add KubernetesPodOperator sample code (GoogleCloudPlatform#1577)
* Composer: Add KubernetesPodOperator sample documentation * Address lint error. * Rename KubernetesPodOperator to remove redundant _sample from filename * Fixes for flake8 linter. Use 4 spaces. Wrap at 79 columns. Use periods after sentences in comments.
1 parent 5c63e3c commit bf78d39

File tree

2 files changed

+243
-0
lines changed

2 files changed

+243
-0
lines changed
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
# Copyright 2018 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""An example DAG demonstrating Kubernetes Pod Operator."""
16+
17+
# [START composer_kubernetespodoperator]
18+
import datetime
19+
20+
from airflow import models
21+
# [END composer_kubernetespodoperator]
22+
from airflow.contrib.kubernetes import pod
23+
from airflow.contrib.kubernetes import secret
24+
# [START composer_kubernetespodoperator]
25+
from airflow.contrib.operators import kubernetes_pod_operator
26+
27+
# [END composer_kubernetespodoperator]
28+
29+
# A Secret is an object that contains a small amount of sensitive data such as
30+
# a password, a token, or a key. Such information might otherwise be put in a
31+
# Pod specification or in an image; putting it in a Secret object allows for
32+
# more control over how it is used, and reduces the risk of accidental
33+
# exposure.
34+
secret_file = secret.Secret(
35+
# Mounts the secret as a file in RAM-backed tmpfs.
36+
deploy_type='volume',
37+
# File path of where to deploy the target, since deploy_type is 'volume'
38+
# rather than 'env'.
39+
deploy_target='/etc/sql_conn',
40+
# Name of secret in Kubernetes, if the secret is not already defined in
41+
# Kubernetes using kubectl the Pod will fail to find the secret, and in
42+
# turn, fail to launch.
43+
secret='airflow-secrets',
44+
# Key of the secret within Kubernetes.
45+
key='sql_alchemy_conn')
46+
47+
secret_env = secret.Secret(
48+
# Expose the secret as environment variable.
49+
deploy_type='env',
50+
# The name of the environment variable, since deploy_type is `env` rather
51+
# than `volume`.
52+
deploy_target='SQL_CONN',
53+
secret='airflow-secrets',
54+
key='sql_alchemy_conn')
55+
56+
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
57+
58+
# [START composer_kubernetespodoperator]
59+
# If a Pod fails to launch, or has an error occur in the container, Airflow
60+
# will show the task as failed, as well as contain all of the task logs
61+
# required to debug.
62+
with models.DAG(
63+
dag_id='kubernetes-pod-example',
64+
schedule_interval=datetime.timedelta(days=1),
65+
start_date=YESTERDAY) as dag:
66+
# Only name, namespace, image, and task_id are required to create a
67+
# KubernetesPodOperator. In Cloud Composer, currently the operator defaults
68+
# to using the config file found at `/home/airflow/composer_kube_config if
69+
# no `config_file` parameter is specified. By default it will contain the
70+
# credentials for Cloud Composer's Google Kubernetes Engine cluster that is
71+
# created upon environment creation.
72+
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
73+
# The ID specified for the task.
74+
task_id='pod-ex-minimum',
75+
# Name of task you want to run, used to generate Pod ID.
76+
name='pod-ex-minimum',
77+
# The namespace to run within Kubernetes, default namespace is
78+
# `default`. There is the potential for the resource starvation of
79+
# Airflow workers and scheduler within the Cloud Composer environment,
80+
# the recommended solution is to increase the amount of nodes in order
81+
# to satisfy the computing requirements. Alternatively, launching pods
82+
# into a custom namespace will stop fighting over resources.
83+
namespace='default',
84+
# Docker image specified. Defaults to hub.docker.com, but any fully
85+
# qualified URLs will point to a custom repository. Supports private
86+
# gcr.io images if the Composer Environment is under the same
87+
# project-id as the gcr.io images.
88+
image='gcr.io/gcp-runtimes/ubuntu_16_0_4')
89+
# [END composer_kubernetespodoperator]
90+
91+
kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
92+
task_id='ex-kube-templates',
93+
name='ex-kube-templates',
94+
namespace='default',
95+
image='bash',
96+
# All parameters below are able to be templated with jinja -- cmds,
97+
# arguments, env_vars, and config_file. For more information visit:
98+
# https://airflow.apache.org/code.html#default-variables
99+
100+
# Entrypoint of the container, if not specified the Docker container's
101+
# entrypoint is used. The cmds parameter is templated.
102+
cmds=['echo'],
103+
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
104+
# will echo the execution date. Arguments to the entrypoint. The docker
105+
# image's CMD is used if this is not provided. The arguments parameter
106+
# is templated.
107+
arguments=['{{ ds }}'],
108+
# The var template variable allows you to access variables defined in
109+
# Airflow UI. In this case we are getting the value of my_value and
110+
# setting the environment variable `MY_VALUE`. The pod will fail if
111+
# `my_value` is not set in the Airflow UI.
112+
env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
113+
# Sets the config file to the specified airflow.cfg airflow home. If
114+
# the configuration file does not exist or does not provide valid
115+
# credentials the pod will fail to launch.
116+
config_file="{{ conf.get('core', 'airflow_home') }}/config")
117+
118+
kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
119+
task_id='ex-kube-secrets',
120+
name='ex-kube-secrets',
121+
namespace='default',
122+
image='ubuntu',
123+
# The secrets to pass to Pod, the Pod will fail to create if the
124+
# secrets you specify in a Secret object do not exist in Kubernetes.
125+
secrets=[secret_env, secret_file],
126+
# env_vars allows you to specify environment variables for your
127+
# container to use. env_vars is templated.
128+
env_vars={'EXAMPLE_VAR': '/example/value'})
129+
130+
# [START composer_kubernetespodaffinity]
131+
kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
132+
task_id='ex-pod-affinity',
133+
name='ex-pod-affinity',
134+
namespace='default',
135+
image='perl',
136+
cmds=['perl'],
137+
arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
138+
# affinity allows you to constrain which nodes your pod is eligible to
139+
# be scheduled on, based on labels on the node. In this case, if the
140+
# label 'cloud.google.com/gke-nodepool' with value
141+
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
142+
# nodes, it will fail to schedule.
143+
affinity={
144+
'nodeAffinity': {
145+
# requiredDuringSchedulingIgnoredDuringExecution means in order
146+
# for a pod to be scheduled on a node, the node must have the
147+
# specified labels. However, if labels on a node change at
148+
# runtime such that the affinity rules on a pod are no longer
149+
# met, the pod will still continue to run on the node.
150+
'requiredDuringSchedulingIgnoredDuringExecution': {
151+
'nodeSelectorTerms': [{
152+
'matchExpressions': [{
153+
# When nodepools are created in Google Kubernetes
154+
# Engine, the nodes inside of that nodepool are
155+
# automatically assigned the label
156+
# 'cloud.google.com/gke-nodepool' with the value of
157+
# the nodepool's name.
158+
'key': 'cloud.google.com/gke-nodepool',
159+
'operator': 'In',
160+
# The label key's value that pods can be scheduled
161+
# on.
162+
'values': [
163+
'nodepool-label-value',
164+
'nodepool-label-value2',
165+
]
166+
}]
167+
}]
168+
}
169+
}
170+
})
171+
# [END composer_kubernetespodaffinity]
172+
173+
kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
174+
task_id='ex-all-configs',
175+
name='pi',
176+
namespace='default',
177+
image='perl',
178+
# Entrypoint of the container, if not specified the Docker container's
179+
# entrypoint is used. The cmds parameter is templated.
180+
cmds=['perl'],
181+
# Arguments to the entrypoint. The docker image's CMD is used if this
182+
# is not provided. The arguments parameter is templated.
183+
arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
184+
# The secrets to pass to Pod, the Pod will fail to create if the
185+
# secrets you specify in a Secret object do not exist in Kubernetes.
186+
secrets=[],
187+
# Labels to apply to the Pod.
188+
labels={'pod-label': 'label-name'},
189+
# Timeout to start up the Pod, default is 120.
190+
startup_timeout_seconds=120,
191+
# The environment variables to be initialized in the container
192+
# env_vars are templated.
193+
env_vars={'EXAMPLE_VAR': '/example/value'},
194+
# If true, logs stdout output of container. Defaults to True.
195+
get_logs=True,
196+
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
197+
# the Kubelet to skip pulling an image if it already exists. If you
198+
# want to always pull a new image, set it to 'Always'.
199+
image_pull_policy='Always',
200+
# Annotations are non-identifying metadata you can attach to the Pod.
201+
# Can be a large range of data, and can include characters that are not
202+
# permitted by labels.
203+
annotations={'key1': 'value1'},
204+
# Resource specifications for Pod, this will allow you to set both cpu
205+
# and memory limits and requirements.
206+
resources=pod.Resources(),
207+
# Specifies path to kubernetes config. If no config is specified will
208+
# default to '~/.kube/config'. The config_file is templated.
209+
config_file='/home/airflow/composer_kube_config',
210+
# If true, the content of /airflow/xcom/return.json from container will
211+
# also be pushed to an XCom when the container ends.
212+
xcom_push=False,
213+
# List of Volume objects to pass to the Pod.
214+
volumes=[],
215+
# List of VolumeMount objects to pass to the Pod.
216+
volume_mounts=[],
217+
# Affinity determines which nodes the Pod can run on based on the
218+
# config. For more information see:
219+
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
220+
affinity={})
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2018 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def test_dag_import():
17+
"""Test that the DAG file can be successfully imported.
18+
19+
This tests that the DAG can be parsed, but does not run it in an Airflow
20+
environment. This is a recommended sanity check by the official Airflow
21+
docs: https://airflow.incubator.apache.org/tutorial.html#testing
22+
"""
23+
from . import kubernetes_pod_operator # noqa

0 commit comments

Comments
 (0)