Kafka 0.10的Spark Streaming集成在设计上类似于0.8 Direct Stream方法。 它提供简单的并行性,Kafka分区和Spark分区之间的1:1对应关系以及对偏移量和元数据的访问。 但是,由于较新的集成使用了新的Kafka使用者API而不是简单的API,因此用法上存在显着差异。 集成的此版本标记为实验性的,因此API可能会发生更改。
在此博客中 ,我将实现Spark结构化流和Kafka集成的基本示例。
- 我获得了50个NBA 2K联赛冠军。 已起草我。
- 流媒体使用费无关紧要
- HANDBALL→观看者Hongrie Egypte直播Le Mercredi 16 Janvier 2019
- 专栏:Binnenkort Eredivisie bij bol.com?
- 在我们的无服务器OTT专用电视频道播放中启用视频广告
在这里,我正在使用
- Apache Spark 2.2.0
- Apache Kafka 0.11.0.1
- 斯卡拉2.11.8
创建built.sbt
让我们创建一个sbt项目,并在build.sbt中添加以下依赖项。
libraryDependencies ++ = Seq(“ org.apache.spark”%“ spark-sql_2.11”%“ 2.2.0”,
“ org.apache.spark”%“ spark-sql-kafka-0-10_2.11”%“ 2.2.0”,
“ org.apache.kafka”%“ kafka-客户端”%“ 0.11.0.1”)
创建SparkSession
现在,我们必须导入必要的类并创建一个本地SparkSession,这是Spark中所有功能的起点。
val spark = SparkSession
.builder
.appName(“ Spark-Kafka-Integration”)
.master(“本地”)
.getOrCreate()
定义架构
我们必须为将要从csv中读取的数据定义模式。
val mySchema = StructType(Array(
StructField(“ id”,IntegerType),
StructField(“ name”,StringType),
StructField(“ year”,IntegerType),
StructField(“ rating”,DoubleType),
StructField(“ duration”,IntegerType)
))
我的csv文件示例在此处,数据集说明在此处给出
创建流数据框架
现在,我们必须创建一个流数据框架,其架构在名为“ mySchema”的变量中定义。 如果将任何csv文件拖放到dir中,该文件将自动更改为流数据帧。
val StreamingDataFrame = spark.readStream.schema(mySchema).csv(“目录的路径,例如home / Desktop / dir /”)
将流发布到Kafka
StreamingDataFrame.selectExpr(“ CAST(id AS STRING)AS key”,“ to_json(struct(*))AS value”)。
writeStream
.format(“ kafka”)
.option(“ topic”,“ topicName”)
.option(“ kafka.bootstrap.servers”,“ localhost:9092”)
.option(“ checkpointLocation”,“本地目录的路径”)
。开始()
为Kafka创建一个名为“ topicName”的主题,并将带有该主题的数据帧发送到Kafka。 这里的9092是运行Kafka的本地系统的端口号。 我们使用checkpointLocation创建有关流的偏移量。
从Kafka订阅流
导入spark.implicits._
val df =火花
.readStream
.format(“ kafka”)
.option(“ kafka.bootstrap.servers”,“ localhost:9092”)
.option(“ subscribe”,“ topicName”)
。加载()
至此,我们只订阅了来自kafka的流,并使用了与上面相同的主题名称。
根据我的架构以及时间戳转换流
val df1 = df.selectExpr(“ CAST(VALUE AS STRING)”,“ CAST(timestamp AS TIMESTAMP)”)。as [(String,Timestamp)]
.select(from_json($“ value”,mySchema).as(“ data”),$“ timestamp”)
.select(“ data。*”,“ timestamp”)
在这里,我们将来自kafka的流中的数据从kafka转换为Json,而从Json中,我们仅根据“ mySchema”中需要描述的架构来创建数据框。 我们还将时间戳列应用于此。
在控制台上打印数据框
在这里,我们只是将数据打印到控制台。
df1.writeStream
.format(“控制台”)
.option(“ truncate”,“ false”)
。开始()
.awaitTermination()
有关更多详细信息,您可以参考此内容。
