[Flink-1.8.1]POJO类型状态升级后字段赋值不正

classic Classic list List threaded Threaded
1 message Options
cs
Reply | Threaded
Open this post in threaded view
|

[Flink-1.8.1]POJO类型状态升级后字段赋值不正

cs
POJO类-未升级状态之前
public class User {


    private String name;
    private String age;


    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age='" + age + '\'' +
                '}';
    }


    public User(String name, String age) {
        this.name = name;
        this.age = age;
    }


    public String getName() {
        return name;
    }


    public void setName(String name) {
        this.name = name;
    }


    public String getAge() {
        return age;
    }


    public void setAge(String age) {
        this.age = age;
    }


}
------------------------------------------------------------------
运行类
public class FlinkTest {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
&nbsp; &nbsp; &nbsp; &nbsp; env.addSource(new SourceFunction<String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void run(SourceContext<String&gt; sourceContext) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (true) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sourceContext.collect("xxx");
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TimeUnit.SECONDS.sleep(5);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void cancel() {


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(s -&gt; s).process(new KeyedProcessFunction<String, String, Object&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ValueState<User&gt; state;


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void open(Configuration parameters) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; super.open(parameters);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; final ValueStateDescriptor<User&gt; vsd = new ValueStateDescriptor<&gt;("valueState", User.class);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state = getRuntimeContext().getState(vsd);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void processElement(String s, Context context, Collector<Object&gt; collector) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; final User value = state.value();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (value == null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state.update(new User("zhangsan", "26"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(value);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).uid("PRO-1");
&nbsp; &nbsp; &nbsp; &nbsp; env.execute();
&nbsp; &nbsp; }
}

该程序输出 User{name='zhangsan', age='26'}/n
等到程序做完checkpoint后停止任务。
修改POJO类
public class User {


&nbsp; &nbsp; private String name;
&nbsp; &nbsp; private String age;
&nbsp; &nbsp; private String addr;


&nbsp; &nbsp; @Override
&nbsp; &nbsp; public String toString() {
&nbsp; &nbsp; &nbsp; &nbsp; return "User{" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "name='" + name + '\'' +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ", age='" + age + '\'' +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ", addr='" + addr + '\'' +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; '}';
&nbsp; &nbsp; }


&nbsp; &nbsp; public User(String name, String age) {
&nbsp; &nbsp; &nbsp; &nbsp; this.name = name;
&nbsp; &nbsp; &nbsp; &nbsp; this.age = age;
&nbsp; &nbsp; }


&nbsp; &nbsp; public String getName() {
&nbsp; &nbsp; &nbsp; &nbsp; return name;
&nbsp; &nbsp; }


&nbsp; &nbsp; public void setName(String name) {
&nbsp; &nbsp; &nbsp; &nbsp; this.name = name;
&nbsp; &nbsp; }


&nbsp; &nbsp; public String getAge() {
&nbsp; &nbsp; &nbsp; &nbsp; return age;
&nbsp; &nbsp; }


&nbsp; &nbsp; public void setAge(String age) {
&nbsp; &nbsp; &nbsp; &nbsp; this.age = age;
&nbsp; &nbsp; }


&nbsp; &nbsp; public String getAddr() {
&nbsp; &nbsp; &nbsp; &nbsp; return addr;
&nbsp; &nbsp; }


&nbsp; &nbsp; public void setAddr(String addr) {
&nbsp; &nbsp; &nbsp; &nbsp; this.addr = addr;
&nbsp; &nbsp; }
}

POJO类增加了一个addr字段,重启打包从checkpoint恢复程序
程序输出 User{name='null', age='zhangsan', addr='26'}/n
理论上应该是addr字段为空,为什么从状态恢复后name字段为空了呢?