大家好,今天我们将对使用Spark Streaming在Kafka主题之间转换和传输数据有一点了解。
流处理的需求每天都在增加。 原因是经常处理大量数据是不够的。 我们需要实时处理数据,尤其是当我们需要处理不断增长的数据量,还需要处理和维护数据时。
- David Fincher的Mindhunter的无缝写作
- 本周在#Scala(2018年11月19日)
- 本月在Netflix上播出100部必看纪录片
- 广播业将如何利用Netflix / Disney裂痕
- 苹果唱片。

最近,在我们的一个项目中,我们面临着这样的要求。 我自己是Apache Spark的新手,对执行操作只有一点想法。 因此,我认为最好的选择是Apache Spark文档。 它确实帮助我理解了Spark的基本概念,有关流以及如何使用流传输数据。
首先,Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。
它提供了称为离散流或DStream的高级抽象,它表示连续的数据流。
可以根据来自Kafka,Flume和Kinesis等来源的输入数据流来创建DStream,也可以对其他DStream应用高级操作。 在内部,DStream表示为RDD(弹性分布式数据集)序列。

尽管在Spark 2.0中引入了结构化流,但是仍然有许多大公司使用D流( 离散流 )或我们所谓的原始Spark流。

我们对此也有些困惑,但是经过大量讨论,我们决定只选择D-Stream,因为它们似乎是更合理的选择。
因此,我们的下一步是将Kafka与Spark Streaming集成在一起。
这是我们挑战的起点。 在文档的帮助下,我开始了编码之旅。
- 我们需要的第一件事是一个SparkConf对象 ,该对象从系统属性和类路径加载默认值。
- val conf = new SparkConf()。setMaster(“ local [*]”)
.setAppName(“ SimpleDStreamExample”) - 注意: local [*]使它可以访问所有可用的内核。 可以根据要求进行更改。
- 下一个重要的事情是KafkaParams ,它不过是Map [String,Object]
- val kafkaParams = Map [String,Object](
“ bootstrap.servers”->“ localhost:9092”,
“ key.deserializer”-> classOf [StringDeserializer],
//就像我在输入kafka主题中使用字符串序列化程序一样
“ value.deserializer”-> classOf [StringDeserializer],
“ group.id”->“ spark-demo”,
“ kafka.consumer.id”->“ kafka-consumer-01”
) - 输入流需要这些信息才能知道输入数据的下落以及如何获取数据。
- 接下来,我们需要一个StreamingContext,它将把SparkConf作为参数之一:
- val ssc = new StreamingContext(conf,Seconds(1))
- 一旦完成了StreamingContext,我们现在需要一个输入流,该输入流实际上将流传输数据(需要KafkaParams)
- val inputStream = KafkaUtils.createDirectStream(ssc,
PreferConsistent,Subscribe [String,String](Array(inputTopic),kafkaParams)) - 现在,此流包含使用者记录,即来自输入主题的记录。
- 在这里,我不是在维护状态或检查点,因为这是一个非常不同的主题,在这里没有参考,您也可以单独集成它。
- 一旦获得了inputStream,我们就可以对该流执行所需的操作(如flatMap,map,filter等操作),以获取经过处理的流,即该流现在包含已修改/转换的数据。
- val createdStream = inputStream.map(record => record.value)//此处可以执行任何操作。
- //检查批次中的数据
processingStream.print() - 完成这些操作后,您将获得一个处理流,我们现在需要将其存储在输出Kafka主题中。 现在,这是我不确定的地方。 我浏览了很多博客,但找不到相同的解决方案。 但是最后,经过大量挖掘,我找到了一个简单的解决方案。
- rdd.foreach {
案例数据:字符串=> {
val message = new ProducerRecord [String,String](outputTopic,data)
producer.send(消息).get()。toString
}
}) - //生产者是新的生产者,outputTopic是Kafka主题,我们需要在其中存储处理后的数据
- 最后,在上述情况下,将使用生产者将已处理流中的消息添加到输出Kafka主题。
一块蛋糕。 对?

我们还添加了我们的代码以供参考,您可以在这里找到:StreamingKafkaDataWithSpark
要了解有关Spark Streaming及其与Kafka集成的更多信息,请参考Apache Kafka集成的Apache Spark Streaming Guide。
在此博客中,我们试图了解如何使用Spark Streaming在Kafka主题之间流式传输数据。
我希望您喜欢阅读本文。 敬请期待更多。 🙂