|
| 1 | +# Copyright 2018 Google LLC |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# https://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +"""An example DAG demonstrating Kubernetes Pod Operator.""" |
| 16 | + |
| 17 | +# [START composer_kubernetespodoperator] |
| 18 | +import datetime |
| 19 | + |
| 20 | +from airflow import models |
| 21 | +# [END composer_kubernetespodoperator] |
| 22 | +from airflow.contrib.kubernetes import pod |
| 23 | +from airflow.contrib.kubernetes import secret |
| 24 | +# [START composer_kubernetespodoperator] |
| 25 | +from airflow.contrib.operators import kubernetes_pod_operator |
| 26 | + |
| 27 | +# [END composer_kubernetespodoperator] |
| 28 | + |
| 29 | +# A Secret is an object that contains a small amount of sensitive data such as |
| 30 | +# a password, a token, or a key. Such information might otherwise be put in a |
| 31 | +# Pod specification or in an image; putting it in a Secret object allows for |
| 32 | +# more control over how it is used, and reduces the risk of accidental |
| 33 | +# exposure. |
| 34 | +secret_file = secret.Secret( |
| 35 | + # Mounts the secret as a file in RAM-backed tmpfs. |
| 36 | + deploy_type='volume', |
| 37 | + # File path of where to deploy the target, since deploy_type is 'volume' |
| 38 | + # rather than 'env'. |
| 39 | + deploy_target='/etc/sql_conn', |
| 40 | + # Name of secret in Kubernetes, if the secret is not already defined in |
| 41 | + # Kubernetes using kubectl the Pod will fail to find the secret, and in |
| 42 | + # turn, fail to launch. |
| 43 | + secret='airflow-secrets', |
| 44 | + # Key of the secret within Kubernetes. |
| 45 | + key='sql_alchemy_conn') |
| 46 | + |
| 47 | +secret_env = secret.Secret( |
| 48 | + # Expose the secret as environment variable. |
| 49 | + deploy_type='env', |
| 50 | + # The name of the environment variable, since deploy_type is `env` rather |
| 51 | + # than `volume`. |
| 52 | + deploy_target='SQL_CONN', |
| 53 | + secret='airflow-secrets', |
| 54 | + key='sql_alchemy_conn') |
| 55 | + |
| 56 | +YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1) |
| 57 | + |
| 58 | +# [START composer_kubernetespodoperator] |
| 59 | +# If a Pod fails to launch, or has an error occur in the container, Airflow |
| 60 | +# will show the task as failed, as well as contain all of the task logs |
| 61 | +# required to debug. |
| 62 | +with models.DAG( |
| 63 | + dag_id='kubernetes-pod-example', |
| 64 | + schedule_interval=datetime.timedelta(days=1), |
| 65 | + start_date=YESTERDAY) as dag: |
| 66 | + # Only name, namespace, image, and task_id are required to create a |
| 67 | + # KubernetesPodOperator. In Cloud Composer, currently the operator defaults |
| 68 | + # to using the config file found at `/home/airflow/composer_kube_config if |
| 69 | + # no `config_file` parameter is specified. By default it will contain the |
| 70 | + # credentials for Cloud Composer's Google Kubernetes Engine cluster that is |
| 71 | + # created upon environment creation. |
| 72 | + kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator( |
| 73 | + # The ID specified for the task. |
| 74 | + task_id='pod-ex-minimum', |
| 75 | + # Name of task you want to run, used to generate Pod ID. |
| 76 | + name='pod-ex-minimum', |
| 77 | + # The namespace to run within Kubernetes, default namespace is |
| 78 | + # `default`. There is the potential for the resource starvation of |
| 79 | + # Airflow workers and scheduler within the Cloud Composer environment, |
| 80 | + # the recommended solution is to increase the amount of nodes in order |
| 81 | + # to satisfy the computing requirements. Alternatively, launching pods |
| 82 | + # into a custom namespace will stop fighting over resources. |
| 83 | + namespace='default', |
| 84 | + # Docker image specified. Defaults to hub.docker.com, but any fully |
| 85 | + # qualified URLs will point to a custom repository. Supports private |
| 86 | + # gcr.io images if the Composer Environment is under the same |
| 87 | + # project-id as the gcr.io images. |
| 88 | + image='gcr.io/gcp-runtimes/ubuntu_16_0_4') |
| 89 | + # [END composer_kubernetespodoperator] |
| 90 | + |
| 91 | + kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator( |
| 92 | + task_id='ex-kube-templates', |
| 93 | + name='ex-kube-templates', |
| 94 | + namespace='default', |
| 95 | + image='bash', |
| 96 | + # All parameters below are able to be templated with jinja -- cmds, |
| 97 | + # arguments, env_vars, and config_file. For more information visit: |
| 98 | + # https://airflow.apache.org/code.html#default-variables |
| 99 | + |
| 100 | + # Entrypoint of the container, if not specified the Docker container's |
| 101 | + # entrypoint is used. The cmds parameter is templated. |
| 102 | + cmds=['echo'], |
| 103 | + # DS in jinja is the execution date as YYYY-MM-DD, this docker image |
| 104 | + # will echo the execution date. Arguments to the entrypoint. The docker |
| 105 | + # image's CMD is used if this is not provided. The arguments parameter |
| 106 | + # is templated. |
| 107 | + arguments=['{{ ds }}'], |
| 108 | + # The var template variable allows you to access variables defined in |
| 109 | + # Airflow UI. In this case we are getting the value of my_value and |
| 110 | + # setting the environment variable `MY_VALUE`. The pod will fail if |
| 111 | + # `my_value` is not set in the Airflow UI. |
| 112 | + env_vars={'MY_VALUE': '{{ var.value.my_value }}'}, |
| 113 | + # Sets the config file to the specified airflow.cfg airflow home. If |
| 114 | + # the configuration file does not exist or does not provide valid |
| 115 | + # credentials the pod will fail to launch. |
| 116 | + config_file="{{ conf.get('core', 'airflow_home') }}/config") |
| 117 | + |
| 118 | + kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator( |
| 119 | + task_id='ex-kube-secrets', |
| 120 | + name='ex-kube-secrets', |
| 121 | + namespace='default', |
| 122 | + image='ubuntu', |
| 123 | + # The secrets to pass to Pod, the Pod will fail to create if the |
| 124 | + # secrets you specify in a Secret object do not exist in Kubernetes. |
| 125 | + secrets=[secret_env, secret_file], |
| 126 | + # env_vars allows you to specify environment variables for your |
| 127 | + # container to use. env_vars is templated. |
| 128 | + env_vars={'EXAMPLE_VAR': '/example/value'}) |
| 129 | + |
| 130 | + # [START composer_kubernetespodaffinity] |
| 131 | + kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator( |
| 132 | + task_id='ex-pod-affinity', |
| 133 | + name='ex-pod-affinity', |
| 134 | + namespace='default', |
| 135 | + image='perl', |
| 136 | + cmds=['perl'], |
| 137 | + arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'], |
| 138 | + # affinity allows you to constrain which nodes your pod is eligible to |
| 139 | + # be scheduled on, based on labels on the node. In this case, if the |
| 140 | + # label 'cloud.google.com/gke-nodepool' with value |
| 141 | + # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any |
| 142 | + # nodes, it will fail to schedule. |
| 143 | + affinity={ |
| 144 | + 'nodeAffinity': { |
| 145 | + # requiredDuringSchedulingIgnoredDuringExecution means in order |
| 146 | + # for a pod to be scheduled on a node, the node must have the |
| 147 | + # specified labels. However, if labels on a node change at |
| 148 | + # runtime such that the affinity rules on a pod are no longer |
| 149 | + # met, the pod will still continue to run on the node. |
| 150 | + 'requiredDuringSchedulingIgnoredDuringExecution': { |
| 151 | + 'nodeSelectorTerms': [{ |
| 152 | + 'matchExpressions': [{ |
| 153 | + # When nodepools are created in Google Kubernetes |
| 154 | + # Engine, the nodes inside of that nodepool are |
| 155 | + # automatically assigned the label |
| 156 | + # 'cloud.google.com/gke-nodepool' with the value of |
| 157 | + # the nodepool's name. |
| 158 | + 'key': 'cloud.google.com/gke-nodepool', |
| 159 | + 'operator': 'In', |
| 160 | + # The label key's value that pods can be scheduled |
| 161 | + # on. |
| 162 | + 'values': [ |
| 163 | + 'nodepool-label-value', |
| 164 | + 'nodepool-label-value2', |
| 165 | + ] |
| 166 | + }] |
| 167 | + }] |
| 168 | + } |
| 169 | + } |
| 170 | + }) |
| 171 | + # [END composer_kubernetespodaffinity] |
| 172 | + |
| 173 | + kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator( |
| 174 | + task_id='ex-all-configs', |
| 175 | + name='pi', |
| 176 | + namespace='default', |
| 177 | + image='perl', |
| 178 | + # Entrypoint of the container, if not specified the Docker container's |
| 179 | + # entrypoint is used. The cmds parameter is templated. |
| 180 | + cmds=['perl'], |
| 181 | + # Arguments to the entrypoint. The docker image's CMD is used if this |
| 182 | + # is not provided. The arguments parameter is templated. |
| 183 | + arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'], |
| 184 | + # The secrets to pass to Pod, the Pod will fail to create if the |
| 185 | + # secrets you specify in a Secret object do not exist in Kubernetes. |
| 186 | + secrets=[], |
| 187 | + # Labels to apply to the Pod. |
| 188 | + labels={'pod-label': 'label-name'}, |
| 189 | + # Timeout to start up the Pod, default is 120. |
| 190 | + startup_timeout_seconds=120, |
| 191 | + # The environment variables to be initialized in the container |
| 192 | + # env_vars are templated. |
| 193 | + env_vars={'EXAMPLE_VAR': '/example/value'}, |
| 194 | + # If true, logs stdout output of container. Defaults to True. |
| 195 | + get_logs=True, |
| 196 | + # Determines when to pull a fresh image, if 'IfNotPresent' will cause |
| 197 | + # the Kubelet to skip pulling an image if it already exists. If you |
| 198 | + # want to always pull a new image, set it to 'Always'. |
| 199 | + image_pull_policy='Always', |
| 200 | + # Annotations are non-identifying metadata you can attach to the Pod. |
| 201 | + # Can be a large range of data, and can include characters that are not |
| 202 | + # permitted by labels. |
| 203 | + annotations={'key1': 'value1'}, |
| 204 | + # Resource specifications for Pod, this will allow you to set both cpu |
| 205 | + # and memory limits and requirements. |
| 206 | + resources=pod.Resources(), |
| 207 | + # Specifies path to kubernetes config. If no config is specified will |
| 208 | + # default to '~/.kube/config'. The config_file is templated. |
| 209 | + config_file='/home/airflow/composer_kube_config', |
| 210 | + # If true, the content of /airflow/xcom/return.json from container will |
| 211 | + # also be pushed to an XCom when the container ends. |
| 212 | + xcom_push=False, |
| 213 | + # List of Volume objects to pass to the Pod. |
| 214 | + volumes=[], |
| 215 | + # List of VolumeMount objects to pass to the Pod. |
| 216 | + volume_mounts=[], |
| 217 | + # Affinity determines which nodes the Pod can run on based on the |
| 218 | + # config. For more information see: |
| 219 | + # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/ |
| 220 | + affinity={}) |
0 commit comments