Modelo unificado para processamento batch e stream
Apache Beam é um modelo de programação unificado que permite definir e executar pipelines de processamento de dados tanto para batch quanto para streaming. Beam fornece um conjunto de SDKs específicos para linguagem para definir pipelines, que são então executados por um dos runners distribuídos suportados pelo Beam, incluindo Apache Flink, Apache Spark e Google Cloud Dataflow.
Mesmo código para processamento batch e streaming.
Executa em Spark, Flink, Dataflow e outros.
SDKs para Java, Python, Go e Scala.
Transforms customizados e I/O connectors.
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
public class WordCount {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("ReadLines", TextIO.read().from("input.txt"))
.apply("ExtractWords", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
.apply("FilterEmptyWords", Filter.by((String word) -> !word.isEmpty()))
.apply("CountWords", Count.perElement())
.apply("FormatResults", MapElements
.into(TypeDescriptors.strings())
.via((KV wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
.apply("WriteResults", TextIO.write().to("output"));
pipeline.run().waitUntilFinish();
}
}
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import re
def run_pipeline():
options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
(pipeline
| 'ReadText' >> beam.io.ReadFromText('input.txt')
| 'ExtractWords' >> beam.FlatMap(lambda line: re.findall(r'[A-Za-z\']+', line))
| 'PairWithOne' >> beam.Map(lambda word: (word, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum)
| 'FormatResult' >> beam.Map(lambda word_count: f'{word_count[0]}: {word_count[1]}')
| 'WriteResult' >> beam.io.WriteToText('output'))
if __name__ == '__main__':
run_pipeline()
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window
def run_streaming_pipeline():
options = PipelineOptions([
'--streaming',
'--runner=FlinkRunner',
'--flink_master=localhost:8081'
])
with beam.Pipeline(options=options) as pipeline:
(pipeline
| 'ReadFromKafka' >> beam.io.ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:9092'},
topics=['input-topic'])
| 'ExtractValue' >> beam.Map(lambda record: record[1].decode('utf-8'))
| 'ParseJSON' >> beam.Map(json.loads)
| 'WindowInto' >> beam.WindowInto(window.FixedWindows(60))
| 'GroupByKey' >> beam.GroupBy(lambda x: x['user_id'])
| 'CountEvents' >> beam.Map(lambda group: {
'user_id': group[0],
'event_count': len(group[1]),
'window': 'current'
})
| 'WriteToKafka' >> beam.io.WriteToKafka(
producer_config={'bootstrap.servers': 'localhost:9092'},
topic='output-topic'))
if __name__ == '__main__':
run_streaming_pipeline()