🔄

Apache Samza

Open Source Stream Processing Java Stateful

Framework distribuído para processamento de streams com estado

O que é Apache Samza?

Apache Samza é um framework distribuído de processamento de streams que permite construir aplicações stateful para processar dados em tempo real. Desenvolvido originalmente no LinkedIn, Samza é projetado para trabalhar com Apache Kafka como sistema de mensageria e YARN como gerenciador de recursos, oferecendo processamento confiável e escalável de streams de dados.

História

Apache Samza foi desenvolvido no LinkedIn em 2013 para atender às necessidades de processamento de streams em larga escala da empresa. Foi open-sourced e doado para a Apache Software Foundation no mesmo ano. O projeto foi criado para superar limitações de outras soluções de stream processing da época, focando especialmente em processamento stateful e integração nativa com Kafka.

Arquitetura

Samza utiliza uma arquitetura baseada em jobs que são executados como containers YARN ou processos standalone. Cada job processa uma ou mais partições de tópicos Kafka, mantendo estado local usando key-value stores. A arquitetura é pluggável, permitindo diferentes sistemas de mensageria, gerenciadores de recursos e sistemas de armazenamento de estado.

Vantagens

  • Processamento stateful: Gerenciamento nativo de estado local com checkpointing
  • Fault tolerance: Recuperação automática de falhas com reprocessamento
  • Arquitetura pluggável: Suporte a diferentes sistemas de mensageria e storage
  • Integração com Kafka: Otimizado para trabalhar com Apache Kafka
  • Isolamento de processos: Jobs executam em containers isolados
  • Flexibilidade: Suporte a diferentes padrões de deployment

Desvantagens

  • Complexidade operacional: Requer conhecimento de YARN e Kafka
  • Comunidade menor: Menos ativa comparada a Spark ou Flink
  • Documentação limitada: Menos recursos de aprendizado disponíveis
  • Curva de aprendizado: Conceitos específicos do framework
  • Dependências: Forte acoplamento com ecossistema Hadoop/Kafka

Principais Recursos

🗃️ Stateful Processing

Gerenciamento nativo de estado local com key-value stores e checkpointing.

🔌 Pluggable Architecture

Arquitetura modular com suporte a diferentes sistemas de mensageria e storage.

🛡️ Fault Tolerance

Recuperação automática de falhas com reprocessamento e checkpointing.

📊 Kafka Integration

Integração nativa e otimizada com Apache Kafka para mensageria.

🏗️ YARN Support

Execução nativa em clusters YARN com isolamento de recursos.

⚖️ Load Balancing

Distribuição automática de carga entre instâncias de processamento.

Especificações Técnicas

Versão Atual 1.9.0
Linguagens Suportadas Java, Scala
Sistemas de Mensageria Apache Kafka (principal), pluggable
Gerenciadores de Recursos YARN, Standalone
State Stores RocksDB, In-memory, pluggable
Requisitos Mínimos Java 8+, Kafka cluster

Casos de Uso

🔄 ETL em Tempo Real

Transformação e enriquecimento de dados em tempo real entre diferentes sistemas com manutenção de estado.

📊 Stream Analytics

Análise contínua de streams de dados com agregações complexas e janelas de tempo.

🔍 Event Processing

Processamento de eventos complexos com correlação entre múltiplos streams e manutenção de contexto.

🎯 Personalization

Sistemas de personalização em tempo real baseados no comportamento do usuário e histórico.

🚨 Monitoring e Alertas

Monitoramento contínuo de métricas e geração de alertas baseados em padrões complexos.

Quando Usar Apache Samza

✅ Recomendado para:
  • Processamento stateful complexo
  • Integração forte com ecossistema Kafka
  • Necessidade de isolamento de processos
  • Workloads que requerem checkpointing frequente
  • Ambientes com YARN já estabelecido
  • Aplicações que precisam de controle granular sobre estado
  • Processamento de eventos com correlação temporal
