Apache Airflow

Open Source Popular Python Workflow Engine

Plataforma para programaticamente criar, agendar e monitorar workflows de dados

Visão Geral

Apache Airflow é uma plataforma open-source para desenvolver, agendar e monitorar workflows. Criado pelo Airbnb em 2014, tornou-se o padrão de facto para orquestração de pipelines de dados, oferecendo uma interface web rica, extensibilidade através de plugins e uma arquitetura robusta para ambientes de produção.

Principais Características

  • DAGs (Directed Acyclic Graphs): Workflows definidos como código Python
  • Interface Web Rica: Dashboard para monitoramento e gerenciamento
  • Extensibilidade: Centenas de operadores e hooks pré-construídos
  • Escalabilidade: Suporte a executores distribuídos
  • Observabilidade: Logs detalhados e métricas
  • Recuperação de Falhas: Retry automático e alertas

Arquitetura

O Airflow é composto por vários componentes principais:

  • Scheduler: Agenda e dispara tarefas
  • Executor: Executa tarefas (Local, Celery, Kubernetes)
  • Webserver: Interface web para usuários
  • Metadata Database: Armazena estado dos workflows
  • Workers: Executam as tarefas individuais

Exemplo de DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'etl_pipeline',
    default_args=default_args,
    description='Pipeline ETL diário',
    schedule_interval='0 2 * * *',  # Executa às 2h da manhã
    catchup=False,
    tags=['etl', 'daily']
)

def extract_data():
    """Extrai dados da fonte"""
    print("Extraindo dados...")
    # Lógica de extração
    return "dados_extraidos"

def transform_data(**context):
    """Transforma os dados"""
    data = context['task_instance'].xcom_pull(task_ids='extract')
    print(f"Transformando {data}...")
    # Lógica de transformação
    return "dados_transformados"

# Definindo as tarefas
extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag
)

load_task = BashOperator(
    task_id='load',
    bash_command='echo "Carregando dados no data warehouse"',
    dag=dag
)

# Definindo dependências
extract_task >> transform_task >> load_task

Operadores Principais

Operadores Básicos

  • BashOperator - Executa comandos bash
  • PythonOperator - Executa funções Python
  • EmailOperator - Envia emails
  • HttpSensor - Monitora endpoints HTTP

Operadores de Dados

  • SqlOperator - Executa queries SQL
  • S3ToRedshiftOperator - Transfere dados
  • SparkSubmitOperator - Submete jobs Spark
  • KubernetesPodOperator - Executa pods

Configuração de Produção

# docker-compose.yml para Airflow
version: '3.8'
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres_db_volume:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine

  airflow-webserver:
    image: apache/airflow:2.8.0
    depends_on:
      - postgres
      - redis
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
    ports:
      - "8080:8080"
    command: webserver

  airflow-scheduler:
    image: apache/airflow:2.8.0
    depends_on:
      - postgres
      - redis
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
    command: scheduler

  airflow-worker:
    image: apache/airflow:2.8.0
    depends_on:
      - postgres
      - redis
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
    command: celery worker

volumes:
  postgres_db_volume:

Melhores Práticas

Desenvolvimento de DAGs

  • Mantenha DAGs simples e focados
  • Use XCom com moderação
  • Implemente idempotência
  • Configure timeouts apropriados
  • Use pools para controlar concorrência

Operações

  • Monitore métricas de performance
  • Configure alertas adequados
  • Faça backup do metadata database
  • Use conexões e variáveis
  • Implemente testes para DAGs

Casos de Uso

  • ETL/ELT Pipelines: Processamento batch de dados
  • ML Pipelines: Treinamento e deploy de modelos
  • Data Quality: Validação e monitoramento
  • Reporting: Geração automática de relatórios
  • Data Migration: Migração entre sistemas

Vantagens

  • Interface web rica e intuitiva
  • Ecossistema extenso de operadores
  • Comunidade ativa e documentação completa
  • Suporte a múltiplos executores
  • Integração com principais ferramentas de dados

Desvantagens

  • Curva de aprendizado inicial
  • Overhead de infraestrutura
  • Não ideal para streaming em tempo real
  • Dependência de Python
  • Complexidade em ambientes distribuídos