在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

key lou
各位大佬:
   在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
 collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。

public class FlinkCheckpointTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment steamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        steamEnv.enableCheckpointing(1000L*2);
        steamEnv
            .addSource(new FSource()).setParallelism(4)
            .transform("开始事务", Types.STRING,new FStart()).setParallelism(1)
            .process(new FCombine()).name("事务预处理").setParallelism(4)
            .addSink(new FSubmit()).name("提交事务").setParallelism(1)
        ;
        steamEnv.execute("test");
    }

   static class FSource extends RichParallelSourceFunction<String>{
        @Override
        public void run(SourceContext<String> sourceContext) throws Exception {
            int I =0;
            while (true){
                I = I + 1;
                sourceContext.collect("thread " +
Thread.currentThread().getId() +"-" +I);
                Thread.sleep(1000);
            }
        }
        @Override
        public void cancel() {}
    }

    static class FStart extends AbstractStreamOperator<String>
implements OneInputStreamOperator<String,String>{
       volatile Long ckid = 0L;
        @Override
        public void processElement(StreamRecord<String> streamRecord)
throws Exception {
            log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
            output.collect(streamRecord);
        }
        @Override
        public void prepareSnapshotPreBarrier(long checkpointId)
throws Exception {
            log("开启事务: " + checkpointId);
            ckid = checkpointId;
            super.prepareSnapshotPreBarrier(checkpointId);
        }
    }

    static class FCombine extends ProcessFunction<String,String>
implements CheckpointedFunction {
        List ls = new ArrayList<String>();
        Collector<String> collector =null;
        volatile Long ckid = 0L;

        @Override
        public void snapshotState(FunctionSnapshotContext
functionSnapshotContext) throws Exception {
            StringBuffer sb = new StringBuffer();
            ls.forEach(x->{sb.append(x).append(";");});
            log("批处理 " + functionSnapshotContext.getCheckpointId() +
": 时收到数据:" + sb.toString());
            Thread.sleep(5*1000);
            collector.collect(sb.toString());
            ls.clear();
            Thread.sleep(5*1000);
            //Thread.sleep(20*1000);
        }
        @Override
        public void initializeState(FunctionInitializationContext
functionInitializationContext) throws Exception {        }
        @Override
        public void processElement(String s, Context context,
Collector<String> out) throws Exception {
            if(StringUtils.isNotBlank(s)){
                ls.add(s);
            }
            log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" + ckid);
            if(collector ==null){
                collector = out;
            }
        }
    }

    static class FSubmit extends RichSinkFunction<String> implements
/*  CheckpointedFunction,*/ CheckpointListener {
        List ls = new ArrayList<String>();
        volatile Long ckid = 0L;
        @Override
        public void notifyCheckpointComplete(long l) throws Exception {
            ckid = l;
            StringBuffer sb = new StringBuffer();
            ls.forEach(x->{sb.append(x).append("||");});
            log("submit checkpoint " + l + " over data:list size" +
ls.size()+ "; detail" + sb.toString());
            ls.clear();
        }
        @Override
        public void invoke(String value, Context context) throws Exception {
            if(StringUtils.isNotBlank(value)){
                ls.add(value);
            }
            log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" + ckid);
        }
    }
    public static void log(String s){
        String name = Thread.currentThread().getName();
        System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
Date())+":"+name + ":" + s);
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: 在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

Congxian Qiu
Hi
    上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
Best,
Congxian


key lou <[hidden email]> 于2020年8月16日周日 下午9:27写道:

