使用Scala Kafka Client和Akka开始使用Kafka

介绍

在这篇文章中,我将对Kafka进行介绍,介绍它的用途以及可能的用途,然后我将解释如何使用它来分离智能电表数据提交系统的两侧。 目的是使介绍Kafka本身大体上有用,而所显示的特定方法将转换为Kafka的部分但不是全部用途。

Kafka简介适用于不具备相关知识或经验的读者(这是我开始撰写本指南之前的两个星期)。 我的目标是使其能够作为一个独立的简介来使用,因此,当您完成本文时,您将获得足够的概览,以了解所需的术语用什么,请查看更多示例,以及哪些示例可能需要了解更多信息,哪些是干净的抽象,您不需要了解更多。

我们将介绍:

什么是分布式日志

  • 什么时候可以使用

Kafka的关键概念:

  • 话题
  • —订阅
  • 按键
  • 密钥和消息(反序列化)
  • 隔断
  • 复写
  • 偏移量
  • —抵销管理
  • -提交策略及其权衡
  • 生产者
  • 消费者
  • —消费群体
  • ZooKeeper的使用
  • 日志压缩

之后,我们将看几个使用scala-kafka-client的示例-一组用于在Scala代码库中操作Kafka Java客户端驱动程序的帮助程序模块-以及一个显示如何使用它来作为一部分运行Kafka的示例。集成测试。

卡夫卡

什么是分布式日志

Kafka是一个持久的分布式日志。 日志是仅附加消息的序列,允许随机读取,并且通常具有一定的长度。

分布式日志是一种分布式日志,其元素分布在多个节点上。 可以设置分发以提供可伸缩性和/或增加的可用性和耐用性。

对于Kafka,默认情况下,该时间长度限制为7天-但这可以更改为空格限制,组合或完全没有限制。

通常,消息会从日志的另一端删除,以使其大小保持在配置的范围内(如果有的话),这通常是使用Kafka的方式,但是也可以将其配置为执行称为日志压缩的操作,我们将进行此操作以后再说。

Kafka还可以灵活地将消费者消息传递到其他用户,而不是其他消息传递系统[需要引用]。

为了尝试对Kafka进行分类,我们定义发布-订阅是指一种间接消息传递机制,该机制导致主题的所有订阅者都收到提交给该主题的任何给定消息的副本,并且还将消息队列定义为间接消息。传递机制,导致仅单个收件人接收任何给定的消息。

有了这些定义,Kafka实际上就不能仅仅描述为一个。 它可以以一种或两种方式使用,此处的相关概念是消费者群体的概念。 我们也将在后面介绍。

持久性部分的意义在于,由于消息直接写入磁盘,因此即使发生故障,即使是单节点部署也可以提供持久性。 Kafka不维护自己的任何内存缓冲区或高速缓存。 尽管如此,它仍支持高读写吞吐量。 简单的线性对数结构可实现高写入吞吐量,线性结构对旋转驱动器和固态驱动器均表现出机械同感。 尽管后者没有很大的寻道延迟,但有写放大要考虑并在可能的情况下使其最小化。 线性度,磁盘以及操作系统自身的缓冲区和高速缓存的组合可实现高读取吞吐量。

什么时候可以使用

分布式日志可用于解决许多问题,而这些问题在您试图最大化基于事件的系统中组件的去耦时可能会存在。 我不会在这里列举所有信息,但是我会注意到,许多信息与消息在时间上大量分配而不仅仅是空间上分配有关; 随着时间的分布,对耐用性的需求越来越大,对存储的需求也越来越大。 卡夫卡满足了这两个需求。

仅举一个例子,让我们考虑我说过要研究的智能电表数据提交系统的情况。

假设这样的系统中的终端设备(即智能电表)几乎没有存储容量,因此即使它们确实存储了功率样本或在一定程度上进行批量处理,它们也只能容纳少量。 因此,为了最大程度减少使用数据的丢失,提交端点需要高度可用。

在智能计量应用程序中,我们可以通过具有一个简单的提交终结点来提高可用性,该终结点可以接受来自智能仪表的JSON POST,并且在进行一些基本验证之后,只需将接收到的样本提交给Kafka主题即可。 这意味着可以更改和重新部署系统的其他部分,而无需使提交端点脱机。 如果没有任何理由实时处理提交的数据,则可以将其作为处理和长期存储的非日常日常批处理工作。

卡夫卡的关键概念

