Spark结构化流和Kafka集成的基本示例

Kafka 0.10的Spark Streaming集成在设计上类似于0.8 Direct Stream方法。 它提供简单的并行性,Kafka分区和Spark分区之间的1:1对应关系以及对偏移量和元数据的访问。 但是,由于较新的集成使用了新的Kafka使用者API而不是简单的API,因此用法上存在显着差异。 集成的此版本标记为实验性的,因此API可能会发生更改。

在此博客中我将实现Spark结构化流和Kafka集成的基本示例。

在这里,我正在使用

  • Apache Spark 2.2.0
  • Apache Kafka 0.11.0.1
  • 斯卡拉2.11.8

创建built.sbt

让我们创建一个sbt项目,并在build.sbt中添加以下依赖项。

  libraryDependencies ++ = Seq(“ org.apache.spark”%“ spark-sql_2.11”%“ 2.2.0”, 
“ org.apache.spark”%“ spark-sql-kafka-0-10_2.11”%“ 2.2.0”,
“ org.apache.kafka”%“ kafka-客户端”%“ 0.11.0.1”)

创建SparkSession

现在,我们必须导入必要的类并创建一个本地SparkSession,这是Spark中所有功能的起点。

  val spark = SparkSession 
.builder
.appName(“ Spark-Kafka-Integration”)
.master(“本地”)
.getOrCreate()

定义架构

我们必须为将要从csv中读取的数据定义模式。

  val mySchema = StructType(Array( 
StructField(“ id”,IntegerType),
StructField(“ name”,StringType),
StructField(“ year”,IntegerType),
StructField(“ rating”,DoubleType),
StructField(“ duration”,IntegerType)
))

我的csv文件示例在此处,数据集说明在此处给出

创建流数据框架

现在,我们必须创建一个流数据框架,其架构在名为“ mySchema”的变量中定义。 如果将任何csv文件拖放到dir中,该文件将自动更改为流数据帧。

  val StreamingDataFrame = spark.readStream.schema(mySchema).csv(“目录的路径,例如home / Desktop / dir /”) 

将流发布到Kafka

  StreamingDataFrame.selectExpr(“ CAST(id AS STRING)AS key”,“ to_json(struct(*))AS value”)。 
writeStream
.format(“ kafka”)
.option(“ topic”,“ topicName”)
.option(“ kafka.bootstrap.servers”,“ localhost:9092”)
.option(“ checkpointLocation”,“本地目录的路径”)
。开始()

为Kafka创建一个名为“ topicName”的主题,并将带有该主题的数据帧发送到Kafka。 这里的9092是运行Kafka的本地系统的端口号。 我们使用checkpointLocation创建有关流的偏移量。

从Kafka订阅流

 导入spark.implicits._ 
  val df =火花 
.readStream
.format(“ kafka”)
.option(“ kafka.bootstrap.servers”,“ localhost:9092”)
.option(“ subscribe”,“ topicName”)
。加载()

至此,我们只订阅了来自kafka的流,并使用了与上面相同的主题名称。

根据我的架构以及时间戳转换流

  val df1 = df.selectExpr(“ CAST(VALUE AS STRING)”,“ CAST(timestamp AS TIMESTAMP)”)。as [(String,Timestamp)] 
.select(from_json($“ value”,mySchema).as(“ data”),$“ timestamp”)
.select(“ data。*”,“ timestamp”)

在这里,我们将来自kafka的流中的数据从kafka转换为Json,而从Json中,我们仅根据“ mySchema”中需要描述的架构来创建数据框。 我们还将时间戳列应用于此。

在控制台上打印数据框

在这里,我们只是将数据打印到控制台。

  df1.writeStream 
.format(“控制台”)
.option(“ truncate”,“ false”)
。开始()
.awaitTermination()

有关更多详细信息,您可以参考此内容。