我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
从checkpoint恢复以后,新来op=d的数据会删除失败 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最大允许同时出现几个CheckPoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // 最小得间隔时间 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 是否倾向于用CheckPoint做故障恢复 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); // 容忍多少次CheckPoint失败 //Checkpoint文件清理策略 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //Checkpoint外部文件路径 env.setStateBackend(new FsStateBackend(new URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); String sourceDDL = String.format( "CREATE TABLE debezium_source (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double" + ") WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = '%s'," + " 'properties.bootstrap.servers' = '%s'," + " 'scan.startup.mode' = 'group-offsets'," + " 'format' = 'debezium-json'" + ")", "ddd", " 172.22.20.206:9092"); String sinkDDL = "CREATE TABLE sink (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double," + " PRIMARY KEY (id,name, description,weight) NOT ENFORCED " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + " 'table-name' = 'products'," + " 'driver'= 'com.mysql.cj.jdbc.Driver'," + " 'username'='DataPip'," + " 'password'='DataPip'" + ")"; String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source GROUP BY id,name ,description, weight"; tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); tEnv.executeSql(dml); -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
为什么要 GROUP BY id,name ,description, weight ?
直接 "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source" 不能满足需求? 曹武 <[hidden email]> 于2020年7月16日周四 下午9:30写道: > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 > 从checkpoint恢复以后,新来op=d的数据会删除失败 > 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s > > hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata > 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // > 最大允许同时出现几个CheckPoint > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // > 最小得间隔时间 > env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // > 是否倾向于用CheckPoint做故障恢复 > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); > // > 容忍多少次CheckPoint失败 > //Checkpoint文件清理策略 > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > //Checkpoint外部文件路径 > env.setStateBackend(new FsStateBackend(new > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > settings); > String sourceDDL = String.format( > "CREATE TABLE debezium_source (" + > " id INT NOT NULL," + > " name STRING," + > " description STRING," + > " weight Double" + > ") WITH (" + > " 'connector' = 'kafka-0.11'," + > " 'topic' = '%s'," + > " 'properties.bootstrap.servers' = '%s'," + > " 'scan.startup.mode' = 'group-offsets'," + > " 'format' = 'debezium-json'" + > ")", "ddd", " 172.22.20.206:9092"); > String sinkDDL = "CREATE TABLE sink (" + > " id INT NOT NULL," + > " name STRING," + > " description STRING," + > " weight Double," + > " PRIMARY KEY (id,name, description,weight) NOT ENFORCED " > + > ") WITH (" + > " 'connector' = 'jdbc'," + > " 'url' = > 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + > " 'table-name' = 'products'," + > " 'driver'= 'com.mysql.cj.jdbc.Driver'," + > " 'username'='DataPip'," + > " 'password'='DataPip'" + > ")"; > String dml = "INSERT INTO sink SELECT id,name ,description, weight > FROM debezium_source GROUP BY id,name ,description, weight"; > tEnv.executeSql(sourceDDL); > tEnv.executeSql(sinkDDL); > tEnv.executeSql(dml); > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Administrator
|
Hi,
能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗? 如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。 当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。 Best, Jark On Thu, 16 Jul 2020 at 21:56, godfrey he <[hidden email]> wrote: > 为什么要 GROUP BY id,name ,description, weight ? > 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM > debezium_source" 不能满足需求? > > 曹武 <[hidden email]> 于2020年7月16日周四 下午9:30写道: > > > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 > > 从checkpoint恢复以后,新来op=d的数据会删除失败 > > 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s > > > > > hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata > > 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() > > .useBlinkPlanner() > > .inStreamingMode() > > .build(); > > > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 > > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // > > 最大允许同时出现几个CheckPoint > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // > > 最小得间隔时间 > > env.getCheckpointConfig().setPreferCheckpointForRecovery(true); > // > > 是否倾向于用CheckPoint做故障恢复 > > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); > > // > > 容忍多少次CheckPoint失败 > > //Checkpoint文件清理策略 > > > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > //Checkpoint外部文件路径 > > env.setStateBackend(new FsStateBackend(new > > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); > > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > > settings); > > String sourceDDL = String.format( > > "CREATE TABLE debezium_source (" + > > " id INT NOT NULL," + > > " name STRING," + > > " description STRING," + > > " weight Double" + > > ") WITH (" + > > " 'connector' = 'kafka-0.11'," + > > " 'topic' = '%s'," + > > " 'properties.bootstrap.servers' = '%s'," + > > " 'scan.startup.mode' = 'group-offsets'," + > > " 'format' = 'debezium-json'" + > > ")", "ddd", " 172.22.20.206:9092"); > > String sinkDDL = "CREATE TABLE sink (" + > > " id INT NOT NULL," + > > " name STRING," + > > " description STRING," + > > " weight Double," + > > " PRIMARY KEY (id,name, description,weight) NOT ENFORCED > " > > + > > ") WITH (" + > > " 'connector' = 'jdbc'," + > > " 'url' = > > 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + > > " 'table-name' = 'products'," + > > " 'driver'= 'com.mysql.cj.jdbc.Driver'," + > > " 'username'='DataPip'," + > > " 'password'='DataPip'" + > > ")"; > > String dml = "INSERT INTO sink SELECT id,name ,description, > weight > > FROM debezium_source GROUP BY id,name ,description, weight"; > > tEnv.executeSql(sourceDDL); > > tEnv.executeSql(sinkDDL); > > tEnv.executeSql(dml); > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > |
如果去掉group by会抛出异常,请问有没有关这个异常的解决方式:
Exception in thread "main" org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, ddd]], fields=[id, age]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240) Jark wrote > Hi, > > 能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗? > 如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。 > 当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。 > > Best, > Jark > > On Thu, 16 Jul 2020 at 21:56, godfrey he < > godfreyhe@ > > wrote: > >> 为什么要 GROUP BY id,name ,description, weight ? >> 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM >> debezium_source" 不能满足需求? >> >> 曹武 < > 14701319164@ >> 于2020年7月16日周四 下午9:30写道: >> >> > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 >> > 从checkpoint恢复以后,新来op=d的数据会删除失败 >> > 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s >> > >> > >> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata >> > 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() >> > .useBlinkPlanner() >> > .inStreamingMode() >> > .build(); >> > >> > StreamExecutionEnvironment env = >> > StreamExecutionEnvironment.getExecutionEnvironment(); >> > >> > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); >> > env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 >> > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // >> > 最大允许同时出现几个CheckPoint >> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); >> // >> > 最小得间隔时间 >> > env.getCheckpointConfig().setPreferCheckpointForRecovery(true); >> // >> > 是否倾向于用CheckPoint做故障恢复 >> > >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); >> > // >> > 容忍多少次CheckPoint失败 >> > //Checkpoint文件清理策略 >> > >> > >> > >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> > //Checkpoint外部文件路径 >> > env.setStateBackend(new FsStateBackend(new >> > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); >> > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); >> > StreamTableEnvironment tEnv = >> StreamTableEnvironment.create(env, >> > settings); >> > String sourceDDL = String.format( >> > "CREATE TABLE debezium_source (" + >> > " id INT NOT NULL," + >> > " name STRING," + >> > " description STRING," + >> > " weight Double" + >> > ") WITH (" + >> > " 'connector' = 'kafka-0.11'," + >> > " 'topic' = '%s'," + >> > " 'properties.bootstrap.servers' = '%s'," + >> > " 'scan.startup.mode' = 'group-offsets'," + >> > " 'format' = 'debezium-json'" + >> > ")", "ddd", " 172.22.20.206:9092"); >> > String sinkDDL = "CREATE TABLE sink (" + >> > " id INT NOT NULL," + >> > " name STRING," + >> > " description STRING," + >> > " weight Double," + >> > " PRIMARY KEY (id,name, description,weight) NOT >> ENFORCED >> " >> > + >> > ") WITH (" + >> > " 'connector' = 'jdbc'," + >> > " 'url' = >> > 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + >> > " 'table-name' = 'products'," + >> > " 'driver'= 'com.mysql.cj.jdbc.Driver'," + >> > " 'username'='DataPip'," + >> > " 'password'='DataPip'" + >> > ")"; >> > String dml = "INSERT INTO sink SELECT id,name ,description, >> weight >> > FROM debezium_source GROUP BY id,name ,description, weight"; >> > tEnv.executeSql(sourceDDL); >> > tEnv.executeSql(sinkDDL); >> > tEnv.executeSql(dml); >> > >> > >> > >> > -- >> > Sent from: http://apache-flink.147419.n8.nabble.com/ >> > >> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by godfrey he
如果去掉group by会抛出异常,请问有没有关这个异常的解决方式:
Exception in thread "main" org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, ddd]], fields=[id, age]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240) godfrey he wrote > 为什么要 GROUP BY id,name ,description, weight ? > 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM > debezium_source" 不能满足需求? > > 曹武 < > 14701319164@ >> 于2020年7月16日周四 下午9:30写道: > >> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 >> 从checkpoint恢复以后,新来op=d的数据会删除失败 >> 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s >> >> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata >> 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); >> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // >> 最大允许同时出现几个CheckPoint >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // >> 最小得间隔时间 >> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); >> // >> 是否倾向于用CheckPoint做故障恢复 >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); >> // >> 容忍多少次CheckPoint失败 >> //Checkpoint文件清理策略 >> >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> //Checkpoint外部文件路径 >> env.setStateBackend(new FsStateBackend(new >> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); >> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); >> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >> settings); >> String sourceDDL = String.format( >> "CREATE TABLE debezium_source (" + >> " id INT NOT NULL," + >> " name STRING," + >> " description STRING," + >> " weight Double" + >> ") WITH (" + >> " 'connector' = 'kafka-0.11'," + >> " 'topic' = '%s'," + >> " 'properties.bootstrap.servers' = '%s'," + >> " 'scan.startup.mode' = 'group-offsets'," + >> " 'format' = 'debezium-json'" + >> ")", "ddd", " 172.22.20.206:9092"); >> String sinkDDL = "CREATE TABLE sink (" + >> " id INT NOT NULL," + >> " name STRING," + >> " description STRING," + >> " weight Double," + >> " PRIMARY KEY (id,name, description,weight) NOT ENFORCED >> " >> + >> ") WITH (" + >> " 'connector' = 'jdbc'," + >> " 'url' = >> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + >> " 'table-name' = 'products'," + >> " 'driver'= 'com.mysql.cj.jdbc.Driver'," + >> " 'username'='DataPip'," + >> " 'password'='DataPip'" + >> ")"; >> String dml = "INSERT INTO sink SELECT id,name ,description, >> weight >> FROM debezium_source GROUP BY id,name ,description, weight"; >> tEnv.executeSql(sourceDDL); >> tEnv.executeSql(sinkDDL); >> tEnv.executeSql(dml); >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by godfrey he
感觉好像是应为从checkpoint启动失败或者是checkpiont文件里面不包含groupby的中间结果,这个怎么排查呀!
godfrey he wrote > 为什么要 GROUP BY id,name ,description, weight ? > 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM > debezium_source" 不能满足需求? > > 曹武 < > 14701319164@ >> 于2020年7月16日周四 下午9:30写道: > >> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 >> 从checkpoint恢复以后,新来op=d的数据会删除失败 >> 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s >> >> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata >> 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); >> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // >> 最大允许同时出现几个CheckPoint >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // >> 最小得间隔时间 >> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); >> // >> 是否倾向于用CheckPoint做故障恢复 >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); >> // >> 容忍多少次CheckPoint失败 >> //Checkpoint文件清理策略 >> >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> //Checkpoint外部文件路径 >> env.setStateBackend(new FsStateBackend(new >> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); >> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); >> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >> settings); >> String sourceDDL = String.format( >> "CREATE TABLE debezium_source (" + >> " id INT NOT NULL," + >> " name STRING," + >> " description STRING," + >> " weight Double" + >> ") WITH (" + >> " 'connector' = 'kafka-0.11'," + >> " 'topic' = '%s'," + >> " 'properties.bootstrap.servers' = '%s'," + >> " 'scan.startup.mode' = 'group-offsets'," + >> " 'format' = 'debezium-json'" + >> ")", "ddd", " 172.22.20.206:9092"); >> String sinkDDL = "CREATE TABLE sink (" + >> " id INT NOT NULL," + >> " name STRING," + >> " description STRING," + >> " weight Double," + >> " PRIMARY KEY (id,name, description,weight) NOT ENFORCED >> " >> + >> ") WITH (" + >> " 'connector' = 'jdbc'," + >> " 'url' = >> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + >> " 'table-name' = 'products'," + >> " 'driver'= 'com.mysql.cj.jdbc.Driver'," + >> " 'username'='DataPip'," + >> " 'password'='DataPip'" + >> ")"; >> String dml = "INSERT INTO sink SELECT id,name ,description, >> weight >> FROM debezium_source GROUP BY id,name ,description, weight"; >> tEnv.executeSql(sourceDDL); >> tEnv.executeSql(sinkDDL); >> tEnv.executeSql(dml); >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi, 曹武
这是一个已知bug,这个在1.11.1和1.12.0里已经修复, 如果着急使用,可以自己编译下release-1.11分支。 祝好 Leonard Xu https://issues.apache.org/jira/browse/FLINK-18461 <https://issues.apache.org/jira/browse/FLINK-18461> > 在 2020年7月17日,17:12,曹武 <[hidden email]> 写道: > > 感觉好像是应为从checkpoint启动失败或者是checkpiont文件里面不包含groupby的中间结果,这个怎么排查呀! > > godfrey he wrote >> 为什么要 GROUP BY id,name ,description, weight ? >> 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM >> debezium_source" 不能满足需求? >> >> 曹武 < > >> 14701319164@ > >>> 于2020年7月16日周四 下午9:30写道: >> >>> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 >>> 从checkpoint恢复以后,新来op=d的数据会删除失败 >>> 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s >>> >>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata >>> 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode() >>> .build(); >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); >>> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 >>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // >>> 最大允许同时出现几个CheckPoint >>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // >>> 最小得间隔时间 >>> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); >>> // >>> 是否倾向于用CheckPoint做故障恢复 >>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); >>> // >>> 容忍多少次CheckPoint失败 >>> //Checkpoint文件清理策略 >>> >>> >>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >>> //Checkpoint外部文件路径 >>> env.setStateBackend(new FsStateBackend(new >>> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); >>> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); >>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >>> settings); >>> String sourceDDL = String.format( >>> "CREATE TABLE debezium_source (" + >>> " id INT NOT NULL," + >>> " name STRING," + >>> " description STRING," + >>> " weight Double" + >>> ") WITH (" + >>> " 'connector' = 'kafka-0.11'," + >>> " 'topic' = '%s'," + >>> " 'properties.bootstrap.servers' = '%s'," + >>> " 'scan.startup.mode' = 'group-offsets'," + >>> " 'format' = 'debezium-json'" + >>> ")", "ddd", " 172.22.20.206:9092"); >>> String sinkDDL = "CREATE TABLE sink (" + >>> " id INT NOT NULL," + >>> " name STRING," + >>> " description STRING," + >>> " weight Double," + >>> " PRIMARY KEY (id,name, description,weight) NOT ENFORCED >>> " >>> + >>> ") WITH (" + >>> " 'connector' = 'jdbc'," + >>> " 'url' = >>> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + >>> " 'table-name' = 'products'," + >>> " 'driver'= 'com.mysql.cj.jdbc.Driver'," + >>> " 'username'='DataPip'," + >>> " 'password'='DataPip'" + >>> ")"; >>> String dml = "INSERT INTO sink SELECT id,name ,description, >>> weight >>> FROM debezium_source GROUP BY id,name ,description, weight"; >>> tEnv.executeSql(sourceDDL); >>> tEnv.executeSql(sinkDDL); >>> tEnv.executeSql(dml); >>> >>> >>> >>> -- >>> Sent from: http://apache-flink.147419.n8.nabble.com/ >>> > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ <http://apache-flink.147419.n8.nabble.com/> |
Free forum by Nabble | Edit this page |