Sistema de mensageria distribuído com separação de armazenamento e computação para alta escalabilidade
Apache Pulsar é uma plataforma de mensageria distribuída cloud-native desenvolvida originalmente pelo Yahoo e doada para a Apache Software Foundation. Pulsar foi projetado desde o início para ser uma solução de mensageria multi-tenant, geograficamente distribuída e altamente escalável.
A principal inovação do Pulsar é sua arquitetura de separação entre computação e armazenamento, utilizando Apache BookKeeper para persistência de dados. Isso permite escalabilidade independente e melhor utilização de recursos comparado a sistemas tradicionais.
Desenvolvido pelo Yahoo em 2013 para substituir sistemas de mensageria existentes, o Pulsar foi open-sourced em 2016 e se tornou um projeto Apache de nível superior em 2018. Empresas como Verizon Media, Tencent, Comcast e Splunk adotaram Pulsar para casos de uso críticos de missão.
A arquitetura do Pulsar é baseada em componentes especializados:
Isolamento nativo entre tenants com quotas e políticas independentes
Replicação automática entre datacenters com resolução de conflitos
Armazenamento em camadas com offload automático para cloud storage
Gerenciamento de schemas integrado com evolução automática
Framework serverless para processamento de streams leve
Suporte a pub-sub, queuing, e padrões híbridos
Versão Atual | 3.1.0 |
Linguagem Principal | Java |
Clientes Suportados | Java, Python, C++, Go, Node.js, C# |
Protocolos | Pulsar Binary, HTTP, WebSocket |
Serialização | Avro, JSON, Protobuf, Schema Registry |
Armazenamento | Apache BookKeeper |
Aplicações SaaS que precisam de isolamento completo entre clientes com diferentes SLAs e políticas.
Sistemas distribuídos globalmente que requerem replicação automática entre regiões.
Casos onde dados antigos precisam ser mantidos por compliance mas com custos reduzidos.
Processamento de eventos leve sem necessidade de frameworks complexos de streaming.
Aplicações que precisam combinar pub-sub e queuing no mesmo sistema.
# Pulsar standalone
docker run -it -p 6650:6650 -p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:3.1.0 \
bin/pulsar standalone
# Verificar status
curl http://localhost:8080/admin/v2/clusters
# broker.conf
clusterName=pulsar-cluster
zookeeperServers=localhost:2181
configurationStoreServers=localhost:2181
brokerServicePort=6650
webServicePort=8080
managedLedgerDefaultEnsembleSize=1
managedLedgerDefaultWriteQuorum=1
managedLedgerDefaultAckQuorum=1
defaultRetentionTimeInMinutes=10080
defaultRetentionSizeInMB=1000
backlogQuotaDefaultLimitGB=10
import org.apache.pulsar.client.api.*;
public class PulsarProducer {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.compressionType(CompressionType.LZ4)
.sendTimeout(10, TimeUnit.SECONDS)
.create();
for (int i = 0; i < 100; i++) {
MessageId msgId = producer.newMessage()
.key("key-" + i)
.value("message-" + i)
.property("custom-prop", "value-" + i)
.send();
System.out.println("Published message " + i + " with ID: " + msgId);
}
producer.close();
client.close();
}
}
import org.apache.pulsar.client.api.*;
public class PulsarConsumer {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
while (true) {
Message message = consumer.receive();
try {
System.out.printf("Received message: key=%s, value=%s, properties=%s%n",
message.getKey(), message.getValue(), message.getProperties());
// Acknowledge message
consumer.acknowledge(message);
} catch (Exception e) {
// Negative acknowledge on failure
consumer.negativeAcknowledge(message);
}
}
}
}
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class WordCountFunction implements Function {
@Override
public Void process(String input, Context context) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
String key = word.toLowerCase();
context.incrCounter(key, 1);
}
context.getLogger().info("Processed " + words.length + " words");
return null;
}
}
// Deploy function
pulsar-admin functions create \
--jar word-count-function.jar \
--classname WordCountFunction \
--inputs input-topic \
--name word-count
# Criar namespace
pulsar-admin namespaces create public/my-namespace
# Criar tópico
pulsar-admin topics create persistent://public/my-namespace/my-topic
# Listar tópicos
pulsar-admin topics list public/my-namespace
# Produzir mensagem via CLI
pulsar-client produce my-topic --messages "Hello Pulsar"
# Consumir mensagens via CLI
pulsar-client consume my-topic -s "my-subscription" -n 0
# Verificar estatísticas do tópico
pulsar-admin topics stats persistent://public/my-namespace/my-topic
# Configurar retenção
pulsar-admin namespaces set-retention public/my-namespace --size 100M --time 7d
Aspecto | Apache Pulsar | Apache Kafka |
---|---|---|
Arquitetura | Separação compute/storage | Acoplada |
Multi-tenancy | Nativo | Limitado |
Geo-replicação | Automática | Manual (MirrorMaker) |
Latência | Consistentemente baixa | Variável |
Ecossistema | Crescendo | Maduro e rico |
Operação | Mais complexa | Complexa |