Plataforma de operações de dados para construção de pipelines de dados contínuos e confiáveis
StreamSets é uma plataforma empresarial de operações de dados (DataOps) que permite às organizações construir, executar e gerenciar pipelines de dados contínuos e confiáveis. A plataforma oferece uma abordagem visual para design de pipelines, monitoramento em tempo real e detecção automática de mudanças nos dados (data drift).
Fundada por ex-engenheiros do Cloudera, a StreamSets foi projetada para resolver os desafios de engenharia de dados moderna, oferecendo uma plataforma que combina facilidade de uso com recursos empresariais robustos para ambientes de produção críticos.
Fundada em 2014 por Girish Pancha e Arvind Prabhakar, ex-executivos da Cloudera, a StreamSets surgiu da necessidade de uma plataforma mais moderna para operações de dados. A empresa rapidamente ganhou tração no mercado, levantando mais de $50 milhões em financiamento e sendo adotada por empresas Fortune 500.
A arquitetura da StreamSets é baseada em componentes principais:
Interface drag-and-drop para criação de pipelines complexos
Detecção automática de mudanças na estrutura dos dados
Monitoramento contínuo com métricas e alertas
Deployment em AWS, Azure, GCP e ambientes híbridos
Segurança robusta com RBAC e auditoria completa
Escalabilidade automática baseada na carga de trabalho
Versão Atual | 5.8.0 |
Linguagem Principal | Java |
Conectores | 100+ sistemas suportados |
Deployment | On-premises, Cloud, Edge |
Processamento | Batch e Streaming |
Suporte | 24/7 Enterprise Support |
Integração de dados entre sistemas legados e modernos com transformações complexas.
Pipelines para análise em tempo real com detecção de anomalias e alertas.
Migração de dados para cloud com pipelines híbridos e sincronização contínua.
Captura de mudanças em tempo real de bancos de dados transacionais.
Construção de data hubs centralizados com governança e qualidade de dados.
# Download StreamSets Data Collector
wget https://archives.streamsets.com/datacollector/5.8.0/tarball/streamsets-datacollector-core-5.8.0.tgz
# Extrair e configurar
tar -xzf streamsets-datacollector-core-5.8.0.tgz
cd streamsets-datacollector-5.8.0
# Configurar Java
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk
# Iniciar Data Collector
bin/streamsets dc
# Acessar interface web
# http://localhost:18630
# HTTP configuration
http.port=18630
http.authentication=form
http.realm.file.permission.check=true
# HTTPS configuration (opcional)
https.port=18631
https.keystore.path=keystore.jks
https.keystore.password=password
# Pipeline configuration
pipeline.max.runners.count=50
runner.thread.pool.size=50
# Stage libraries
stage.libs.dir=${SDC_DIST}/streamsets-libs
# Data directories
sdc.data.dir=${SDC_DATA}/data
sdc.log.dir=${SDC_DATA}/log
sdc.resources.dir=${SDC_DATA}/resources
# Security
kerberos.client.enabled=false
ldap.authentication.enabled=false
# Monitoring
observer.queue.size=100
observer.sampled.records.cache.size=10000
{
"pipelineId": "kafka-to-hdfs-pipeline",
"title": "Kafka to HDFS Pipeline",
"description": "Stream data from Kafka to HDFS with transformations",
"stages": [
{
"instanceName": "KafkaConsumer_01",
"library": "streamsets-datacollector-apache-kafka_2_6-lib",
"stageName": "com_streamsets_pipeline_stage_origin_kafka_KafkaDSource",
"configuration": [
{
"name": "conf.brokerURI",
"value": "localhost:9092"
},
{
"name": "conf.topic",
"value": "user-events"
},
{
"name": "conf.consumerGroup",
"value": "streamsets-consumer"
}
]
},
{
"instanceName": "JSONParser_01",
"library": "streamsets-datacollector-basic-lib",
"stageName": "com_streamsets_pipeline_stage_processor_jsonparser_JsonParserDProcessor",
"configuration": [
{
"name": "fieldPathToParse",
"value": "/text"
},
{
"name": "parsedFieldPath",
"value": "/parsed"
}
]
},
{
"instanceName": "HadoopFS_01",
"library": "streamsets-datacollector-hdp_3_1-lib",
"stageName": "com_streamsets_pipeline_stage_destination_hdfs_HdfsDTarget",
"configuration": [
{
"name": "hdfsUri",
"value": "hdfs://localhost:9000"
},
{
"name": "hdfsUser",
"value": "hdfs"
},
{
"name": "dirPathTemplate",
"value": "/data/events/${YYYY()}-${MM()}-${DD()}"
},
{
"name": "fileNamePrefix",
"value": "events"
}
]
}
]
}
@StageDef(
version = 1,
label = "Custom Data Enricher",
description = "Enriches records with external data",
icon = "enricher.png",
onlineHelpRefUrl = ""
)
@ConfigGroups(Groups.class)
@GenerateResourceBundle
public class CustomEnricherProcessor extends SingleLaneProcessor {
@ConfigDef(
required = true,
type = ConfigDef.Type.STRING,
defaultValue = "http://api.example.com",
label = "API Endpoint",
description = "External API endpoint for enrichment",
displayPosition = 10,
group = "ENRICHMENT"
)
public String apiEndpoint;
@ConfigDef(
required = true,
type = ConfigDef.Type.STRING,
defaultValue = "/userId",
label = "Key Field",
description = "Field to use as lookup key",
displayPosition = 20,
group = "ENRICHMENT"
)
public String keyField;
private WebTarget webTarget;
@Override
protected List init() {
List issues = super.init();
try {
Client client = ClientBuilder.newClient();
webTarget = client.target(apiEndpoint);
} catch (Exception e) {
issues.add(getContext().createConfigIssue(
Groups.ENRICHMENT.name(),
"apiEndpoint",
Errors.CUSTOM_01,
e.getMessage()
));
}
return issues;
}
@Override
protected void process(Record record, SingleLaneBatchMaker batchMaker)
throws StageException {
try {
// Extract key value
String keyValue = record.get(keyField).getValueAsString();
// Call external API
Response response = webTarget
.path("lookup")
.queryParam("key", keyValue)
.request(MediaType.APPLICATION_JSON)
.get();
if (response.getStatus() == 200) {
String enrichmentData = response.readEntity(String.class);
// Add enrichment data to record
record.set("/enrichment", Field.create(enrichmentData));
batchMaker.addRecord(record);
} else {
// Handle error
getContext().reportError(Errors.CUSTOM_02, keyValue);
}
} catch (Exception e) {
throw new StageException(Errors.CUSTOM_03, e.getMessage(), e);
}
}
}
# Dockerfile
FROM streamsets/datacollector:5.8.0
# Install additional stage libraries
RUN "${SDC_DIST}/bin/streamsets" stagelibs -install=streamsets-datacollector-apache-kafka_2_6-lib
RUN "${SDC_DIST}/bin/streamsets" stagelibs -install=streamsets-datacollector-hdp_3_1-lib
# Copy custom configurations
COPY sdc.properties "${SDC_CONF}/sdc.properties"
COPY pipeline.json "${SDC_DATA}/pipelines/"
# Expose ports
EXPOSE 18630
# Docker Compose
version: '3.8'
services:
streamsets:
build: .
ports:
- "18630:18630"
environment:
- SDC_JAVA_OPTS=-Xmx2g -Xms2g
volumes:
- streamsets_data:/data
- streamsets_logs:/logs
networks:
- data_network
kafka:
image: confluentinc/cp-kafka:latest
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
networks:
- data_network
volumes:
streamsets_data:
streamsets_logs:
networks:
data_network:
driver: bridge
Aspecto | StreamSets | Apache NiFi | Talend |
---|---|---|---|
Licença | Comercial | Apache 2.0 | Comercial/Open |
Interface | Visual moderna | Visual tradicional | Visual/Código |
Data Drift Detection | Automática | Manual | Limitada |
Suporte Empresarial | 24/7 incluído | Terceiros | 24/7 incluído |
Multi-cloud | Nativo | Limitado | Bom |
Facilidade de Uso | Alta | Moderada | Moderada |