对于某些读者来说,以下内容看起来像是教学大纲,而不是参考资料,因为您需要更多细节。 其他人将已经知道所有提及的内容以及更多。 牢记简洁的目标,我会少花一面。

话题

诚然,该主题是消息传递中的一个非常普遍的术语,在Kafka中并不奇怪。 它标识一类消息。 Kafka支持主题自动创建,但是也可以将其关闭。 可以在每个主题的基础上控制各种Kafka行为。

按键

Kafka中的消息可以选择键入。 如果消息中包含密钥,则该密钥将用于确定将其写入哪个分区。 关键也是日志压缩的关键(稍后介绍)。

隔断

每个主题都有固定数量的分区。 分区是分配单位; 它们是Kafka在集群中的各个节点之间分配给定主题中的消息的方式的核心。 它们也是允许多个使用者从同一个主题阅读的方式的核心,同时还确保每个使用者按总顺序阅读消息以及使用者组的实现。

对于使用密钥提交的消息,Kafka使用密钥的哈希值确定将消息写入的主题分区(尽管可以配置自定义分区程序)。 对于无密钥消息,Kafka选择一个随机分区(尽管这样做有些粘性,如Kafka FAQ中所述)。

对于原型,您可以使用每个主题一个分区的默认值,然后使用此Confluent帖子作为指导来调整数字。

密钥和消息(反序列化)

Kafka不会对您的邮件强加任何格式/结构规则。 就Kafka而言,它们只是不透明的二进制Blob。 这意味着您可以自由选择所需的任何序列化格式。 Kafka客户端确实提供了一些内置(反)序列化器,您可以将其用作构建块,包括用于字节数组和字符串的构建器。

在本文使用的智能计量功率数据提交示例中,我们将事件序列化为JSON字符串。 他们快速入门,使示例代码保持相对简单,并且易于阅读,可最大程度地减少调试所需的工具量。

除了原型之外,在提交消息格式之前,您还需要仔细考虑。 特定于语言的序列化程序可能比JSON更紧凑和更快,但是当您需要引入使用其他语言编写的系统时该怎么办? 考虑到这种情况创建的选项包括:

  • 阿帕奇(Apache Avro)
  • 协议缓冲区
  • Cap’n Proto
  • Apache Thrift

我不会在这里直接主张任何单一选择,但我将向您介绍这个Confluent帖子,该帖子在以下各部分中为i)选择单一数据格式和ii)我们[ing] ] Avro作为您的数据格式。

经纪人

经纪人只是Kafka服务的一个实例。 在本文中,我已经将术语“节点”与它同义使用。

复写

复制是在分区级别完成的,而要维护的副本数是在主题级别设置的。 default.replication.factor为1。

缺省值为1,主题内的每个分区都存储在单个节点上。 复制因子为N时,主题中的每个分区将存储在N个节点上,这意味着N -1个节点可能会失败,并且分区(和主题)仍然可用(尽管这忽略了其余节点过载的可能性)的点击量)。

如果您不使用复制,那么您不需要了解更多。 如果这样做,您将需要阅读更多内容,以了解故障期间事情的表现。 这篇LinkedIn文章和Kafka文档的复制部分提供了两个很好的资源。

生产者

生产者是一个客户端,它打开并维护与Kafka群集的连接,然后将消息推送到主题。 指示其如何查找群集的配置参数是bootstrap.servers。

Java客户端的KafkaProducer相对简单,这反映在其相对简单的文档中。

如果您对复制有更多的了解,我应该解释一下,“保持与Kafka群集的连接”是指生产者与群集中每个节点的连接,该节点是生产者所在主题分区的领导者。将消息推送到。

消费者

他们是生产者的对立面。 消费者从主题中提取消息。 告诉它如何找到集群的配置参数再次是bootstrap.servers。

Java客户端的KafkaConsumer并不是那么简单,这在更全面的文档中得到了反映。

抵销

如果在阅读消息后立即将其从分布式日志中删除,则足以最多提供一次传递。 但这不是日志。

偏移量是用于控制消费者在给定分区中读取位置的控件。 给定主题分区的偏移量是提交给该分区以从中读取下一条消息的第一条消息中的消息数。

卡夫卡将跟踪补偿的责任推给了消费者。 这是当Kafka被描述为向其使用者公开分布式日志的本质时的部分意思,并且通过这样做,它可以提供它所具有的灵活性和性能。

偏移量跟踪和提交策略