> 各位大佬:
>    在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
>  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
>
> public class FlinkCheckpointTest {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment steamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         steamEnv.enableCheckpointing(1000L*2);
>         steamEnv
>             .addSource(new FSource()).setParallelism(4)
>             .transform("开始事务", Types.STRING,new FStart()).setParallelism(1)
>             .process(new FCombine()).name("事务预处理").setParallelism(4)
>             .addSink(new FSubmit()).name("提交事务").setParallelism(1)
>         ;
>         steamEnv.execute("test");
>     }
>
>    static class FSource extends RichParallelSourceFunction<String>{
>         @Override
>         public void run(SourceContext<String> sourceContext) throws
> Exception {
>             int I =0;
>             while (true){
>                 I = I + 1;
>                 sourceContext.collect("thread " +
> Thread.currentThread().getId() +"-" +I);
>                 Thread.sleep(1000);
>             }
>         }
>         @Override
>         public void cancel() {}
>     }
>
>     static class FStart extends AbstractStreamOperator<String>
> implements OneInputStreamOperator<String,String>{
>        volatile Long ckid = 0L;
>         @Override
>         public void processElement(StreamRecord<String> streamRecord)
> throws Exception {
>             log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
>             output.collect(streamRecord);
>         }
>         @Override
>         public void prepareSnapshotPreBarrier(long checkpointId)
> throws Exception {
>             log("开启事务: " + checkpointId);
>             ckid = checkpointId;
>             super.prepareSnapshotPreBarrier(checkpointId);
>         }
>     }
>
>     static class FCombine extends ProcessFunction<String,String>
> implements CheckpointedFunction {
>         List ls = new ArrayList<String>();
>         Collector<String> collector =null;
>         volatile Long ckid = 0L;
>
>         @Override
>         public void snapshotState(FunctionSnapshotContext
> functionSnapshotContext) throws Exception {
>             StringBuffer sb = new StringBuffer();
>             ls.forEach(x->{sb.append(x).append(";");});
>             log("批处理 " + functionSnapshotContext.getCheckpointId() +
> ": 时收到数据:" + sb.toString());
>             Thread.sleep(5*1000);
>             collector.collect(sb.toString());
>             ls.clear();
>             Thread.sleep(5*1000);
>             //Thread.sleep(20*1000);
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext
> functionInitializationContext) throws Exception {        }
>         @Override
>         public void processElement(String s, Context context,
> Collector<String> out) throws Exception {
>             if(StringUtils.isNotBlank(s)){
>                 ls.add(s);
>             }
>             log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> ckid);
>             if(collector ==null){
>                 collector = out;
>             }
>         }
>     }
>
>     static class FSubmit extends RichSinkFunction<String> implements
> /*  CheckpointedFunction,*/ CheckpointListener {
>         List ls = new ArrayList<String>();
>         volatile Long ckid = 0L;
>         @Override
>         public void notifyCheckpointComplete(long l) throws Exception {
>             ckid = l;
>             StringBuffer sb = new StringBuffer();
>             ls.forEach(x->{sb.append(x).append("||");});
>             log("submit checkpoint " + l + " over data:list size" +
> ls.size()+ "; detail" + sb.toString());
>             ls.clear();
>         }
>         @Override
>         public void invoke(String value, Context context) throws Exception
> {
>             if(StringUtils.isNotBlank(value)){
>                 ls.add(value);
>             }
>             log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" +
> ckid);
>         }
>     }
>     public static void log(String s){
>         String name = Thread.currentThread().getName();
>         System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
> Date())+":"+name + ":" + s);
>     }
> }
>
Reply | Threaded
Open this post in threaded view
|

Re: 在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

key lou
谢谢 解答。也就是假如 A->B 这样一个 graph。在一次checkpoint 中 A 调用  snapshot 往下游发的数据,在B 执行
notifyCheckpointComplete 与 A    snapshot 下发的数据到达B   这2者没有必然的先后顺序。

另外就是 如果没有先后顺序,有没有什么办法 或者是在 B执行 某某操作前 能确保 这次 checkpoint 中 A  snapshot  发出的数据
到达了B.

 我的场景是 有3个核心算子  start->proccess->submit . 其中 start和 submit 并行度为1, proccess
并行度为N, start  会开启一个事务 编号    proccess  用这个事务 编号
去做预处理(赞一批处理一次,并把这一次处理结果下发,给下游做事务提交),  submit  收到上游批处理的结果 用 同样的事务编号去提交


Congxian Qiu <[hidden email]> 于2020年8月17日周一 上午10:42写道:

