Prefect

Open Source Moderno Python Developer-Friendly

Plataforma moderna de orquestração com foco em simplicidade e observabilidade

Visão Geral

Prefect é uma plataforma moderna de orquestração de workflows que prioriza a experiência do desenvolvedor. Criado pelos fundadores originais do Airflow, o Prefect 2.0 representa uma abordagem completamente nova para orquestração, com foco em simplicidade, observabilidade e facilidade de uso.

Principais Características

  • API-First: Arquitetura baseada em APIs REST
  • Hybrid Execution: Execução local, remota ou híbrida
  • Dynamic Workflows: Workflows dinâmicos e condicionais
  • Observabilidade: Monitoramento em tempo real
  • Simplicidade: Configuração mínima necessária
  • Cloud-Native: Projetado para ambientes modernos

Arquitetura Prefect 2.0

O Prefect 2.0 introduziu uma arquitetura simplificada:

  • Prefect Server: API central e interface web
  • Prefect Agent: Executa workflows em ambientes específicos
  • Work Pools: Gerenciam recursos de execução
  • Deployments: Configurações de execução
  • Flows: Definições de workflow

Exemplo de Flow

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_data(url: str):
    """Extrai dados de uma API"""
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()

@task
def transform_data(raw_data: dict):
    """Transforma os dados extraídos"""
    # Lógica de transformação
    transformed = {
        'processed_at': '2024-01-01',
        'records': len(raw_data.get('items', [])),
        'data': raw_data
    }
    return transformed

@task
def load_data(data: dict, destination: str):
    """Carrega dados no destino"""
    print(f"Carregando {data['records']} registros em {destination}")
    # Lógica de carregamento
    return f"Loaded {data['records']} records successfully"

@flow(name="ETL Pipeline", description="Pipeline ETL com Prefect")
def etl_pipeline(api_url: str, destination: str = "warehouse"):
    """Flow principal do pipeline ETL"""
    
    # Extração
    raw_data = extract_data(api_url)
    
    # Transformação
    processed_data = transform_data(raw_data)
    
    # Carregamento
    result = load_data(processed_data, destination)
    
    return result

# Execução local
if __name__ == "__main__":
    result = etl_pipeline(
        api_url="https://api.example.com/data",
        destination="data_warehouse"
    )

Deployment e Agendamento

# deployment.py
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

@flow
def my_flow():
    print("Hello from Prefect!")

# Criar deployment
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="daily-etl",
    schedule=CronSchedule(cron="0 2 * * *"),  # Diário às 2h
    work_pool_name="default-agent-pool",
    tags=["etl", "production"]
)

if __name__ == "__main__":
    deployment.apply()

Configuração com Docker

# docker-compose.yml
version: '3.8'

services:
  prefect-server:
    image: prefecthq/prefect:2.14-python3.11
    command: prefect server start --host 0.0.0.0
    environment:
      - PREFECT_UI_URL=http://localhost:4200/api
      - PREFECT_API_URL=http://localhost:4200/api
      - PREFECT_SERVER_API_HOST=0.0.0.0
    ports:
      - "4200:4200"
    volumes:
      - prefect_data:/root/.prefect

  prefect-agent:
    image: prefecthq/prefect:2.14-python3.11
    command: prefect agent start --pool default-agent-pool
    environment:
      - PREFECT_API_URL=http://prefect-server:4200/api
    depends_on:
      - prefect-server
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - ./flows:/opt/prefect/flows

volumes:
  prefect_data:

Recursos Avançados

Conditional Logic

from prefect import flow, task

@task
def check_data_quality(data):
    return len(data) > 100  # Retorna True se dados suficientes

@task
def process_full_dataset(data):
    print("Processando dataset completo")
    return "full_processing_complete"

@task
def process_sample(data):
    print("Processando amostra dos dados")
    return "sample_processing_complete"

@flow
def conditional_processing(data):
    quality_check = check_data_quality(data)
    
    if quality_check:
        result = process_full_dataset(data)
    else:
        result = process_sample(data)
    
    return result

Error Handling

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(minutes=30)
)
def unreliable_task():
    import random
    if random.random() < 0.7:  # 70% chance de falha
        raise Exception("Task failed!")
    return "Success!"

@flow
def resilient_flow():
    try:
        result = unreliable_task()
        return result
    except Exception as e:
        print(f"Flow failed: {e}")
        return "Flow completed with errors"

Integração com Kubernetes

from prefect import flow
from prefect_kubernetes.jobs import KubernetesJob

@flow
def kubernetes_flow():
    job = KubernetesJob(
        image="python:3.11-slim",
        command=["python", "-c", "print('Hello from Kubernetes!')"],
        namespace="prefect",
        labels={"app": "prefect-job"}
    )
    
    return job.submit()

Monitoramento e Observabilidade

  • Real-time Dashboard: Interface web moderna
  • Flow Run Tracking: Histórico detalhado de execuções
  • Logs Centralizados: Logs estruturados e pesquisáveis
  • Metrics: Métricas de performance e saúde
  • Notifications: Alertas customizáveis

Casos de Uso

  • Data Engineering: Pipelines ETL/ELT modernos
  • ML Operations: Treinamento e deploy de modelos
  • API Orchestration: Coordenação de microserviços
  • Business Automation: Automação de processos
  • Data Quality: Validação e monitoramento

Vantagens

  • Experiência de desenvolvedor superior
  • Configuração mínima necessária
  • Execução híbrida (local/remota)
  • Interface moderna e intuitiva
  • Forte observabilidade nativa
  • Suporte nativo ao Python moderno

Desvantagens

  • Ecossistema menor que Airflow
  • Menos operadores pré-construídos
  • Comunidade ainda em crescimento
  • Mudanças significativas entre versões
  • Documentação em evolução

Prefect vs Airflow

Aspecto Prefect Airflow
Configuração Mínima Complexa
Execução Híbrida Centralizada
Interface Moderna Funcional
Ecossistema Crescendo Maduro
Curva de Aprendizado Baixa Média-Alta