Login  Register

flink1.11 DDL定义kafka source报错

classic Classic list List threaded Threaded
4 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

flink1.11 DDL定义kafka source报错

阿华田
33 posts
代码如下


Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

回复:flink1.11 DDL定义kafka source报错

阿华田
33 posts
错误信息:
Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.
at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59)




代码:
public class DDLSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
String  create_sql=
"create table test\n" +
"(\n" +
"name varchar,\n" +
"city varchar\n" +
")with (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal',\n" +
"'connector.topic' = 'test',\n" +
"'connector.properties.0.key' = 'group.id',\n" +
"'connector.properties.0.value' = 'test_gd',\n" +
"'connector.properties.1.key' = 'bootstrap.servers',\n" +
"'connector.properties.1.value' = '127.0.0.1:9092',\n" +
"'connector.property-version' = '1',\n" +
"'connector.startup-mode' = 'latest-offset',\n" +
"'format.type' = 'json',\n" +
"'format.property-version' = '1',\n" +
"'format.derive-schema' = 'true',\n" +
"'update-mode' = 'append')";

tableEnv.executeSql(create_sql);
Table table = tableEnv.sqlQuery("select name from test ");
TableSchema schema = table.getSchema();
System.out.println(schema);
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print();
tableEnv.execute("test");
//bsEnv.execute("fff");
}
}



| |
阿华田
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2020年08月7日 13:49,阿华田<[hidden email]> 写道:
代码如下


| |
阿华田
|
|
[hidden email]
|
签名由网易邮箱大师定制

Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: flink1.11 DDL定义kafka source报错

Evan
55 posts
In reply to this post by 阿华田
你好 :
    图片是看不到的,建议直接粘贴文本再发送一次





[hidden email]
 
发件人: 阿华田
发送时间: 2020-08-07 13:49
收件人: user-zh
主题: flink1.11 DDL定义kafka source报错
代码如下

阿华田
[hidden email]
签名由 网易邮箱大师 定制

Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: 回复:flink1.11 DDL定义kafka source报错

Evan
55 posts
In reply to this post by 阿华田
你好:
    你使用的是Flink 1.11版本,但是你的建表语句还是用的老版本,建议更换新版本的建表语句后再试一下
    参考如下:
    https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html






[hidden email]
 
发件人: 阿华田
发送时间: 2020-08-07 14:03
收件人: [hidden email]
主题: 回复:flink1.11 DDL定义kafka source报错
错误信息:
Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.
at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59)
 
 
 
 
代码:
public class DDLSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
String  create_sql=
"create table test\n" +
"(\n" +
"name varchar,\n" +
"city varchar\n" +
")with (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal',\n" +
"'connector.topic' = 'test',\n" +
"'connector.properties.0.key' = 'group.id',\n" +
"'connector.properties.0.value' = 'test_gd',\n" +
"'connector.properties.1.key' = 'bootstrap.servers',\n" +
"'connector.properties.1.value' = '127.0.0.1:9092',\n" +
"'connector.property-version' = '1',\n" +
"'connector.startup-mode' = 'latest-offset',\n" +
"'format.type' = 'json',\n" +
"'format.property-version' = '1',\n" +
"'format.derive-schema' = 'true',\n" +
"'update-mode' = 'append')";
 
tableEnv.executeSql(create_sql);
Table table = tableEnv.sqlQuery("select name from test ");
TableSchema schema = table.getSchema();
System.out.println(schema);
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print();
tableEnv.execute("test");
//bsEnv.execute("fff");
}
}
 
 
 
| |
阿华田
|
|
[hidden email]
|
签名由网易邮箱大师定制
 
 
在2020年08月7日 13:49,阿华田<[hidden email]> 写道:
代码如下
 
 
| |
阿华田
|
|
[hidden email]
|
签名由网易邮箱大师定制