Flink1.2对 key 进行分区,和 hash 分区有什么区别?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink1.2对 key 进行分区,和 hash 分区有什么区别?

刘文


Flink1.2对 key 进行分区,和 hash 分区有什么区别?
如: 分区数值 = key 的 hash值 % 并行度?

为什么不直接使用 hash 进行分区?

KeyGroupStreamPartitioner.java

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}


 

Reply | Threaded
Open this post in threaded view
|

Re: Flink1.2对 key 进行分区,和 hash 分区有什么区别?

nobleyd
首先,本身就是对key做的hash哈。只不过不是直接分配到并行的subtask,而是先分到maxParallelism,然后再分到subtask。加了一层主要是方便状态scala。

刘文 <[hidden email]> 于2021年4月6日周二 上午9:33写道:

>
>
> Flink1.2对 key 进行分区,和 hash 分区有什么区别?
> 如: 分区数值 = key 的 hash值 % 并行度?
>
> 为什么不直接使用 hash 进行分区?
>
> KeyGroupStreamPartitioner.java
>
> @Override
> public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
>    K key;
>    try {
>       key = keySelector.getKey(record.getInstance().getValue());
>    } catch (Exception e) {
>       throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
>    }
>    return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
> }
>
>
>
>
>