⚠️ Considere alternativas se:
  • Você precisa de uma comunidade mais ativa
  • Quer simplicidade operacional (considere Kafka Streams)
  • Necessita de suporte a múltiplas linguagens
  • Requer processamento de baixíssima latência
  • Não usa Kafka como sistema principal de mensageria
  • Prefere soluções com mais recursos de aprendizado

Exemplos Práticos

Configuração Maven

<dependency>
    <groupId>org.apache.samza</groupId>
    <artifactId>samza-api</artifactId>
    <version>1.9.0</version>
</dependency>

<dependency>
    <groupId>org.apache.samza</groupId>
    <artifactId>samza-core_2.12</artifactId>
    <version>1.9.0</version>
</dependency>

<dependency>
    <groupId>org.apache.samza</groupId>
    <artifactId>samza-kafka_2.12</artifactId>
    <version>1.9.0</version>
</dependency>

Exemplo Básico - Stream Task

import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;

public class WordCountApplication implements StreamApplication {
    
    @Override
    public void describe(StreamApplicationDescriptor appDescriptor) {
        // Configurar sistema Kafka
        KafkaSystemDescriptor kafkaSystemDescriptor = 
            new KafkaSystemDescriptor("kafka")
                .withConsumerZkConnect("localhost:2181")
                .withProducerBootstrapServers("localhost:9092");
        
        // Input stream
        KafkaInputDescriptor<String> inputDescriptor = 
            kafkaSystemDescriptor.getInputDescriptor("input-topic", 
                new JsonSerdeV2<>(String.class));
        
        // Output stream
        KafkaOutputDescriptor<WordCount> outputDescriptor = 
            kafkaSystemDescriptor.getOutputDescriptor("word-count-output", 
                new JsonSerdeV2<>(WordCount.class));
        
        // Definir streams
        MessageStream<String> inputStream = appDescriptor.getInputStream(inputDescriptor);
        OutputStream<WordCount> outputStream = appDescriptor.getOutputStream(outputDescriptor);
        
        // Processamento
        inputStream
            .flatMap(text -> Arrays.asList(text.toLowerCase().split("\\s+")))
            .filter(word -> !word.isEmpty())
            .map(word -> new WordCount(word, 1))
            .partitionBy(WordCount::getWord, new JsonSerdeV2<>(String.class))
            .window(Windows.keyedSessionWindow(Duration.ofMinutes(5)), 
                    new JsonSerdeV2<>(WordCount.class))
            .aggregate(WordCount::new, 
                      (message, aggregate) -> {
                          aggregate.setCount(aggregate.getCount() + message.getCount());
                          return aggregate;
                      })
            .map(windowPane -> windowPane.getMessage())
            .sendTo(outputStream);
    }
    
    public static class WordCount {
        private String word;
        private int count;
        
        public WordCount() {}
        
        public WordCount(String word, int count) {
            this.word = word;
            this.count = count;
        }
        
        // getters and setters
        public String getWord() { return word; }
        public void setWord(String word) { this.word = word; }
        public int getCount() { return count; }
        public void setCount(int count) { this.count = count; }
    }
}

Exemplo com Estado - User Session Tracking

import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.context.Context;

public class SessionTrackingApplication implements StreamApplication {
    