我只是说卡夫卡让消费者负责跟踪他们的抵销额。 现在,我要告诉您,Kafka可以负责跟踪偏移量。 有点。

Java使用者的用户有以下三种选择:

>让Kafka每隔auto.commit.interval.ms毫秒自动向其提交偏移量。

>>这是最简单的选项,如果使用它,则可能永远不会与代码中的偏移量进行交互。 它的缺点是提供最弱的加工保证; 如果Kafka 您实际完成所有早于新偏移量的消息的处理之前提交了更新的偏移量,则这些结果将在崩溃时丢失,并且不会在重新启动时重新生成(因为您使用的代码不会看到消息第二次)。 如果Kafka在处理完一条或多条消息提交了更新的偏移量,那么最终将在崩溃后再次处理相同的消息。 因此,您不会获得最多一次的处理,但也不会获得最少的一次处理。

>当您的使用方代码告诉Kafka时,使Kafka提交偏移量。

>>如果开始处理消息之前提交更新的偏移量,则最多可以进行一次处理。 如果您开始处理消息提交更新的偏移量,则可以至少一次处理。 但是,由于偏移量与生成的结果分开存储,因此没有原子性,因此您无法通过这种方式进行一次精确的处理。

>让Kafka根本不提交偏移量,用户完全负责将偏移量存储在其他位置,并负责在重启后告诉客户端要去哪里。

>>如果将偏移量与消息处理结果一起自动提交到同一外部存储,则可以进行一次精确处理。

对于所有三个选项,都有一个配置参数应引起注意:auto.offset.reset。 当使用者第一次从某个主题开始阅读时,它没有任何先前的偏移可恢复,因此存在auto.offset.reset来控制此类使用者从何处开始阅读。 默认情况下,它设置为latest ,这意味着消费者只有在第一次阅读该主题后,才会看到推送到该主题的消息。 也可以将其设置为earliest以使新使用者浏览该主题中的所有消息,也可以设置为none以防止出现任何默认情况-换句话说,如果使用者在以下情况下不手动寻求初始偏移量,则会引发异常他们首先阅读一个主题。

对于前两个选项,我说过Kafka可以向自己提交偏移量,但是我没有完全解释它在哪里存储偏移量。 实际上有两个可能的位置:ZooKeeper Kafka本身。 在较早的版本中,ZooKeeper是唯一的选择,但是发现它在这项工作中表现不佳,因此在0.8.2中增加了Kafka选项。 ZooKeeper支持仍然存在,以支持升级较旧的客户端。 Kafka是较新版本中的默认设置,您无需设置任何配置。 相当整洁的是,偏移量实际上只是存储在压缩的主题中(我们将很快介绍日志压缩)。 0.8.2公告的“ 偏移管理”部分的最后一段对此进行了更多介绍。 这不是必不可少的阅读,但很有趣。

消费者群体

在我对分布式日志的描述中,我说过Kafka不能归类为提供发布-订阅XOR典型的消息队列语义(其中发布-订阅)意味着每个订阅者都会收到给定消息的副本,而“通常消息队列语义”表示每条消息仅传递给一个收件人。

消费者群体是Kafka灵活性的关键。 消费者订阅主题时,它指定要订阅的消费者组。 在订阅过程中使用其名称隐含了一个消费组。 不需要显式创建。

消费群体在发布—订阅术语中也可以被视为“逻辑订户”。 这意味着每个订阅主题的消费者组都会收到发布到该主题的每条消息的副本(在该组内每个消费者所读取的偏移量范围内)。 但是在给定的消费者组中,行为就像一个消息队列:每条传入的消息都准确地传递给该组的一个消费者。 此行为可用于在多个使用者节点之间分配处理消息的工作。

Kafka使用分区实现此使用者组功能。 根据您使用它们的方式,重要的是,您必须了解了解这对消息在消费者组中的分发方式以及用户的订购保证所产生的后果的重要性。 如有疑问,这可能值得一读; Kafka文档的消费者部分有更多内容。

ZooKeeper的使用

从撰写Kafka生产者和消费者的角度来看,关于ZooKeeper的内容并不需要多说,这是本文的写作目的和针对的观点。 您应该知道的是,Kafka确实需要ZooKeeper。 这不是一个选择。 从其他角度使用Kafka,您无疑需要了解更多,但这不是可以告诉您这些事情的文章。 TL; DR是这样的:Kafka将ZooKeeper基本上用于所有THE METADATA,主题偏移是一个值得注意的例外。

日志压缩

