Serviço gerenciado da AWS para coleta, processamento e análise de dados de streaming em tempo real
Amazon Kinesis é uma plataforma totalmente gerenciada da AWS para coleta, processamento e análise de dados de streaming em tempo real. Kinesis facilita a ingestão e processamento de grandes volumes de dados de streaming, permitindo que você obtenha insights em tempo real e reaja rapidamente a novas informações.
A plataforma Kinesis consiste em quatro serviços principais: Kinesis Data Streams para ingestão de dados, Kinesis Data Firehose para entrega de dados, Kinesis Data Analytics para análise em tempo real, e Kinesis Video Streams para streaming de vídeo.
Lançado pela Amazon Web Services em 2013, o Kinesis foi desenvolvido para competir com soluções como Apache Kafka, oferecendo uma alternativa totalmente gerenciada. O serviço evoluiu significativamente, adicionando capacidades de analytics, video streaming e integração profunda com o ecossistema AWS.
A arquitetura do Kinesis é baseada em componentes especializados:
Captura e armazena dados de streaming com baixa latência
Entrega dados para destinos como S3, Redshift, e Elasticsearch
Análise de dados de streaming usando SQL padrão
Streaming de vídeo seguro para análise e playback
Escalabilidade automática baseada na demanda
Criptografia em trânsito e em repouso com IAM
Throughput por Shard | 1 MB/s ou 1,000 records/s (ingress) |
Throughput de Saída | 2 MB/s por shard |
Retenção de Dados | 24 horas a 365 dias |
Tamanho Máximo do Record | 1 MB |
Regiões Disponíveis | Todas as regiões AWS principais |
SLA | 99.9% de disponibilidade |
Análise de métricas de aplicações, logs de sistema e dados de IoT em tempo real para dashboards e alertas.
Processamento de eventos de clickstream para recomendações personalizadas em tempo real.
Detecção de fraudes em transações financeiras usando machine learning em tempo real.
Coleta e análise de eventos de aplicações móveis e web para insights de usuário.
Processamento de dados de sensores IoT para monitoramento e automação industrial.
# Criar stream
aws kinesis create-stream --stream-name my-stream --shard-count 2
# Verificar status
aws kinesis describe-stream --stream-name my-stream
# Listar streams
aws kinesis list-streams
import boto3
import json
import time
from datetime import datetime
# Criar cliente Kinesis
kinesis = boto3.client('kinesis', region_name='us-east-1')
def put_record(stream_name, data, partition_key):
try:
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey=partition_key
)
print(f"Record sent: {response['SequenceNumber']}")
return response
except Exception as e:
print(f"Error sending record: {e}")
# Exemplo de uso
for i in range(100):
data = {
'timestamp': datetime.now().isoformat(),
'user_id': f'user_{i}',
'event': 'page_view',
'page': f'/page_{i % 10}'
}
put_record('my-stream', data, f'user_{i}')
time.sleep(0.1)
import boto3
import json
import time
kinesis = boto3.client('kinesis', region_name='us-east-1')
def get_shard_iterator(stream_name, shard_id):
response = kinesis.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='LATEST'
)
return response['ShardIterator']
def consume_records(stream_name):
# Obter informações do stream
stream_info = kinesis.describe_stream(StreamName=stream_name)
shards = stream_info['StreamDescription']['Shards']
# Processar cada shard
for shard in shards:
shard_id = shard['ShardId']
shard_iterator = get_shard_iterator(stream_name, shard_id)
while shard_iterator:
try:
response = kinesis.get_records(ShardIterator=shard_iterator)
records = response['Records']
for record in records:
data = json.loads(record['Data'])
print(f"Received: {data}")
shard_iterator = response.get('NextShardIterator')
time.sleep(1)
except Exception as e:
print(f"Error reading records: {e}")
break
# Consumir records
consume_records('my-stream')
-- Criar stream de entrada
CREATE STREAM "SOURCE_SQL_STREAM_001" (
"timestamp" VARCHAR(32),
"user_id" VARCHAR(32),
"event" VARCHAR(32),
"page" VARCHAR(64)
);
-- Análise em tempo real - contagem de eventos por página
CREATE STREAM "DESTINATION_SQL_STREAM" AS
SELECT
"page",
COUNT(*) as "event_count",
ROWTIME_TO_TIMESTAMP(ROWTIME) as "window_time"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "page",
ROWTIME RANGE INTERVAL '1' MINUTE;
-- Detecção de anomalias - usuários com muitos eventos
CREATE STREAM "ANOMALY_STREAM" AS
SELECT
"user_id",
COUNT(*) as "event_count",
ROWTIME_TO_TIMESTAMP(ROWTIME) as "window_time"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "user_id",
ROWTIME RANGE INTERVAL '5' MINUTE
HAVING COUNT(*) > 100;
# Kinesis Stream
resource "aws_kinesis_stream" "example" {
name = "my-stream"
shard_count = 2
retention_period = 48
shard_level_metrics = [
"IncomingRecords",
"OutgoingRecords",
]
tags = {
Environment = "production"
Application = "streaming-app"
}
}
# Kinesis Firehose
resource "aws_kinesis_firehose_delivery_stream" "example" {
name = "my-firehose"
destination = "s3"
s3_configuration {
role_arn = aws_iam_role.firehose_role.arn
bucket_arn = aws_s3_bucket.bucket.arn
prefix = "data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
buffer_size = 5
buffer_interval = 300
compression_format = "GZIP"
}
}
Aspecto | Amazon Kinesis | Apache Kafka | Google Pub/Sub |
---|---|---|---|
Gerenciamento | Totalmente gerenciado | Self-managed | Totalmente gerenciado |
Escalabilidade | Manual (shards) | Manual | Automática |
Throughput | 1 MB/s por shard | Muito alto | Muito alto |
Latência | ~200ms | ~10ms | ~100ms |
Custo | Pay-per-shard | Infraestrutura | Pay-per-use |
Vendor Lock-in | Alto (AWS) | Baixo | Alto (GCP) |