Biblioteca cliente para construir aplicações de stream processing com Kafka
Kafka Streams é uma biblioteca cliente para construir aplicações e microserviços críticos em tempo real, onde os dados de entrada e saída são armazenados em clusters Kafka. Combina a simplicidade de escrever e deployar aplicações Java e Scala padrão no lado cliente com os benefícios da tecnologia de cluster server-side do Kafka.
Kafka Streams foi introduzido como parte do Apache Kafka 0.10.0 em maio de 2016. Foi desenvolvido pela Confluent e pela comunidade Apache Kafka para fornecer uma alternativa mais simples aos frameworks de stream processing como Apache Storm e Apache Spark Streaming, especificamente otimizada para trabalhar com Kafka.
Kafka Streams utiliza uma arquitetura de processamento distribuído onde cada aplicação é composta por uma ou mais stream threads. Cada thread processa uma ou mais partições de tópicos Kafka. A biblioteca fornece abstrações de alto nível como KStream (stream de registros) e KTable (changelog stream) para facilitar o desenvolvimento.
Processamento contínuo de streams de dados com baixa latência.
Garantias de processamento exactly-once para aplicações críticas.
Suporte a operações com estado usando state stores locais.
Operações em janelas de tempo para agregações temporais.
Consulta do estado local da aplicação em tempo real.
Processamento de milhões de mensagens por segundo.
Versão Atual | 3.6.0 |
Linguagens Suportadas | Java, Scala |
Dependências | Apache Kafka cluster |
Processamento | Stream processing, Stateful, Stateless |
Garantias | At-least-once, Exactly-once |
Requisitos Mínimos | Java 8+, Kafka 2.8+ |
Análise em tempo real de streams de dados para dashboards, métricas e alertas instantâneos.
Detecção de fraudes em tempo real analisando padrões de transações e comportamentos suspeitos.
Transformação e enriquecimento de dados em tempo real entre diferentes sistemas.
Implementação de arquiteturas event-sourced com processamento de eventos em tempo real.
Personalização em tempo real baseada no comportamento do usuário e histórico de interações.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;
public class WordCountExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Input stream
KStream<String, String> textLines = builder.stream("text-input");
// Processing pipeline
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count();
// Output stream
wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
public class FraudDetectionExample {
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
// Input stream de transações
KStream<String, Transaction> transactions = builder.stream("transactions");
// Detectar múltiplas transações do mesmo usuário em janela de tempo
KTable<Windowed<String>, Long> transactionCounts = transactions
.groupBy((key, transaction) -> transaction.getUserId())
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// Filtrar usuários com muitas transações (possível fraude)
KStream<Windowed<String>, Long> suspiciousActivity = transactionCounts
.toStream()
.filter((windowedUserId, count) -> count > 10);
// Enriquecer com dados do usuário
KTable<String, User> users = builder.table("users");
KStream<String, FraudAlert> fraudAlerts = transactions
.join(users,
(transaction, user) -> new TransactionWithUser(transaction, user),
Joined.with(Serdes.String(), transactionSerde, userSerde))
.filter((key, txWithUser) -> isSuspicious(txWithUser))
.mapValues(txWithUser -> new FraudAlert(
txWithUser.getTransaction(),
txWithUser.getUser(),
"High frequency transactions detected"
));
// Output para tópico de alertas
fraudAlerts.to("fraud-alerts");
KafkaStreams streams = new KafkaStreams(builder.build(), getProperties());
streams.start();
}
private static boolean isSuspicious(TransactionWithUser txWithUser) {
// Lógica de detecção de fraude
return txWithUser.getTransaction().getAmount() > 10000 ||
txWithUser.getUser().getRiskScore() > 0.8;
}
}
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import java.time.Duration;
public class RealTimeAnalyticsExample {
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
// Stream de eventos de cliques
KStream<String, ClickEvent> clicks = builder.stream("click-events");
// Agregações em janelas de tempo
KTable<Windowed<String>, Long> clicksPerPage = clicks
.groupBy((key, click) -> click.getPageId())
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count();
// Calcular taxa de cliques por sessão
KTable<Windowed<String>, Double> clickRateBySession = clicks
.groupBy((key, click) -> click.getSessionId())
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
() -> new ClickStats(0, 0),
(key, click, stats) -> {
stats.incrementClicks();
if (click.isConversion()) {
stats.incrementConversions();
}
return stats;
},
Materialized.with(Serdes.String(), clickStatsSerde)
)
.mapValues(stats -> stats.getConversionRate());
// Stream de alertas para páginas com baixa performance
KStream<String, Alert> performanceAlerts = clickRateBySession
.toStream()
.filter((windowedSession, rate) -> rate < 0.02)
.map((windowedSession, rate) -> KeyValue.pair(
windowedSession.key(),
new Alert("Low conversion rate", rate, System.currentTimeMillis())
));
// Output streams
clicksPerPage.toStream().to("page-analytics");
performanceAlerts.to("performance-alerts");
KafkaStreams streams = new KafkaStreams(builder.build(), getProperties());
streams.start();
}
}
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
public class InteractiveQueriesExample {
private KafkaStreams streams;
public void setupStreams() {
StreamsBuilder builder = new StreamsBuilder();
// Criar uma KTable materializada
KTable<String, Long> userClickCounts = builder
.stream("user-clicks")
.groupBy((key, click) -> click.getUserId())
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("user-click-counts"));
streams = new KafkaStreams(builder.build(), getProperties());
streams.start();
}
// Método para consultar o estado local
public Long getUserClickCount(String userId) {
ReadOnlyKeyValueStore<String, Long> store = streams.store(
"user-click-counts",
QueryableStoreTypes.keyValueStore()
);
return store.get(userId);
}
// REST endpoint para expor as consultas
@GetMapping("/user/{userId}/clicks")
public ResponseEntity<Long> getUserClicks(@PathVariable String userId) {
Long clickCount = getUserClickCount(userId);
return clickCount != null ?
ResponseEntity.ok(clickCount) :
ResponseEntity.notFound().build();
}
}
Tecnologia | Latência | Facilidade de Uso | Escalabilidade | Ecossistema |
---|---|---|---|---|
Kafka Streams | Baixa | Alta | Excelente | Kafka |
Apache Flink | Muito Baixa | Média | Excelente | Amplo |
Apache Storm | Baixa | Média | Boa | Limitado |
Spark Streaming | Média | Alta | Excelente | Amplo |
Apache Samza | Baixa | Média | Boa | Kafka |
Escolha Kafka Streams se: Você já usa Kafka, quer simplicidade de deployment, e precisa de exactly-once processing com baixa latência.
Escolha Flink se: Você precisa de latência ultra-baixa, suporte a múltiplas fontes de dados, ou processamento de eventos complexos.
Escolha Kafka Streams se: Você quer processamento verdadeiramente contínuo, baixa latência, e simplicidade operacional.
Escolha Spark Streaming se: Você precisa de processamento batch também, tem workloads complexos de ML, ou quer usar múltiplas linguagens.
Escolha Kafka Streams se: Você quer uma API mais simples, melhor documentação, e comunidade mais ativa.
Escolha Samza se: Você precisa de controle mais granular sobre o processamento ou tem requisitos específicos de deployment.