1
+ from datetime import datetime , timedelta
2
+
3
+ from airflow import DAG
4
+ from airflow .providers .postgres .operators .postgres import PostgresOperator
5
+
6
+
7
+ default_args = {
8
+ 'owner' : 'coder2j' ,
9
+ 'retries' : 5 ,
10
+ 'retry_delay' : timedelta (minutes = 5 )
11
+ }
12
+
13
+
14
+ with DAG (
15
+ dag_id = 'dag_with_postgres_operator_v03' ,
16
+ default_args = default_args ,
17
+ start_date = datetime (2021 , 12 , 19 ),
18
+ schedule_interval = '0 0 * * *'
19
+ ) as dag :
20
+ task1 = PostgresOperator (
21
+ task_id = 'create_postgres_table' ,
22
+ postgres_conn_id = 'postgres_localhost' ,
23
+ sql = """
24
+ create table if not exists dag_runs (
25
+ dt date,
26
+ dag_id character varying,
27
+ primary key (dt, dag_id)
28
+ )
29
+ """
30
+ )
31
+
32
+ task2 = PostgresOperator (
33
+ task_id = 'insert_into_table' ,
34
+ postgres_conn_id = 'postgres_localhost' ,
35
+ sql = """
36
+ insert into dag_runs (dt, dag_id) values ('{{ ds }}', '{{ dag.dag_id }}')
37
+ """
38
+ )
39
+
40
+ task3 = PostgresOperator (
41
+ task_id = 'delete_data_from_table' ,
42
+ postgres_conn_id = 'postgres_localhost' ,
43
+ sql = """
44
+ delete from dag_runs where dt = '{{ ds }}' and dag_id = '{{ dag.dag_id }}';
45
+ """
46
+ )
47
+ task1 >> task3 >> task2
0 commit comments