|
错误信息:
Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59) 代码: public class DDLSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String create_sql= "create table test\n" + "(\n" + "name varchar,\n" + "city varchar\n" + ")with (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'test',\n" + "'connector.properties.0.key' = 'group.id',\n" + "'connector.properties.0.value' = 'test_gd',\n" + "'connector.properties.1.key' = 'bootstrap.servers',\n" + "'connector.properties.1.value' = '127.0.0.1:9092',\n" + "'connector.property-version' = '1',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'format.type' = 'json',\n" + "'format.property-version' = '1',\n" + "'format.derive-schema' = 'true',\n" + "'update-mode' = 'append')"; tableEnv.executeSql(create_sql); Table table = tableEnv.sqlQuery("select name from test "); TableSchema schema = table.getSchema(); System.out.println(schema); DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class); tuple2DataStream.print(); tableEnv.execute("test"); //bsEnv.execute("fff"); } } | | 阿华田 | | [hidden email] | 签名由网易邮箱大师定制 在2020年08月7日 13:49,阿华田<[hidden email]> 写道: 代码如下 | | 阿华田 | | [hidden email] | 签名由网易邮箱大师定制 |
In reply to this post by 阿华田
你好 :
图片是看不到的,建议直接粘贴文本再发送一次 [hidden email] 发件人: 阿华田 发送时间: 2020-08-07 13:49 收件人: user-zh 主题: flink1.11 DDL定义kafka source报错 代码如下 阿华田 [hidden email] 签名由 网易邮箱大师 定制 |
In reply to this post by 阿华田
你好:
你使用的是Flink 1.11版本,但是你的建表语句还是用的老版本,建议更换新版本的建表语句后再试一下 参考如下: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html [hidden email] 发件人: 阿华田 发送时间: 2020-08-07 14:03 收件人: [hidden email] 主题: 回复:flink1.11 DDL定义kafka source报错 错误信息: Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59) 代码: public class DDLSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String create_sql= "create table test\n" + "(\n" + "name varchar,\n" + "city varchar\n" + ")with (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'test',\n" + "'connector.properties.0.key' = 'group.id',\n" + "'connector.properties.0.value' = 'test_gd',\n" + "'connector.properties.1.key' = 'bootstrap.servers',\n" + "'connector.properties.1.value' = '127.0.0.1:9092',\n" + "'connector.property-version' = '1',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'format.type' = 'json',\n" + "'format.property-version' = '1',\n" + "'format.derive-schema' = 'true',\n" + "'update-mode' = 'append')"; tableEnv.executeSql(create_sql); Table table = tableEnv.sqlQuery("select name from test "); TableSchema schema = table.getSchema(); System.out.println(schema); DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class); tuple2DataStream.print(); tableEnv.execute("test"); //bsEnv.execute("fff"); } } | | 阿华田 | | [hidden email] | 签名由网易邮箱大师定制 在2020年08月7日 13:49,阿华田<[hidden email]> 写道: 代码如下 | | 阿华田 | | [hidden email] | 签名由网易邮箱大师定制 |
Free forum by Nabble | Edit this page |