从IBM Message Hub接收消息

IBM Message Hub提供Kafka即服务。 它是可扩展的,分布式的,高度容错的和云部署的消息传递系统。

在本文中,您将学习如何使用Kafka REST API使用IBM Message Hub中的事件进行下游处理。 Kafka REST API提供了一种快速简便的方法来探索Kafka主题并在小规模用例中使用消息。

在我们的示例中,我们将通过将消息写入主题来演示这种用例,然后再使用REST API使用它们进行一些分析。 端到端的说明足以使您继续学习并获得可行的方案。

博客中使用的代码可以在此GitHub位置找到。

但是,对于任何更复杂的用例,您都希望使用具有更高吞吐率和稳定性保证的主Kafka API。 Kafka REST API仅用于管理和探索用途,而不用于处理生产数据。 对于生产数据,您应该利用本机Kafka连接器。

1.基础

消息中心的作用类似于用于所有传入和传出通信的实时消息传递系统或数据管道。 最重要的术语是:

  • 话题
  • 生产者
  • 消费者

生产者向Kafka集群中的特定主题发送消息,而消费者通过订阅该主题来接收此消息。

消费者可以加入所谓的消费者组。 在该组内,只有一个消费者可以处理该消息。 像这样对使用者进行分组对于在高吞吐量应用程序中并行处理消息很有用。

(图1:基本架构,Kafka文档)

REST API允许您定义使用者实例和使用者组。

2.设置和准备

您将需要在IBM Bluemix上创建自己的Message Hub实例或使用现有实例,并在DSX中创建一个新的Python笔记本。

创建一个笔记本

您将使用这个新笔记本将本教程中找到的代码复制并粘贴到笔记本中的代码单元中。

  1. 导航到Data Science Experience并登录或创建一个新帐户。
  2. 使用默认的Python设置从Blank创建一个新的笔记本。
  3. 我们的示例笔记本中的REST API调用通过Python请求包作为http请求提交。 使用以下命令来安装和导入软件包:
!pip install --upgrade requests 
 import requests 

创建一个消息中心实例

  1. 导航到Bluemix。
  2. 登录并单击目录。
  3. 在“应用程序服务”部分中搜索“消息中心”。
  4. 创建一个实例。
  5. 在“管理”部分中,创建一个带有名称的主题。 稍后将需要该名称。
  6. 复制服务凭证并将其粘贴到代码单元中。

您的代码单元应该看起来像这样(URL可能因您的Bluemix区域而异):

 credentials ={ 
"instance_id": "xxx",
"mqlight_lookup_url": "https://mqlight-lookup-prod01.messagehub.services.us-south.bluemix.net/Lookup?serviceId=xxx",
"api_key": "xxx",
"kafka_admin_url": "https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443",
"kafka_rest_url": "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443",
"kafka_brokers_sasl": [
"kafka03-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka04-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka05-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka01-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka02-prod01.messagehub.services.us-south.bluemix.net:9093"
],
"user": "xxx",
"password": "xxx"
}

请注意,消息具有定义的保留时间(默认为24小时),可以在创建主题时指定保留时间。

3.创建一个Kafka用户组和实例

定义Kafka使用者实例的名称及其应属于的组。 使用不带下划线的简单名称或任何其他特殊字符,以避免潜在的冲突。

 consumerInstance = 'instance1' 
consumerGroup = 'group1'

authToken = credentials['api_key']
kafkaRestUrl = credentials['kafka_rest_url']

标头定义一次,并且将用于所有即将到来的请求,因为API密钥和REST URL不会更改。

 import json 

headers = {
'X-Auth-Token': authToken,
'Content-Type': 'application/json',
'Accept': 'application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json'
}

消费者实例将在体内发送。

 body1 = json.dumps({ 
'name': consumerInstance,
'format': 'binary',
'auto.offset.reset': 'smallest'
})

以下方法用于向Kafka Rest API提交发布请求以创建使用者实例。 它需要一个有效的身份验证标头以及使用者组和实例名称。

 def setConsumerInstanceAndGroup(consumerGroup, body1, headers): 

