[sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

Harold.Miao
hi  all

flink 版本: 1.11.1

我们利用sql-client提交任务, flink-conf.yaml配置如下

state.backend: filesystem
state.backend.fs.checkpointdir:
hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
state.checkpoints.dir:
hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
state.savepoints.dir:
hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252

execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
jobmanager.execution.failover-strategy: full
state.backend.incremental: true


任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件

类似下面:

hdfs://10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata

除了这个文件,其他什么都没有。

我们的源是kafka,kafka肯定会保存state的。


请教大家这是什么原因导致的呢


谢谢










--

Best Regards,
Harold Miao
Reply | Threaded
Open this post in threaded view
|

Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

Congxian Qiu
Hi
   如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
这一个文件的。具体逻辑可以看一下这里[1]

[1]
https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
Best,
Congxian


Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道:

> hi  all
>
> flink 版本: 1.11.1
>
> 我们利用sql-client提交任务, flink-conf.yaml配置如下
>
> state.backend: filesystem
> state.backend.fs.checkpointdir:
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> state.checkpoints.dir:
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> state.savepoints.dir:
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
>
> execution.checkpointing.externalized-checkpoint-retention:
> RETAIN_ON_CANCELLATION
> execution.checkpointing.interval: 60s
> execution.checkpointing.mode: EXACTLY_ONCE
> jobmanager.execution.failover-strategy: full
> state.backend.incremental: true
>
>
> 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
>
> 类似下面:
>
> hdfs://
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
>
> 除了这个文件,其他什么都没有。
>
> 我们的源是kafka,kafka肯定会保存state的。
>
>
> 请教大家这是什么原因导致的呢
>
>
> 谢谢
>
>
>
>
>
>
>
>
>
>
> --
>
> Best Regards,
> Harold Miao
>
Reply | Threaded
Open this post in threaded view
|

Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

Harold.Miao
还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下

private StreamExecutionEnvironment createStreamExecutionEnvironment() {
   final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();







*   LOG.info("restore cp exist: {}",
environment.getExecution().getRestoreSp().isPresent());   if
(environment.getExecution().getRestoreSp().isPresent()) {
LOG.info("restore cp path: {}",
environment.getExecution().getRestoreSp().get());      if
(!environment.getExecution().getRestoreSp().get().contains("none")) {
       SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
true);         env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
     }   }*
   // for TimeCharacteristic validation in StreamTableEnvironmentImpl
   env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
   if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
      env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
   }
   return env;
}


传入上面那个只有meta文件地址的时候报错如下:

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected
exception. This is a bug. Please consider filing an issue.
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.
        at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: java.lang.IllegalStateException: No operators defined in
streaming topology. Cannot execute.
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137)
        at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
        ... 3 more


错误很明显的显示没有算子的state








Congxian Qiu <[hidden email]> 于2020年9月14日周一 下午7:53写道:

> Hi
>    如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> 这一个文件的。具体逻辑可以看一下这里[1]
>
> [1]
>
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> Best,
> Congxian
>
>
> Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道:
>
> > hi  all
> >
> > flink 版本: 1.11.1
> >
> > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> >
> > state.backend: filesystem
> > state.backend.fs.checkpointdir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > state.checkpoints.dir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > state.savepoints.dir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> >
> > execution.checkpointing.externalized-checkpoint-retention:
> > RETAIN_ON_CANCELLATION
> > execution.checkpointing.interval: 60s
> > execution.checkpointing.mode: EXACTLY_ONCE
> > jobmanager.execution.failover-strategy: full
> > state.backend.incremental: true
> >
> >
> > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> >
> > 类似下面:
> >
> > hdfs://
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> >
> > 除了这个文件,其他什么都没有。
> >
> > 我们的源是kafka,kafka肯定会保存state的。
> >
> >
> > 请教大家这是什么原因导致的呢
> >
> >
> > 谢谢
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


--

Best Regards,
Harold Miao
Reply | Threaded
Open this post in threaded view
|

Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

Jark
Administrator
是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?

On Mon, 14 Sep 2020 at 20:15, Harold.Miao <[hidden email]> wrote:

