如何自定义带有状态的UDF

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

如何自定义带有状态的UDF

阿华田
自定义UDF 实现CheckpointedFunction
伪代码如下 发现并没有执行initializeState






public class ClusterInfoCollectUdf   extends ScalarFunction implements CheckpointedFunction {
private static final Logger                            LOGGER = LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient     MapState<String,Integer >          mapState;
private               MapStateDescriptor<String,Integer> mapStateDescriptor;
   。。。。。



@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {


LOGGER.info("the snapshotState    is  started ");


}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);

mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeState    is  started ");




}



| |
阿华田
|
|
[hidden email]
|
签名由网易邮箱大师定制