Skip to content

Commit a843fc7

Browse files
coder2jcoder2j
authored andcommitted
MINOR: Add tutorial of airflow dag with postgres connection and operator
1 parent d80b187 commit a843fc7

File tree

2 files changed

+49
-0
lines changed

2 files changed

+49
-0
lines changed

dags/dag_with_postgres_operator.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from datetime import datetime, timedelta
2+
3+
from airflow import DAG
4+
from airflow.providers.postgres.operators.postgres import PostgresOperator
5+
6+
7+
default_args = {
8+
'owner': 'coder2j',
9+
'retries': 5,
10+
'retry_delay': timedelta(minutes=5)
11+
}
12+
13+
14+
with DAG(
15+
dag_id='dag_with_postgres_operator_v03',
16+
default_args=default_args,
17+
start_date=datetime(2021, 12, 19),
18+
schedule_interval='0 0 * * *'
19+
) as dag:
20+
task1 = PostgresOperator(
21+
task_id='create_postgres_table',
22+
postgres_conn_id='postgres_localhost',
23+
sql="""
24+
create table if not exists dag_runs (
25+
dt date,
26+
dag_id character varying,
27+
primary key (dt, dag_id)
28+
)
29+
"""
30+
)
31+
32+
task2 = PostgresOperator(
33+
task_id='insert_into_table',
34+
postgres_conn_id='postgres_localhost',
35+
sql="""
36+
insert into dag_runs (dt, dag_id) values ('{{ ds }}', '{{ dag.dag_id }}')
37+
"""
38+
)
39+
40+
task3 = PostgresOperator(
41+
task_id='delete_data_from_table',
42+
postgres_conn_id='postgres_localhost',
43+
sql="""
44+
delete from dag_runs where dt = '{{ ds }}' and dag_id = '{{ dag.dag_id }}';
45+
"""
46+
)
47+
task1 >> task3 >> task2

docker-compose.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ services:
6666
POSTGRES_DB: airflow
6767
volumes:
6868
- postgres-db-volume:/var/lib/postgresql/data
69+
ports:
70+
- 5432:5432
6971
healthcheck:
7072
test: ["CMD", "pg_isready", "-U", "airflow"]
7173
interval: 5s

0 commit comments

Comments
 (0)