Spark MLlib é a biblioteca de machine learning do Apache Spark, projetada para tornar o machine learning prático e escalável. Oferece algoritmos e utilitários de alta qualidade que funcionam em clusters distribuídos.
MLlib aproveita a arquitetura in-memory do Spark para acelerar algoritmos de ML, oferecendo APIs em Scala, Java, Python e R. É uma das bibliotecas de ML distribuído mais maduras e amplamente utilizadas.
API moderna baseada em DataFrames (spark.ml)
API legada baseada em RDDs (spark.mllib)
Workflows de ML composáveis e reutilizáveis
Transformadores para preparação de dados
Exemplo de pipeline de ML com Spark MLlib (PySpark):
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
# Criar SparkSession
spark = SparkSession.builder \
.appName("MLlib Example") \
.getOrCreate()
# Carregar dados (exemplo com dataset Iris)
data = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("iris.csv")
# Preparar features
feature_cols = ["sepal_length", "sepal_width", "petal_length", "petal_width"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Indexar labels
label_indexer = StringIndexer(inputCol="species", outputCol="label")
# Normalizar features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# Criar classificador
rf = RandomForestClassifier(
featuresCol="scaled_features",
labelCol="label",
numTrees=100,
maxDepth=5,
seed=42
)
# Criar pipeline
pipeline = Pipeline(stages=[assembler, label_indexer, scaler, rf])
# Dividir dados
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
# Treinar modelo
model = pipeline.fit(train_data)
# Fazer predições
predictions = model.transform(test_data)
# Avaliar modelo
evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.4f}")
# Mostrar algumas predições
predictions.select("features", "label", "prediction", "probability").show(10)
# Feature importance
rf_model = model.stages[-1] # último estágio é o RandomForest
feature_importance = rf_model.featureImportances.toArray()
print("\nFeature Importance:")
for i, importance in enumerate(feature_importance):
print(f"{feature_cols[i]}: {importance:.4f}")
# Salvar modelo
model.write().overwrite().save("iris_model")
# Exemplo de uso do modelo salvo
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("iris_model")
# Fazer predições com modelo carregado
new_predictions = loaded_model.transform(test_data)
spark.stop()