自定义UDF 实现CheckpointedFunction
伪代码如下 发现并没有执行initializeState
public class ClusterInfoCollectUdf extends ScalarFunction implements CheckpointedFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient MapState<String,Integer > mapState;
private MapStateDescriptor<String,Integer> mapStateDescriptor;
。。。。。
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
LOGGER.info("the snapshotState is started ");
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);
mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeState is started ");
}
| |
阿华田
|
|
[hidden email]
|
签名由网易邮箱大师定制