> Hi
>     上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
> 之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
> Best,
> Congxian
>
>
> key lou <[hidden email]> 于2020年8月16日周日 下午9:27写道:
>
> > 各位大佬:
> >    在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> > 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> > 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> > 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> > 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> > notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
> >  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> > 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> > 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
> >
> > public class FlinkCheckpointTest {
> >     public static void main(String[] args) throws Exception {
> >         StreamExecutionEnvironment steamEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         steamEnv.enableCheckpointing(1000L*2);
> >         steamEnv
> >             .addSource(new FSource()).setParallelism(4)
> >             .transform("开始事务", Types.STRING,new
> FStart()).setParallelism(1)
> >             .process(new FCombine()).name("事务预处理").setParallelism(4)
> >             .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> >         ;
> >         steamEnv.execute("test");
> >     }
> >
> >    static class FSource extends RichParallelSourceFunction<String>{
> >         @Override
> >         public void run(SourceContext<String> sourceContext) throws
> > Exception {
> >             int I =0;
> >             while (true){
> >                 I = I + 1;
> >                 sourceContext.collect("thread " +
> > Thread.currentThread().getId() +"-" +I);
> >                 Thread.sleep(1000);
> >             }
> >         }
> >         @Override
> >         public void cancel() {}
> >     }
> >
> >     static class FStart extends AbstractStreamOperator<String>
> > implements OneInputStreamOperator<String,String>{
> >        volatile Long ckid = 0L;
> >         @Override
> >         public void processElement(StreamRecord<String> streamRecord)
> > throws Exception {
> >             log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> >             output.collect(streamRecord);
> >         }
> >         @Override
> >         public void prepareSnapshotPreBarrier(long checkpointId)
> > throws Exception {
> >             log("开启事务: " + checkpointId);
> >             ckid = checkpointId;
> >             super.prepareSnapshotPreBarrier(checkpointId);
> >         }
> >     }
> >
> >     static class FCombine extends ProcessFunction<String,String>
> > implements CheckpointedFunction {
> >         List ls = new ArrayList<String>();
> >         Collector<String> collector =null;
> >         volatile Long ckid = 0L;
> >
> >         @Override
> >         public void snapshotState(FunctionSnapshotContext
> > functionSnapshotContext) throws Exception {
> >             StringBuffer sb = new StringBuffer();
> >             ls.forEach(x->{sb.append(x).append(";");});
> >             log("批处理 " + functionSnapshotContext.getCheckpointId() +
> > ": 时收到数据:" + sb.toString());
> >             Thread.sleep(5*1000);
> >             collector.collect(sb.toString());
> >             ls.clear();
> >             Thread.sleep(5*1000);
> >             //Thread.sleep(20*1000);
> >         }
> >         @Override
> >         public void initializeState(FunctionInitializationContext
> > functionInitializationContext) throws Exception {        }
> >         @Override
> >         public void processElement(String s, Context context,
> > Collector<String> out) throws Exception {
> >             if(StringUtils.isNotBlank(s)){
> >                 ls.add(s);
> >             }
> >             log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> > ckid);
> >             if(collector ==null){
> >                 collector = out;
> >             }
> >         }
> >     }
> >
> >     static class FSubmit extends RichSinkFunction<String> implements
> > /*  CheckpointedFunction,*/ CheckpointListener {
> >         List ls = new ArrayList<String>();
> >         volatile Long ckid = 0L;
> >         @Override
> >         public void notifyCheckpointComplete(long l) throws Exception {
> >             ckid = l;
> >             StringBuffer sb = new StringBuffer();
> >             ls.forEach(x->{sb.append(x).append("||");});
> >             log("submit checkpoint " + l + " over data:list size" +
> > ls.size()+ "; detail" + sb.toString());
> >             ls.clear();
> >         }
> >         @Override
> >         public void invoke(String value, Context context) throws
> Exception
> > {
> >             if(StringUtils.isNotBlank(value)){
> >                 ls.add(value);
> >             }
> >             log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" +
> > ckid);
> >         }
> >     }
> >     public static void log(String s){
> >         String name = Thread.currentThread().getName();
> >         System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
> > Date())+":"+name + ":" + s);
> >     }
> > }
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

Congxian Qiu
Hi
    notifyCheckpointComplete 是整个 checkpoint 完成后调用的(也就是所有算子都做完了 snapshot,而且
JM 也做完了一些其他的工作),你的需求看上去只是要在算子间做一些顺序操作,这个应该不需要依赖 notifyCheckpointComplete
的,你可以自己写一个逻辑,在 submit 收集到 N 个信号后再做相应的事情。
Best,
Congxian


key lou <[hidden email]> 于2020年8月17日周一 上午11:42写道:

