flink 1.11 sql作业提交JM报错

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 sql作业提交JM报错

sunfulin
hi,
我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。
如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?


Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.
at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0]
... 24 more
Reply | Threaded
Open this post in threaded view
|

Re:flink 1.11 sql作业提交JM报错

sunfulin



hi,
在JM日志中还有如下异常:这个也比较诡异。求大神帮忙解答下。


java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:67) ~[763f9e05-39d4-4c70-bf9c-e3bea7ef0e0f_FatJob-1.0.jar:?]
at com.htsc.crm_realtime.fatjob.Jobs.sensordata.SensorDataETLTask.main(SensorDataETLTask.java:47) ~[763f9e05-39d4-4c70-bf9c-e3bea7ef0e0f_FatJob-1.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_201]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_201]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_201]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:99) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) [?:1.8.0_201]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_201]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_201]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_201]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_201]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]














在 2020-07-11 22:24:51,"sunfulin" <[hidden email]> 写道:

>hi,
>我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。
>如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?
>
>
>Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.
>at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0]
>at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0]
>... 24 more
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 sql作业提交JM报错

Leonard Xu
In reply to this post by sunfulin
HI, fulin

能大致贴下代码吗?能复现异常即可。简单说下这两个方法,
 TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。
Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法, 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”), TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job, 这应该不是用户需要的。
具体使用根据你的需要来使用。


Best,
Leonard Xu


> 在 2020年7月11日,22:24,sunfulin <[hidden email]> 写道:
>
> statementset.execute

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 sql作业提交JM报错

godfrey he
hi sunfulin,

1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]



对于 “No operators defined in streaming topology.”,如果使用
TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用
StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()
提交作业,就会出现前面的错误。

对于
“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset

Best,
Godfrey

Leonard Xu <[hidden email]> 于2020年7月12日周日 下午1:48写道:

> HI, fulin
>
> 能大致贴下代码吗?能复现异常即可。简单说下这两个方法,
>  TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是
> DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink
> job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …)
> 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。
> Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法,
> 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert
> tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”),
> TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job,
> 这应该不是用户需要的。
> 具体使用根据你的需要来使用。
>
>
> Best,
> Leonard Xu
>
>
> 在 2020年7月11日,22:24,sunfulin <[hidden email]> 写道:
>
> statementset.execute
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink 1.11 sql作业提交JM报错

sunfulin
hi,
感谢详细的解释和回复。那问题就清楚了。之前我们的job提交框架里统一都使用了StreamExecutionEnvironment.execute(jobName)方法,现在基于这个解释就明白了。

















在 2020-07-12 22:55:34,"godfrey he" <[hidden email]> 写道:

>hi sunfulin,
>
>1.11 对 StreamTableEnvironment.execute()
>和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
>简单概述为:
>1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
>2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
>3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
>(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
>或 StreamExecutionEnvironment.execute()
>
>详细可以参考 [1] [2]
>
>
>
>对于 “No operators defined in streaming topology.”,如果使用
>TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用
>StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()
>提交作业,就会出现前面的错误。
>
>对于
>“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?
>
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset
>
>Best,
>Godfrey
>
>Leonard Xu <[hidden email]> 于2020年7月12日周日 下午1:48写道:
>
>> HI, fulin
>>
>> 能大致贴下代码吗?能复现异常即可。简单说下这两个方法,
>>  TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是
>> DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink
>> job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …)
>> 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。
>> Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法,
>> 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert
>> tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”),
>> TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job,
>> 这应该不是用户需要的。
>> 具体使用根据你的需要来使用。
>>
>>
>> Best,
>> Leonard Xu
>>
>>
>> 在 2020年7月11日,22:24,sunfulin <[hidden email]> 写道:
>>
>> statementset.execute
>>
>>
>>