Framework distribuído para processamento de streams com estado
Apache Samza é um framework distribuído de processamento de streams que permite construir aplicações stateful para processar dados em tempo real. Desenvolvido originalmente no LinkedIn, Samza é projetado para trabalhar com Apache Kafka como sistema de mensageria e YARN como gerenciador de recursos, oferecendo processamento confiável e escalável de streams de dados.
Apache Samza foi desenvolvido no LinkedIn em 2013 para atender às necessidades de processamento de streams em larga escala da empresa. Foi open-sourced e doado para a Apache Software Foundation no mesmo ano. O projeto foi criado para superar limitações de outras soluções de stream processing da época, focando especialmente em processamento stateful e integração nativa com Kafka.
Samza utiliza uma arquitetura baseada em jobs que são executados como containers YARN ou processos standalone. Cada job processa uma ou mais partições de tópicos Kafka, mantendo estado local usando key-value stores. A arquitetura é pluggável, permitindo diferentes sistemas de mensageria, gerenciadores de recursos e sistemas de armazenamento de estado.
Gerenciamento nativo de estado local com key-value stores e checkpointing.
Arquitetura modular com suporte a diferentes sistemas de mensageria e storage.
Recuperação automática de falhas com reprocessamento e checkpointing.
Integração nativa e otimizada com Apache Kafka para mensageria.
Execução nativa em clusters YARN com isolamento de recursos.
Distribuição automática de carga entre instâncias de processamento.
Versão Atual | 1.9.0 |
Linguagens Suportadas | Java, Scala |
Sistemas de Mensageria | Apache Kafka (principal), pluggable |
Gerenciadores de Recursos | YARN, Standalone |
State Stores | RocksDB, In-memory, pluggable |
Requisitos Mínimos | Java 8+, Kafka cluster |
Transformação e enriquecimento de dados em tempo real entre diferentes sistemas com manutenção de estado.
Análise contínua de streams de dados com agregações complexas e janelas de tempo.
Processamento de eventos complexos com correlação entre múltiplos streams e manutenção de contexto.
Sistemas de personalização em tempo real baseados no comportamento do usuário e histórico.
Monitoramento contínuo de métricas e geração de alertas baseados em padrões complexos.
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-api</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.12</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kafka_2.12</artifactId>
<version>1.9.0</version>
</dependency>
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
public class WordCountApplication implements StreamApplication {
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
// Configurar sistema Kafka
KafkaSystemDescriptor kafkaSystemDescriptor =
new KafkaSystemDescriptor("kafka")
.withConsumerZkConnect("localhost:2181")
.withProducerBootstrapServers("localhost:9092");
// Input stream
KafkaInputDescriptor<String> inputDescriptor =
kafkaSystemDescriptor.getInputDescriptor("input-topic",
new JsonSerdeV2<>(String.class));
// Output stream
KafkaOutputDescriptor<WordCount> outputDescriptor =
kafkaSystemDescriptor.getOutputDescriptor("word-count-output",
new JsonSerdeV2<>(WordCount.class));
// Definir streams
MessageStream<String> inputStream = appDescriptor.getInputStream(inputDescriptor);
OutputStream<WordCount> outputStream = appDescriptor.getOutputStream(outputDescriptor);
// Processamento
inputStream
.flatMap(text -> Arrays.asList(text.toLowerCase().split("\\s+")))
.filter(word -> !word.isEmpty())
.map(word -> new WordCount(word, 1))
.partitionBy(WordCount::getWord, new JsonSerdeV2<>(String.class))
.window(Windows.keyedSessionWindow(Duration.ofMinutes(5)),
new JsonSerdeV2<>(WordCount.class))
.aggregate(WordCount::new,
(message, aggregate) -> {
aggregate.setCount(aggregate.getCount() + message.getCount());
return aggregate;
})
.map(windowPane -> windowPane.getMessage())
.sendTo(outputStream);
}
public static class WordCount {
private String word;
private int count;
public WordCount() {}
public WordCount(String word, int count) {
this.word = word;
this.count = count;
}
// getters and setters
public String getWord() { return word; }
public void setWord(String word) { this.word = word; }
public int getCount() { return count; }
public void setCount(int count) { this.count = count; }
}
}
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.context.Context;
public class SessionTrackingApplication implements StreamApplication {
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
// Configurar state store
KVSerde<String, UserSession> sessionSerde =
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserSession.class));
appDescriptor.withDefaultSystem(kafkaSystemDescriptor);
MessageStream<UserEvent> events = appDescriptor
.getInputStream(inputDescriptor)
.map(new JsonSerdeV2<>(UserEvent.class));
// Processar eventos com estado
events
.partitionBy(UserEvent::getUserId, new StringSerde())
.map(new SessionProcessor())
.sendTo(outputStream);
}
private static class SessionProcessor implements MapFunction<UserEvent, SessionUpdate> {
private KeyValueStore<String, UserSession> sessionStore;
@Override
public void init(Context context) {
this.sessionStore = (KeyValueStore<String, UserSession>)
context.getTaskContext().getStore("session-store");
}
@Override
public SessionUpdate apply(UserEvent event) {
String userId = event.getUserId();
UserSession session = sessionStore.get(userId);
if (session == null) {
session = new UserSession(userId, event.getTimestamp());
}
// Atualizar sessão
session.addEvent(event);
session.setLastActivity(event.getTimestamp());
// Verificar se sessão expirou
if (isSessionExpired(session, event.getTimestamp())) {
SessionUpdate update = new SessionUpdate(session, "EXPIRED");
sessionStore.delete(userId);
return update;
} else {
sessionStore.put(userId, session);
return new SessionUpdate(session, "ACTIVE");
}
}
private boolean isSessionExpired(UserSession session, long currentTime) {
return (currentTime - session.getLastActivity()) > Duration.ofMinutes(30).toMillis();
}
}
public static class UserSession {
private String userId;
private long startTime;
private long lastActivity;
private List<UserEvent> events;
public UserSession() {
this.events = new ArrayList<>();
}
public UserSession(String userId, long startTime) {
this.userId = userId;
this.startTime = startTime;
this.lastActivity = startTime;
this.events = new ArrayList<>();
}
public void addEvent(UserEvent event) {
this.events.add(event);
}
// getters and setters
}
}
# job.properties
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=word-count-job
# Kafka system
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
# Input stream
task.inputs=kafka.input-topic
# Output stream
streams.word-count-output.samza.system=kafka
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
# State store
stores.session-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.session-store.key.serde=string
stores.session-store.msg.serde=json
# YARN configuration
yarn.package.path=file:///path/to/samza-job-package.tgz
yarn.container.memory.mb=1024
yarn.container.cpu.cores=1
# Task configuration
task.class=com.example.WordCountTask
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
#!/bin/bash
# Build do projeto
mvn clean package
# Criar package para deployment
mkdir -p deploy/lib
cp target/*.jar deploy/lib/
cp -r config deploy/
# Criar tarball
tar -czf samza-job-package.tgz -C deploy .
# Upload para HDFS (se usando YARN)
hdfs dfs -put samza-job-package.tgz /samza/
# Executar job
./deploy/bin/run-job.sh \
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \
--config-path=file://$PWD/deploy/config/job.properties
# Monitorar job
yarn application -list -appStates RUNNING | grep word-count-job
Tecnologia | Complexidade | Estado | Comunidade | Integração Kafka |
---|---|---|---|---|
Apache Samza | Alta | Excelente | Pequena | Nativa |
Kafka Streams | Baixa | Excelente | Grande | Nativa |
Apache Flink | Média | Excelente | Grande | Boa |
Apache Storm | Média | Limitado | Média | Boa |
Spark Streaming | Média | Boa | Grande | Boa |
Escolha Samza se: Você precisa de isolamento de processos, deployment em YARN, ou controle granular sobre recursos.
Escolha Kafka Streams se: Você quer simplicidade, facilidade de deployment, ou está construindo microserviços.
Escolha Samza se: Você tem forte integração com Kafka e precisa de processamento stateful simples.
Escolha Flink se: Você precisa de baixa latência, processamento de eventos complexos, ou suporte a múltiplas fontes.
Escolha Samza se: Você precisa de processamento verdadeiramente contínuo e estado local otimizado.
Escolha Spark Streaming se: Você quer unificar batch e streaming, ou precisa de um ecossistema mais amplo.