> 谢谢 解答。也就是假如 A->B 这样一个 graph。在一次checkpoint 中 A 调用  snapshot 往下游发的数据,在B 执行
> notifyCheckpointComplete 与 A    snapshot 下发的数据到达B   这2者没有必然的先后顺序。
>
> 另外就是 如果没有先后顺序,有没有什么办法 或者是在 B执行 某某操作前 能确保 这次 checkpoint 中 A  snapshot  发出的数据
> 到达了B.
>
>  我的场景是 有3个核心算子  start->proccess->submit . 其中 start和 submit 并行度为1, proccess
> 并行度为N, start  会开启一个事务 编号    proccess  用这个事务 编号
> 去做预处理(赞一批处理一次,并把这一次处理结果下发,给下游做事务提交),  submit  收到上游批处理的结果 用 同样的事务编号去提交
>
>
> Congxian Qiu <[hidden email]> 于2020年8月17日周一 上午10:42写道:
>
> > Hi
> >     上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
> > 之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
> > Best,
> > Congxian
> >
> >
> > key lou <[hidden email]> 于2020年8月16日周日 下午9:27写道:
> >
> > > 各位大佬:
> > >    在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> > > 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> > > 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> > > 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> > > 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> > > notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
> > >  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> > > 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> > > 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
> > >
> > > public class FlinkCheckpointTest {
> > >     public static void main(String[] args) throws Exception {
> > >         StreamExecutionEnvironment steamEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >         steamEnv.enableCheckpointing(1000L*2);
> > >         steamEnv
> > >             .addSource(new FSource()).setParallelism(4)
> > >             .transform("开始事务", Types.STRING,new
> > FStart()).setParallelism(1)
> > >             .process(new FCombine()).name("事务预处理").setParallelism(4)
> > >             .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> > >         ;
> > >         steamEnv.execute("test");
> > >     }
> > >
> > >    static class FSource extends RichParallelSourceFunction<String>{
> > >         @Override
> > >         public void run(SourceContext<String> sourceContext) throws
> > > Exception {
> > >             int I =0;
> > >             while (true){
> > >                 I = I + 1;
> > >                 sourceContext.collect("thread " +
> > > Thread.currentThread().getId() +"-" +I);
> > >                 Thread.sleep(1000);
> > >             }
> > >         }
> > >         @Override
> > >         public void cancel() {}
> > >     }
> > >
> > >     static class FStart extends AbstractStreamOperator<String>
> > > implements OneInputStreamOperator<String,String>{
> > >        volatile Long ckid = 0L;
> > >         @Override
> > >         public void processElement(StreamRecord<String> streamRecord)
> > > throws Exception {
> > >             log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> > >             output.collect(streamRecord);
> > >         }
> > >         @Override
> > >         public void prepareSnapshotPreBarrier(long checkpointId)
> > > throws Exception {
> > >             log("开启事务: " + checkpointId);
> > >             ckid = checkpointId;
> > >             super.prepareSnapshotPreBarrier(checkpointId);
> > >         }
> > >     }
> > >
> > >     static class FCombine extends ProcessFunction<String,String>
> > > implements CheckpointedFunction {
> > >         List ls = new ArrayList<String>();
> > >         Collector<String> collector =null;
> > >         volatile Long ckid = 0L;
> > >
> > >         @Override
> > >         public void snapshotState(FunctionSnapshotContext
> > > functionSnapshotContext) throws Exception {
> > >             StringBuffer sb = new StringBuffer();
> > >             ls.forEach(x->{sb.append(x).append(";");});
> > >             log("批处理 " + functionSnapshotContext.getCheckpointId() +
> > > ": 时收到数据:" + sb.toString());
> > >             Thread.sleep(5*1000);
> > >             collector.collect(sb.toString());
> > >             ls.clear();
> > >             Thread.sleep(5*1000);
> > >             //Thread.sleep(20*1000);
> > >         }
> > >         @Override
> > >         public void initializeState(FunctionInitializationContext
> > > functionInitializationContext) throws Exception {        }
> > >         @Override
> > >         public void processElement(String s, Context context,
> > > Collector<String> out) throws Exception {
> > >             if(StringUtils.isNotBlank(s)){
> > >                 ls.add(s);
> > >             }
> > >             log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> > > ckid);
> > >             if(collector ==null){
> > >                 collector = out;
> > >             }
> > >         }
> > >     }
> > >
> > >     static class FSubmit extends RichSinkFunction<String> implements
> > > /*  CheckpointedFunction,*/ CheckpointListener {
> > >         List ls = new ArrayList<String>();
> > >         volatile Long ckid = 0L;
> > >         @Override
> > >         public void notifyCheckpointComplete(long l) throws Exception {
> > >             ckid = l;
> > >             StringBuffer sb = new StringBuffer();
> > >             ls.forEach(x->{sb.append(x).append("||");});
> > >             log("submit checkpoint " + l + " over data:list size" +
> > > ls.size()+ "; detail" + sb.toString());
> > >             ls.clear();
> > >         }
> > >         @Override
> > >         public void invoke(String value, Context context) throws
> > Exception
> > > {
> > >             if(StringUtils.isNotBlank(value)){
> > >                 ls.add(value);
> > >             }
> > >             log("收到数据 :" + value + " list zie:" + ls.size() +
> "..ckid:" +
> > > ckid);
> > >         }
> > >     }
> > >     public static void log(String s){
> > >         String name = Thread.currentThread().getName();
> > >         System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
> > > Date())+":"+name + ":" + s);
> > >     }
> > > }
> > >
> >
>