Serviço de mensageria assíncrona do Google Cloud para integração de aplicações e análise de dados
Google Cloud Pub/Sub é um serviço de mensageria assíncrona totalmente gerenciado que permite desacoplar serviços que produzem e consomem mensagens. Baseado no padrão publish-subscribe, o Pub/Sub oferece entrega de mensagens durável e de alta disponibilidade em escala global.
O serviço é projetado para suportar aplicações distribuídas modernas, oferecendo garantias de entrega at-least-once, ordenação de mensagens e integração nativa com outros serviços do Google Cloud Platform.
Lançado pelo Google Cloud Platform em 2015, o Pub/Sub foi desenvolvido baseado na experiência interna do Google com sistemas de mensageria em larga escala. O serviço evoluiu para se tornar uma peça fundamental da arquitetura de microserviços e aplicações serverless no GCP.
A arquitetura do Pub/Sub é baseada em conceitos simples mas poderosos:
Mensageria global com baixa latência entre regiões
Escalabilidade automática baseada na demanda
Garantia de entrega de mensagens com acknowledgments
Suporte a entrega push e pull para subscribers
Tratamento automático de mensagens com falha
Criptografia em trânsito e em repouso com IAM
Throughput | Milhões de mensagens por segundo |
Tamanho Máximo da Mensagem | 10 MB |
Retenção de Mensagens | 7 dias (padrão), até 31 dias |
Latência | < 100ms (99th percentile) |
Regiões Disponíveis | Todas as regiões GCP |
SLA | 99.95% de disponibilidade |
Comunicação assíncrona entre microserviços com desacoplamento e resiliência.
Coleta e processamento de eventos para análise em tempo real com BigQuery e Dataflow.
Orquestração de pipelines de dados com Cloud Functions e Cloud Run.
Sistema de notificações push para aplicações móveis e web em tempo real.
Coleta de dados de dispositivos IoT com processamento em tempo real.
# Criar topic
gcloud pubsub topics create my-topic
# Criar subscription
gcloud pubsub subscriptions create my-subscription --topic=my-topic
# Listar topics
gcloud pubsub topics list
# Listar subscriptions
gcloud pubsub subscriptions list
from google.cloud import pubsub_v1
import json
import time
from datetime import datetime
# Configurar cliente
project_id = "my-project"
topic_id = "my-topic"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
# Converter dados para bytes
message_data = json.dumps(data).encode('utf-8')
# Adicionar atributos
attributes = {
'timestamp': datetime.now().isoformat(),
'source': 'python-publisher'
}
# Publicar mensagem
future = publisher.publish(topic_path, message_data, **attributes)
message_id = future.result()
print(f"Published message ID: {message_id}")
return message_id
# Exemplo de uso
for i in range(100):
data = {
'user_id': f'user_{i}',
'event': 'page_view',
'page': f'/page_{i % 10}',
'timestamp': datetime.now().isoformat()
}
publish_message(data)
time.sleep(0.1)
from google.cloud import pubsub_v1
import json
from concurrent.futures import ThreadPoolExecutor
project_id = "my-project"
subscription_id = "my-subscription"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
try:
# Processar mensagem
data = json.loads(message.data.decode('utf-8'))
attributes = dict(message.attributes)
print(f"Received message: {data}")
print(f"Attributes: {attributes}")
# Simular processamento
time.sleep(0.1)
# Acknowledge mensagem
message.ack()
print(f"Message acknowledged: {message.message_id}")
except Exception as e:
print(f"Error processing message: {e}")
message.nack()
# Configurar subscriber
flow_control = pubsub_v1.types.FlowControl(max_messages=100)
executor = ThreadPoolExecutor(max_workers=10)
print(f"Listening for messages on {subscription_path}...")
# Iniciar subscriber
streaming_pull_future = subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=flow_control,
executor=executor
)
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
print("Subscriber stopped.")
import json
import base64
from google.cloud import bigquery
def pubsub_trigger(event, context):
"""Cloud Function triggered by Pub/Sub message"""
# Decodificar mensagem
if 'data' in event:
message_data = base64.b64decode(event['data']).decode('utf-8')
data = json.loads(message_data)
else:
print('No data in Pub/Sub message')
return
# Obter atributos
attributes = event.get('attributes', {})
print(f"Processing message: {data}")
print(f"Attributes: {attributes}")
# Exemplo: inserir no BigQuery
client = bigquery.Client()
table_id = "my-project.my_dataset.events"
rows_to_insert = [{
'user_id': data.get('user_id'),
'event': data.get('event'),
'page': data.get('page'),
'timestamp': data.get('timestamp'),
'processed_at': datetime.utcnow().isoformat()
}]
errors = client.insert_rows_json(table_id, rows_to_insert)
if errors:
print(f"BigQuery insert errors: {errors}")
raise Exception("Failed to insert into BigQuery")
else:
print("Data inserted successfully into BigQuery")
# Pub/Sub Topic
resource "google_pubsub_topic" "example" {
name = "my-topic"
labels = {
environment = "production"
team = "data-engineering"
}
message_retention_duration = "86400s"
}
# Pub/Sub Subscription
resource "google_pubsub_subscription" "example" {
name = "my-subscription"
topic = google_pubsub_topic.example.name
# Configurações de entrega
ack_deadline_seconds = 20
# Dead letter policy
dead_letter_policy {
dead_letter_topic = google_pubsub_topic.dead_letter.id
max_delivery_attempts = 5
}
# Retry policy
retry_policy {
minimum_backoff = "10s"
maximum_backoff = "600s"
}
# Push configuration (opcional)
push_config {
push_endpoint = "https://my-app.com/webhook"
attributes = {
x-goog-version = "v1"
}
}
}
# Dead Letter Topic
resource "google_pubsub_topic" "dead_letter" {
name = "my-topic-dead-letter"
}
Aspecto | Google Pub/Sub | Amazon Kinesis | Apache Kafka |
---|---|---|---|
Gerenciamento | Totalmente gerenciado | Totalmente gerenciado | Self-managed |
Escalabilidade | Automática | Manual (shards) | Manual |
Ordenação | Limitada | Por partition key | Por partition |
Latência | ~100ms | ~200ms | ~10ms |
Throughput | Muito alto | Alto | Muito alto |
Modelo de Preços | Pay-per-message | Pay-per-shard | Infraestrutura |