1
+ from datetime import datetime , timedelta
2
+
3
+ from airflow import DAG
4
+ from airflow .operators .python import PythonOperator
5
+
6
+
7
+ default_args = {
8
+ 'owner' : 'coder2j' ,
9
+ 'retries' : 5 ,
10
+ 'retry_delay' : timedelta (minutes = 5 )
11
+ }
12
+
13
+
14
+ def greet (some_dict , ti ):
15
+ print ("some dict: " , some_dict )
16
+ first_name = ti .xcom_pull (task_ids = 'get_name' , key = 'first_name' )
17
+ last_name = ti .xcom_pull (task_ids = 'get_name' , key = 'last_name' )
18
+ age = ti .xcom_pull (task_ids = 'get_age' , key = 'age' )
19
+ print (f"Hello World! My name is { first_name } { last_name } , "
20
+ f"and I am { age } years old!" )
21
+
22
+
23
+ def get_name (ti ):
24
+ ti .xcom_push (key = 'first_name' , value = 'Jerry' )
25
+ ti .xcom_push (key = 'last_name' , value = 'Fridman' )
26
+
27
+
28
+ def get_age (ti ):
29
+ ti .xcom_push (key = 'age' , value = 19 )
30
+
31
+
32
+ with DAG (
33
+ default_args = default_args ,
34
+ dag_id = 'our_dag_with_python_operator_v07' ,
35
+ description = 'Our first dag using python operator' ,
36
+ start_date = datetime (2021 , 10 , 6 ),
37
+ schedule_interval = '@daily'
38
+ ) as dag :
39
+ task1 = PythonOperator (
40
+ task_id = 'greet' ,
41
+ python_callable = greet ,
42
+ op_kwargs = {'some_dict' : {'a' : 1 , 'b' : 2 }}
43
+ )
44
+
45
+ task2 = PythonOperator (
46
+ task_id = 'get_name' ,
47
+ python_callable = get_name
48
+ )
49
+
50
+ task3 = PythonOperator (
51
+ task_id = 'get_age' ,
52
+ python_callable = get_age
53
+ )
54
+
55
+ [task2 , task3 ] >> task1
0 commit comments