日志压缩是一项整洁的功能,可以与键控消息一起使用,但是它不是常用的操作模式[需要引用]。 再加上以上其他概念均不依赖它的事实,这就是为什么我最后要描述它。

当以正常操作模式(该模式称为“删除”)运行的主题达到其配置的长度限制(无论是时间和/或空间限制)时,将从日志尾部删除消息。

在日志压缩模式下(该模式称为“压缩”模式),Kafka会构建新版本的日志,而不是在达到长度限制时从日志尾部删除消息,仅逐个键保留写入旧的未压缩日志的最新消息。

如果有帮助的话,日志压缩可以(以一种相当容易的方式)被认为类似于压缩一系列Git提交。 最终状态是相同的,您只是失去了历史。

这个Kafka Wiki页面还有更多。

scala-kafka客户端

scala-kafka-client是一组三个模块,旨在帮助使用Scala和Akka中的Kafka:

  • scala-kafka-client。 Java客户端API的最小Scala包装器,为方便配置客户端和使用Scala提供了一些帮助。
  • scala-kafka-client-akka。 提供异步和非阻塞的Kafka使用者,在使用Akka开发应用程序时可以很方便。 KafkaConsumerActor具有缓冲功能以增加吞吐量,并提供一些帮助程序以简化配置。
  • scala-kafka-client-tesktkit。 通过提供可以启动进程内Kafka和ZooKeeper服务器的帮助程序,支持Kafka客户端代码的集成测试。

我不会重复自述文件和Wiki中的文档,而是为每个模块提供一个更详细的用法示例。 每个都取自前面提到的智能电表数据提交系统。 所有这三个示例都是针对0.7.0版本构建的。

scala-kafka-client的KafkaProducer

KafkaProducer是围绕Java Java KafkaProducer的(非常)轻量级的Scala包装器。 它提供的主要两个改进是:

  • 与Java KafkaProducer只是将配置作为Map参数不同,它是通过具有Conf.apply功能的Conf对象Conf.apply
 def apply[K, V](keySerializer: Serializer[K], 
valueSerializer: Serializer[V],
bootstrapServers: String = "localhost:9092",
acks: String = "all",
retries: Int = 0,
batchSize: Int = 16384,
lingerMs: Int = 1,
bufferMemory: Int = 33554432): Conf[K, V]

还有一个选项可以从Typesafe配置中创建Conf对象:

 def apply[K, V](config: Config, keySerializer: Serializer[K], valueSerializer: Serializer[V]): Conf[K, V] 

在这种情况下,配置名称和值必须与Kafka的ProducerConfig样式匹配。 下面的示例使用此选项。

  • 您将Scala Futures作为发送的结果,而不是Java Futures。

这是我们的智能计量功率数据提交系统中Scala KafkaProducer的用法:

 import cakesolutions.kafka.{KafkaProducer, KafkaProducerRecord} 
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringSerializer

class SampleSubmitter(config: Config) {

private val producer = KafkaProducer(
KafkaProducer.Conf(
config,
keySerializer = new StringSerializer,
valueSerializer = new JsonSerializer[SubmitSampleCommand])
)

private val topic = config.getString("topic")

def submitSample(meterId: MeterId, submitSampleCommand: SubmitSampleCommand) = producer.send(
KafkaProducerRecord(topic, meterId.id.toString, submitSampleCommand)
)

def close() = producer.close()

}

Akka HTTP提交前端的每个实例都实例化这些SampleSubmitter中的一个,并将所有接受的客户端POST(未编组为SubmitSampleCommands)转发给它。

作为参考,传入的config块定义为:

 kafka { 
bootstrap.servers = "kafka:9092"
topic = "samples"
}

MeterId为:

 import java.util.UUID 
 case class MeterId(id: UUID) 
 object MeterId { 
  def generate = MeterId(UUID.randomUUID) 
 } 

SubmitSampleCommand为:

 import play.api.libs.functional.syntax._ 
import play.api.libs.json.Reads._
import play.api.libs.json.Json
 case class SubmitSampleCommand(timestamp: Long, power: Double) { 
require(timestamp > 0)
}
 object SubmitSampleCommand { 
  implicit val SubmitSampleCommandFormat = ( 
(JsPath \ "timestamp").format[Long](min(0L)) and
(JsPath \ "power").format[Double]
) (SubmitSampleCommand.apply, unlift(SubmitSampleCommand.unapply))
 } 