response = requests.post(kafkaRestUrl + "/consumers/" + consumerGroup, data=body1, headers=headers)

print(response.status_code, response.reason, response.text)
result = response.json()
print(result)
consumerUrl = result['base_uri']
print(consumerUrl)

return response

定义方法后,只需执行一次即可:

 setConsumerInstanceAndGroup(consumerGroup, body1, headers) 

如果使用者组已成功配置,您将获得响应代码200,并且print命令的输出将类似于以下内容:

 200 OK {"instance_id":"instance1","base_uri":"https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net/consumers/group1/instances/instance1"} 

{'base_uri': 'https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net/consumers/group1/instances/instance1', 'instance_id': 'instance1'}

https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net/consumers/group1/instances/instance1

现有的消费者组/实例组合将返回409冲突响应代码。 在这种情况下,请更改组和/或实例名称,然后重复呼叫。 如果要多次运行笔记本单元,可以在setConsumerInstanceAndGroup调用之前添加deleteConsumerInstance (请参阅下面的第6节)。

如果收到504超时错误,则通常意味着您没有正确创建主题。 在这种情况下,请在Bluemix仪表板中验证Message Hub实例,并断言主题名称和连接详细信息都与笔记本中的设置匹配。

如果您收到的不是200、409或504,请查看响应代码。

4.向Kafka主题推送消息

在向主题推送一些消息之前,我们必须设置主题名称和URL。 该名称必须与您之前在Message Hub实例中手动创建的名称相同。

 kafkaTopic = '' 

通过Kafka REST API,您可以将消息发布到之前创建的主题。 展开基本URL以添加/topics/ ,如图所示。

 url = kafkaRestUrl + "/topics/" + kafkaTopic 

在以下请求正文中,您可以定义要提交的消息文本。 在records数组中提交的值必须针对我们的使用者实例进行二进制编码。 Message Hub REST API当前不支持JSON或Avro编码。

要将值编码为二进制,必须将它们保存在字节数组(不是字符串)中,并从接收器解码为String值。

 import binascii 

body2 = json.dumps({
'records': [
{'value': binascii.hexlify(b"Mercury ").decode('utf-8')},
{'value': binascii.hexlify(b"Venus ").decode('utf-8')},
{'value': binascii.hexlify(b"Earth ").decode('utf-8')},
{'value': binascii.hexlify(b"Mars ").decode('utf-8')},
{'value': binascii.hexlify(b"Jupiter ").decode('utf-8')},
{'value': binascii.hexlify(b"Saturn ").decode('utf-8')},
{'value': binascii.hexlify(b"Uranus ").decode('utf-8')},
{'value': binascii.hexlify(b"Neptune ").decode('utf-8')}
]
}, ensure_ascii=False).encode('utf8')

要将消息提交到主题,我们使用以下方法,再次向Kafka REST API发送请求。

 def pushMessageToKafka(kafkaTopic, url, body2, headers): 

response = requests.post(url, data=body2, headers=headers)
print(response.text)

return response

随时返回并更改记录值,重新执行该方法,或引入一个循环来模拟数据流。

 pushMessageToKafka(kafkaTopic, url, body2, headers) 

输出:

 200 OK {"offsets": 
[
{"partition":0,"offset":0,"error_code":null,"error":null},
{"partition":0,"offset":1,"error_code":null,"error":null},
{"partition":0,"offset":2,"error_code":null,"error":null},
{"partition":0,"offset":3,"error_code":null,"error":null},
{"partition":0,"offset":4,"error_code":null,"error":null},
{"partition":0,"offset":5,"error_code":null,"error":null},
{"partition":0,"offset":6,"error_code":null,"error":null},
{"partition":0,"offset":7,"error_code":null,"error":null}
],
"key_schema_id":null,
"value_schema_id":null
}

