Dagster

Open Source Data-Centric Type-Safe Asset-Based

Sistema de orquestração focado em dados com forte tipagem e observabilidade de assets

Visão Geral

Dagster é um sistema de orquestração de dados que coloca os dados no centro do desenvolvimento. Diferente de outras ferramentas que focam em tarefas, o Dagster organiza pipelines em torno de "assets de dados" - as tabelas, arquivos e modelos de ML que seus pipelines produzem e mantêm.

Principais Características

  • Software-Defined Assets: Foco em dados, não em tarefas
  • Type System: Sistema de tipos robusto para dados
  • Testing Framework: Testes integrados para pipelines
  • Observabilidade: Lineage e monitoramento de dados
  • Development Tools: IDE integrado e debugging
  • Multi-Environment: Dev, staging e produção

Conceitos Fundamentais

  • Assets: Objetos de dados persistentes (tabelas, arquivos)
  • Ops: Unidades de computação que transformam dados
  • Jobs: Seleção de assets para executar
  • Resources: Conexões externas (databases, APIs)
  • Sensors: Monitoram mudanças externas
  • Schedules: Execução baseada em tempo

Exemplo de Software-Defined Assets

import pandas as pd
from dagster import asset, AssetIn, Config
from typing import List

class DatabaseConfig(Config):
    connection_string: str
    table_name: str

@asset
def raw_sales_data(config: DatabaseConfig) -> pd.DataFrame:
    """Extrai dados brutos de vendas do banco de dados"""
    # Simulação de extração de dados
    return pd.DataFrame({
        'date': ['2024-01-01', '2024-01-02', '2024-01-03'],
        'product': ['A', 'B', 'A'],
        'sales': [100, 150, 200],
        'region': ['North', 'South', 'North']
    })

@asset(deps=[raw_sales_data])
def cleaned_sales_data(raw_sales_data: pd.DataFrame) -> pd.DataFrame:
    """Limpa e valida os dados de vendas"""
    # Limpeza de dados
    cleaned = raw_sales_data.copy()
    cleaned['date'] = pd.to_datetime(cleaned['date'])
    cleaned = cleaned.dropna()
    
    # Validação
    assert len(cleaned) > 0, "Dataset não pode estar vazio"
    assert cleaned['sales'].min() >= 0, "Vendas não podem ser negativas"
    
    return cleaned

@asset(deps=[cleaned_sales_data])
def sales_summary(cleaned_sales_data: pd.DataFrame) -> pd.DataFrame:
    """Cria resumo de vendas por região"""
    summary = cleaned_sales_data.groupby('region').agg({
        'sales': ['sum', 'mean', 'count']
    }).round(2)
    
    summary.columns = ['total_sales', 'avg_sales', 'transaction_count']
    return summary.reset_index()

@asset(deps=[sales_summary])
def sales_report(sales_summary: pd.DataFrame) -> str:
    """Gera relatório de vendas"""
    report = f"""
    Sales Report Generated: {pd.Timestamp.now()}
    
    Regional Performance:
    {sales_summary.to_string(index=False)}
    
    Total Revenue: ${sales_summary['total_sales'].sum():,.2f}
    """
    
    # Salvar relatório
    with open('/tmp/sales_report.txt', 'w') as f:
        f.write(report)
    
    return report

Configuração e Resources

from dagster import Definitions, resource
import psycopg2

@resource
def postgres_connection(config):
    """Resource para conexão PostgreSQL"""
    conn = psycopg2.connect(
        host=config["host"],
        database=config["database"],
        user=config["user"],
        password=config["password"]
    )
    try:
        yield conn
    finally:
        conn.close()

@resource  
def s3_client(config):
    """Resource para cliente S3"""
    import boto3
    return boto3.client(
        's3',
        aws_access_key_id=config["access_key"],
        aws_secret_access_key=config["secret_key"],
        region_name=config["region"]
    )

# Definições do projeto
defs = Definitions(
    assets=[raw_sales_data, cleaned_sales_data, sales_summary, sales_report],
    resources={
        "postgres": postgres_connection.configured({
            "host": "localhost",
            "database": "sales_db",
            "user": "user",
            "password": "password"
        }),
        "s3": s3_client.configured({
            "access_key": "your_access_key",
            "secret_key": "your_secret_key", 
            "region": "us-east-1"
        })
    }
)

Jobs e Schedules

from dagster import (
    define_asset_job, 
    ScheduleDefinition,
    AssetSelection,
    DefaultSensorContext,
    sensor,
    RunRequest
)

# Job para processar dados diários
daily_sales_job = define_asset_job(
    name="daily_sales_processing",
    selection=AssetSelection.assets(
        raw_sales_data, 
        cleaned_sales_data, 
        sales_summary
    ),
    tags={"team": "data", "env": "prod"}
)

