在这里,我将解释如何配置Spark Streaming以从Kafka接收数据。 有两种解决方法:一种是使用Receivers和Kafka的高级API的旧方法,另一种是不使用Receiver的新方法(Spark 1.3中引入)。 它们具有不同的编程模型,性能特征和语义保证。 从当前版本的Spark开始,这两种方法都被认为是稳定的API。
Spark 1.3中引入了这种新的无接收器“直接”方法,以确保更强的端到端保证。 该方法不是使用接收器来接收数据,而是定期向Kafka查询每个主题+分区中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。 启动用于处理数据的作业时,Kafka的简单使用者API用于读取Kafka定义的偏移范围(类似于从文件系统读取文件)。
- 混音器March Madness Streamer聚焦:BigHungry2x
- 流水般的战争:电影业会成为下一个伤亡者吗?
- 他们如何在1993年惨败
- 5个不再提供的著名Kodi插件
- 通过“动手实践”方法学习Java 8流超快速
与基于接收器的方法相比,此方法具有以下优点。
- 简化的并行性 :无需创建多个输入Kafka流并将它们合并。 使用
directStream
,Spark Streaming将创建与要使用的Kafka分区一样多的RDD分区,所有这些分区都将从Kafka并行读取数据。 因此,Kafka和RDD分区之间存在一对一的映射,这更易于理解和调整。 - 效率 :为了实现基于接收器的方法的零数据丢失,需要将数据存储在预写日志中,从而进一步复制数据。 这实际上是低效的,因为数据被有效地复制了两次-一次是通过Kafka复制,另一次是通过“预写日志”复制。
- 一次语义 :基于接收者的方法使用Kafka的高级API将消耗的偏移量存储在Zookeeper中。 传统上,这是从Kafka消费数据的方式。 尽管这种方法(与预写日志结合使用)可以确保零数据丢失(即至少一次语义),但在某些故障下,某些记录可能会被消耗两次,这是很小的机会。 发生这种情况是由于Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移量之间存在不一致。
因此,在这种直接方法中,我们使用不使用Zookeeper的简单Kafka API。 Spark Streaming在其检查点内跟踪偏移。 这样可以消除Spark Streaming与Zookeeper / Kafka之间的不一致,因此即使出现故障,Spark Streaming也会有效地一次接收每条记录。 为了实现结果输出的一次语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。
请注意,此方法的一个缺点是它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监视工具将不会显示进度。 但是,您可以在每个批次中访问通过此方法处理的偏移量,并自己更新Zookeeper。
从kafka主题中读取数据并在控制台中本地打印
SBT使用与Maven相同的目录结构,并且出于简单的需要,我们可以使用Shell脚本生成兼容的结构。
#!/ bin / sh
mkdir -p src / main / java
mkdir -p src / main / resources
mkdir -p src / main / scala
mkdir -p src / test / java
mkdir -p src / test / resources
mkdir -p src / test / scala
mkdir lib项目目标
#创建一个初始build.sbt文件
echo'name:=“ MyProject”
版本:=“ 1.0”
val sparkVersion =“ 1.6.1”
scalaVersion:=“ 2.11.8”'> build.sbt
现在将这些依赖项添加到您的sbt中。
libraryDependencies ++ = Seq(
“ org.apache.spark” %%“ spark-streaming”%sparkVersion,
“ org.apache.spark” %%“ spark-streaming-kafka”%sparkVersion
)
libraryDependencies + =“ org.apache.kafka” %%“ kafka”%“ 2.0.0”
使用tree
命令,并从当前目录运行它。
。
|-build.sbt
|-lib
|-项目
|-src
| |-主要
| | |-Java
| | |-资源
| | |-斯卡拉
| |-测试
| |-Java
| |-资源
| |-斯卡拉
|-目标
我得到了与SBT相同的结构。
我在 src / main / scala目录中创建了一个名为streaming.scala的文件。
导入org.apache.spark.SparkConf
导入org.apache._
导入org.apache.spark.SparkContext
导入org.apache.spark.SparkContext._
导入org.apache.spark.streaming.StreamingContext
导入org.apache.spark.streaming.Seconds
导入org.apache.spark.streaming.kafka.KafkaUtils
导入org.apache.spark.streaming.kafka._
导入org.apache.spark.streaming.dstream.DStream
import _root_.kafka.serializer.StringDecoder //此定义很重要,因为sbt有时无法找到包,有时会进行对象标流{
def main(args:Array [String]):Unit = {
val conf = new SparkConf()。setMaster(“ spark:// localhost:7077”).setAppName(“ KafkaReceiver”).set(“ spark.rdd.compress”,“ true”).set(“ spark.streaming。不持久”,“真”)
val ssc = new StreamingContext(conf,Seconds(10))
val topicSet =“ stream” .split(“,”)。toSet //主题名称为“ stream”
val kafkaParams = Map [String,String](“ metadata.broker.list”->“ localhost:9092”)
val directKafkaStream = KafkaUtils.createDirectStream [String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
directKafkaStream.print()
ssc.start
ssc.awaitTermination
}
}
您还可以将messageHandler
传递给createDirectStream
以访问MessageAndMetadata
,该消息包含有关当前消息的元数据,并将其转换为任何所需的类型。
在Kafka参数中,您必须指定metadata.broker.list
或bootstrap.servers
。 默认情况下,它将从每个Kafka分区的最新偏移量开始消耗。 如果您将Kafka参数中的配置auto.offset.reset
设置为smallest
,那么它将从最小的偏移开始消耗。
您还可以使用KafkaUtils.createDirectStream
其他变体开始从任意偏移量开始使用。
与任何Spark应用程序一样, spark-submit
用于启动您的应用程序。 但是,Scala / Java应用程序和Python应用程序的详细信息略有不同。
对于Scala和Java应用程序,如果使用SBT或Maven进行项目管理,则将spark-streaming-kafka-0-8_2.11
及其依赖spark-streaming-kafka-0-8_2.11
到应用程序JAR中。 确保spark-core_2.11
和spark-streaming_2.11
被标记为provided
依赖项,因为它们已经存在于Spark安装中。 然后使用spark-submit
启动您的应用程序
提交Spark作业之前,首先要做的三件事是:
- 启动Zookeeper服务
- 启动Kafka服务器
- 在Kafka中创建主题
- 启动Kafka生产者以获取其中的数据
启动Zookeeper:
$ bin / zookeeper-server-start.sh config / zookeeper.properties
启动Kafka服务器:
$ bin / kafka-server-start.sh config / server.properties
创建Kafka主题:
$ bin / kafka-topics.sh –创建–zookeeper localhost:2181 –复制因子1 –分区1 –主题流
注意:这里的主题名称是流
启动Spark服务:
转到spark文件夹:
cd local-spark-1–2 / master
./sbin/start-master.sh
cd ../workers-0–1
./sbin/start-slave.sh spark:// localhost:7077

编译代码:
sbt编译
[info]从/ home / thirdeye / sparkstreaming / project加载项目定义
[info]正在从build.sbt加载项目sparkstreaming的设置…
[信息]将当前项目设置为MyProject(在构建文件中:/ home / thirdeye / sparkstreaming /)
[info]以批处理模式执行。 为了获得更好的性能,请使用sbt的shell
[成功]总时间:5秒,已完成2018年8月30日下午3:26:06sbt运行
[info]从/ home / thirdeye / sparkstreaming / project加载项目定义
[info]正在从build.sbt加载项目sparkstreaming的设置…
[信息]将当前项目设置为MyProject(在构建文件中:/ home / thirdeye / sparkstreaming /)
[info]包装/home/thirdeye/sparkstreaming/target/scala-2.11/myproject_2.11–1.0.jar…
[info]完成包装。
[info]运行scalastreamingsbt包
[info]从/ home / thirdeye / sparkstreaming / project加载项目定义
[info]正在从build.sbt加载项目sparkstreaming的设置…
[信息]将当前项目设置为MyProject(在构建文件中:/ home / thirdeye / sparkstreaming /)
[成功]总时间:1秒,已完成2018年8月30日下午3:28:34
现在我们要提交火花:
./bin/spark-submit --class scalastreaming --master spark://localhost:6066 /home/thirdeye/sparkstreaming/target/scala-2.11/myproject_2.11-1.0.jar 10