Sistema distribuído para computação em tempo real
Apache Storm é um sistema de computação distribuída, livre e open source para processamento de streams de dados em tempo real. Storm torna fácil processar streams unbounded de dados de forma confiável, processando streams tão facilmente quanto o Hadoop processa batches. Storm tem muitos casos de uso: analytics em tempo real, online machine learning, computação contínua, ETL distribuído, e mais.
Storm foi criado originalmente por Nathan Marz na BackType em 2010. Foi open-sourced em 2011 e posteriormente adquirido pelo Twitter. Em 2013, Storm foi doado para a Apache Software Foundation e se tornou um projeto Apache top-level em 2014. Storm foi um dos primeiros frameworks dedicados ao processamento de streams em tempo real.
Storm utiliza uma arquitetura master-worker com Nimbus (master) e Supervisor nodes (workers). As aplicações são organizadas como topologies - grafos direcionados de spouts (fontes de dados) e bolts (unidades de processamento). Storm garante que cada tuple seja processado pelo menos uma vez e fornece primitivas para exactly-once processing.
Processamento de streams em tempo real com baixa latência.
Recuperação automática de falhas com reprocessamento de dados.
Escala facilmente adicionando mais nós ao cluster.
Suporte a múltiplas linguagens de programação.
Versão Atual | 2.6.0 |
Linguagens | Java, Clojure, Python, Ruby, JavaScript |
Deployment | Standalone, YARN |
Messaging | ZeroMQ, Netty |
Coordination | Apache ZooKeeper |
Processamento de métricas e KPIs em tempo real para dashboards e alertas.
Transformação e carregamento de dados em tempo real entre sistemas.
Detecção de anomalias e geração de alertas baseados em thresholds.
Aplicação de modelos ML em streams de dados para predições em tempo real.
# Download e instalação
wget https://archive.apache.org/dist/storm/apache-storm-2.6.0/apache-storm-2.6.0.tar.gz
tar -xzf apache-storm-2.6.0.tar.gz
cd apache-storm-2.6.0
# Configurar storm.yaml
# storm.zookeeper.servers:
# - "localhost"
# nimbus.seeds: ["localhost"]
# Iniciar ZooKeeper
bin/storm dev-zookeeper &
# Iniciar Nimbus
bin/storm nimbus &
# Iniciar Supervisor
bin/storm supervisor &
# Iniciar UI
bin/storm ui &
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordCountTopology {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
// Adicionar spout
builder.setSpout("sentence-spout", new SentenceSpout(), 1);
// Adicionar bolt para split
builder.setBolt("split-bolt", new SplitSentenceBolt(), 2)
.shuffleGrouping("sentence-spout");
// Adicionar bolt para count
builder.setBolt("count-bolt", new WordCountBolt(), 2)
.fieldsGrouping("split-bolt", new Fields("word"));
Config config = new Config();
config.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", config, builder.createTopology());
// Executar por 10 segundos
Thread.sleep(10000);
cluster.shutdown();
}
}
// Spout que emite sentenças
class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago"
};
private int index = 0;
@Override
public void nextTuple() {
collector.emit(new Values(sentences[index]));
index = (index + 1) % sentences.length;
Thread.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
// Bolt que divide sentenças em palavras
class SplitSentenceBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
String[] words = sentence.split(" ");
for (String word : words) {
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
class WordCountBolt(storm.BasicBolt):
def initialize(self, conf, context):
self.counts = {}
def process(self, tup):
word = tup.values[0]
self.counts[word] = self.counts.get(word, 0) + 1
storm.emit([word, self.counts[word]])
# Executar bolt
if __name__ == '__main__':
if len(sys.argv) == 2:
if sys.argv[1] == "split":
SplitSentenceBolt().run()
elif sys.argv[1] == "count":
WordCountBolt().run()