定义
工作者:每个客户应用程序都有一个可以处理一个或多个分片的工作者。 如果有多个消费者应用程序,则工作人员会在其中随机共享碎片。
- 您的乐队作为品牌
- 我从未在2017年跳过的歌曲
- eLVy神在他的最新专辑中超越了其他人
- 是否可以免费托管,编码和流式传输所有视频?
- HANDBALL→巡回赛法国Russie en流媒体le Jeudi 17 Janvier 2019
分片使用者:分片使用者消耗一个分片。 (一对一)
增强的扇出功能:此功能使使用者可以从流中接收记录,每个分片每秒的吞吐量高达2 MiB数据
消费者应用程序:使用AWS KCL(Kinesis Client Library)开发以使用流的应用程序。
测试条件
流名称: TEST_AB
碎片: 7(3开-4关)
AWS KCL版本: 2.0.5
- 即使已关闭,所有碎片都由碎片使用者共享。
- 尝试通过多线程(一个应用程序)与消费者应用程序相乘,并创建多个实例(两个应用程序)以从同一流并行使用。
- 在多线程方案中,每个线程彼此共享分片,并分别消耗数据。
- 在多实例方案中,它们的作用与方案1中相同。创建新线程或应用程序(无关紧要)时,将创建工作程序,该工作程序从监视URL中获取有关分片数量的信息并侦听它们。
- 运行了“测试应用程序”,并且一个工作人员提取了7个碎片。 观察以下日志。
日志:
INFO: Worker 3843a767-9cd0-471a-8067-58da19136a32 saw 7
total leases, 6
available leases, 1
workers. Target is 7
workers. Target is 7
leases, I have 0
leases, I will take 7
leases
- 当再创造一个工人时,它会从其他工人那里窃取一些碎片以保持平衡。
日志:
INFO: Worker dc0cb1e1-925a-4347-a788-e18aab5c900b needed 3
leases but none were expired, so it will steal lease shardId-000000000005
from 3843a767-9cd0-471a-8067-58da19136a32 (Worker 1)
leases but none were expired, so it will steal lease shardId-000000000005
INFO: Worker dc0cb1e1-925a-4347-a788-e18aab5c900b saw 7
total leases, 0
available leases, 2
workers. Target is 4
workers. Target is 4
leases, I have 1
leases, I will take 1
leases
- 一段时间后,他们共享了以下7个碎片:
日志:
INFO: Current stream shard assignments: shardId-000000000007, shardId-000000000003, shardId-000000000002, shardId-000000000001
(Worker 1)
INFO: Current stream shard assignments: shardId-000000000006, shardId-000000000005, shardId-000000000004
(Worker 2)
- 来自数据流的数据由应用程序内部的多个线程处理,即使消费者应用程序有一个正在运行的Worker。 为了观察这一点,线程名称被写入日志中。
日志:
INFO: Thread Name:ShardRecordProcessor-0000
Processing 26
record(s)
INFO: Thread Name:ShardRecordProcessor-0003
Processing 2
record(s)
增强的扇出消费者比较

HTTP / 1.1和HTTP / 2的比较
- 第一链接
- 第二连结
关于增强型扇出
- 使用者应用程序的主要逻辑是,每个应用程序都有自己的工作程序,并且该工作者具有自己的分片使用者。 每个碎片消费者仅选择一个碎片来收听。 在测试方案中,创建了3个线程并将其注册为具有不同应用程序名称的增强型扇出消费者。 线程充当独立的应用程序,它们从同一流中接收相同的数据。 这不是我想要的情况。 数据应仅由一个使用者使用一次,并从流中消失。

