Sistema de orquestração focado em dados com forte tipagem e observabilidade de assets
Dagster é um sistema de orquestração de dados que coloca os dados no centro do desenvolvimento. Diferente de outras ferramentas que focam em tarefas, o Dagster organiza pipelines em torno de "assets de dados" - as tabelas, arquivos e modelos de ML que seus pipelines produzem e mantêm.
import pandas as pd
from dagster import asset, AssetIn, Config
from typing import List
class DatabaseConfig(Config):
connection_string: str
table_name: str
@asset
def raw_sales_data(config: DatabaseConfig) -> pd.DataFrame:
"""Extrai dados brutos de vendas do banco de dados"""
# Simulação de extração de dados
return pd.DataFrame({
'date': ['2024-01-01', '2024-01-02', '2024-01-03'],
'product': ['A', 'B', 'A'],
'sales': [100, 150, 200],
'region': ['North', 'South', 'North']
})
@asset(deps=[raw_sales_data])
def cleaned_sales_data(raw_sales_data: pd.DataFrame) -> pd.DataFrame:
"""Limpa e valida os dados de vendas"""
# Limpeza de dados
cleaned = raw_sales_data.copy()
cleaned['date'] = pd.to_datetime(cleaned['date'])
cleaned = cleaned.dropna()
# Validação
assert len(cleaned) > 0, "Dataset não pode estar vazio"
assert cleaned['sales'].min() >= 0, "Vendas não podem ser negativas"
return cleaned
@asset(deps=[cleaned_sales_data])
def sales_summary(cleaned_sales_data: pd.DataFrame) -> pd.DataFrame:
"""Cria resumo de vendas por região"""
summary = cleaned_sales_data.groupby('region').agg({
'sales': ['sum', 'mean', 'count']
}).round(2)
summary.columns = ['total_sales', 'avg_sales', 'transaction_count']
return summary.reset_index()
@asset(deps=[sales_summary])
def sales_report(sales_summary: pd.DataFrame) -> str:
"""Gera relatório de vendas"""
report = f"""
Sales Report Generated: {pd.Timestamp.now()}
Regional Performance:
{sales_summary.to_string(index=False)}
Total Revenue: ${sales_summary['total_sales'].sum():,.2f}
"""
# Salvar relatório
with open('/tmp/sales_report.txt', 'w') as f:
f.write(report)
return report
from dagster import Definitions, resource
import psycopg2
@resource
def postgres_connection(config):
"""Resource para conexão PostgreSQL"""
conn = psycopg2.connect(
host=config["host"],
database=config["database"],
user=config["user"],
password=config["password"]
)
try:
yield conn
finally:
conn.close()
@resource
def s3_client(config):
"""Resource para cliente S3"""
import boto3
return boto3.client(
's3',
aws_access_key_id=config["access_key"],
aws_secret_access_key=config["secret_key"],
region_name=config["region"]
)
# Definições do projeto
defs = Definitions(
assets=[raw_sales_data, cleaned_sales_data, sales_summary, sales_report],
resources={
"postgres": postgres_connection.configured({
"host": "localhost",
"database": "sales_db",
"user": "user",
"password": "password"
}),
"s3": s3_client.configured({
"access_key": "your_access_key",
"secret_key": "your_secret_key",
"region": "us-east-1"
})
}
)
from dagster import (
define_asset_job,
ScheduleDefinition,
AssetSelection,
DefaultSensorContext,
sensor,
RunRequest
)
# Job para processar dados diários
daily_sales_job = define_asset_job(
name="daily_sales_processing",
selection=AssetSelection.assets(
raw_sales_data,
cleaned_sales_data,
sales_summary
),
tags={"team": "data", "env": "prod"}
)
# Schedule para execução diária
daily_schedule = ScheduleDefinition(
job=daily_sales_job,
cron_schedule="0 2 * * *", # 2h da manhã
tags={"schedule": "daily"}
)
# Sensor para detectar novos arquivos
@sensor(job=daily_sales_job)
def new_file_sensor(context: DefaultSensorContext):
"""Sensor que detecta novos arquivos de dados"""
import os
data_dir = "/data/incoming"
if os.path.exists(data_dir):
files = os.listdir(data_dir)
new_files = [f for f in files if f.endswith('.csv')]
if new_files:
return RunRequest(
run_key=f"new_files_{len(new_files)}",
tags={"trigger": "file_sensor", "files": str(len(new_files))}
)
return None
from dagster import materialize, AssetMaterialization
import pytest
def test_cleaned_sales_data():
"""Testa a limpeza de dados de vendas"""
# Dados de teste
test_raw_data = pd.DataFrame({
'date': ['2024-01-01', '2024-01-02', None],
'product': ['A', 'B', 'C'],
'sales': [100, 150, -50], # Valor negativo para testar validação
'region': ['North', 'South', 'East']
})
# Executar asset com dados de teste
with pytest.raises(AssertionError, match="Vendas não podem ser negativas"):
result = materialize(
[cleaned_sales_data],
resources={"raw_sales_data": test_raw_data}
)
def test_sales_pipeline_integration():
"""Teste de integração do pipeline completo"""
result = materialize([
raw_sales_data,
cleaned_sales_data,
sales_summary,
sales_report
])
assert result.success
# Verificar se todos os assets foram materializados
materialized_assets = [
event.asset_key for event in result.events_for_node("sales_report")
if event.event_type_value == "ASSET_MATERIALIZATION"
]
assert len(materialized_assets) > 0
O Dagster oferece uma interface web rica com recursos avançados:
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# Expor porta do Dagster UI
EXPOSE 3000
# Comando para iniciar Dagster
CMD ["dagster", "dev", "--host", "0.0.0.0", "--port", "3000"]
# docker-compose.yml
version: '3.8'
services:
dagster:
build: .
ports:
- "3000:3000"
environment:
- DAGSTER_POSTGRES_USER=dagster
- DAGSTER_POSTGRES_PASSWORD=dagster
- DAGSTER_POSTGRES_DB=dagster
depends_on:
- postgres
volumes:
- ./dagster_home:/opt/dagster/dagster_home
postgres:
image: postgres:13
environment:
POSTGRES_USER: dagster
POSTGRES_PASSWORD: dagster
POSTGRES_DB: dagster
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
from dagster_dbt import DbtCliResource, dbt_assets
from dagster import AssetExecutionContext
dbt_resource = DbtCliResource(project_dir="/path/to/dbt/project")
@dbt_assets(manifest="/path/to/dbt/target/manifest.json")
def dbt_analytics_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""Assets gerados a partir de modelos dbt"""
yield from dbt.cli(["build"], context=context).stream()
# Combinar com assets Python
@asset(deps=[dbt_analytics_assets])
def ml_features(dbt_analytics_assets) -> pd.DataFrame:
"""Criar features para ML a partir de dados dbt"""
# Lógica para criar features
pass
Aspecto | Dagster | Airflow | Prefect |
---|---|---|---|
Paradigma | Asset-based | Task-based | Flow-based |
Type System | Forte | Fraco | Médio |
Testing | Nativo | Manual | Manual |
Observabilidade | Excelente | Boa | Boa |
Desenvolvimento | IDE Integrado | Externo | Externo |