# Schedule para execução diária
daily_schedule = ScheduleDefinition(
    job=daily_sales_job,
    cron_schedule="0 2 * * *",  # 2h da manhã
    tags={"schedule": "daily"}
)

# Sensor para detectar novos arquivos
@sensor(job=daily_sales_job)
def new_file_sensor(context: DefaultSensorContext):
    """Sensor que detecta novos arquivos de dados"""
    import os
    
    data_dir = "/data/incoming"
    if os.path.exists(data_dir):
        files = os.listdir(data_dir)
        new_files = [f for f in files if f.endswith('.csv')]
        
        if new_files:
            return RunRequest(
                run_key=f"new_files_{len(new_files)}",
                tags={"trigger": "file_sensor", "files": str(len(new_files))}
            )
    
    return None

Testing Framework

from dagster import materialize, AssetMaterialization
import pytest

def test_cleaned_sales_data():
    """Testa a limpeza de dados de vendas"""
    # Dados de teste
    test_raw_data = pd.DataFrame({
        'date': ['2024-01-01', '2024-01-02', None],
        'product': ['A', 'B', 'C'],
        'sales': [100, 150, -50],  # Valor negativo para testar validação
        'region': ['North', 'South', 'East']
    })
    
    # Executar asset com dados de teste
    with pytest.raises(AssertionError, match="Vendas não podem ser negativas"):
        result = materialize(
            [cleaned_sales_data],
            resources={"raw_sales_data": test_raw_data}
        )

def test_sales_pipeline_integration():
    """Teste de integração do pipeline completo"""
    result = materialize([
        raw_sales_data,
        cleaned_sales_data, 
        sales_summary,
        sales_report
    ])
    
    assert result.success
    
    # Verificar se todos os assets foram materializados
    materialized_assets = [
        event.asset_key for event in result.events_for_node("sales_report")
        if event.event_type_value == "ASSET_MATERIALIZATION"
    ]
    
    assert len(materialized_assets) > 0

Dagster UI e Observabilidade

O Dagster oferece uma interface web rica com recursos avançados:

  • Asset Lineage: Visualização de dependências entre dados
  • Run History: Histórico detalhado de execuções
  • Asset Catalog: Catálogo de todos os assets de dados
  • Performance Metrics: Métricas de tempo e recursos
  • Data Quality: Monitoramento de qualidade dos dados

Deployment com Docker

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

# Expor porta do Dagster UI
EXPOSE 3000

# Comando para iniciar Dagster
CMD ["dagster", "dev", "--host", "0.0.0.0", "--port", "3000"]
# docker-compose.yml
version: '3.8'

services:
  dagster:
    build: .
    ports:
      - "3000:3000"
    environment:
      - DAGSTER_POSTGRES_USER=dagster
      - DAGSTER_POSTGRES_PASSWORD=dagster
      - DAGSTER_POSTGRES_DB=dagster
    depends_on:
      - postgres
    volumes:
      - ./dagster_home:/opt/dagster/dagster_home

  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: dagster
      POSTGRES_PASSWORD: dagster
      POSTGRES_DB: dagster
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:

Integração com dbt

from dagster_dbt import DbtCliResource, dbt_assets
from dagster import AssetExecutionContext

dbt_resource = DbtCliResource(project_dir="/path/to/dbt/project")

@dbt_assets(manifest="/path/to/dbt/target/manifest.json")
def dbt_analytics_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    """Assets gerados a partir de modelos dbt"""
    yield from dbt.cli(["build"], context=context).stream()

# Combinar com assets Python
@asset(deps=[dbt_analytics_assets])
def ml_features(dbt_analytics_assets) -> pd.DataFrame:
    """Criar features para ML a partir de dados dbt"""
    # Lógica para criar features
    pass

Casos de Uso

  • Data Engineering: Pipelines ETL/ELT robustos
  • Analytics Engineering: Integração com dbt
  • ML Operations: Pipelines de ML com lineage
  • Data Quality: Monitoramento e validação
  • Data Governance: Catalogação e lineage

Vantagens

  • Foco em dados e assets
  • Sistema de tipos robusto
  • Framework de testes integrado
  • Observabilidade superior
  • Desenvolvimento local facilitado
  • Integração nativa com ferramentas modernas

Desvantagens

  • Curva de aprendizado inicial
  • Conceitos únicos (assets vs tasks)
  • Ecossistema menor
  • Overhead para casos simples
  • Documentação ainda em evolução

Dagster vs Outras Ferramentas

Aspecto Dagster Airflow Prefect
Paradigma Asset-based Task-based Flow-based
Type System Forte Fraco Médio
Testing Nativo Manual Manual
Observabilidade Excelente Boa Boa
Desenvolvimento IDE Integrado Externo Externo