创建新的碎片消费者
运行应用程序时,将为每个分片创建新的分片使用者。 (例如,5个碎片的5个碎片使用者)。 根据文档,每个分片使用者以2Mib / sec的速率分别消耗其分片。
日志:
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000006, concurrencyToken=0777401e-12ea-44b5-b94c-56ef1e93dd10, parentShardIds=[shardId-000000000002], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000006, concurrencyToken=0777401e-12ea-44b5-b94c-56ef1e93dd10, parentShardIds=[shardId-000000000002], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000007, concurrencyToken=ebf00fa3-9d09-4bb4-9f9d-8c32a42c8728, parentShardIds=[shardId-000000000004, shardId-000000000005], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000007, concurrencyToken=ebf00fa3-9d09-4bb4-9f9d-8c32a42c8728, parentShardIds=[shardId-000000000004, shardId-000000000005], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000005, concurrencyToken=2efdb0f4-8b8d-4ee8-986d-ebf0076a0c10, parentShardIds=[shardId-000000000002], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000005, concurrencyToken=2efdb0f4-8b8d-4ee8-986d-ebf0076a0c10, parentShardIds=[shardId-000000000002], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000004, concurrencyToken=44c9d364-1d60-4da9-b3cf-f7d5321cfcb7, parentShardIds=[shardId-000000000001], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000004, concurrencyToken=44c9d364-1d60-4da9-b3cf-f7d5321cfcb7, parentShardIds=[shardId-000000000001], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000003, concurrencyToken=ad7774dd-f29a-49a4-9110-a0ea5695bde8, parentShardIds=[shardId-000000000001], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000003, concurrencyToken=ad7774dd-f29a-49a4-9110-a0ea5695bde8, parentShardIds=[shardId-000000000001], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000002, concurrencyToken=83e0c3b4-3999-4598-858d-e13015718cfc, parentShardIds=[shardId-000000000000], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000002, concurrencyToken=83e0c3b4-3999-4598-858d-e13015718cfc, parentShardIds=[shardId-000000000000], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000001, concurrencyToken=36ff857a-be4b-4b73-8e18-1cab0f03393b, parentShardIds=[shardId-000000000000], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})
INFO: Created new
shardConsumer for
: ShardInfo(shardId=shardId-000000000001, concurrencyToken=36ff857a-be4b-4b73-8e18-1cab0f03393b, parentShardIds=[shardId-000000000000], checkpoint={SequenceNumber: SHARD_END,SubsequenceNumber: 0})
测试申请代码
设定凭证
系统。 setProperty (“ aws.accessKeyId”,“ XXX”);
系统。
setProperty (“ aws.secretAccessKey”,“ YYY”);
主要
ExecutorService执行程序=执行程序。 newFixedThreadPool (1);
CallableKinesisConsumer callableKinesisConsumer = new CallableKinesisConsumer();
executor.submit(callableKinesisConsumer);
测试记录处理器
包com.ahmetburak.aws.kinesis.consumer;
导入org.slf4j.Logger;
导入org.slf4j.LoggerFactory;
导入software.amazon.kinesis.exceptions.InvalidStateException;
导入software.amazon.kinesis.exceptions.ShutdownException;
导入软件.amazon.kinesis.lifecycle.events。*;
导入software.amazon.kinesis.processor.ShardRecordProcessor;
导入java.util.HashMap;
/ **
*由burakoz在2018年11月16日创建
* /
公共类TestRecordProcessor实现ShardRecordProcessor {
私有静态最终字符串
SHARD_KEY =“ ShardId”;
私有静态最终记录器
日志 = LoggerFactory。 getLogger (TestRecordProcessor.class);
私有HashMap shardMap;
private String shardId;
公共TestRecordProcessor(){
this.shardMap = new HashMap();
}
公共无效的initialize(InitializationInput initializeInput){
shardId =初始化输入.shardId();
shardMap.put(
SHARD_KEY ,shardId);
尝试{
log .info(“正在初始化@ Sequence:{}”,initializationInput.extendedSequenceNumber());
}最后{
shardMap.remove(
SHARD_KEY );
}
}
公共无效processRecords(ProcessRecordsInput processRecordsInput){
shardMap.put(
SHARD_KEY ,shardId);
尝试{
log .info(“线程名称:” +线程。currentThread().getName()+“处理{}条记录”,processRecordsInput.records()。size());
processRecordsInput.records()。forEach(r->
log .info(“处理记录pk:{}-序列:{}-数据:{}”,r.partitionKey(),r.sequenceNumber(),r.data()));
} catch(Throwable t){
log .error(“处理记录时被抛出。中止”);
运行。
getRuntime ().halt(1);
}最后{
shardMap.remove(
SHARD_KEY );
}
}
公共无效leaseLost(LeaseLostInput leaseLostInput){
log .error(“ leaseLostInput”,leaseLostInput.toString());
}
公共无效shardEnded(ShardEndedInput shardEndedInput){
shardMap.put(
SHARD_KEY ,shardId);
尝试{
log .info(“到达分片末端检查点。”);
shardEndedInput.checkpointer()。checkpoint();
} catch(ShutdownException
InvalidStateException e){
log .error(“分片检查点异常。放弃”,e);
}最后{
shardMap.remove(
SHARD_KEY );
}
}
公共无效shutdownRequested(ShutdownRequestedInput shutdownRequestedInput){
shardMap.put(
SHARD_KEY ,shardId);
尝试{
log .info(“ Scheduler正在关闭,检查点。”);
shutdownRequestedInput.checkpointer()。checkpoint();
} catch(ShutdownException
InvalidStateException e){
记录 .error(“在请求关闭时检查点异常。放弃”,e);
}最后{
shardMap.remove(
SHARD_KEY );
}
}
}
可调用的Kinesis消费者
包com.ahmetburak.aws.kinesis.consumer;
导入org.slf4j.Logger;
导入org.slf4j.LoggerFactory;
导入software.amazon.awssdk.regions.Region;
导入软件.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
导入软件.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
导入软件.amazon.awssdk.services.kinesis.KinesisAsyncClient;
导入software.amazon.kinesis.common.ConfigsBuilder;
导入software.amazon.kinesis.coordinator.Scheduler;
导入java.util.UUID;
导入java.util.concurrent.Callable;
/ **
*由burakoz在2018年11月21日创建
* /
公共类CallableKinesisConsumer实现Callable {
公共静态最终字符串
STREAM_NAME =“ TEST_AB”;
公共静态最终字符串
APP_NAME =“ CallableKinesisConsumer_V2”;
私有静态最终记录器
日志 = LoggerFactory。 getLogger (KinesisConsumer.class);
私有KinesisAsyncClient kinesisClient;
私有DynamoDbAsyncClient dynamoClient;
私有CloudWatchAsyncClient cloudWatchClient;
private Region地区=地区。
AWS_GLOBAL ;
私有ConfigsBuilder configsBuilder;
私有字符串threadName;
公共CallableKinesisConsumer(){
日志 .info(“ B。POINT
“ +主题。
currentThread ().getName());
threadName =线程。
currentThread ().getName();
this.kinesisClient = KinesisAsyncClient。
builder ().region(region).build();
this.dynamoClient = DynamoDbAsyncClient。
builder ().region(region).build();
this.cloudWatchClient = CloudWatchAsyncClient。
builder ().region(region).build();
configsBuilder =新的ConfigsBuilder(
STREAM_NAME , APP_NAME ,kinesisClient,dynamoClient,cloudWatchClient,UUID。 randomUUID (). toString (),新的TestRecordProcessorFactory());
}
@Override
公共字符串call(){
日志 .info(“ C。POINT
“ +主题。
currentThread ().getName());
getSchedular()。run();
返回threadName;
}
私人调度程序getSchedular(){
返回新的Scheduler(configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig());
}
}
测试记录处理器工厂
包com.ahmetburak.aws.kinesis.consumer;
导入software.amazon.kinesis.processor.ShardRecordProcessor;
导入software.amazon.kinesis.processor.ShardRecordProcessorFactory;
/ **
*由burakoz在2018年11月16日创建
* /
公共类TestRecordProcessorFactory实现ShardRecordProcessorFactory {
@Override
公共ShardRecordProcessor shardRecordProcessor(){
返回新的TestRecordProcessor();
}
}
综上所述,无论是否使用增强的扇出,具有一个分片的流都以相同的速度消耗。 如果流中有多个分片,则其客户端需要使用KCL 2.x版编写,以使用增强的扇出功能。 无需实现多线程,KCL为我们处理了此问题,但应考虑运行多个Consumer Application来动态地进行负载平衡。 因为,当发生故障时,KCL应该共享分片并平衡活动工作程序之间的负载。 因此,流消耗过程可以继续进行。
资源:
https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html
https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html