Este projeto implementa um pipeline de Extração, Transformação e Carga (ETL) que migra dados de um banco de dados Oracle para um banco de dados PostgreSQL, usando Apache Airflow para orquestração.
O pipeline lê dados de uma tabela chamada SALES_TRANSACTIONS
no Oracle, aplica transformações específicas e carrega os dados transformados em uma tabela correspondente chamada ANALYTICS_TRANSACTIONS
no PostgreSQL.
- Extração de dados do Oracle.
- Transformações:
- Exclusão de registros com
AMOUNT <= 0
. - Normalização da coluna
CUSTOMER_ID
para letras maiúsculas. - Criação de uma nova coluna
CATEGORY
com base no valor deAMOUNT
(LOW, MEDIUM, HIGH).
- Exclusão de registros com
- Carregamento de dados no PostgreSQL.
- Orquestração com Apache Airflow.
- Logs detalhados das etapas do pipeline.
- Mecanismo de reexecução para casos de falha.
- Python 3.7+: Certifique-se de ter o Python instalado.
- Apache Airflow: Instale e configure o Airflow. Consulte a documentação oficial: https://airflow.apache.org/docs/
- cx_Oracle: Instale o driver Oracle para Python:
pip install cx_Oracle
- Você também precisará instalar o Instant Client da Oracle. Consulte a documentação do
cx_Oracle
para instruções detalhadas.
- Você também precisará instalar o Instant Client da Oracle. Consulte a documentação do
- psycopg2: Instale o driver PostgreSQL para Python:
pip install psycopg2
- Pandas: Instale a biblioteca Pandas:
pip install pandas
- Pendulum: Instale a biblioteca pendulum para lidar com datas e horários:
pip install pendulum
- Banco de dados Oracle: Um banco de dados Oracle em execução com a tabela
SALES_TRANSACTIONS
. - Banco de dados PostgreSQL: Um banco de dados PostgreSQL em execução.
- Variáveis de Ambiente: Configure as seguintes variáveis de ambiente:
ORACLE_CONN_STRING
: String de conexão para o Oracle (ex:usuario/senha@host:porta/serviço
).POSTGRES_CONN_STRING
: String de conexão para o PostgreSQL (ex:dbname=nome_do_banco user=usuario password=senha host=host port=porta
).
-
Clone o repositório (se aplicável):
git clone <seu_repositorio> cd <nome_do_projeto>
-
Crie a tabela no PostgreSQL: Execute o script SQL
create_table.sql
no seu banco de dados PostgreSQL:psql -d <nome_do_banco> -f create_table.sql
-
Copie os arquivos para o Airflow: Copie os arquivos
etl_script.py
eetl_dag.py
para o diretóriodags
do seu Airflow (geralmente$AIRFLOW_HOME/dags
). -
Instale as dependências Python: Crie um arquivo
requirements.txt
na pastadags
com o conteúdo:apache-airflow cx-Oracle psycopg2 pandas pendulum
E execute:
pip install -r $AIRFLOW_HOME/dags/requirements.txt
-
Variáveis de Ambiente no Airflow: Na interface web do Airflow (Admin -> Variables), crie as variáveis
ORACLE_CONN_STRING
ePOSTGRES_CONN_STRING
com os valores das suas strings de conexão.
-
Inicie o Airflow:
airflow webserver -p 8080 # Ou outra porta airflow scheduler
-
Acesse a interface web do Airflow: Abra o navegador e acesse a interface web do Airflow (geralmente em
http://localhost:8080
). -
Ative a DAG: Encontre a DAG chamada
etl_oracle_postgres
na lista de DAGs e ative-a. -
Execute a DAG: Clique no botão "Trigger DAG" para executar a DAG manualmente. A DAG também será executada automaticamente de acordo com o agendamento definido (
@daily
neste exemplo). -
Verifique os logs: Monitore a execução da DAG na interface web do Airflow e verifique os logs das tarefas para identificar possíveis erros. Os logs também serão salvos no arquivo
etl.log
localmente. -
Verifique os dados no PostgreSQL: Consulte a tabela
ANALYTICS_TRANSACTIONS
no seu banco de dados PostgreSQL para verificar se os dados foram carregados corretamente.
create_table.sql
: Script SQL para criar a tabelaANALYTICS_TRANSACTIONS
no PostgreSQL.etl_script.py
: Script Python com as funções de ETL (extração, transformação e carregamento).etl_dag.py
: DAG do Airflow que orquestra o pipeline.
├── dags/ │ ├── etl_dag.py # DAG do Airflow │ ├── etl_script.py # Script Python com as funções de ETL │ └── requirements.txt # Dependências do projeto ├── create_table.sql # Script SQL para criar a tabela no PostgreSQL └── README.md # Este arquivo
- Certifique-se de que as strings de conexão dos bancos de dados estejam corretas e que os bancos estejam acessíveis.
- Adapte o agendamento da DAG (
schedule
) conforme necessário. - Este exemplo usa Pandas para as transformações. Para volumes de dados extremamente grandes, considere usar o PySpark para melhor escalabilidade.
- A configuração das variáveis de ambiente no Airflow (Admin -> Variables) é a forma mais segura e recomendada.
Contribuições são bem-vindas!
MIT