Computação paralela com task scheduling para Python
Dask é uma biblioteca Python flexível para computação paralela. Ela fornece estruturas de dados paralelas que mimetizam NumPy arrays, Pandas DataFrames e listas Python, mas podem operar em datasets que não cabem na memória e usar múltiplos cores. Dask é composto por duas partes: "Big Data" collections como Dask Array e Dask DataFrame que estendem interfaces comuns como NumPy e Pandas para ambientes distribuídos, e "Dynamic task scheduling" otimizado para computação interativa.
Pandas paralelo para datasets maiores que a memória.
NumPy paralelo para arrays multi-dimensionais grandes.
Processamento paralelo de dados semi-estruturados.
Paralelização de código Python arbitrário.
# Instalação básica
pip install dask
# Com dependências completas
pip install "dask[complete]"
# Com suporte distribuído
pip install "dask[distributed]"
# Via conda
conda install dask
import dask.dataframe as dd
import pandas as pd
# Ler múltiplos arquivos CSV
df = dd.read_csv('data/*.csv')
# Operações familiares do pandas
result = df.groupby('category').value.mean()
# Computar resultado (lazy evaluation)
computed_result = result.compute()
# Exemplo com dados grandes
# Criar DataFrame grande
large_df = dd.from_pandas(
pd.DataFrame({
'x': range(1000000),
'y': range(1000000)
}),
npartitions=10
)
# Operações paralelas
filtered = large_df[large_df.x > 500000]
grouped = filtered.groupby('y').x.sum()
result = grouped.compute()
print(f"Resultado: {result.head()}")
import dask.array as da
import numpy as np
# Criar array Dask grande
x = da.random.random((10000, 10000), chunks=(1000, 1000))
# Operações NumPy familiares
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
# Computar resultado
result = z.compute()
# Exemplo com dados reais
# Carregar dados de múltiplos arquivos
arrays = [da.from_array(np.load(f'data_{i}.npy'), chunks=(1000, 1000))
for i in range(10)]
# Concatenar arrays
big_array = da.concatenate(arrays, axis=0)
# Operações matemáticas
mean_values = big_array.mean(axis=1)
std_values = big_array.std(axis=1)
# Computar em paralelo
results = da.compute(mean_values, std_values)
import dask
from dask import delayed
import time
# Função que simula processamento pesado
@delayed
def process_data(filename):
# Simular processamento
time.sleep(1)
return f"Processed {filename}"
@delayed
def aggregate_results(results):
return f"Aggregated {len(results)} results"
# Criar grafo de computação
files = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt']
processed = [process_data(f) for f in files]
final_result = aggregate_results(processed)
# Executar em paralelo
result = final_result.compute()
print(result)
# Exemplo com machine learning
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
@delayed
def train_model(X_train, y_train, n_estimators):
model = RandomForestClassifier(n_estimators=n_estimators)
model.fit(X_train, y_train)
return model
@delayed
def evaluate_model(model, X_test, y_test):
return model.score(X_test, y_test)
# Treinar múltiplos modelos em paralelo
models = [train_model(X_train, y_train, n) for n in [10, 50, 100]]
scores = [evaluate_model(model, X_test, y_test) for model in models]
# Computar todos os scores
final_scores = dask.compute(*scores)
from dask.distributed import Client, as_completed
import dask.dataframe as dd
# Conectar ao cluster Dask
client = Client('scheduler-address:8786')
# Ou iniciar cluster local
# client = Client(processes=False) # threads
# client = Client() # processes
# Usar Dask DataFrame distribuído
df = dd.read_parquet('s3://bucket/data/*.parquet')
# Operações distribuídas
result = df.groupby('category').agg({
'value': ['mean', 'std', 'count'],
'amount': 'sum'
}).compute()
# Submeter tarefas customizadas
def expensive_function(x):
return x ** 2 + x ** 3
futures = client.map(expensive_function, range(1000))
# Coletar resultados conforme ficam prontos
results = []
for future in as_completed(futures):
result = future.result()
results.append(result)
client.close()