hi,
我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute? Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more |
hi, 在JM日志中还有如下异常:这个也比较诡异。求大神帮忙解答下。 java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:67) ~[763f9e05-39d4-4c70-bf9c-e3bea7ef0e0f_FatJob-1.0.jar:?] at com.htsc.crm_realtime.fatjob.Jobs.sensordata.SensorDataETLTask.main(SensorDataETLTask.java:47) ~[763f9e05-39d4-4c70-bf9c-e3bea7ef0e0f_FatJob-1.0.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_201] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_201] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_201] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:99) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) [?:1.8.0_201] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_201] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_201] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_201] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_201] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201] 在 2020-07-11 22:24:51,"sunfulin" <[hidden email]> 写道: >hi, >我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 >如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute? > > >Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. >at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] >at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] >at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] >at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] >... 24 more |
In reply to this post by sunfulin
HI, fulin
能大致贴下代码吗?能复现异常即可。简单说下这两个方法, TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。 Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法, 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”), TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job, 这应该不是用户需要的。 具体使用根据你的需要来使用。 Best, Leonard Xu > 在 2020年7月11日,22:24,sunfulin <[hidden email]> 写道: > > statementset.execute |
hi sunfulin,
1.11 对 StreamTableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 的执行方式有所调整, 简单概述为: 1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业; 2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业; 3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业 (异步提交作业),不需要再调用 StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute() 详细可以参考 [1] [2] 对于 “No operators defined in streaming topology.”,如果使用 TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用 StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute() 提交作业,就会出现前面的错误。 对于 “是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset Best, Godfrey Leonard Xu <[hidden email]> 于2020年7月12日周日 下午1:48写道: > HI, fulin > > 能大致贴下代码吗?能复现异常即可。简单说下这两个方法, > TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 > DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink > job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) > 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。 > Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法, > 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert > tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”), > TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job, > 这应该不是用户需要的。 > 具体使用根据你的需要来使用。 > > > Best, > Leonard Xu > > > 在 2020年7月11日,22:24,sunfulin <[hidden email]> 写道: > > statementset.execute > > > |
hi,
感谢详细的解释和回复。那问题就清楚了。之前我们的job提交框架里统一都使用了StreamExecutionEnvironment.execute(jobName)方法,现在基于这个解释就明白了。 在 2020-07-12 22:55:34,"godfrey he" <[hidden email]> 写道: >hi sunfulin, > >1.11 对 StreamTableEnvironment.execute() >和 StreamExecutionEnvironment.execute() 的执行方式有所调整, >简单概述为: >1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业; >2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业; >3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业 >(异步提交作业),不需要再调用 StreamTableEnvironment.execute() >或 StreamExecutionEnvironment.execute() > >详细可以参考 [1] [2] > > > >对于 “No operators defined in streaming topology.”,如果使用 >TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用 >StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute() >提交作业,就会出现前面的错误。 > >对于 >“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗? > > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2 >[2] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset > >Best, >Godfrey > >Leonard Xu <[hidden email]> 于2020年7月12日周日 下午1:48写道: > >> HI, fulin >> >> 能大致贴下代码吗?能复现异常即可。简单说下这两个方法, >> TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 >> DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink >> job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) >> 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。 >> Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法, >> 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert >> tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”), >> TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job, >> 这应该不是用户需要的。 >> 具体使用根据你的需要来使用。 >> >> >> Best, >> Leonard Xu >> >> >> 在 2020年7月11日,22:24,sunfulin <[hidden email]> 写道: >> >> statementset.execute >> >> >> |
Free forum by Nabble | Edit this page |