> 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
>
> private StreamExecutionEnvironment createStreamExecutionEnvironment() {
>    final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
>
>
>
>
> *   LOG.info("restore cp exist: {}",
> environment.getExecution().getRestoreSp().isPresent());   if
> (environment.getExecution().getRestoreSp().isPresent()) {
> LOG.info("restore cp path: {}",
> environment.getExecution().getRestoreSp().get());      if
> (!environment.getExecution().getRestoreSp().get().contains("none")) {
>        SavepointRestoreSettings savepointRestoreSettings =
>
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> true);
>  env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
>      }   }*
>    // for TimeCharacteristic validation in StreamTableEnvironmentImpl
>
>  env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
>    if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
>
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
>    }
>    return env;
> }
>
>
> 传入上面那个只有meta文件地址的时候报错如下:
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected
> exception. This is a bug. Please consider filing an issue.
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
>         at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: java.lang.IllegalStateException: No operators defined in
> streaming topology. Cannot execute.
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
>         ... 3 more
>
>
> 错误很明显的显示没有算子的state
>
>
>
>
>
>
>
>
> Congxian Qiu <[hidden email]> 于2020年9月14日周一 下午7:53写道:
>
> > Hi
> >    如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > 这一个文件的。具体逻辑可以看一下这里[1]
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > Best,
> > Congxian
> >
> >
> > Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道:
> >
> > > hi  all
> > >
> > > flink 版本: 1.11.1
> > >
> > > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> > >
> > > state.backend: filesystem
> > > state.backend.fs.checkpointdir:
> > >
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > > state.checkpoints.dir:
> > >
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > > state.savepoints.dir:
> > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> > >
> > > execution.checkpointing.externalized-checkpoint-retention:
> > > RETAIN_ON_CANCELLATION
> > > execution.checkpointing.interval: 60s
> > > execution.checkpointing.mode: EXACTLY_ONCE
> > > jobmanager.execution.failover-strategy: full
> > > state.backend.incremental: true
> > >
> > >
> > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> > >
> > > 类似下面:
> > >
> > > hdfs://
> > >
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> > >
> > > 除了这个文件,其他什么都没有。
> > >
> > > 我们的源是kafka,kafka肯定会保存state的。
> > >
> > >
> > > 请教大家这是什么原因导致的呢
> > >
> > >
> > > 谢谢
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Best Regards,
> > > Harold Miao
> > >
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>
Reply | Threaded
Open this post in threaded view
|

Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

Harold.Miao
是同一个insert任务, 只是重启任务的时候 ,我加了这些代码,构造一个 SavepointRestoreSettings  来恢复cp

请教   我如何判断cp真正写入了hdfs呢,meta文件有什么工具可以解析吗

谢谢

Jark Wu <[hidden email]> 于2020年9月15日周二 上午11:31写道:

> 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?
>
> On Mon, 14 Sep 2020 at 20:15, Harold.Miao <[hidden email]> wrote:
>
> > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
> >
> > private StreamExecutionEnvironment createStreamExecutionEnvironment() {
> >    final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >
> >
> >
> >
> >
> >
> > *   LOG.info("restore cp exist: {}",
> > environment.getExecution().getRestoreSp().isPresent());   if
> > (environment.getExecution().getRestoreSp().isPresent()) {
> > LOG.info("restore cp path: {}",
> > environment.getExecution().getRestoreSp().get());      if
> > (!environment.getExecution().getRestoreSp().get().contains("none")) {
> >        SavepointRestoreSettings savepointRestoreSettings =
> >
> >
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> > true);
> >
> env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
> >      }   }*
> >    // for TimeCharacteristic validation in StreamTableEnvironmentImpl
> >
> >
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
> >    if (env.getStreamTimeCharacteristic() ==
> TimeCharacteristic.EventTime) {
> >
> >
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
> >    }
> >    return env;
> > }
> >
> >
> > 传入上面那个只有meta文件地址的时候报错如下:
> >
> > Exception in thread "main"
> > org.apache.flink.table.client.SqlClientException: Unexpected
> > exception. This is a bug. Please consider filing an issue.
> >         at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> > Could not create execution context.
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
> >         at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> >         at
> > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> >         at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> > Caused by: java.lang.IllegalStateException: No operators defined in
> > streaming topology. Cannot execute.
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
> >         ... 3 more
> >
> >
> > 错误很明显的显示没有算子的state
> >
> >
> >
> >
> >
> >
> >
> >
> > Congxian Qiu <[hidden email]> 于2020年9月14日周一 下午7:53写道:
> >
> > > Hi
> > >    如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > > 这一个文件的。具体逻辑可以看一下这里[1]
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > > Best,
> > > Congxian
> > >
> > >
> > > Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道:
> > >
> > > > hi  all
> > > >
> > > > flink 版本: 1.11.1
> > > >
> > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> > > >
> > > > state.backend: filesystem
> > > > state.backend.fs.checkpointdir:
> > > >
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > > > state.checkpoints.dir:
> > > >
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > > > state.savepoints.dir:
> > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> > > >
> > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > RETAIN_ON_CANCELLATION
> > > > execution.checkpointing.interval: 60s
> > > > execution.checkpointing.mode: EXACTLY_ONCE
> > > > jobmanager.execution.failover-strategy: full
> > > > state.backend.incremental: true
> > > >
> > > >
> > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> > > >
> > > > 类似下面:
> > > >
> > > > hdfs://
> > > >
> > >
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> > > >
> > > > 除了这个文件,其他什么都没有。
> > > >
> > > > 我们的源是kafka,kafka肯定会保存state的。
> > > >
> > > >
> > > > 请教大家这是什么原因导致的呢
> > > >
> > > >
> > > > 谢谢
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best Regards,
> > > > Harold Miao
> > > >
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