    @Override
    public void describe(StreamApplicationDescriptor appDescriptor) {
        // Configurar state store
        KVSerde<String, UserSession> sessionSerde = 
            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserSession.class));
        
        appDescriptor.withDefaultSystem(kafkaSystemDescriptor);
        
        MessageStream<UserEvent> events = appDescriptor
            .getInputStream(inputDescriptor)
            .map(new JsonSerdeV2<>(UserEvent.class));
        
        // Processar eventos com estado
        events
            .partitionBy(UserEvent::getUserId, new StringSerde())
            .map(new SessionProcessor())
            .sendTo(outputStream);
    }
    
    private static class SessionProcessor implements MapFunction<UserEvent, SessionUpdate> {
        private KeyValueStore<String, UserSession> sessionStore;
        
        @Override
        public void init(Context context) {
            this.sessionStore = (KeyValueStore<String, UserSession>) 
                context.getTaskContext().getStore("session-store");
        }
        
        @Override
        public SessionUpdate apply(UserEvent event) {
            String userId = event.getUserId();
            UserSession session = sessionStore.get(userId);
            
            if (session == null) {
                session = new UserSession(userId, event.getTimestamp());
            }
            
            // Atualizar sessão
            session.addEvent(event);
            session.setLastActivity(event.getTimestamp());
            
            // Verificar se sessão expirou
            if (isSessionExpired(session, event.getTimestamp())) {
                SessionUpdate update = new SessionUpdate(session, "EXPIRED");
                sessionStore.delete(userId);
                return update;
            } else {
                sessionStore.put(userId, session);
                return new SessionUpdate(session, "ACTIVE");
            }
        }
        
        private boolean isSessionExpired(UserSession session, long currentTime) {
            return (currentTime - session.getLastActivity()) > Duration.ofMinutes(30).toMillis();
        }
    }
    
    public static class UserSession {
        private String userId;
        private long startTime;
        private long lastActivity;
        private List<UserEvent> events;
        
        public UserSession() {
            this.events = new ArrayList<>();
        }
        
        public UserSession(String userId, long startTime) {
            this.userId = userId;
            this.startTime = startTime;
            this.lastActivity = startTime;
            this.events = new ArrayList<>();
        }
        
        public void addEvent(UserEvent event) {
            this.events.add(event);
        }
        
        // getters and setters
    }
}

Configuração de Job

# job.properties
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=word-count-job

# Kafka system
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092

# Input stream
task.inputs=kafka.input-topic

# Output stream
streams.word-count-output.samza.system=kafka

# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# State store
stores.session-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.session-store.key.serde=string
stores.session-store.msg.serde=json

# YARN configuration
yarn.package.path=file:///path/to/samza-job-package.tgz
yarn.container.memory.mb=1024
yarn.container.cpu.cores=1

# Task configuration
task.class=com.example.WordCountTask
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory

Deployment Script

#!/bin/bash

# Build do projeto
mvn clean package

# Criar package para deployment
mkdir -p deploy/lib
cp target/*.jar deploy/lib/
cp -r config deploy/

# Criar tarball
tar -czf samza-job-package.tgz -C deploy .

# Upload para HDFS (se usando YARN)
hdfs dfs -put samza-job-package.tgz /samza/

# Executar job
./deploy/bin/run-job.sh \
    --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \
    --config-path=file://$PWD/deploy/config/job.properties

# Monitorar job
yarn application -list -appStates RUNNING | grep word-count-job

Tutoriais Relacionados

Comparações

Alternativas Similares

Tecnologia Complexidade Estado Comunidade Integração Kafka
Apache Samza Alta Excelente Pequena Nativa
Kafka Streams Baixa Excelente Grande Nativa
Apache Flink Média Excelente Grande Boa
Apache Storm Média Limitado Média Boa
Spark Streaming Média Boa Grande Boa

Quando Escolher Samza vs Alternativas

Samza vs Kafka Streams

Escolha Samza se: Você precisa de isolamento de processos, deployment em YARN, ou controle granular sobre recursos.

Escolha Kafka Streams se: Você quer simplicidade, facilidade de deployment, ou está construindo microserviços.

Samza vs Apache Flink

Escolha Samza se: Você tem forte integração com Kafka e precisa de processamento stateful simples.

Escolha Flink se: Você precisa de baixa latência, processamento de eventos complexos, ou suporte a múltiplas fontes.

Samza vs Spark Streaming

Escolha Samza se: Você precisa de processamento verdadeiramente contínuo e estado local otimizado.

Escolha Spark Streaming se: Você quer unificar batch e streaming, ou precisa de um ecossistema mais amplo.

Informações Rápidas
  • Tipo: Stream Processing Framework
  • Licença: Apache 2.0
  • Primeira Versão: 2013
  • Linguagens: Java, Scala
  • Dependências: Kafka, YARN
  • Categoria: Stateful Stream Processing