import java.io.Serializable; public abstractclassStreamPartitioner<T>implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable { private static final long serialVersionUID = 1L;
/** * 分区器将所有元素发送到下游算子中分区为 0 的分区中 * * @param <T> Type of the elements in the Stream being partitioned */ @Internal public classGlobalPartitioner<T>extendsStreamPartitioner<T>{ private static final long serialVersionUID = 1L;
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { // 无论数据怎样,都返回 0 号分区 return0; }
@Override public StreamPartitioner<T> copy() { returnthis; }
@Override public String toString() { return"GLOBAL"; } }
/** * 轮询 所有的分区,将数据均匀的发送出去 * @param <T> Type of the elements in the Stream being rebalanced */ @Internal public classRebalancePartitioner<T>extendsStreamPartitioner<T>{ private static final long serialVersionUID = 1L;
/** * 选择所有分区发送数据 */ @Internal public classBroadcastPartitioner<T>extendsStreamPartitioner<T>{ private static final long serialVersionUID = 1L;
/** * 广播模式可以直接在记录写入器中为所有输出通道处理,因此不需要通过此方法选择通道。 */ @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { thrownewUnsupportedOperationException("Broadcast partitioner does not support select channels."); }
// 广播数据 @Override public boolean isBroadcast() { returntrue; }
@Override public StreamPartitioner<T> copy() { returnthis; }
@Override public String toString() { return"BROADCAST"; } }
/** * 通过 key 分区将数据发送到下游算子的分区中 * @param <T> Type of the elements in the Stream being partitioned */ @Internal public classKeyGroupStreamPartitioner<T, K>extendsStreamPartitioner<T>implementsConfigurableStreamPartitioner{ private static final long serialVersionUID = 1L;
privatefinalKeySelector<T, K> keySelector;
private int maxParallelism;
public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, int maxParallelism) { Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!"); this.keySelector = Preconditions.checkNotNull(keySelector); this.maxParallelism = maxParallelism; }
public int getMaxParallelism() { return maxParallelism; }
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { thrownewRuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } // 调用KeyGroupRangeAssignment类的assignKeyToParallelOperator方法,返回应该发送的分区号码 returnKeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels); }
@Override public StreamPartitioner<T> copy() { returnthis; }
@Override public String toString() { return"HASH"; }
/** * 将指定的key 分配给指定的 operator index * * @param key key 值 * @param maxParallelism 最大并行度 * @param parallelism 当前算子并行度 * @return the index of the parallel operator to which the given key should be routed. */ public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { Preconditions.checkNotNull(key, "Assigned key must not be null!"); // 调用 computeOperatorIndexForKeyGroup 方法 return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * 在给定的并行度和最大并行度下计算键组所属的运算符的索引。 * 重要说明:maxParallelism必须为<= Short.MAX_VALUE,以避免此方法中的舍入问题。 * 如果我们想超越此界限,则此方法必须对长值执行算术运算。 * * @param maxParallelism 最大并行度 * 0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold. * @param parallelism 当前作业并行度. Must be <= maxParallelism. * @param keyGroupId Key 分区 ID. 0 <= keyGroupID < maxParallelism. * @return 返回分区号 在 */ public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) { // 计算数据发送到哪个分区 return keyGroupId * parallelism / maxParallelism; }
/** * 自定义分区方式 * @param <K> * Type of the key * @param <T> * Type of the data */ @Internal public classCustomPartitionerWrapper<K, T>extendsStreamPartitioner<T>{ private static final long serialVersionUID = 1L;