HiveTableSink中关于streaming方式文件回滚策略HiveRollingPolicy疑问

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

HiveTableSink中关于streaming方式文件回滚策略HiveRollingPolicy疑问

kandy.wang
private static class HiveRollingPolicy extends CheckpointRollingPolicy<RowData, String> {




private final long rollingFileSize;

private final long rollingTimeInterval;




private HiveRollingPolicy(

long rollingFileSize,

long rollingTimeInterval) {

Preconditions.checkArgument(rollingFileSize > 0L);

Preconditions.checkArgument(rollingTimeInterval > 0L);

this.rollingFileSize = rollingFileSize;

this.rollingTimeInterval = rollingTimeInterval;

}




@Override

public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {

return true;

}




@Override

public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) {

return false;

}




@Override

public boolean shouldRollOnProcessingTime(

PartFileInfo<String> partFileState, long currentTime) {

try {

return currentTime - partFileState.getCreationTime() >= rollingTimeInterval ||

partFileState.getSize() > rollingFileSize;

} catch (IOException e) {

throw new UncheckedIOException(e);

}

}

}
没太理解,为啥一定要控制checkpoint强制滚动文件,这样的话配置的文件滚动参数就失效了:  'sink.rolling-policy.check-interval' ='30s',
  'sink.rolling-policy.rollover-interval'='10min',
  'sink.rolling-policy.file-size'='128MB'
目的是想让文件按照 固定的size 或记录数  或时间 滚动,这样该如何做?