如图所示,消息被发送到特定分区(此处为0)并分配了偏移量(此处为0至7)。 消息也将保留一定的时间,无论是否有消费者收到该消息。 要更改分区数或保留期,请使用Message Hub服务控制台。

(图2:主题剖析,Apache Kafka文档)

由于并行处理,更多的分区通常意味着更高的吞吐率。 为了容错,分区也可以跨群集复制。

5.接收来自Kafka的消息事件

现在是时候接收我们的消息了。

下面的方法定义了一个我们要运行一段时间的循环。 我们最终必须停止它才能处理结果。 我们这里没有的是Spark Streaming所需的实际流式接收器。 我们仅通过将消息中心中的消息读入数据帧来演示Spark核心功能。

 def getMessageFromKafka(maxArrayLength, maxIterations, consumerUrl, headers): 
results = []
length = 0
iteration = 0
while (length < maxArrayLength):
if (iteration > maxIterations): break

response = requests.get(kafkaRestUrl + "/consumers/"+consumerGroup+"/instances/"+consumerInstance+"/topics/"+kafkaTopic, headers=headers)

print (response, response.reason, response.text)
data = response.text

x = json.loads(data)
length = length + len(x)
iteration = iteration + 1

print ('===============================')
print ('Number of incoming messages: ', len(x))
print ('===============================')

for obj in x:
value = binascii.unhexlify(obj['value']).decode('utf-8')

print(value)
results.append(value)

return results

请注意,该方法会调用以解码我们之前使用生产者编码的每个二进制值。

执行该方法以接收当前有关该主题的所有消息。 设置其他参数以限制消息数组的长度和迭代次数。 如果遇到404错误,请返回并检查使用者实例是否设置正确。 如果没有输出,则推送更多消息并再次执行此方法。

 maxArrayLength = 2000 
maxIterations = 10

results = getMessageFromKafka (maxArrayLength, maxIterations, url, headers)

您的输出应类似于以下内容:

 =============================== 
Number of incoming messages: 8
===============================
Mercury
Venus
Earth
Mars
Jupiter
Saturn
Uranus
Neptune
OK []

6.删除Kafka使用者实例

为避免下次您要创建具有相同名称的实例时发生409冲突,让我们删除使用者实例。

 def deleteConsumerInstance(consumerGroup, consumerInstance, headers): 

response = requests.delete(kafkaRestUrl + "/consumers/" + consumerGroup + "/instances/" + consumerInstance, headers=headers)

print(response.status_code, response.reason, response.text)
return response

此呼叫的预期响应是204(无内容)消息。

 deleteConsumerInstance(consumerGroup, consumerInstance, headers) 
 204 No Content 

7.执行分析以获取见解

收到数据后,我们可以自由选择要处理的数据。 在这种情况下,我们将在Brunel中创建一个简单的标签云。 此时,收到的消息已经存储在results列表中。 您可以通过运行以下命令查看它们:

 [u'Mercury ', 
u'Venus ',
u'Earth ',
u'Mars ',
u'Jupiter ',
u'Saturn ',
u'Uranus ',
u'Neptune ']

接下来,我们将结果转换为Pandas数据框。 我们想要的只是构造一个通用数据框,以便我们可以检查从主题获得的数据。

 import pandas as pd 

results_pd = pd.DataFrame(results, columns=["Message"])

pd.set_option('display.max_columns', 500)
results_pd.head(20)
 Message 
0 Mercury
1 Venus
2 Earth
3 Mars
4 Jupiter
5 Saturn
6 Uranus
7 Neptune

收到数据后,我们可以自由选择要使用的数据。 例如,我们可以建立一个机器学习模型,可视化数据,将其映射到图形等。

让我们将数据存储到CSV文件中,并使用Brunel进行读取,以生成简单的标签云并可视化我们的消息文本。

 import brunel 

%brunel cloud data('results_pd') label(Message) :: width=700, height=400

有关用于Bluemix的IBM Message Hub的信息,请访问:http://ibm.co/1LOCQr6。

从此GitHub位置下载此iPython笔记本的完整代码。