使用Scala和DStream集成Kafka + Spark

在这里,我将解释如何配置Spark Streaming以从Kafka接收数据。 有两种解决方法:一种是使用Receivers和Kafka的高级API的旧方法,另一种是不使用Receiver的新方法(Spark 1.3中引入)。 它们具有不同的编程模型,性能特征和语义保证。 从当前版本的Spark开始,这两种方法都被认为是稳定的API。

Spark 1.3中引入了这种新的无接收器“直接”方法,以确保更强的端到端保证。 该方法不是使用接收器来接收数据,而是定期向Kafka查询每个主题+分区中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。 启动用于处理数据的作业时,Kafka的简单使用者API用于读取Kafka定义的偏移范围(类似于从文件系统读取文件)。

与基于接收器的方法相比,此方法具有以下优点。

  • 简化的并行性 无需创建多个输入Kafka流并将它们合并。 使用directStream ,Spark Streaming将创建与要使用的Kafka分区一样多的RDD分区,所有这些分区都将从Kafka并行读取数据。 因此,Kafka和RDD分区之间存在一对一的映射,这更易于理解和调整。
  • 效率 为了实现基于接收器的方法的零数据丢失,需要将数据存储在预写日志中,从而进一步复制数据。 这实际上是低效的,因为数据被有效地复制了两次-一次是通过Kafka复制,另一次是通过“预写日志”复制。
  • 一次语义 基于接收者的方法使用Kafka的高级API将消耗的偏移量存储在Zookeeper中。 传统上,这是从Kafka消费数据的方式。 尽管这种方法(与预写日志结合使用)可以确保零数据丢失(即至少一次语义),但在某些故障下,某些记录可能会被消耗两次,这是很小的机会。 发生这种情况是由于Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移量之间存在不一致。

因此,在这种直接方法中,我们使用不使用Zookeeper的简单Kafka API。 Spark Streaming在其检查点内跟踪偏移。 这样可以消除Spark Streaming与Zookeeper / Kafka之间的不一致,因此即使出现故障,Spark Streaming也会有效地一次接收每条记录。 为了实现结果输出的一次语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。

请注意,此方法的一个缺点是它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监视工具将不会显示进度。 但是,您可以在每个批次中访问通过此方法处理的偏移量,并自己更新Zookeeper。

从kafka主题中读取数据并在控制台中本地打印

SBT使用与Maven相同的目录结构,并且出于简单的需要,我们可以使用Shell脚本生成兼容的结构。

  #!/ bin / sh 
mkdir -p src / main / java
mkdir -p src / main / resources
mkdir -p src / main / scala
mkdir -p src / test / java
mkdir -p src / test / resources
mkdir -p src / test / scala

mkdir lib项目目标

#创建一个初始build.sbt文件
echo'name:=“ MyProject”
版本:=“ 1.0”
val sparkVersion =“ 1.6.1”
scalaVersion:=“ 2.11.8”'> build.sbt

现在将这些依赖项添加到您的sbt中。


