|
默认的 shard assigner
public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER =
(shard, subtasks) -> shard.hashCode();
如何shard 的数量 大于 并发度 很容易造成分布不均。
想着用这种方法,在主类使用
static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
static AtomicInteger counter = new AtomicInteger(0);
public static final KinesisShardAssigner origin_shard =
(shard, subtasks) -> {
String shardId = shard.getShard().getShardId();
Integer index = map.get(shardId);
if (index != null){
return index;
}
else{
counter.getAndIncrement();
Integer new_index = counter.get();
map.put(shardId, new_index);
return new_index;
}
};
flinkKinesisConsumer.setShardAssigner(origin_shard);
虽然试验了一把没有问题。 但是感觉 这段代码 最终会运行在 task slot 里面。
这种方法的有效性 是不是*依赖 kinesis list shards api 返回的顺序固定*呢?
有没有不依赖 api 返回的。又能均匀分布的方法?
虽然问题是 kinesis 。但是感觉对 数据源 和 slot 并发读取多源的其他场景或许有相似之处。
初来社区。欢迎给出建议。
谢谢。
|