Plataforma para programaticamente criar, agendar e monitorar workflows de dados
Apache Airflow é uma plataforma open-source para desenvolver, agendar e monitorar workflows. Criado pelo Airbnb em 2014, tornou-se o padrão de facto para orquestração de pipelines de dados, oferecendo uma interface web rica, extensibilidade através de plugins e uma arquitetura robusta para ambientes de produção.
O Airflow é composto por vários componentes principais:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'etl_pipeline',
default_args=default_args,
description='Pipeline ETL diário',
schedule_interval='0 2 * * *', # Executa às 2h da manhã
catchup=False,
tags=['etl', 'daily']
)
def extract_data():
"""Extrai dados da fonte"""
print("Extraindo dados...")
# Lógica de extração
return "dados_extraidos"
def transform_data(**context):
"""Transforma os dados"""
data = context['task_instance'].xcom_pull(task_ids='extract')
print(f"Transformando {data}...")
# Lógica de transformação
return "dados_transformados"
# Definindo as tarefas
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag
)
load_task = BashOperator(
task_id='load',
bash_command='echo "Carregando dados no data warehouse"',
dag=dag
)
# Definindo dependências
extract_task >> transform_task >> load_task
BashOperator
- Executa comandos bashPythonOperator
- Executa funções PythonEmailOperator
- Envia emailsHttpSensor
- Monitora endpoints HTTPSqlOperator
- Executa queries SQLS3ToRedshiftOperator
- Transfere dadosSparkSubmitOperator
- Submete jobs SparkKubernetesPodOperator
- Executa pods# docker-compose.yml para Airflow
version: '3.8'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres_db_volume:/var/lib/postgresql/data
redis:
image: redis:7-alpine
airflow-webserver:
image: apache/airflow:2.8.0
depends_on:
- postgres
- redis
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
ports:
- "8080:8080"
command: webserver
airflow-scheduler:
image: apache/airflow:2.8.0
depends_on:
- postgres
- redis
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
command: scheduler
airflow-worker:
image: apache/airflow:2.8.0
depends_on:
- postgres
- redis
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
command: celery worker
volumes:
postgres_db_volume: