🐍

Dask

Open Source Python Data Science Processamento

Computação paralela com task scheduling para Python

O que é Dask?

Dask é uma biblioteca Python flexível para computação paralela. Ela fornece estruturas de dados paralelas que mimetizam NumPy arrays, Pandas DataFrames e listas Python, mas podem operar em datasets que não cabem na memória e usar múltiplos cores. Dask é composto por duas partes: "Big Data" collections como Dask Array e Dask DataFrame que estendem interfaces comuns como NumPy e Pandas para ambientes distribuídos, e "Dynamic task scheduling" otimizado para computação interativa.

Componentes Principais

  • Dask Array: Paralleliza NumPy arrays
  • Dask DataFrame: Paralleliza Pandas DataFrames
  • Dask Bag: Paralleliza listas Python
  • Dask Delayed: Paralleliza código Python customizado
  • Dask Futures: Interface de baixo nível para task scheduling

Vantagens

  • Familiar: Usa APIs conhecidas (pandas, NumPy)
  • Flexível: Suporta workloads diversos
  • Nativo Python: Integração perfeita com ecossistema Python
  • Escalável: Do laptop ao cluster
  • Lazy Evaluation: Otimizações automáticas

Casos de Uso

  • Análise de datasets grandes que não cabem na memória
  • Paralelização de workloads de machine learning
  • Processamento de arrays NumPy grandes
  • ETL com Pandas em larga escala
  • Computação científica distribuída

Principais Recursos

📊 Dask DataFrame

Pandas paralelo para datasets maiores que a memória.

🔢 Dask Array

NumPy paralelo para arrays multi-dimensionais grandes.

🎒 Dask Bag

Processamento paralelo de dados semi-estruturados.

⏰ Dask Delayed

Paralelização de código Python arbitrário.

Exemplos Práticos

Instalação

# Instalação básica
pip install dask

# Com dependências completas
pip install "dask[complete]"

# Com suporte distribuído
pip install "dask[distributed]"

# Via conda
conda install dask

Dask DataFrame Example

import dask.dataframe as dd
import pandas as pd

# Ler múltiplos arquivos CSV
df = dd.read_csv('data/*.csv')

# Operações familiares do pandas
result = df.groupby('category').value.mean()

# Computar resultado (lazy evaluation)
computed_result = result.compute()

# Exemplo com dados grandes
# Criar DataFrame grande
large_df = dd.from_pandas(
    pd.DataFrame({
        'x': range(1000000),
        'y': range(1000000)
    }), 
    npartitions=10
)

# Operações paralelas
filtered = large_df[large_df.x > 500000]
grouped = filtered.groupby('y').x.sum()
result = grouped.compute()

print(f"Resultado: {result.head()}")

Dask Array Example

import dask.array as da
import numpy as np

# Criar array Dask grande
x = da.random.random((10000, 10000), chunks=(1000, 1000))

# Operações NumPy familiares
y = x + x.T
z = y[::2, 5000:].mean(axis=1)

# Computar resultado
result = z.compute()

# Exemplo com dados reais
# Carregar dados de múltiplos arquivos
arrays = [da.from_array(np.load(f'data_{i}.npy'), chunks=(1000, 1000)) 
          for i in range(10)]

# Concatenar arrays
big_array = da.concatenate(arrays, axis=0)

# Operações matemáticas
mean_values = big_array.mean(axis=1)
std_values = big_array.std(axis=1)

# Computar em paralelo
results = da.compute(mean_values, std_values)

Dask Delayed Example

import dask
from dask import delayed
import time

# Função que simula processamento pesado
@delayed
def process_data(filename):
    # Simular processamento
    time.sleep(1)
    return f"Processed {filename}"

@delayed
def aggregate_results(results):
    return f"Aggregated {len(results)} results"

# Criar grafo de computação
files = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt']
processed = [process_data(f) for f in files]
final_result = aggregate_results(processed)

# Executar em paralelo
result = final_result.compute()
print(result)

# Exemplo com machine learning
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

@delayed
def train_model(X_train, y_train, n_estimators):
    model = RandomForestClassifier(n_estimators=n_estimators)
    model.fit(X_train, y_train)
    return model

@delayed
def evaluate_model(model, X_test, y_test):
    return model.score(X_test, y_test)

# Treinar múltiplos modelos em paralelo
models = [train_model(X_train, y_train, n) for n in [10, 50, 100]]
scores = [evaluate_model(model, X_test, y_test) for model in models]

# Computar todos os scores
final_scores = dask.compute(*scores)

Dask Distributed Example

from dask.distributed import Client, as_completed
import dask.dataframe as dd

# Conectar ao cluster Dask
client = Client('scheduler-address:8786')

# Ou iniciar cluster local
# client = Client(processes=False)  # threads
# client = Client()  # processes

# Usar Dask DataFrame distribuído
df = dd.read_parquet('s3://bucket/data/*.parquet')

# Operações distribuídas
result = df.groupby('category').agg({
    'value': ['mean', 'std', 'count'],
    'amount': 'sum'
}).compute()

# Submeter tarefas customizadas
def expensive_function(x):
    return x ** 2 + x ** 3

futures = client.map(expensive_function, range(1000))

# Coletar resultados conforme ficam prontos
results = []
for future in as_completed(futures):
    result = future.result()
    results.append(result)

client.close()
📊 Quick Facts
Versão Atual: 2024.1.0
Licença: BSD
Linguagem: Python
Primeira Release: 2015
🔄 Tecnologias Relacionadas
Ray Apache Spark Pandas NumPy
💼 Conteúdo Patrocinado

Espaço disponível para parceiros

Anuncie Aqui