AWS Kinesis Consumer Library 2.X的优势

定义

工作者:每个客户应用程序都有一个可以处理一个或多个分片的工作者。 如果有多个消费者应用程序,则工作人员会在其中随机共享碎片。

分片使用者:分片使用者消耗一个分片。 (一对一)

增强的扇出功能此功能使使用者可以从流中接收记录,每个分片每秒的吞吐量高达2 MiB数据

消费者应用程序:使用AWS KCL(Kinesis Client Library)开发以使用流的应用程序。

测试条件

流名称: TEST_AB

碎片: 7(3开-4关)

AWS KCL版本: 2.0.5

  • 即使已关闭,所有碎片都由碎片使用者共享。
  • 尝试通过多线程(一个应用程序)与消费者应用程序相乘,并创建多个实例(两个应用程序)以从同一流并行使用。
  • 在多线程方案中,每个线程彼此共享分片,并分别消耗数据。
  • 在多实例方案中,它们的作用与方案1中相同。创建新线程或应用程序(无关紧要)时,将创建工作程序,该工作程序从监视URL中获取有关分片数量的信息并侦听它们。
  • 运行了“测试应用程序”,并且一个工作人员提取了7个碎片。 观察以下日志。

日志:

INFO: Worker 3843a767-9cd0-471a-8067-58da19136a32 saw 7 total leases, 6 available leases, 1 workers. Target is 7 workers. Target is 7 leases, I have 0 leases, I will take 7 leases

  • 当再创造一个工人时,它会从其他工人那里窃取一些碎片以保持平衡。

日志:

INFO: Worker dc0cb1e1-925a-4347-a788-e18aab5c900b needed 3 leases but none were expired, so it will steal lease shardId-000000000005 from 3843a767-9cd0-471a-8067-58da19136a32 (Worker 1) leases but none were expired, so it will steal lease shardId-000000000005

INFO: Worker dc0cb1e1-925a-4347-a788-e18aab5c900b saw 7 total leases, 0 available leases, 2 workers. Target is 4 workers. Target is 4 leases, I have 1 leases, I will take 1 leases

  • 一段时间后,他们共享了以下7个碎片:

日志:

INFO: Current stream shard assignments: shardId-000000000007, shardId-000000000003, shardId-000000000002, shardId-000000000001 (Worker 1)

INFO: Current stream shard assignments: shardId-000000000006, shardId-000000000005, shardId-000000000004 (Worker 2)

  • 来自数据流的数据由应用程序内部的多个线程处理,即使消费者应用程序有一个正在运行的Worker。 为了观察这一点,线程名称被写入日志中。

日志:

INFO: Thread Name:ShardRecordProcessor-0000 Processing 26 record(s)

INFO: Thread Name:ShardRecordProcessor-0003 Processing 2 record(s)

增强的扇出消费者比较

HTTP / 1.1和HTTP / 2的比较

  1. 第一链接
  2. 第二连结

关于增强型扇出

  • 使用者应用程序的主要逻辑是,每个应用程序都有自己的工作程序,并且该工作者具有自己的分片使用者。 每个碎片消费者仅选择一个碎片来收听。 在测试方案中,创建了3个线程并将其注册为具有不同应用程序名称的增强型扇出消费者。 线程充当独立的应用程序,它们从同一流中接收相同的数据。 这不是我想要的情况。 数据应仅由一个使用者使用一次,并从流中消失。

创建新的碎片消费者

运行应用程序时,将为每个分片创建新的分片使用者。 (例如,5个碎片的5个碎片使用者)。 根据文档,每个分片使用者以2Mib / sec的速率分别消耗其分片。

日志:

INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000006, concurrencyToken=0777401e-12ea-44b5-b94c-56ef1e93dd10, parentShardIds=[shardId-000000000002], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}) INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000006, concurrencyToken=0777401e-12ea-44b5-b94c-56ef1e93dd10, parentShardIds=[shardId-000000000002], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})

INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000007, concurrencyToken=ebf00fa3-9d09-4bb4-9f9d-8c32a42c8728, parentShardIds=[shardId-000000000004, shardId-000000000005], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}) INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000007, concurrencyToken=ebf00fa3-9d09-4bb4-9f9d-8c32a42c8728, parentShardIds=[shardId-000000000004, shardId-000000000005], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})

INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000005, concurrencyToken=2efdb0f4-8b8d-4ee8-986d-ebf0076a0c10, parentShardIds=[shardId-000000000002], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0}) INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000005, concurrencyToken=2efdb0f4-8b8d-4ee8-986d-ebf0076a0c10, parentShardIds=[shardId-000000000002], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})

INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000004, concurrencyToken=44c9d364-1d60-4da9-b3cf-f7d5321cfcb7, parentShardIds=[shardId-000000000001], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0}) INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000004, concurrencyToken=44c9d364-1d60-4da9-b3cf-f7d5321cfcb7, parentShardIds=[shardId-000000000001], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})

INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000003, concurrencyToken=ad7774dd-f29a-49a4-9110-a0ea5695bde8, parentShardIds=[shardId-000000000001], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}) INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000003, concurrencyToken=ad7774dd-f29a-49a4-9110-a0ea5695bde8, parentShardIds=[shardId-000000000001], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})

INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000002, concurrencyToken=83e0c3b4-3999-4598-858d-e13015718cfc, parentShardIds=[shardId-000000000000], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0}) INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000002, concurrencyToken=83e0c3b4-3999-4598-858d-e13015718cfc, parentShardIds=[shardId-000000000000], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})

INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000001, concurrencyToken=36ff857a-be4b-4b73-8e18-1cab0f03393b, parentShardIds=[shardId-000000000000], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0}) INFO: Created new shardConsumer for : ShardInfo(shardId=shardId-000000000001, concurrencyToken=36ff857a-be4b-4b73-8e18-1cab0f03393b, parentShardIds=[shardId-000000000000], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})

测试申请代码

设定凭证

 系统。  setProperty (“ aws.accessKeyId”,“ XXX”); 
系统。
setProperty (“ aws.secretAccessKey”,“ YYY”);

主要

  ExecutorService执行程序=执行程序。  newFixedThreadPool (1); 
CallableKinesisConsumer callableKinesisConsumer = new CallableKinesisConsumer();
executor.submit(callableKinesisConsumer);

测试记录处理器

 包com.ahmetburak.aws.kinesis.consumer; 
导入org.slf4j.Logger;
导入org.slf4j.LoggerFactory;
导入software.amazon.kinesis.exceptions.InvalidStateException;
导入software.amazon.kinesis.exceptions.ShutdownException;
导入软件.amazon.kinesis.lifecycle.events。*;
导入software.amazon.kinesis.processor.ShardRecordProcessor;
导入java.util.HashMap;
/ **
*由burakoz在2018年11月16日创建
* /
公共类TestRecordProcessor实现ShardRecordProcessor {
私有静态最终字符串
SHARD_KEY =“ ShardId”;
私有静态最终记录器
日志 = LoggerFactory。 getLogger (TestRecordProcessor.class);
私有HashMap shardMap;
private String shardId;
公共TestRecordProcessor(){
this.shardMap = new HashMap();
}
公共无效的initialize(InitializationInput initializeInput){
shardId =初始化输入.shardId();
shardMap.put(
SHARD_KEY ,shardId);
尝试{

log .info(“正在初始化@ Sequence:{}”,initializationInput.extendedSequenceNumber());
}最后{
shardMap.remove(
SHARD_KEY );
}
}
公共无效processRecords(ProcessRecordsInput processRecordsInput){
shardMap.put(
SHARD_KEY ,shardId);
尝试{

log .info(“线程名称:” +线程。currentThread().getName()+“处理{}条记录”,processRecordsInput.records()。size());
processRecordsInput.records()。forEach(r->
log .info(“处理记录pk:{}-序列:{}-数据:{}”,r.partitionKey(),r.sequenceNumber(),r.data()));
} catch(Throwable t){

log .error(“处理记录时被抛出。中止”);
运行。
getRuntime ().halt(1);
}最后{
shardMap.remove(
SHARD_KEY );
}
}
公共无效leaseLost(LeaseLostInput leaseLostInput){

log .error(“ leaseLostInput”,leaseLostInput.toString());
}
公共无效shardEnded(ShardEndedInput shardEndedInput){
shardMap.put(
SHARD_KEY ,shardId);
尝试{

log .info(“到达分片末端检查点。”);
shardEndedInput.checkpointer()。checkpoint();
} catch(ShutdownException
InvalidStateException e){

log .error(“分片检查点异常。放弃”,e);
}最后{
shardMap.remove(
SHARD_KEY );
}
}
公共无效shutdownRequested(ShutdownRequestedInput shutdownRequestedInput){
shardMap.put(
SHARD_KEY ,shardId);
尝试{

log .info(“ Scheduler正在关闭,检查点。”);
shutdownRequestedInput.checkpointer()。checkpoint();
} catch(ShutdownException
InvalidStateException e){

记录 .error(“在请求关闭时检查点异常。放弃”,e);
}最后{
shardMap.remove(
SHARD_KEY );
}
}
}