libraryDependencies ++ = Seq(
“ org.apache.spark” %%“ spark-streaming”%sparkVersion,
“ org.apache.spark” %%“ spark-streaming-kafka”%sparkVersion


libraryDependencies + =“ org.apache.kafka” %%“ kafka”%“ 2.0.0”

使用tree命令,并从当前目录运行它。


|-build.sbt
|-lib
|-项目
|-src
| |-主要
| | |-Java
| | |-资源
| | |-斯卡拉
| |-测试
| |-Java
| |-资源
| |-斯卡拉
|-目标

我得到了与SBT相同的结构。

src / main / scala目录中创建了一个名为streaming.scala的文件。

 导入org.apache.spark.SparkConf 
导入org.apache._
导入org.apache.spark.SparkContext
导入org.apache.spark.SparkContext._
导入org.apache.spark.streaming.StreamingContext
导入org.apache.spark.streaming.Seconds
导入org.apache.spark.streaming.kafka.KafkaUtils
导入org.apache.spark.streaming.kafka._
导入org.apache.spark.streaming.dstream.DStream
import _root_.kafka.serializer.StringDecoder //此定义很重要,因为sbt有时无法找到包,有时会进行对象标流{
def main(args:Array [String]):Unit = {
val conf = new SparkConf()。setMaster(“ spark:// localhost:7077”).setAppName(“ KafkaReceiver”).set(“ spark.rdd.compress”,“ true”).set(“ spark.streaming。不持久”,“真”)
val ssc = new StreamingContext(conf,Seconds(10))
val topicSet =“ stream” .split(“,”)。toSet //主题名称为“ stream”
val kafkaParams = Map [String,String](“ metadata.broker.list”->“ localhost:9092”)
val directKafkaStream = KafkaUtils.createDirectStream [String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
directKafkaStream.print()
ssc.start
ssc.awaitTermination
}
}

您还可以将messageHandler传递给createDirectStream以访问MessageAndMetadata ,该消息包含有关当前消息的元数据,并将其转换为任何所需的类型。

在Kafka参数中,您必须指定metadata.broker.listbootstrap.servers 。 默认情况下,它将从每个Kafka分区的最新偏移量开始消耗。 如果您将Kafka参数中的配置auto.offset.reset设置为smallest ,那么它将从最小的偏移开始消耗。

您还可以使用KafkaUtils.createDirectStream其他变体开始从任意偏移量开始使用。

与任何Spark应用程序一样, spark-submit用于启动您的应用程序。 但是,Scala / Java应用程序和Python应用程序的详细信息略有不同。

对于Scala和Java应用程序,如果使用SBT或Maven进行项目管理,则将spark-streaming-kafka-0-8_2.11及其依赖spark-streaming-kafka-0-8_2.11到应用程序JAR中。 确保spark-core_2.11spark-streaming_2.11被标记为provided依赖项,因为它们已经存在于Spark安装中。 然后使用spark-submit启动您的应用程序

提交Spark作业之前,首先要做的三件事是:

  • 启动Zookeeper服务
  • 启动Kafka服务器
  • 在Kafka中创建主题
  • 启动Kafka生产者以获取其中的数据

启动Zookeeper:

$ bin / zookeeper-server-start.sh config / zookeeper.properties

启动Kafka服务器:

$ bin / kafka-server-start.sh config / server.properties

创建Kafka主题:

$ bin / kafka-topics.sh –创建–zookeeper localhost:2181 –复制因子1 –分区1 –主题流

注意:这里的主题名称是流

启动Spark服务:

转到spark文件夹:

cd local-spark-1–2 / master
./sbin/start-master.sh

cd ../workers-0–1
./sbin/start-slave.sh spark:// localhost:7077

编译代码:

sbt编译
[info]从/ home / thirdeye / sparkstreaming / project加载项目定义
[info]正在从build.sbt加载项目sparkstreaming的设置…
[信息]将当前项目设置为MyProject(在构建文件中:/ home / thirdeye / sparkstreaming /)
[info]以批处理模式执行。 为了获得更好的性能,请使用sbt的shell
[成功]总时间:5秒,已完成2018年8月30日下午3:26:06

sbt运行
[info]从/ home / thirdeye / sparkstreaming / project加载项目定义
[info]正在从build.sbt加载项目sparkstreaming的设置…
[信息]将当前项目设置为MyProject(在构建文件中:/ home / thirdeye / sparkstreaming /)
[info]包装/home/thirdeye/sparkstreaming/target/scala-2.11/myproject_2.11–1.0.jar…
[info]完成包装。
[info]运行scalastreaming

sbt包
[info]从/ home / thirdeye / sparkstreaming / project加载项目定义
[info]正在从build.sbt加载项目sparkstreaming的设置…
[信息]将当前项目设置为MyProject(在构建文件中:/ home / thirdeye / sparkstreaming /)
[成功]总时间:1秒,已完成2018年8月30日下午3:28:34

现在我们要提交火花:

./bin/spark-submit --class scalastreaming --master spark://localhost:6066 /home/thirdeye/sparkstreaming/target/scala-2.11/myproject_2.11-1.0.jar 10