为了使本示例简洁明了,我们只是将消息序列化为JSON字符串。 但是对于生产代码,您将需要选择一些东西来给您提供一种处理模式更改的方法-如前面有关序列化的部分所述。

就是说,这里是JsonSerializer

 import java.util 
 import org.apache.kafka.common.serialization.{Serializer, StringSerializer} 
import play.api.libs.json.{Json, Writes}
 public class JsonSerializer[A: Writes] extends Serializer[A] { 
 private val stringSerializer = new StringSerializer 
 override def configure(configs: util.Map[String, _], isKey: Boolean) = 
stringSerializer.configure(configs, isKey)
 override def serialize(topic: String, data: A) = 
stringSerializer.serialize(topic, Json.stringify(Json.toJson(data)))
 override def close() = 
stringSerializer.close()
 } 

scala-kafka-client-akka的KafkaConsumerActor

官方Java KafkaConsumer只是提供了一个阻塞poll方法。 预期的使用模式是调用方在阻塞的客户端轮询线程中循环。 不幸的是,没有直接在actor系统中直接使用此类接口的便捷方法。 那就是KafkaConsumerActor出现的地方。

创建KafkaConsumerActor时,将为它提供一个下游actor。 然后,消费者actor从Kafka中读取消息,并将其转发给您的下游actor。

在您确认并接收并处理了已发送的消息之前,使用者actor不会再发送任何下游消息。 为了确保传递,消费者参与者可以选择在可配置的超时后重新发送未确认的消息。

为了向下游Actor提供平稳的消息流,KafkaConsumerActor维护自己的消息缓冲区,该缓冲区已从Kafka读取并准备向下游发送。 这样,在确认收到一组消息与接收下一组消息之间的延迟最小(前提是Kafka本身会使用户感到饱和)。

KafkaConsumerActor支持我们前面确定的官方Java KafkaConsumer支持的所有三个偏移提交选择。

在我们的示例中,权力样本被转发给了集群化的持久参与者,并且我们使用手动偏移提交策略,由Kafka存储它们。

这是我们的智能计量功率数据提交系统中KafkaConsumerActor的用法:

 import java.util.UUID 

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.akka.KafkaConsumerActor.{Confirm, Subscribe, Unsubscribe}
import cakesolutions.kafka.akka.{ConsumerRecords, KafkaConsumerActor}
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringDeserializer

object SampleAcceptorActor {

def props(config: Config, meterShardRegion: ActorRef) = Props(new SampleAcceptorActor(config, meterShardRegion))

private val extractor = ConsumerRecords.extractor[String, SubmitSampleCommand]

}

class SampleAcceptorActor(config: Config, meterShardRegion: ActorRef) extends Actor with ActorLogging {

private val kafkaConsumerActor = context.actorOf(
KafkaConsumerActor.props(
consumerConf = KafkaConsumer.Conf(
config,
keyDeserializer = new StringDeserializer,
valueDeserializer = new JsonDeserializer[SubmitSampleCommand]
),
actorConf = KafkaConsumerActor.Conf(config),
self
),
"KafkaConsumer"
)

override def preStart() = {
super.preStart()
kafkaConsumerActor ! Subscribe()
}

override def postStop() = {
kafkaConsumerActor ! Unsubscribe
super.postStop()
}

override def receive = {

// extractor recovers the type parameters of ConsumerRecords, so pairs is of type Seq[(Option[String], SubmitSampleCommand)]
case extractor(consumerRecords) =>

consumerRecords.pairs.foreach {
case (None, submitSampleCommand) => log.error(s"Received unkeyed submit sample command: $submitSampleCommand")
case (Some(meterIdUuidString), submitSampleCommand) =>
meterShardRegion ! EnvelopedMessage(
MeterId(UUID.fromString(meterIdUuidString)),
MeterActor.AddSampleCommand(
MeterActor.Sample(
submitSampleCommand.timestamp,
submitSampleCommand.power
)
)
)
}

// By committing *after* processing we get at-least-once-processing, but that's OK here because we can identify duplicates by their timestamps
kafkaConsumerActor ! Confirm(consumerRecords.offsets, commit = true)

}

}

在这种情况下, config块定义为:

 kafka { 
bootstrap.servers = "kafka:9092"
topics = ["samples"]
group .id = "metering"
auto.offset.reset = "earliest"
}

这是JsonDeserializer

 import java.util 
 import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer} 
