hi all
flink 版本: 1.11.1 我们利用sql-client提交任务, flink-conf.yaml配置如下 state.backend: filesystem state.backend.fs.checkpointdir: hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252 state.checkpoints.dir: hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252 state.savepoints.dir: hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252 execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.interval: 60s execution.checkpointing.mode: EXACTLY_ONCE jobmanager.execution.failover-strategy: full state.backend.incremental: true 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件 类似下面: hdfs://10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata 除了这个文件,其他什么都没有。 我们的源是kafka,kafka肯定会保存state的。 请教大家这是什么原因导致的呢 谢谢 -- Best Regards, Harold Miao |
Hi
如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata 这一个文件的。具体逻辑可以看一下这里[1] [1] https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442 Best, Congxian Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道: > hi all > > flink 版本: 1.11.1 > > 我们利用sql-client提交任务, flink-conf.yaml配置如下 > > state.backend: filesystem > state.backend.fs.checkpointdir: > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252 > state.checkpoints.dir: > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252 > state.savepoints.dir: > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252 > > execution.checkpointing.externalized-checkpoint-retention: > RETAIN_ON_CANCELLATION > execution.checkpointing.interval: 60s > execution.checkpointing.mode: EXACTLY_ONCE > jobmanager.execution.failover-strategy: full > state.backend.incremental: true > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件 > > 类似下面: > > hdfs:// > 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata > > 除了这个文件,其他什么都没有。 > > 我们的源是kafka,kafka肯定会保存state的。 > > > 请教大家这是什么原因导致的呢 > > > 谢谢 > > > > > > > > > > > -- > > Best Regards, > Harold Miao > |
还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
private StreamExecutionEnvironment createStreamExecutionEnvironment() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); * LOG.info("restore cp exist: {}", environment.getExecution().getRestoreSp().isPresent()); if (environment.getExecution().getRestoreSp().isPresent()) { LOG.info("restore cp path: {}", environment.getExecution().getRestoreSp().get()); if (!environment.getExecution().getRestoreSp().get().contains("none")) { SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(), true); env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings); } }* // for TimeCharacteristic validation in StreamTableEnvironmentImpl env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic()); if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) { env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval()); } return env; } 传入上面那个只有meta文件地址的时候报错如下: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834) at org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868) ... 3 more 错误很明显的显示没有算子的state Congxian Qiu <[hidden email]> 于2020年9月14日周一 下午7:53写道: > Hi > 如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata > 这一个文件的。具体逻辑可以看一下这里[1] > > [1] > > https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442 > Best, > Congxian > > > Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道: > > > hi all > > > > flink 版本: 1.11.1 > > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下 > > > > state.backend: filesystem > > state.backend.fs.checkpointdir: > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252 > > state.checkpoints.dir: > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252 > > state.savepoints.dir: > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252 > > > > execution.checkpointing.externalized-checkpoint-retention: > > RETAIN_ON_CANCELLATION > > execution.checkpointing.interval: 60s > > execution.checkpointing.mode: EXACTLY_ONCE > > jobmanager.execution.failover-strategy: full > > state.backend.incremental: true > > > > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件 > > > > 类似下面: > > > > hdfs:// > > > 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata > > > > 除了这个文件,其他什么都没有。 > > > > 我们的源是kafka,kafka肯定会保存state的。 > > > > > > 请教大家这是什么原因导致的呢 > > > > > > 谢谢 > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao |
Administrator
|
是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?
On Mon, 14 Sep 2020 at 20:15, Harold.Miao <[hidden email]> wrote: > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下 > > private StreamExecutionEnvironment createStreamExecutionEnvironment() { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > * LOG.info("restore cp exist: {}", > environment.getExecution().getRestoreSp().isPresent()); if > (environment.getExecution().getRestoreSp().isPresent()) { > LOG.info("restore cp path: {}", > environment.getExecution().getRestoreSp().get()); if > (!environment.getExecution().getRestoreSp().get().contains("none")) { > SavepointRestoreSettings savepointRestoreSettings = > > SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(), > true); > env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings); > } }* > // for TimeCharacteristic validation in StreamTableEnvironmentImpl > > env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic()); > if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) { > > env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval()); > } > return env; > } > > > 传入上面那个只有meta文件地址的时候报错如下: > > Exception in thread "main" > org.apache.flink.table.client.SqlClientException: Unexpected > exception. This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) > at > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > Caused by: java.lang.IllegalStateException: No operators defined in > streaming topology. Cannot execute. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137) > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868) > ... 3 more > > > 错误很明显的显示没有算子的state > > > > > > > > > Congxian Qiu <[hidden email]> 于2020年9月14日周一 下午7:53写道: > > > Hi > > 如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata > > 这一个文件的。具体逻辑可以看一下这里[1] > > > > [1] > > > > > https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442 > > Best, > > Congxian > > > > > > Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道: > > > > > hi all > > > > > > flink 版本: 1.11.1 > > > > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下 > > > > > > state.backend: filesystem > > > state.backend.fs.checkpointdir: > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252 > > > state.checkpoints.dir: > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252 > > > state.savepoints.dir: > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252 > > > > > > execution.checkpointing.externalized-checkpoint-retention: > > > RETAIN_ON_CANCELLATION > > > execution.checkpointing.interval: 60s > > > execution.checkpointing.mode: EXACTLY_ONCE > > > jobmanager.execution.failover-strategy: full > > > state.backend.incremental: true > > > > > > > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件 > > > > > > 类似下面: > > > > > > hdfs:// > > > > > > 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata > > > > > > 除了这个文件,其他什么都没有。 > > > > > > 我们的源是kafka,kafka肯定会保存state的。 > > > > > > > > > 请教大家这是什么原因导致的呢 > > > > > > > > > 谢谢 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best Regards, > > > Harold Miao > > > > > > > > -- > > Best Regards, > Harold Miao > |
是同一个insert任务, 只是重启任务的时候 ,我加了这些代码,构造一个 SavepointRestoreSettings 来恢复cp
请教 我如何判断cp真正写入了hdfs呢,meta文件有什么工具可以解析吗 谢谢 Jark Wu <[hidden email]> 于2020年9月15日周二 上午11:31写道: > 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句? > > On Mon, 14 Sep 2020 at 20:15, Harold.Miao <[hidden email]> wrote: > > > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下 > > > > private StreamExecutionEnvironment createStreamExecutionEnvironment() { > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > > > > > > > > > * LOG.info("restore cp exist: {}", > > environment.getExecution().getRestoreSp().isPresent()); if > > (environment.getExecution().getRestoreSp().isPresent()) { > > LOG.info("restore cp path: {}", > > environment.getExecution().getRestoreSp().get()); if > > (!environment.getExecution().getRestoreSp().get().contains("none")) { > > SavepointRestoreSettings savepointRestoreSettings = > > > > > SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(), > > true); > > > env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings); > > } }* > > // for TimeCharacteristic validation in StreamTableEnvironmentImpl > > > > > env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic()); > > if (env.getStreamTimeCharacteristic() == > TimeCharacteristic.EventTime) { > > > > > env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval()); > > } > > return env; > > } > > > > > > 传入上面那个只有meta文件地址的时候报错如下: > > > > Exception in thread "main" > > org.apache.flink.table.client.SqlClientException: Unexpected > > exception. This is a bug. Please consider filing an issue. > > at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > > Could not create execution context. > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879) > > at > > > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) > > at > > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > > at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > > Caused by: java.lang.IllegalStateException: No operators defined in > > streaming topology. Cannot execute. > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868) > > ... 3 more > > > > > > 错误很明显的显示没有算子的state > > > > > > > > > > > > > > > > > > Congxian Qiu <[hidden email]> 于2020年9月14日周一 下午7:53写道: > > > > > Hi > > > 如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata > > > 这一个文件的。具体逻辑可以看一下这里[1] > > > > > > [1] > > > > > > > > > https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442 > > > Best, > > > Congxian > > > > > > > > > Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道: > > > > > > > hi all > > > > > > > > flink 版本: 1.11.1 > > > > > > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下 > > > > > > > > state.backend: filesystem > > > > state.backend.fs.checkpointdir: > > > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252 > > > > state.checkpoints.dir: > > > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252 > > > > state.savepoints.dir: > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252 > > > > > > > > execution.checkpointing.externalized-checkpoint-retention: > > > > RETAIN_ON_CANCELLATION > > > > execution.checkpointing.interval: 60s > > > > execution.checkpointing.mode: EXACTLY_ONCE > > > > jobmanager.execution.failover-strategy: full > > > > state.backend.incremental: true > > > > > > > > > > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件 > > > > > > > > 类似下面: > > > > > > > > hdfs:// > > > > > > > > > > 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata > > > > > > > > 除了这个文件,其他什么都没有。 > > > > > > > > 我们的源是kafka,kafka肯定会保存state的。 > > > > > > > > > > > > 请教大家这是什么原因导致的呢 > > > > > > > > > > > > 谢谢 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best Regards, > > > > Harold Miao > > > > > > > > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao |
Hi
你可以参考这里[1] 自己进行一些修改尝试,来分析 metadata 文件 [1] https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java Best, Congxian Harold.Miao <[hidden email]> 于2020年9月15日周二 下午1:58写道: > 是同一个insert任务, 只是重启任务的时候 ,我加了这些代码,构造一个 SavepointRestoreSettings 来恢复cp > > 请教 我如何判断cp真正写入了hdfs呢,meta文件有什么工具可以解析吗 > > 谢谢 > > Jark Wu <[hidden email]> 于2020年9月15日周二 上午11:31写道: > > > 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句? > > > > On Mon, 14 Sep 2020 at 20:15, Harold.Miao <[hidden email]> wrote: > > > > > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下 > > > > > > private StreamExecutionEnvironment createStreamExecutionEnvironment() { > > > final StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > > > > > > > > > > > > > > > > > * LOG.info("restore cp exist: {}", > > > environment.getExecution().getRestoreSp().isPresent()); if > > > (environment.getExecution().getRestoreSp().isPresent()) { > > > LOG.info("restore cp path: {}", > > > environment.getExecution().getRestoreSp().get()); if > > > (!environment.getExecution().getRestoreSp().get().contains("none")) { > > > SavepointRestoreSettings savepointRestoreSettings = > > > > > > > > > SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(), > > > true); > > > > > > env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings); > > > } }* > > > // for TimeCharacteristic validation in StreamTableEnvironmentImpl > > > > > > > > > env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic()); > > > if (env.getStreamTimeCharacteristic() == > > TimeCharacteristic.EventTime) { > > > > > > > > > env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval()); > > > } > > > return env; > > > } > > > > > > > > > 传入上面那个只有meta文件地址的时候报错如下: > > > > > > Exception in thread "main" > > > org.apache.flink.table.client.SqlClientException: Unexpected > > > exception. This is a bug. Please consider filing an issue. > > > at > > org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) > > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > > > Could not create execution context. > > > at > > > > > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879) > > > at > > > > > > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) > > > at > > > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > > > at > > org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > > > Caused by: java.lang.IllegalStateException: No operators defined in > > > streaming topology. Cannot execute. > > > at > > > > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) > > > at > > > > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) > > > at > > > > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) > > > at > > > > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834) > > > at > > > > > > org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691) > > > at > > > > > > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593) > > > at > > > > > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498) > > > at > > > > > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184) > > > at > > > > > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137) > > > at > > > > > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868) > > > ... 3 more > > > > > > > > > 错误很明显的显示没有算子的state > > > > > > > > > > > > > > > > > > > > > > > > > > > Congxian Qiu <[hidden email]> 于2020年9月14日周一 下午7:53写道: > > > > > > > Hi > > > > 如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata > > > > 这一个文件的。具体逻辑可以看一下这里[1] > > > > > > > > [1] > > > > > > > > > > > > > > https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442 > > > > Best, > > > > Congxian > > > > > > > > > > > > Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道: > > > > > > > > > hi all > > > > > > > > > > flink 版本: 1.11.1 > > > > > > > > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下 > > > > > > > > > > state.backend: filesystem > > > > > state.backend.fs.checkpointdir: > > > > > > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252 > > > > > state.checkpoints.dir: > > > > > > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252 > > > > > state.savepoints.dir: > > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252 > > > > > > > > > > execution.checkpointing.externalized-checkpoint-retention: > > > > > RETAIN_ON_CANCELLATION > > > > > execution.checkpointing.interval: 60s > > > > > execution.checkpointing.mode: EXACTLY_ONCE > > > > > jobmanager.execution.failover-strategy: full > > > > > state.backend.incremental: true > > > > > > > > > > > > > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件 > > > > > > > > > > 类似下面: > > > > > > > > > > hdfs:// > > > > > > > > > > > > > > > 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata > > > > > > > > > > 除了这个文件,其他什么都没有。 > > > > > > > > > > 我们的源是kafka,kafka肯定会保存state的。 > > > > > > > > > > > > > > > 请教大家这是什么原因导致的呢 > > > > > > > > > > > > > > > 谢谢 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Best Regards, > > > > > Harold Miao > > > > > > > > > > > > > > > > > > -- > > > > > > Best Regards, > > > Harold Miao > > > > > > > > -- > > Best Regards, > Harold Miao > |
In reply to this post by Jark
是我的代码问题,我set sp的时候streamGraph里面的算子还没有构建出来,正确的做法是在生成jobGraph的时候set 进去。 感谢
Jark Wu <[hidden email]> 于2020年9月15日周二 上午11:31写道: > 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句? > > On Mon, 14 Sep 2020 at 20:15, Harold.Miao <[hidden email]> wrote: > > > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下 > > > > private StreamExecutionEnvironment createStreamExecutionEnvironment() { > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > > > > > > > > > * LOG.info("restore cp exist: {}", > > environment.getExecution().getRestoreSp().isPresent()); if > > (environment.getExecution().getRestoreSp().isPresent()) { > > LOG.info("restore cp path: {}", > > environment.getExecution().getRestoreSp().get()); if > > (!environment.getExecution().getRestoreSp().get().contains("none")) { > > SavepointRestoreSettings savepointRestoreSettings = > > > > > SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(), > > true); > > > env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings); > > } }* > > // for TimeCharacteristic validation in StreamTableEnvironmentImpl > > > > > env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic()); > > if (env.getStreamTimeCharacteristic() == > TimeCharacteristic.EventTime) { > > > > > env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval()); > > } > > return env; > > } > > > > > > 传入上面那个只有meta文件地址的时候报错如下: > > > > Exception in thread "main" > > org.apache.flink.table.client.SqlClientException: Unexpected > > exception. This is a bug. Please consider filing an issue. > > at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > > Could not create execution context. > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879) > > at > > > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) > > at > > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > > at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > > Caused by: java.lang.IllegalStateException: No operators defined in > > streaming topology. Cannot execute. > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868) > > ... 3 more > > > > > > 错误很明显的显示没有算子的state > > > > > > > > > > > > > > > > > > Congxian Qiu <[hidden email]> 于2020年9月14日周一 下午7:53写道: > > > > > Hi > > > 如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata > > > 这一个文件的。具体逻辑可以看一下这里[1] > > > > > > [1] > > > > > > > > > https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442 > > > Best, > > > Congxian > > > > > > > > > Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道: > > > > > > > hi all > > > > > > > > flink 版本: 1.11.1 > > > > > > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下 > > > > > > > > state.backend: filesystem > > > > state.backend.fs.checkpointdir: > > > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252 > > > > state.checkpoints.dir: > > > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252 > > > > state.savepoints.dir: > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252 > > > > > > > > execution.checkpointing.externalized-checkpoint-retention: > > > > RETAIN_ON_CANCELLATION > > > > execution.checkpointing.interval: 60s > > > > execution.checkpointing.mode: EXACTLY_ONCE > > > > jobmanager.execution.failover-strategy: full > > > > state.backend.incremental: true > > > > > > > > > > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件 > > > > > > > > 类似下面: > > > > > > > > hdfs:// > > > > > > > > > > 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata > > > > > > > > 除了这个文件,其他什么都没有。 > > > > > > > > 我们的源是kafka,kafka肯定会保存state的。 > > > > > > > > > > > > 请教大家这是什么原因导致的呢 > > > > > > > > > > > > 谢谢 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best Regards, > > > > Harold Miao > > > > > > > > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao |
Free forum by Nabble | Edit this page |