几天前,我遇到了一种情况,我想对流数据进行有状态操作。 因此,我开始寻找可能的解决方案。 我遇到了许多使用不同技术的解决方案,例如Spark结构化流,Apache Flink,Kafka Streams等。
所有解决方案都解决了我的问题,但我选择Kafka Streams是因为它满足了我的大多数要求。 之后,我开始阅读其文档并尝试运行其示例。 但是,一旦我开始学习它,我遇到了一个主要障碍,那就是“ Kafka Streams不提供Scala API! ”。 令我震惊的是。
- Guarda – Gomorroide电影在线完整高清版2017
- 苹果唱片。
- 像雷泽少校这样的表演正在使用Wavo来扩大他们的Spotify基地
- 从整体中流式传输数据:构建高度可靠的CDC堆栈
- Spotify,当心! JioSaavn在这里!
我之所以期望Kafka Streams具有Scala API,是因为我正在使用Scala构建我的应用程序,如果Kafka Streams提供了它的API,那么对我来说将其包含在我的应用程序中将很容易。 但是事实并非如此。 当我搜索它的Scala示例时,最上方的地方是我只能找到其中的几个。
因此,我决定自己学习,第一步是建立一个“ Hello World! 使用Kafka Streams和Scala的程序,如下所示:
在运行此示例之前,我们需要启动Kafka服务器。 为此,您可以阅读他们的快速入门指南 。 之后,向Kafka主题-“ SourceTopic ”发送一些消息,并为Kafka主题-“ SinkTopic ”启动Kafka使用者。
[代码语言=“重击”]
$ bin / kafka-console-producer.sh —代理列表本地主机:9092 —主题SourceTopic
你好,世界!
[/码]
现在,运行示例,您将看到Kafka消费者主题“ SinkTopic”将收到消息。
[代码语言=“重击”]
$ bin / kafka-console-consumer.sh —引导服务器本地主机:9092 — SinkTopic主题
你好,世界!
[/码]
这意味着现在我们可以通过Kafka Streams将消息从一个Kafka主题发送到另一个主题。
因此,这是我使用Scala学习Kafka Streams的第一步。 我知道这并不多,但是我仍然需要在Kafka Streams中探索更多内容,例如转换,联接,聚合等,我将在以后的文章中对此进行探讨。 所以,请继续关注🙂
完整的代码可以从Github下载。
请随时提出建议或评论!