|
private static class HiveRollingPolicy extends CheckpointRollingPolicy<RowData, String> {
private final long rollingFileSize;
private final long rollingTimeInterval;
private HiveRollingPolicy(
long rollingFileSize,
long rollingTimeInterval) {
Preconditions.checkArgument(rollingFileSize > 0L);
Preconditions.checkArgument(rollingTimeInterval > 0L);
this.rollingFileSize = rollingFileSize;
this.rollingTimeInterval = rollingTimeInterval;
}
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
return true;
}
@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) {
return false;
}
@Override
public boolean shouldRollOnProcessingTime(
PartFileInfo<String> partFileState, long currentTime) {
try {
return currentTime - partFileState.getCreationTime() >= rollingTimeInterval ||
partFileState.getSize() > rollingFileSize;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
没太理解,为啥一定要控制checkpoint强制滚动文件,这样的话配置的文件滚动参数就失效了: 'sink.rolling-policy.check-interval' ='30s',
'sink.rolling-policy.rollover-interval'='10min',
'sink.rolling-policy.file-size'='128MB'
目的是想让文件按照 固定的size 或记录数 或时间 滚动,这样该如何做?
|