--

Best Regards,
Harold Miao
Reply | Threaded
Open this post in threaded view
|

Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

Congxian Qiu
Hi
   你可以参考这里[1] 自己进行一些修改尝试,来分析 metadata 文件
[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
Best,
Congxian


Harold.Miao <[hidden email]> 于2020年9月15日周二 下午1:58写道:

> 是同一个insert任务, 只是重启任务的时候 ,我加了这些代码,构造一个 SavepointRestoreSettings  来恢复cp
>
> 请教   我如何判断cp真正写入了hdfs呢,meta文件有什么工具可以解析吗
>
> 谢谢
>
> Jark Wu <[hidden email]> 于2020年9月15日周二 上午11:31写道:
>
> > 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?
> >
> > On Mon, 14 Sep 2020 at 20:15, Harold.Miao <[hidden email]> wrote:
> >
> > > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
> > >
> > > private StreamExecutionEnvironment createStreamExecutionEnvironment() {
> > >    final StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *   LOG.info("restore cp exist: {}",
> > > environment.getExecution().getRestoreSp().isPresent());   if
> > > (environment.getExecution().getRestoreSp().isPresent()) {
> > > LOG.info("restore cp path: {}",
> > > environment.getExecution().getRestoreSp().get());      if
> > > (!environment.getExecution().getRestoreSp().get().contains("none")) {
> > >        SavepointRestoreSettings savepointRestoreSettings =
> > >
> > >
> >
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> > > true);
> > >
> >
> env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
> > >      }   }*
> > >    // for TimeCharacteristic validation in StreamTableEnvironmentImpl
> > >
> > >
> >
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
> > >    if (env.getStreamTimeCharacteristic() ==
> > TimeCharacteristic.EventTime) {
> > >
> > >
> >
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
> > >    }
> > >    return env;
> > > }
> > >
> > >
> > > 传入上面那个只有meta文件地址的时候报错如下:
> > >
> > > Exception in thread "main"
> > > org.apache.flink.table.client.SqlClientException: Unexpected
> > > exception. This is a bug. Please consider filing an issue.
> > >         at
> > org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> > > Could not create execution context.
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> > >         at
> > > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> > >         at
> > org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > streaming topology. Cannot execute.
> > >         at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
> > >         at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
> > >         at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
> > >         at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
> > >         ... 3 more
> > >
> > >
> > > 错误很明显的显示没有算子的state
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > Congxian Qiu <[hidden email]> 于2020年9月14日周一 下午7:53写道:
> > >
> > > > Hi
> > > >    如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > > > 这一个文件的。具体逻辑可以看一下这里[1]
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道:
> > > >
> > > > > hi  all
> > > > >
> > > > > flink 版本: 1.11.1
> > > > >
> > > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> > > > >
> > > > > state.backend: filesystem
> > > > > state.backend.fs.checkpointdir:
> > > > >
> > >
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > > > > state.checkpoints.dir:
> > > > >
> > >
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > > > > state.savepoints.dir:
> > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> > > > >
> > > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > > RETAIN_ON_CANCELLATION
> > > > > execution.checkpointing.interval: 60s
> > > > > execution.checkpointing.mode: EXACTLY_ONCE
> > > > > jobmanager.execution.failover-strategy: full
> > > > > state.backend.incremental: true
> > > > >
> > > > >
> > > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> > > > >
> > > > > 类似下面:
> > > > >
> > > > > hdfs://
> > > > >
> > > >
> > >
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> > > > >
> > > > > 除了这个文件,其他什么都没有。
> > > > >
> > > > > 我们的源是kafka,kafka肯定会保存state的。
> > > > >
> > > > >
> > > > > 请教大家这是什么原因导致的呢
> > > > >
> > > > >
> > > > > 谢谢
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best Regards,
> > > > > Harold Miao
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best Regards,
> > > Harold Miao
> > >
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>
Reply | Threaded
Open this post in threaded view
|

Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

