你好!
我通过表环境执行insert into语句提交作业,我该如何设置我的job名称呢? 程序: EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); String sourceDDL = "CREATE TABLE datagen ( " + " f_random INT, " + " f_random_str STRING, " + " ts AS localtimestamp, " + " WATERMARK FOR ts AS ts " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='10', " + " 'fields.f_random.min'='1', " + " 'fields.f_random.max'='5', " + " 'fields.f_random_str.length'='10' " + ")"; bsTableEnv.executeSql(sourceDDL); Table datagen = bsTableEnv.from("datagen"); System.out.println(datagen.getSchema()); String sinkDDL = "CREATE TABLE print_table (" + " f_random int," + " c_val bigint, " + " wStart TIMESTAMP(3) " + ") WITH ('connector' = 'print') "; bsTableEnv.executeSql(sinkDDL); System.out.println(bsTableEnv.from("print_table").getSchema()); Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by TUMBLE(ts, INTERVAL '5' second), f_random"); bsTableEnv.executeSql("insert into print_table select * from " + table); |
据我所知,这种执行方式目前没法设置 jobName
> 2020年8月21日 上午11:11,Asahi Lee <[hidden email]> 写道: > > 你好! > 我通过表环境执行insert into语句提交作业,我该如何设置我的job名称呢? > > > 程序: > EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); > TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); > > String sourceDDL = "CREATE TABLE datagen ( " + > " f_random INT, " + > " f_random_str STRING, " + > " ts AS localtimestamp, " + > " WATERMARK FOR ts AS ts " + > ") WITH ( " + > " 'connector' = 'datagen', " + > " 'rows-per-second'='10', " + > " 'fields.f_random.min'='1', " + > " 'fields.f_random.max'='5', " + > " 'fields.f_random_str.length'='10' " + > ")"; > > bsTableEnv.executeSql(sourceDDL); > Table datagen = bsTableEnv.from("datagen"); > > System.out.println(datagen.getSchema()); > > String sinkDDL = "CREATE TABLE print_table (" + > " f_random int," + > " c_val bigint, " + > " wStart TIMESTAMP(3) " + > ") WITH ('connector' = 'print') "; > bsTableEnv.executeSql(sinkDDL); > > System.out.println(bsTableEnv.from("print_table").getSchema()); > > Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by TUMBLE(ts, INTERVAL '5' second), f_random"); > bsTableEnv.executeSql("insert into print_table select * from " + table); |
FYI: 有一个issue[1] 正在跟进和解决这个问题
[1] https://issues.apache.org/jira/browse/FLINK-18545 Zou Dan <[hidden email]> 于2020年8月23日周日 下午2:29写道: > 据我所知,这种执行方式目前没法设置 jobName > > > 2020年8月21日 上午11:11,Asahi Lee <[hidden email]> 写道: > > > > 你好! > > 我通过表环境执行insert into语句提交作业,我该如何设置我的job名称呢? > > > > > > 程序: > > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().build(); > > TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); > > > > String sourceDDL = "CREATE TABLE datagen ( " + > > " f_random INT, " + > > " f_random_str STRING, " + > > " ts AS localtimestamp, " + > > " WATERMARK FOR ts AS ts " + > > ") WITH ( " + > > " 'connector' = 'datagen', " + > > " 'rows-per-second'='10', " + > > " 'fields.f_random.min'='1', " + > > " 'fields.f_random.max'='5', " + > > " 'fields.f_random_str.length'='10' " + > > ")"; > > > > bsTableEnv.executeSql(sourceDDL); > > Table datagen = bsTableEnv.from("datagen"); > > > > System.out.println(datagen.getSchema()); > > > > String sinkDDL = "CREATE TABLE print_table (" + > > " f_random int," + > > " c_val bigint, " + > > " wStart TIMESTAMP(3) " + > > ") WITH ('connector' = 'print') "; > > bsTableEnv.executeSql(sinkDDL); > > > > System.out.println(bsTableEnv.from("print_table").getSchema()); > > > > Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), > TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by > TUMBLE(ts, INTERVAL '5' second), f_random"); > > bsTableEnv.executeSql("insert into print_table select * from " + table); > > > -- Best, Benchao Li |
In reply to this post by Zou Dan
那sql执行的这种模式,是否可以考虑添加job名称的设置呢?
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月23日(星期天) 下午2:29 收件人: "user-zh"<[hidden email]>; 主题: Re: 1.11版本,关于TableEnvironment.executeSql("insert into ..."),job名称设置的问题 据我所知,这种执行方式目前没法设置 jobName > 2020年8月21日 上午11:11,Asahi Lee <[hidden email]> 写道: > > 你好! > &nbsp; &nbsp; &nbsp;我通过表环境执行insert into语句提交作业,我该如何设置我的job名称呢? > > > 程序: > EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); > TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); > > String sourceDDL = "CREATE TABLE datagen ( " + > " f_random INT, " + > " f_random_str STRING, " + > " ts AS localtimestamp, " + > " WATERMARK FOR ts AS ts " + > ") WITH ( " + > " 'connector' = 'datagen', " + > " 'rows-per-second'='10', " + > " 'fields.f_random.min'='1', " + > " 'fields.f_random.max'='5', " + > " 'fields.f_random_str.length'='10' " + > ")"; > > bsTableEnv.executeSql(sourceDDL); > Table datagen = bsTableEnv.from("datagen"); > > System.out.println(datagen.getSchema()); > > String sinkDDL = "CREATE TABLE print_table (" + > " f_random int," + > " c_val bigint, " + > " wStart TIMESTAMP(3) " + > ") WITH ('connector' = 'print') "; > bsTableEnv.executeSql(sinkDDL); > > System.out.println(bsTableEnv.from("print_table").getSchema()); > > Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by TUMBLE(ts, INTERVAL '5' second), f_random"); > bsTableEnv.executeSql("insert into print_table select * from " + table); |
In reply to this post by Asahi Lee
>我通过表环境执行insert into语句提交作业,我该如何设置我的job名称呢?
Hi, 社区也有相关讨论 https://issues.apache.org/jira/browse/FLINK-18545 之前在有看到一些相关的解决方案,可以参考 https://www.jianshu.com/p/5981646cb1d4 https://www.cnblogs.com/Springmoon-venn/p/13375972.html Best, DanielGu -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |