hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
异常: The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 11 more 代码: StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment, settings); environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); environment.setStateBackend(new MemoryStateBackend()); environment.getCheckpointConfig().setCheckpointInterval(5000); String name = "myhive"; String defaultDatabase = "tmp"; String hiveConfDir = "/etc/alternatives/hive-conf/"; String version = "1.1.0"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog("myhive", hive); tableEnv.useCatalog("myhive"); tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" + " user_id BIGINT,\n" + " item_id STRING,\n" + " behavior STRING,\n" + " ts AS PROCTIME()\n" + ") WITH (\n" + " 'connector' = 'kafka-0.11',\n" + " 'topic' = 'user_behavior',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")"); // tableEnv.executeSql("CREATE TABLE print_table (\n" + // " user_id BIGINT,\n" + // " item_id STRING,\n" + // " behavior STRING,\n" + // " tsdata STRING\n" + // ") WITH (\n" + // " 'connector' = 'print'\n" + // ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" + " user_id BIGINT,\n" + " item_id STRING,\n" + " behavior STRING,\n" + " tsdata STRING\n" + ") STORED AS parquet TBLPROPERTIES (\n" + " 'sink.rolling-policy.file-size' = '12MB',\n" + " 'sink.rolling-policy.rollover-interval' = '1 min',\n" + " 'sink.rolling-policy.check-interval' = '1 min',\n" + " 'execution.checkpointing.interval' = 'true'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tableEnv.executeSql("insert into streamhivetest select user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from user_behavior"); tableEnv.execute("stream-write-hive"); |
tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> wrote: > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因: > > 异常: > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: No operators defined in streaming topology. Cannot > generate StreamGraph. > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) > at > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > Caused by: java.lang.IllegalStateException: No operators defined in > streaming topology. Cannot generate StreamGraph. > at > > org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) > at > > org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > at > > com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 11 more > 代码: > > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings settings = > > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(environment, settings); > > > environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > environment.setStateBackend(new MemoryStateBackend()); > environment.getCheckpointConfig().setCheckpointInterval(5000); > > String name = "myhive"; > String defaultDatabase = "tmp"; > String hiveConfDir = "/etc/alternatives/hive-conf/"; > String version = "1.1.0"; > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > hiveConfDir, version); > tableEnv.registerCatalog("myhive", hive); > tableEnv.useCatalog("myhive"); > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" + > " user_id BIGINT,\n" + > " item_id STRING,\n" + > " behavior STRING,\n" + > " ts AS PROCTIME()\n" + > ") WITH (\n" + > " 'connector' = 'kafka-0.11',\n" + > " 'topic' = 'user_behavior',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'testGroup',\n" + > " 'scan.startup.mode' = 'earliest-offset',\n" + > " 'format' = 'json',\n" + > " 'json.fail-on-missing-field' = 'false',\n" + > " 'json.ignore-parse-errors' = 'true'\n" + > ")"); > > // tableEnv.executeSql("CREATE TABLE print_table (\n" + > // " user_id BIGINT,\n" + > // " item_id STRING,\n" + > // " behavior STRING,\n" + > // " tsdata STRING\n" + > // ") WITH (\n" + > // " 'connector' = 'print'\n" + > // ")"); > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" + > " user_id BIGINT,\n" + > " item_id STRING,\n" + > " behavior STRING,\n" + > " tsdata STRING\n" + > ") STORED AS parquet TBLPROPERTIES (\n" + > " 'sink.rolling-policy.file-size' = '12MB',\n" + > " 'sink.rolling-policy.rollover-interval' = '1 min',\n" + > " 'sink.rolling-policy.check-interval' = '1 min',\n" + > " 'execution.checkpointing.interval' = 'true'\n" + > ")"); > > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > tableEnv.executeSql("insert into streamhivetest select > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from > user_behavior"); > > tableEnv.execute("stream-write-hive"); > -- Best regards! Rui Li |
hi
好的,想问一下stream写hive表的时候: 1、一定要在flink内部先建立hive表吗? 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗 Rui Li <[hidden email]> 于2020年7月20日周一 下午4:44写道: > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈 > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> wrote: > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因: > > > > 异常: > > The program finished with the following exception: > > > > org.apache.flink.client.program.ProgramInvocationException: The main > method > > caused an error: No operators defined in streaming topology. Cannot > > generate StreamGraph. > > at > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > > at > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > > at > > > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > > at > > > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > > at > > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) > > at > > > > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > Caused by: java.lang.IllegalStateException: No operators defined in > > streaming topology. Cannot generate StreamGraph. > > at > > > > > org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) > > at > > > > > org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) > > at > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > > at > > > > > com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > > ... 11 more > > 代码: > > > > StreamExecutionEnvironment environment = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > EnvironmentSettings settings = > > > > > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); > > StreamTableEnvironment tableEnv = > > StreamTableEnvironment.create(environment, settings); > > > > > > > environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > environment.setStateBackend(new MemoryStateBackend()); > > environment.getCheckpointConfig().setCheckpointInterval(5000); > > > > String name = "myhive"; > > String defaultDatabase = "tmp"; > > String hiveConfDir = "/etc/alternatives/hive-conf/"; > > String version = "1.1.0"; > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > > hiveConfDir, version); > > tableEnv.registerCatalog("myhive", hive); > > tableEnv.useCatalog("myhive"); > > > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" + > > " user_id BIGINT,\n" + > > " item_id STRING,\n" + > > " behavior STRING,\n" + > > " ts AS PROCTIME()\n" + > > ") WITH (\n" + > > " 'connector' = 'kafka-0.11',\n" + > > " 'topic' = 'user_behavior',\n" + > > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > > " 'properties.group.id' = 'testGroup',\n" + > > " 'scan.startup.mode' = 'earliest-offset',\n" + > > " 'format' = 'json',\n" + > > " 'json.fail-on-missing-field' = 'false',\n" + > > " 'json.ignore-parse-errors' = 'true'\n" + > > ")"); > > > > // tableEnv.executeSql("CREATE TABLE print_table (\n" + > > // " user_id BIGINT,\n" + > > // " item_id STRING,\n" + > > // " behavior STRING,\n" + > > // " tsdata STRING\n" + > > // ") WITH (\n" + > > // " 'connector' = 'print'\n" + > > // ")"); > > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" + > > " user_id BIGINT,\n" + > > " item_id STRING,\n" + > > " behavior STRING,\n" + > > " tsdata STRING\n" + > > ") STORED AS parquet TBLPROPERTIES (\n" + > > " 'sink.rolling-policy.file-size' = '12MB',\n" + > > " 'sink.rolling-policy.rollover-interval' = '1 min',\n" + > > " 'sink.rolling-policy.check-interval' = '1 min',\n" + > > " 'execution.checkpointing.interval' = 'true'\n" + > > ")"); > > > > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > tableEnv.executeSql("insert into streamhivetest select > > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from > > user_behavior"); > > > > tableEnv.execute("stream-write-hive"); > > > > > -- > Best regards! > Rui Li > |
Hi Dream,
> 1.一定要在flink内部先建立hive表吗? 不用,哪边建无所谓 > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗 可以,默认下 128MB 滚动,Checkpoint 滚动。 Best, Jingsong On Mon, Jul 20, 2020 at 5:15 PM Dream-底限 <[hidden email]> wrote: > hi > 好的,想问一下stream写hive表的时候: > 1、一定要在flink内部先建立hive表吗? > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗 > > Rui Li <[hidden email]> 于2020年7月20日周一 下午4:44写道: > > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈 > > > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> wrote: > > > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因: > > > > > > 异常: > > > The program finished with the following exception: > > > > > > org.apache.flink.client.program.ProgramInvocationException: The main > > method > > > caused an error: No operators defined in streaming topology. Cannot > > > generate StreamGraph. > > > at > > > > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > > > at > > > > > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > > > at > > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > > > at > > > > > > > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > > > at > > > > > > > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > > > at > > > > > > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > > > at java.security.AccessController.doPrivileged(Native Method) > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > at > > > > > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) > > > at > > > > > > > > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > > Caused by: java.lang.IllegalStateException: No operators defined in > > > streaming topology. Cannot generate StreamGraph. > > > at > > > > > > > > > org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) > > > at > > > > > > > > > org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) > > > at > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > > > at > > > > > > > > > com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80) > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > at > > > > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > > at > > > > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > at > > > > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > > > ... 11 more > > > 代码: > > > > > > StreamExecutionEnvironment environment = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > EnvironmentSettings settings = > > > > > > > > > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); > > > StreamTableEnvironment tableEnv = > > > StreamTableEnvironment.create(environment, settings); > > > > > > > > > > > > environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > > environment.setStateBackend(new MemoryStateBackend()); > > > environment.getCheckpointConfig().setCheckpointInterval(5000); > > > > > > String name = "myhive"; > > > String defaultDatabase = "tmp"; > > > String hiveConfDir = "/etc/alternatives/hive-conf/"; > > > String version = "1.1.0"; > > > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > > > hiveConfDir, version); > > > tableEnv.registerCatalog("myhive", hive); > > > tableEnv.useCatalog("myhive"); > > > > > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" + > > > " user_id BIGINT,\n" + > > > " item_id STRING,\n" + > > > " behavior STRING,\n" + > > > " ts AS PROCTIME()\n" + > > > ") WITH (\n" + > > > " 'connector' = 'kafka-0.11',\n" + > > > " 'topic' = 'user_behavior',\n" + > > > " 'properties.bootstrap.servers' = > 'localhost:9092',\n" + > > > " 'properties.group.id' = 'testGroup',\n" + > > > " 'scan.startup.mode' = 'earliest-offset',\n" + > > > " 'format' = 'json',\n" + > > > " 'json.fail-on-missing-field' = 'false',\n" + > > > " 'json.ignore-parse-errors' = 'true'\n" + > > > ")"); > > > > > > // tableEnv.executeSql("CREATE TABLE print_table (\n" + > > > // " user_id BIGINT,\n" + > > > // " item_id STRING,\n" + > > > // " behavior STRING,\n" + > > > // " tsdata STRING\n" + > > > // ") WITH (\n" + > > > // " 'connector' = 'print'\n" + > > > // ")"); > > > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > > tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" + > > > " user_id BIGINT,\n" + > > > " item_id STRING,\n" + > > > " behavior STRING,\n" + > > > " tsdata STRING\n" + > > > ") STORED AS parquet TBLPROPERTIES (\n" + > > > " 'sink.rolling-policy.file-size' = '12MB',\n" + > > > " 'sink.rolling-policy.rollover-interval' = '1 > min',\n" + > > > " 'sink.rolling-policy.check-interval' = '1 min',\n" + > > > " 'execution.checkpointing.interval' = 'true'\n" + > > > ")"); > > > > > > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > > tableEnv.executeSql("insert into streamhivetest select > > > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from > > > user_behavior"); > > > > > > tableEnv.execute("stream-write-hive"); > > > > > > > > > -- > > Best regards! > > Rui Li > > > -- Best, Jingsong Lee |
hi、
对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件 》可以,默认下 128MB 滚动,Checkpoint 滚动 Jingsong Li <[hidden email]> 于2020年7月20日周一 下午6:12写道: > Hi Dream, > > > 1.一定要在flink内部先建立hive表吗? > > 不用,哪边建无所谓 > > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗 > > 可以,默认下 128MB 滚动,Checkpoint 滚动。 > > Best, > Jingsong > > On Mon, Jul 20, 2020 at 5:15 PM Dream-底限 <[hidden email]> wrote: > > > hi > > 好的,想问一下stream写hive表的时候: > > 1、一定要在flink内部先建立hive表吗? > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗 > > > > Rui Li <[hidden email]> 于2020年7月20日周一 下午4:44写道: > > > > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈 > > > > > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> wrote: > > > > > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因: > > > > > > > > 异常: > > > > The program finished with the following exception: > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: The main > > > method > > > > caused an error: No operators defined in streaming topology. Cannot > > > > generate StreamGraph. > > > > at > > > > > > > > > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > > > > at > > > > > > > > > > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > > > > at > > > > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > > > > at > > > > > > > > > > > > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > > > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > > > > at > > > > > > > > > > > > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > > > > at > > > > > > > > > > > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > at > > > > > > > > > > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > > > Caused by: java.lang.IllegalStateException: No operators defined in > > > > streaming topology. Cannot generate StreamGraph. > > > > at > > > > > > > > > > > > > > org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) > > > > at > > > > > > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > > > > at > > > > > > > > > > > > > > com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80) > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > at > > > > > > > > > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > > > at > > > > > > > > > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > at > > > > > > > > > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > > > > ... 11 more > > > > 代码: > > > > > > > > StreamExecutionEnvironment environment = > > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > EnvironmentSettings settings = > > > > > > > > > > > > > > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); > > > > StreamTableEnvironment tableEnv = > > > > StreamTableEnvironment.create(environment, settings); > > > > > > > > > > > > > > > > > > environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > > > environment.setStateBackend(new MemoryStateBackend()); > > > > > environment.getCheckpointConfig().setCheckpointInterval(5000); > > > > > > > > String name = "myhive"; > > > > String defaultDatabase = "tmp"; > > > > String hiveConfDir = "/etc/alternatives/hive-conf/"; > > > > String version = "1.1.0"; > > > > > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > > > > hiveConfDir, version); > > > > tableEnv.registerCatalog("myhive", hive); > > > > tableEnv.useCatalog("myhive"); > > > > > > > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" + > > > > " user_id BIGINT,\n" + > > > > " item_id STRING,\n" + > > > > " behavior STRING,\n" + > > > > " ts AS PROCTIME()\n" + > > > > ") WITH (\n" + > > > > " 'connector' = 'kafka-0.11',\n" + > > > > " 'topic' = 'user_behavior',\n" + > > > > " 'properties.bootstrap.servers' = > > 'localhost:9092',\n" + > > > > " 'properties.group.id' = 'testGroup',\n" + > > > > " 'scan.startup.mode' = 'earliest-offset',\n" + > > > > " 'format' = 'json',\n" + > > > > " 'json.fail-on-missing-field' = 'false',\n" + > > > > " 'json.ignore-parse-errors' = 'true'\n" + > > > > ")"); > > > > > > > > // tableEnv.executeSql("CREATE TABLE print_table (\n" + > > > > // " user_id BIGINT,\n" + > > > > // " item_id STRING,\n" + > > > > // " behavior STRING,\n" + > > > > // " tsdata STRING\n" + > > > > // ") WITH (\n" + > > > > // " 'connector' = 'print'\n" + > > > > // ")"); > > > > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > > > tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" + > > > > " user_id BIGINT,\n" + > > > > " item_id STRING,\n" + > > > > " behavior STRING,\n" + > > > > " tsdata STRING\n" + > > > > ") STORED AS parquet TBLPROPERTIES (\n" + > > > > " 'sink.rolling-policy.file-size' = '12MB',\n" + > > > > " 'sink.rolling-policy.rollover-interval' = '1 > > min',\n" + > > > > " 'sink.rolling-policy.check-interval' = '1 min',\n" > + > > > > " 'execution.checkpointing.interval' = 'true'\n" + > > > > ")"); > > > > > > > > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > > > tableEnv.executeSql("insert into streamhivetest select > > > > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from > > > > user_behavior"); > > > > > > > > tableEnv.execute("stream-write-hive"); > > > > > > > > > > > > > -- > > > Best regards! > > > Rui Li > > > > > > > > -- > Best, Jingsong Lee > |
是的。
但是不管怎么滚动,最终都是checkpoint完成后文件才可见 On Mon, Jul 20, 2020 at 7:10 PM Dream-底限 <[hidden email]> wrote: > hi、 > 对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件 > > 》可以,默认下 128MB 滚动,Checkpoint 滚动 > > Jingsong Li <[hidden email]> 于2020年7月20日周一 下午6:12写道: > > > Hi Dream, > > > > > 1.一定要在flink内部先建立hive表吗? > > > > 不用,哪边建无所谓 > > > > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗 > > > > 可以,默认下 128MB 滚动,Checkpoint 滚动。 > > > > Best, > > Jingsong > > > > On Mon, Jul 20, 2020 at 5:15 PM Dream-底限 <[hidden email]> wrote: > > > > > hi > > > 好的,想问一下stream写hive表的时候: > > > 1、一定要在flink内部先建立hive表吗? > > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗 > > > > > > Rui Li <[hidden email]> 于2020年7月20日周一 下午4:44写道: > > > > > > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈 > > > > > > > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> > wrote: > > > > > > > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因: > > > > > > > > > > 异常: > > > > > The program finished with the following exception: > > > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: The > main > > > > method > > > > > caused an error: No operators defined in streaming topology. Cannot > > > > > generate StreamGraph. > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > > > > > at > > > > > > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > > > > > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > > > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > > > > Caused by: java.lang.IllegalStateException: No operators defined in > > > > > streaming topology. Cannot generate StreamGraph. > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > > > > > at > > > > > > > > > > > > > > > > > > > > com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80) > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > > at > > > > > > > > > > > > > > > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > > > > at > > > > > > > > > > > > > > > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > > > > > ... 11 more > > > > > 代码: > > > > > > > > > > StreamExecutionEnvironment environment = > > > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > EnvironmentSettings settings = > > > > > > > > > > > > > > > > > > > > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); > > > > > StreamTableEnvironment tableEnv = > > > > > StreamTableEnvironment.create(environment, settings); > > > > > > > > > > > > > > > > > > > > > > > > > environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > > > > environment.setStateBackend(new MemoryStateBackend()); > > > > > > > environment.getCheckpointConfig().setCheckpointInterval(5000); > > > > > > > > > > String name = "myhive"; > > > > > String defaultDatabase = "tmp"; > > > > > String hiveConfDir = "/etc/alternatives/hive-conf/"; > > > > > String version = "1.1.0"; > > > > > > > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > > > > > hiveConfDir, version); > > > > > tableEnv.registerCatalog("myhive", hive); > > > > > tableEnv.useCatalog("myhive"); > > > > > > > > > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" + > > > > > " user_id BIGINT,\n" + > > > > > " item_id STRING,\n" + > > > > > " behavior STRING,\n" + > > > > > " ts AS PROCTIME()\n" + > > > > > ") WITH (\n" + > > > > > " 'connector' = 'kafka-0.11',\n" + > > > > > " 'topic' = 'user_behavior',\n" + > > > > > " 'properties.bootstrap.servers' = > > > 'localhost:9092',\n" + > > > > > " 'properties.group.id' = 'testGroup',\n" + > > > > > " 'scan.startup.mode' = 'earliest-offset',\n" + > > > > > " 'format' = 'json',\n" + > > > > > " 'json.fail-on-missing-field' = 'false',\n" + > > > > > " 'json.ignore-parse-errors' = 'true'\n" + > > > > > ")"); > > > > > > > > > > // tableEnv.executeSql("CREATE TABLE print_table (\n" + > > > > > // " user_id BIGINT,\n" + > > > > > // " item_id STRING,\n" + > > > > > // " behavior STRING,\n" + > > > > > // " tsdata STRING\n" + > > > > > // ") WITH (\n" + > > > > > // " 'connector' = 'print'\n" + > > > > > // ")"); > > > > > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > > > > tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" + > > > > > " user_id BIGINT,\n" + > > > > > " item_id STRING,\n" + > > > > > " behavior STRING,\n" + > > > > > " tsdata STRING\n" + > > > > > ") STORED AS parquet TBLPROPERTIES (\n" + > > > > > " 'sink.rolling-policy.file-size' = '12MB',\n" + > > > > > " 'sink.rolling-policy.rollover-interval' = '1 > > > min',\n" + > > > > > " 'sink.rolling-policy.check-interval' = '1 > min',\n" > > + > > > > > " 'execution.checkpointing.interval' = 'true'\n" + > > > > > ")"); > > > > > > > > > > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > > > > tableEnv.executeSql("insert into streamhivetest select > > > > > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata > from > > > > > user_behavior"); > > > > > > > > > > tableEnv.execute("stream-write-hive"); > > > > > > > > > > > > > > > > > -- > > > > Best regards! > > > > Rui Li > > > > > > > > > > > > > -- > > Best, Jingsong Lee > > > -- Best, Jingsong Lee |
In reply to this post by Dream-底限
可以写已有的表,相关的配置 [1] 需要添加到表的property当中。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing On Mon, Jul 20, 2020 at 5:14 PM Dream-底限 <[hidden email]> wrote: > hi > 好的,想问一下stream写hive表的时候: > 1、一定要在flink内部先建立hive表吗? > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗 > > Rui Li <[hidden email]> 于2020年7月20日周一 下午4:44写道: > > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈 > > > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> wrote: > > > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因: > > > > > > 异常: > > > The program finished with the following exception: > > > > > > org.apache.flink.client.program.ProgramInvocationException: The main > > method > > > caused an error: No operators defined in streaming topology. Cannot > > > generate StreamGraph. > > > at > > > > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > > > at > > > > > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > > > at > > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > > > at > > > > > > > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > > > at > > > > > > > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > > > at > > > > > > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > > > at java.security.AccessController.doPrivileged(Native Method) > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > at > > > > > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) > > > at > > > > > > > > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > > Caused by: java.lang.IllegalStateException: No operators defined in > > > streaming topology. Cannot generate StreamGraph. > > > at > > > > > > > > > org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) > > > at > > > > > > > > > org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) > > > at > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > > > at > > > > > > > > > com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80) > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > at > > > > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > > at > > > > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > at > > > > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > > > ... 11 more > > > 代码: > > > > > > StreamExecutionEnvironment environment = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > EnvironmentSettings settings = > > > > > > > > > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); > > > StreamTableEnvironment tableEnv = > > > StreamTableEnvironment.create(environment, settings); > > > > > > > > > > > > environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > > environment.setStateBackend(new MemoryStateBackend()); > > > environment.getCheckpointConfig().setCheckpointInterval(5000); > > > > > > String name = "myhive"; > > > String defaultDatabase = "tmp"; > > > String hiveConfDir = "/etc/alternatives/hive-conf/"; > > > String version = "1.1.0"; > > > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > > > hiveConfDir, version); > > > tableEnv.registerCatalog("myhive", hive); > > > tableEnv.useCatalog("myhive"); > > > > > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" + > > > " user_id BIGINT,\n" + > > > " item_id STRING,\n" + > > > " behavior STRING,\n" + > > > " ts AS PROCTIME()\n" + > > > ") WITH (\n" + > > > " 'connector' = 'kafka-0.11',\n" + > > > " 'topic' = 'user_behavior',\n" + > > > " 'properties.bootstrap.servers' = > 'localhost:9092',\n" + > > > " 'properties.group.id' = 'testGroup',\n" + > > > " 'scan.startup.mode' = 'earliest-offset',\n" + > > > " 'format' = 'json',\n" + > > > " 'json.fail-on-missing-field' = 'false',\n" + > > > " 'json.ignore-parse-errors' = 'true'\n" + > > > ")"); > > > > > > // tableEnv.executeSql("CREATE TABLE print_table (\n" + > > > // " user_id BIGINT,\n" + > > > // " item_id STRING,\n" + > > > // " behavior STRING,\n" + > > > // " tsdata STRING\n" + > > > // ") WITH (\n" + > > > // " 'connector' = 'print'\n" + > > > // ")"); > > > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > > tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" + > > > " user_id BIGINT,\n" + > > > " item_id STRING,\n" + > > > " behavior STRING,\n" + > > > " tsdata STRING\n" + > > > ") STORED AS parquet TBLPROPERTIES (\n" + > > > " 'sink.rolling-policy.file-size' = '12MB',\n" + > > > " 'sink.rolling-policy.rollover-interval' = '1 > min',\n" + > > > " 'sink.rolling-policy.check-interval' = '1 min',\n" + > > > " 'execution.checkpointing.interval' = 'true'\n" + > > > ")"); > > > > > > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > > tableEnv.executeSql("insert into streamhivetest select > > > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from > > > user_behavior"); > > > > > > tableEnv.execute("stream-write-hive"); > > > > > > > > > -- > > Best regards! > > Rui Li > > > -- Best regards! Rui Li |
Free forum by Nabble | Edit this page |