Harold.Miao
In reply to this post by Jark
是我的代码问题,我set sp的时候streamGraph里面的算子还没有构建出来,正确的做法是在生成jobGraph的时候set 进去。  感谢

Jark Wu <[hidden email]> 于2020年9月15日周二 上午11:31写道:

> 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?
>
> On Mon, 14 Sep 2020 at 20:15, Harold.Miao <[hidden email]> wrote:
>
> > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
> >
> > private StreamExecutionEnvironment createStreamExecutionEnvironment() {
> >    final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >
> >
> >
> >
> >
> >
> > *   LOG.info("restore cp exist: {}",
> > environment.getExecution().getRestoreSp().isPresent());   if
> > (environment.getExecution().getRestoreSp().isPresent()) {
> > LOG.info("restore cp path: {}",
> > environment.getExecution().getRestoreSp().get());      if
> > (!environment.getExecution().getRestoreSp().get().contains("none")) {
> >        SavepointRestoreSettings savepointRestoreSettings =
> >
> >
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> > true);
> >
> env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
> >      }   }*
> >    // for TimeCharacteristic validation in StreamTableEnvironmentImpl
> >
> >
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
> >    if (env.getStreamTimeCharacteristic() ==
> TimeCharacteristic.EventTime) {
> >
> >
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
> >    }
> >    return env;
> > }
> >
> >
> > 传入上面那个只有meta文件地址的时候报错如下:
> >
> > Exception in thread "main"
> > org.apache.flink.table.client.SqlClientException: Unexpected
> > exception. This is a bug. Please consider filing an issue.
> >         at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> > Could not create execution context.
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
> >         at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> >         at
> > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> >         at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> > Caused by: java.lang.IllegalStateException: No operators defined in
> > streaming topology. Cannot execute.
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
> >         ... 3 more
> >
> >
> > 错误很明显的显示没有算子的state
> >
> >
> >
> >
> >
> >
> >
> >
> > Congxian Qiu <[hidden email]> 于2020年9月14日周一 下午7:53写道:
> >
> > > Hi
> > >    如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > > 这一个文件的。具体逻辑可以看一下这里[1]
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > > Best,
> > > Congxian
> > >
> > >
> > > Harold.Miao <[hidden email]> 于2020年9月14日周一 下午6:44写道:
> > >
> > > > hi  all
> > > >
> > > > flink 版本: 1.11.1
> > > >
> > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> > > >
> > > > state.backend: filesystem
> > > > state.backend.fs.checkpointdir:
> > > >
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > > > state.checkpoints.dir:
> > > >
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > > > state.savepoints.dir:
> > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> > > >
> > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > RETAIN_ON_CANCELLATION
> > > > execution.checkpointing.interval: 60s
> > > > execution.checkpointing.mode: EXACTLY_ONCE
> > > > jobmanager.execution.failover-strategy: full
> > > > state.backend.incremental: true
> > > >
> > > >
> > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> > > >
> > > > 类似下面:
> > > >
> > > > hdfs://
> > > >
> > >
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> > > >
> > > > 除了这个文件,其他什么都没有。
> > > >
> > > > 我们的源是kafka,kafka肯定会保存state的。
> > > >
> > > >
> > > > 请教大家这是什么原因导致的呢
> > > >
> > > >
> > > > 谢谢
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best Regards,
> > > > Harold Miao
> > > >
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


--

Best Regards,
Harold Miao