可调用的Kinesis消费者

 包com.ahmetburak.aws.kinesis.consumer; 
导入org.slf4j.Logger;
导入org.slf4j.LoggerFactory;
导入software.amazon.awssdk.regions.Region;
导入软件.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
导入软件.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
导入软件.amazon.awssdk.services.kinesis.KinesisAsyncClient;
导入software.amazon.kinesis.common.ConfigsBuilder;
导入software.amazon.kinesis.coordinator.Scheduler;
导入java.util.UUID;
导入java.util.concurrent.Callable;
/ **
*由burakoz在2018年11月21日创建
* /
公共类CallableKinesisConsumer实现Callable {
公共静态最终字符串
STREAM_NAME =“ TEST_AB”;
公共静态最终字符串
APP_NAME =“ CallableKinesisConsumer_V2”;
私有静态最终记录器
日志 = LoggerFactory。 getLogger (KinesisConsumer.class);
私有KinesisAsyncClient kinesisClient;
私有DynamoDbAsyncClient dynamoClient;
私有CloudWatchAsyncClient cloudWatchClient;
private Region地区=地区。
AWS_GLOBAL ;
私有ConfigsBuilder configsBuilder;
私有字符串threadName;
公共CallableKinesisConsumer(){

日志 .info(“ B。POINT
“ +主题。
currentThread ().getName());
threadName =线程。
currentThread ().getName();
this.kinesisClient = KinesisAsyncClient。
builder ().region(region).build();
this.dynamoClient = DynamoDbAsyncClient。
builder ().region(region).build();
this.cloudWatchClient = CloudWatchAsyncClient。
builder ().region(region).build();
configsBuilder =新的ConfigsBuilder(
STREAM_NAMEAPP_NAME ,kinesisClient,dynamoClient,cloudWatchClient,UUID。 randomUUID (). toString (),新的TestRecordProcessorFactory());
}
@Override
公共字符串call(){

日志 .info(“ C。POINT
“ +主题。
currentThread ().getName());
getSchedular()。run();
返回threadName;
}
私人调度程序getSchedular(){
返回新的Scheduler(configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig());
}
}

测试记录处理器工厂

 包com.ahmetburak.aws.kinesis.consumer; 
导入software.amazon.kinesis.processor.ShardRecordProcessor;
导入software.amazon.kinesis.processor.ShardRecordProcessorFactory;
/ **
*由burakoz在2018年11月16日创建
* /
公共类TestRecordProcessorFactory实现ShardRecordProcessorFactory {
@Override
公共ShardRecordProcessor shardRecordProcessor(){
返回新的TestRecordProcessor();
}
}

综上所述,无论是否使用增强的扇出,具有一个分片的流都以相同的速度消耗。 如果流中有多个分片,则其客户端需要使用KCL 2.x版编写,以使用增强的扇出功能。 无需实现多线程,KCL为我们处理了此问题,但应考虑运行多个Consumer Application来动态地进行负载平衡。 因为,当发生故障时,KCL应该共享分片并平衡活动工作程序之间的负载。 因此,流消耗过程可以继续进行。

资源:

https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html

https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html