Plataforma moderna de orquestração com foco em simplicidade e observabilidade
Prefect é uma plataforma moderna de orquestração de workflows que prioriza a experiência do desenvolvedor. Criado pelos fundadores originais do Airflow, o Prefect 2.0 representa uma abordagem completamente nova para orquestração, com foco em simplicidade, observabilidade e facilidade de uso.
O Prefect 2.0 introduziu uma arquitetura simplificada:
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_data(url: str):
"""Extrai dados de uma API"""
response = httpx.get(url)
response.raise_for_status()
return response.json()
@task
def transform_data(raw_data: dict):
"""Transforma os dados extraídos"""
# Lógica de transformação
transformed = {
'processed_at': '2024-01-01',
'records': len(raw_data.get('items', [])),
'data': raw_data
}
return transformed
@task
def load_data(data: dict, destination: str):
"""Carrega dados no destino"""
print(f"Carregando {data['records']} registros em {destination}")
# Lógica de carregamento
return f"Loaded {data['records']} records successfully"
@flow(name="ETL Pipeline", description="Pipeline ETL com Prefect")
def etl_pipeline(api_url: str, destination: str = "warehouse"):
"""Flow principal do pipeline ETL"""
# Extração
raw_data = extract_data(api_url)
# Transformação
processed_data = transform_data(raw_data)
# Carregamento
result = load_data(processed_data, destination)
return result
# Execução local
if __name__ == "__main__":
result = etl_pipeline(
api_url="https://api.example.com/data",
destination="data_warehouse"
)
# deployment.py
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
@flow
def my_flow():
print("Hello from Prefect!")
# Criar deployment
deployment = Deployment.build_from_flow(
flow=my_flow,
name="daily-etl",
schedule=CronSchedule(cron="0 2 * * *"), # Diário às 2h
work_pool_name="default-agent-pool",
tags=["etl", "production"]
)
if __name__ == "__main__":
deployment.apply()
# docker-compose.yml
version: '3.8'
services:
prefect-server:
image: prefecthq/prefect:2.14-python3.11
command: prefect server start --host 0.0.0.0
environment:
- PREFECT_UI_URL=http://localhost:4200/api
- PREFECT_API_URL=http://localhost:4200/api
- PREFECT_SERVER_API_HOST=0.0.0.0
ports:
- "4200:4200"
volumes:
- prefect_data:/root/.prefect
prefect-agent:
image: prefecthq/prefect:2.14-python3.11
command: prefect agent start --pool default-agent-pool
environment:
- PREFECT_API_URL=http://prefect-server:4200/api
depends_on:
- prefect-server
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./flows:/opt/prefect/flows
volumes:
prefect_data:
from prefect import flow, task
@task
def check_data_quality(data):
return len(data) > 100 # Retorna True se dados suficientes
@task
def process_full_dataset(data):
print("Processando dataset completo")
return "full_processing_complete"
@task
def process_sample(data):
print("Processando amostra dos dados")
return "sample_processing_complete"
@flow
def conditional_processing(data):
quality_check = check_data_quality(data)
if quality_check:
result = process_full_dataset(data)
else:
result = process_sample(data)
return result
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(minutes=30)
)
def unreliable_task():
import random
if random.random() < 0.7: # 70% chance de falha
raise Exception("Task failed!")
return "Success!"
@flow
def resilient_flow():
try:
result = unreliable_task()
return result
except Exception as e:
print(f"Flow failed: {e}")
return "Flow completed with errors"
from prefect import flow
from prefect_kubernetes.jobs import KubernetesJob
@flow
def kubernetes_flow():
job = KubernetesJob(
image="python:3.11-slim",
command=["python", "-c", "print('Hello from Kubernetes!')"],
namespace="prefect",
labels={"app": "prefect-job"}
)
return job.submit()
Aspecto | Prefect | Airflow |
---|---|---|
Configuração | Mínima | Complexa |
Execução | Híbrida | Centralizada |
Interface | Moderna | Funcional |
Ecossistema | Crescendo | Maduro |
Curva de Aprendizado | Baixa | Média-Alta |