import play.api.libs.json.{Json, Reads}
 public class JsonDeserializer[A: Reads] extends Deserializer[A] { 
 private val stringDeserializer = new StringDeserializer 
 override def configure(configs: util.Map[String, _], isKey: Boolean) = 
stringDeserializer.configure(configs, isKey)
 override def deserialize(topic: String, data: Array[Byte]) = 
Json.parse(stringDeserializer.deserialize(topic, data)). as [A]
 override def close() = 
stringDeserializer.close()
 } 

关于故障处理的注意事项:当KafkaConsumerActor在与Kafka通讯失败时透明地重试(并且它具有高度可配置和可扩展的重试策略来控制它),如果KafkaConsumerActor本身的缺陷导致Actor重新启动,则新启动的实例将不记得您使用旧版本打开的订阅。 因此,您需要通过某种形式的监督来检测到此情况,然后向新启动的实例重新发送预订命令。 为了简洁起见,我在此示例中省略了此类代码,但是您应该在使用KafkaConsumerActor的任何长时间运行的生产系统中都包含此代码。

scala-kafka-client-testkit的KafkaServer

KafkaServer很简单,也非常有用。 它为您提供了可以对其进行集成测试的进程内Kafka服务器(还为Kafka启动了进程内ZooKeeper服务器)。 使用null构造函数创建一个,调用startup ,就是这样。 完成后只需调用close

这是一个使用它的示例:

 import cakesolutions.kafka.KafkaConsumer 
import cakesolutions.kafka.testkit.{KafkaServer, TestUtils}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import play.api.libs.json.Json

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.util.Random

class SampleSubmitterSpec extends WordSpec with Matchers with BeforeAndAfterAll {

private val kafkaServer = new KafkaServer

private val baseConfig = ConfigFactory.parseString(
s"""
|{
| bootstrap.servers = "localhost:${kafkaServer.kafkaPort}"
|}
""".stripMargin)

private val topic = "samples"

private val consumer = KafkaConsumer(KafkaConsumer.Conf(
ConfigFactory.parseString(
s"""
|{
| topics = ["$topic"]
| group.id = "testing-${TestUtils.randomString(5)}"
| auto.offset.reset = "earliest"
|}
""".stripMargin).withFallback(baseConfig),
keyDeserializer = new StringDeserializer(),
valueDeserializer = new StringDeserializer()
))

override def beforeAll() = kafkaServer.startup()

override def afterAll() = kafkaServer.close()

"A Sample Submitter" must {
"forward commands to Kafka" in {
consumer.subscribe(List(topic).asJava)

val sampleSubmitter = new SampleSubmitter(
ConfigFactory.parseString(
s"""
|{
| topic = "$topic"
|}
""".stripMargin).withFallback(baseConfig)
)

val meterId = MeterId.generate
val submitSampleCommand = SubmitSampleCommand(
timestamp = System.currentTimeMillis,
power = {
val bedrooms = Random.nextInt(4) + 1
Math.random * bedrooms * 1000
}
)
sampleSubmitter.submitSample(
meterId,
submitSampleCommand
)

val records = consumer.poll(30.seconds.toMillis)
records.count shouldBe 1
val record = records.asScala.toList.head
record.key shouldBe meterId.id.toString
record.value shouldBe Json.stringify(Json.toJson(submitSampleCommand))

sampleSubmitter.close()

consumer.close()
}
}

}

结论

希望您对Kafka有了一个很好的了解,并了解以下内容:

  • 分布式日志是其提供的功能及其所具有的属性的核心。
  • 什么是主题和分区。
  • 什么是键控消息。
  • 卡夫卡不在乎您如何序列化消息,而应该在乎。
  • 生产者,消费者,消费者群体,订购者和补偿对象是什么,不同补偿提交策略的权衡,以及何时何地可以了解更多有关分区如何影响消费者群体内工作分配的信息。
  • 什么是日志压缩。
  • 关于复制的一点(即通过分区启用复制)以及何时何地了解更多信息。

我还解释了为什么官方Java客户端不适合从Scala和actor系统中使用,以及这如何激励了scala-kafka-client的开发。 然后,我们看到了一些示例,这些示例表明,这使从Kafka主题产生和消费变得非常简单。

感谢您的阅读,并祝您愉快!

仓库

这是我们的开源项目的链接:

https://github.com/cakesolutions/scala-kafka-client

进一步阅读

  • http://kafka.apache.org/documentation.html
  • http://www.confluent.io/blog/stream-data-platform-1/、http://www.confluent.io/blog/stream-data-platform-2/
  • https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying