🌊

Kafka Streams

Open Source 2024 Trending Stream Processing Java

Biblioteca cliente para construir aplicações de stream processing com Kafka

O que é Kafka Streams?

Kafka Streams é uma biblioteca cliente para construir aplicações e microserviços críticos em tempo real, onde os dados de entrada e saída são armazenados em clusters Kafka. Combina a simplicidade de escrever e deployar aplicações Java e Scala padrão no lado cliente com os benefícios da tecnologia de cluster server-side do Kafka.

História

Kafka Streams foi introduzido como parte do Apache Kafka 0.10.0 em maio de 2016. Foi desenvolvido pela Confluent e pela comunidade Apache Kafka para fornecer uma alternativa mais simples aos frameworks de stream processing como Apache Storm e Apache Spark Streaming, especificamente otimizada para trabalhar com Kafka.

Arquitetura

Kafka Streams utiliza uma arquitetura de processamento distribuído onde cada aplicação é composta por uma ou mais stream threads. Cada thread processa uma ou mais partições de tópicos Kafka. A biblioteca fornece abstrações de alto nível como KStream (stream de registros) e KTable (changelog stream) para facilitar o desenvolvimento.

Vantagens

  • Simplicidade: Biblioteca Java/Scala simples, sem necessidade de cluster separado
  • Exactly-once semantics: Garantias de processamento exactly-once
  • Fault tolerance: Recuperação automática de falhas
  • Escalabilidade: Escala horizontalmente adicionando instâncias
  • Stateful processing: Suporte nativo a operações com estado
  • Interactive queries: Consulta do estado local da aplicação

Desvantagens

  • Limitado ao Kafka: Funciona apenas com Apache Kafka
  • Linguagens limitadas: Apenas Java e Scala nativamente
  • Complexidade de debugging: Debugging distribuído pode ser desafiador
  • Overhead de serialização: Performance pode ser impactada por serialização
  • Curva de aprendizado: Conceitos de stream processing podem ser complexos

Principais Recursos

🔄 Stream Processing

Processamento contínuo de streams de dados com baixa latência.

✅ Exactly-Once

Garantias de processamento exactly-once para aplicações críticas.

🗃️ Stateful Operations

Suporte a operações com estado usando state stores locais.

🪟 Windowing

Operações em janelas de tempo para agregações temporais.

🔍 Interactive Queries

Consulta do estado local da aplicação em tempo real.

⚡ High Performance

Processamento de milhões de mensagens por segundo.

Especificações Técnicas

Versão Atual 3.6.0
Linguagens Suportadas Java, Scala
Dependências Apache Kafka cluster
Processamento Stream processing, Stateful, Stateless
Garantias At-least-once, Exactly-once
Requisitos Mínimos Java 8+, Kafka 2.8+

Casos de Uso

🔍 Real-time Analytics

Análise em tempo real de streams de dados para dashboards, métricas e alertas instantâneos.

🛡️ Fraud Detection

Detecção de fraudes em tempo real analisando padrões de transações e comportamentos suspeitos.

🔄 Data Transformation

Transformação e enriquecimento de dados em tempo real entre diferentes sistemas.

📊 Event Sourcing

Implementação de arquiteturas event-sourced com processamento de eventos em tempo real.

🎯 Personalization

Personalização em tempo real baseada no comportamento do usuário e histórico de interações.

Quando Usar Kafka Streams

✅ Recomendado para:
  • Aplicações que já usam Apache Kafka
  • Microserviços que processam streams
  • Necessidade de exactly-once processing
  • Processamento com baixa latência
  • Aplicações Java/Scala
  • Operações stateful em streams
  • Integração simples com infraestrutura existente
⚠️ Considere alternativas se:
  • Você não usa Kafka como message broker
  • Precisa de suporte a outras linguagens além de Java/Scala
  • Requer processamento batch complexo
  • Necessita de conectores para muitas fontes de dados
  • Workloads muito complexos de machine learning

Exemplos Práticos

Configuração Maven

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.6.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

Exemplo Básico - Word Count

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;

public class WordCountExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        
        // Input stream
        KStream<String, String> textLines = builder.stream("text-input");
        
        // Processing pipeline
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count();
        
        // Output stream
        wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        // Shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Exemplo Avançado - Fraud Detection

import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

public class FraudDetectionExample {
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Input stream de transações
        KStream<String, Transaction> transactions = builder.stream("transactions");
        
        // Detectar múltiplas transações do mesmo usuário em janela de tempo
        KTable<Windowed<String>, Long> transactionCounts = transactions
            .groupBy((key, transaction) -> transaction.getUserId())
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .count();
        
        // Filtrar usuários com muitas transações (possível fraude)
        KStream<Windowed<String>, Long> suspiciousActivity = transactionCounts
            .toStream()
            .filter((windowedUserId, count) -> count > 10);
        
        // Enriquecer com dados do usuário
        KTable<String, User> users = builder.table("users");
        
        KStream<String, FraudAlert> fraudAlerts = transactions
            .join(users, 
                  (transaction, user) -> new TransactionWithUser(transaction, user),
                  Joined.with(Serdes.String(), transactionSerde, userSerde))
            .filter((key, txWithUser) -> isSuspicious(txWithUser))
            .mapValues(txWithUser -> new FraudAlert(
                txWithUser.getTransaction(),
                txWithUser.getUser(),
                "High frequency transactions detected"
            ));
        
        // Output para tópico de alertas
        fraudAlerts.to("fraud-alerts");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), getProperties());
        streams.start();
    }
    
    private static boolean isSuspicious(TransactionWithUser txWithUser) {
        // Lógica de detecção de fraude
        return txWithUser.getTransaction().getAmount() > 10000 ||
               txWithUser.getUser().getRiskScore() > 0.8;
    }
}

Exemplo com Windowing e Aggregations

import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import java.time.Duration;

public class RealTimeAnalyticsExample {
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Stream de eventos de cliques
        KStream<String, ClickEvent> clicks = builder.stream("click-events");
        
        // Agregações em janelas de tempo
        KTable<Windowed<String>, Long> clicksPerPage = clicks
            .groupBy((key, click) -> click.getPageId())
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
            .count();
        
        // Calcular taxa de cliques por sessão
        KTable<Windowed<String>, Double> clickRateBySession = clicks
            .groupBy((key, click) -> click.getSessionId())
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .aggregate(
                () -> new ClickStats(0, 0),
                (key, click, stats) -> {
                    stats.incrementClicks();
                    if (click.isConversion()) {
                        stats.incrementConversions();
                    }
                    return stats;
                },
                Materialized.with(Serdes.String(), clickStatsSerde)
            )
            .mapValues(stats -> stats.getConversionRate());
        
        // Stream de alertas para páginas com baixa performance
        KStream<String, Alert> performanceAlerts = clickRateBySession
            .toStream()
            .filter((windowedSession, rate) -> rate < 0.02)
            .map((windowedSession, rate) -> KeyValue.pair(
                windowedSession.key(),
                new Alert("Low conversion rate", rate, System.currentTimeMillis())
            ));
        
        // Output streams
        clicksPerPage.toStream().to("page-analytics");
        performanceAlerts.to("performance-alerts");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), getProperties());
        streams.start();
    }
}

Interactive Queries Example

import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

public class InteractiveQueriesExample {
    private KafkaStreams streams;
    
    public void setupStreams() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Criar uma KTable materializada
        KTable<String, Long> userClickCounts = builder
            .stream("user-clicks")
            .groupBy((key, click) -> click.getUserId())
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("user-click-counts"));
        
        streams = new KafkaStreams(builder.build(), getProperties());
        streams.start();
    }
    
    // Método para consultar o estado local
    public Long getUserClickCount(String userId) {
        ReadOnlyKeyValueStore<String, Long> store = streams.store(
            "user-click-counts",
            QueryableStoreTypes.keyValueStore()
        );
        
        return store.get(userId);
    }
    
    // REST endpoint para expor as consultas
    @GetMapping("/user/{userId}/clicks")
    public ResponseEntity<Long> getUserClicks(@PathVariable String userId) {
        Long clickCount = getUserClickCount(userId);
        return clickCount != null ? 
            ResponseEntity.ok(clickCount) : 
            ResponseEntity.notFound().build();
    }
}

Tutoriais Relacionados

Comparações

Alternativas Similares

Tecnologia Latência Facilidade de Uso Escalabilidade Ecossistema
Kafka Streams Baixa Alta Excelente Kafka
Apache Flink Muito Baixa Média Excelente Amplo
Apache Storm Baixa Média Boa Limitado
Spark Streaming Média Alta Excelente Amplo
Apache Samza Baixa Média Boa Kafka

Quando Escolher Kafka Streams vs Alternativas

Kafka Streams vs Apache Flink

Escolha Kafka Streams se: Você já usa Kafka, quer simplicidade de deployment, e precisa de exactly-once processing com baixa latência.

Escolha Flink se: Você precisa de latência ultra-baixa, suporte a múltiplas fontes de dados, ou processamento de eventos complexos.

Kafka Streams vs Spark Streaming

Escolha Kafka Streams se: Você quer processamento verdadeiramente contínuo, baixa latência, e simplicidade operacional.

Escolha Spark Streaming se: Você precisa de processamento batch também, tem workloads complexos de ML, ou quer usar múltiplas linguagens.

Kafka Streams vs Apache Samza

Escolha Kafka Streams se: Você quer uma API mais simples, melhor documentação, e comunidade mais ativa.

Escolha Samza se: Você precisa de controle mais granular sobre o processamento ou tem requisitos específicos de deployment.

Informações Rápidas
  • Tipo: Stream Processing Library
  • Licença: Apache 2.0
  • Primeira Versão: 2016
  • Linguagens: Java, Scala
  • Dependências: Apache Kafka
  • Categoria: Stream Processing