POJO类-未升级状态之前
public class User { private String name; private String age; @Override public String toString() { return "User{" + "name='" + name + '\'' + ", age='" + age + '\'' + '}'; } public User(String name, String age) { this.name = name; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAge() { return age; } public void setAge(String age) { this.age = age; } } ------------------------------------------------------------------ 运行类 public class FlinkTest { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> sourceContext) throws Exception { while (true) { sourceContext.collect("xxx"); TimeUnit.SECONDS.sleep(5); } } @Override public void cancel() { } }).keyBy(s -> s).process(new KeyedProcessFunction<String, String, Object>() { ValueState<User> state; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); final ValueStateDescriptor<User> vsd = new ValueStateDescriptor<>("valueState", User.class); state = getRuntimeContext().getState(vsd); } @Override public void processElement(String s, Context context, Collector<Object> collector) throws Exception { final User value = state.value(); if (value == null) { state.update(new User("zhangsan", "26")); } else { System.out.println(value); } } }).uid("PRO-1"); env.execute(); } } 该程序输出 User{name='zhangsan', age='26'}/n 等到程序做完checkpoint后停止任务。 修改POJO类 public class User { private String name; private String age; private String addr; @Override public String toString() { return "User{" + "name='" + name + '\'' + ", age='" + age + '\'' + ", addr='" + addr + '\'' + '}'; } public User(String name, String age) { this.name = name; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAge() { return age; } public void setAge(String age) { this.age = age; } public String getAddr() { return addr; } public void setAddr(String addr) { this.addr = addr; } } POJO类增加了一个addr字段,重启打包从checkpoint恢复程序 程序输出 User{name='null', age='zhangsan', addr='26'}/n 理论上应该是addr字段为空,为什么从状态恢复后name字段为空了呢? |
Free forum by Nabble | Edit this page |