Kafka和Spark Streams:从此幸福快乐!!

大家好,今天我们将对使用Spark Streaming在Kafka主题之间转换和传输数据有一点了解。

流处理的需求每天都在增加。 原因是经常处理大量数据是不够的。 我们需要实时处理数据,尤其是当我们需要处理不断增长的数据量,还需要处理和维护数据时。

最近,在我们的一个项目中,我们面临着这样的要求。 我自己是Apache Spark的新手,对执行操作只有一点想法。 因此,我认为最好的选择是Apache Spark文档。 它确实帮助我理解了Spark的基本概念,有关流以及如何使用流传输数据。

首先,Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。
它提供了称为离散流DStream的高级抽象,它表示连续的数据流。
可以根据来自Kafka,Flume和Kinesis等来源的输入数据流来创建DStream,也可以对其他DStream应用高级操作。 在内部,DStream表示为RDD(弹性分布式数据集)序列。

尽管在Spark 2.0中引入了结构化流,但是仍然有许多大公司使用D流( 离散流 )或我们所谓的原始Spark流。

我们对此也有些困惑,但是经过大量讨论,我们决定只选择D-Stream,因为它们似乎是更合理的选择。

因此,我们的下一步是将Kafka与Spark Streaming集成在一起。

这是我们挑战的起点。 在文档的帮助下,我开始了编码之旅。

  • 我们需要的第一件事是一个SparkConf对象 ,该对象从系统属性和类路径加载默认值。
  • val conf = new SparkConf()。setMaster(“ local [*]”)
    .setAppName(“ SimpleDStreamExample”)
  • 注意: local [*]使它可以访问所有可用的内核。 可以根据要求进行更改。
  • 下一个重要的事情是KafkaParams ,它不过是Map [String,Object]
  • val kafkaParams = Map [String,Object](
    “ bootstrap.servers”->“ localhost:9092”,
    “ key.deserializer”-> classOf [StringDeserializer],
    //就像我在输入kafka主题中使用字符串序列化程序一样
    “ value.deserializer”-> classOf [StringDeserializer],
    “ group.id”->“ spark-demo”,
    “ kafka.consumer.id”->“ kafka-consumer-01”
  • 输入流需要这些信息才能知道输入数据的下落以及如何获取数据。
  • 接下来,我们需要一个StreamingContext,它将把SparkConf作为参数之一:
  • val ssc = new StreamingContext(conf,Seconds(1))
  • 一旦完成了StreamingContext,我们现在需要一个输入流,该输入流实际上将流传输数据(需要KafkaParams)
  • val inputStream = KafkaUtils.createDirectStream(ssc,
    PreferConsistent,Subscribe [String,String](Array(inputTopic),kafkaParams))
  • 现在,此流包含使用者记录,即来自输入主题的记录。
  • 在这里,我不是在维护状态或检查点,因为这是一个非常不同的主题,在这里没有参考,您也可以单独集成它。
  • 一旦获得了inputStream,我们就可以对该流执行所需的操作(如flatMap,map,filter等操作),以获取经过处理的流,即该流现在包含已修改/转换的数据。
  • val createdStream = inputStream.map(record => record.value)//此处可以执行任何操作。
  • //检查批次中的数据
    processingStream.print()
  • 完成这些操作后,您将获得一个处理流,我们现在需要将其存储在输出Kafka主题中。 现在,这是我不确定的地方。 我浏览了很多博客,但找不到相同的解决方案。 但是最后,经过大量挖掘,我找到了一个简单的解决方案。
  • rdd.foreach {
    案例数据:字符串=> {
    val message = new ProducerRecord [String,String](outputTopic,data)
    producer.send(消息).get()。toString
    }
    })
  • //生产者是新的生产者,outputTopic是Kafka主题,我们需要在其中存储处理后的数据
  • 最后,在上述情况下,将使用生产者将已处理流中的消息添加到输出Kafka主题。

一块蛋糕。 对?

我们还添加了我们的代码以供参考,您可以在这里找到:StreamingKafkaDataWithSpark

要了解有关Spark Streaming及其与Kafka集成的更多信息,请参考Apache Kafka集成的Apache Spark Streaming Guide。

在此博客中,我们试图了解如何使用Spark Streaming在Kafka主题之间流式传输数据。
我希望您喜欢阅读本文。 敬请期待更多。 🙂