Hi all
小弟遇到个问题期望大佬解答解答: 通过 env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据, flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到 “/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢? public class FlinkTestDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000); env.getConfig().setAutoWatermarkInterval(200); env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints")); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings); bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5)); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "6000"); configuration.setString("table.exec.mini-batch.size", "5000"); | | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 |
你配置的是本地目录,不是 hdfs
目录,当重启后,可能新的任务运行的机器不是之前的那台机器了,那么之前作业的状态信息(在其他机器上)是不在新的机器上的,那么就会发现找不到状态文件,建议配置成 HDFS 的 Best zhisheng 刘海 <[hidden email]> 于2021年1月20日周三 下午9:05写道: > Hi all > 小弟遇到个问题期望大佬解答解答: > 通过 env.setStateBackend(new > RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据, > > > flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到 > “/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢? > > > public class FlinkTestDemo { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(60000); > env.getConfig().setAutoWatermarkInterval(200); > env.setStateBackend(new > RocksDBStateBackend("file:///data/flink/checkpoints")); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, > bsSettings); > > bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE); > CheckpointConfig config = env.getCheckpointConfig(); > > config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofMinutes(5)); > > Configuration configuration = bsTableEnv.getConfig().getConfiguration(); > configuration.setString("table.exec.mini-batch.enabled", "true"); > configuration.setString("table.exec.mini-batch.allow-latency", "6000"); > configuration.setString("table.exec.mini-batch.size", "5000"); > > | | > 刘海 > | > | > [hidden email] > | > 签名由网易邮箱大师定制 |
Free forum by Nabble | Edit this page |