源码如下: CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'buy_cnt_per_hour', 'connector.document-type' = 'user_behavior', 'connector.bulk-flush.max-actions' = '1', 'format.type' = 'json', 'update-mode' = 'append' ) import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 具体error The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'elasticsearch'
'format.type' expects 'csv', but is 'json'
The following properties are requested:
connector.bulk-flush.max-actions=1
connector.document-type=user_behavior
connector.hosts=http://localhost:9200
connector.index=buy_cnt_per_hour
connector.type=elasticsearch
connector.version=6
format.type=json
schema.0.data-type=BIGINT
schema.0.name=hour_of_day
schema.1.data-type=BIGINT
schema.1.name=buy_cnt
update-mode=append |
Hi, 出发
看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem connector只支持csv format,所以会有这个错误。 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> Best, Leonard [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector> > 在 2020年3月23日,23:30,出发 <[hidden email]> 写道: > > > 源码如下: > CREATE TABLE buy_cnt_per_hour ( > hour_of_day BIGINT, > buy_cnt BIGINT > ) WITH ( > 'connector.type' = 'elasticsearch', > 'connector.version' = '6', > 'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200', > 'connector.index' = 'buy_cnt_per_hour', > 'connector.document-type' = 'user_behavior', > 'connector.bulk-flush.max-actions' = '1', > 'format.type' = 'json', > 'update-mode' = 'append' > ) > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > > public class ESTest { > > public static void main(String[] args) throws Exception { > > //2、设置运行环境 > StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); > streamEnv.setParallelism(1); > String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, buy_cnt BIGINT " > + ") WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6'," > + " 'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200', 'connector.index' = 'buy_cnt_per_hour'," > + " 'connector.document-type' = 'user_behavior'," > + " 'connector.bulk-flush.max-actions' = '1',\n" + " 'format.type' = 'json'," > + " 'update-mode' = 'append' )"; > tableEnv.sqlUpdate(sinkDDL); > Table table = tableEnv.sqlQuery("select * from test_es "); > tableEnv.toRetractStream(table, Row.class).print(); > streamEnv.execute(""); > } > > } > 具体error > The matching candidates: > org.apache.flink.table.sources.CsvAppendTableSourceFactory > Mismatched properties: > 'connector.type' expects 'filesystem', but is 'elasticsearch' > 'format.type' expects 'csv', but is 'json' > > The following properties are requested: > connector.bulk-flush.max-actions=1 > connector.document-type=user_behavior > connector.hosts=http://localhost:9200 > connector.index=buy_cnt_per_hour > connector.type=elasticsearch > connector.version=6 > format.type=json > schema.0.data-type=BIGINT > schema.0.name=hour_of_day > schema.1.data-type=BIGINT > schema.1.name=buy_cnt > update-mode=append |
hi,Leonar Xu
官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢? 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png Best Wishes! zhisheng Leonard Xu <[hidden email]> 于2020年3月24日周二 下午5:53写道: > Hi, 出发 > 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem > connector只支持csv format,所以会有这个错误。 > 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。 > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > </dependency> > > Best, > Leonard > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector > < > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector > > > > > > 在 2020年3月23日,23:30,出发 <[hidden email]> 写道: > > > > > > 源码如下: > > CREATE TABLE buy_cnt_per_hour ( > > hour_of_day BIGINT, > > buy_cnt BIGINT > > ) WITH ( > > 'connector.type' = 'elasticsearch', > > 'connector.version' = '6', > > 'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200', > > 'connector.index' = 'buy_cnt_per_hour', > > 'connector.document-type' = 'user_behavior', > > 'connector.bulk-flush.max-actions' = '1', > > 'format.type' = 'json', > > 'update-mode' = 'append' > > ) > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > import org.apache.flink.table.api.EnvironmentSettings; > > import org.apache.flink.table.api.Table; > > import org.apache.flink.table.api.java.StreamTableEnvironment; > > import org.apache.flink.types.Row; > > > > public class ESTest { > > > > public static void main(String[] args) throws Exception { > > > > //2、设置运行环境 > > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); > > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(streamEnv, settings); > > streamEnv.setParallelism(1); > > String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, > buy_cnt BIGINT " > > + ") WITH ( 'connector.type' = 'elasticsearch', > 'connector.version' = '6'," > > + " 'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200', > 'connector.index' = 'buy_cnt_per_hour'," > > + " 'connector.document-type' = 'user_behavior'," > > + " 'connector.bulk-flush.max-actions' = '1',\n" + " > 'format.type' = 'json'," > > + " 'update-mode' = 'append' )"; > > tableEnv.sqlUpdate(sinkDDL); > > Table table = tableEnv.sqlQuery("select * from test_es "); > > tableEnv.toRetractStream(table, Row.class).print(); > > streamEnv.execute(""); > > } > > > > } > > 具体error > > The matching candidates: > > org.apache.flink.table.sources.CsvAppendTableSourceFactory > > Mismatched properties: > > 'connector.type' expects 'filesystem', but is 'elasticsearch' > > 'format.type' expects 'csv', but is 'json' > > > > The following properties are requested: > > connector.bulk-flush.max-actions=1 > > connector.document-type=user_behavior > > connector.hosts=http://localhost:9200 > > connector.index=buy_cnt_per_hour > > connector.type=elasticsearch > > connector.version=6 > > format.type=json > > schema.0.data-type=BIGINT > > schema.0.name=hour_of_day > > schema.1.data-type=BIGINT > > schema.1.name=buy_cnt > > update-mode=append > > |
优秀!可以提个improve issue
Best Regards [hidden email] > 2020年3月25日 下午1:40,zhisheng <[hidden email]> 写道: > > hi,Leonar Xu > > 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢? > > 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png > > Best Wishes! > > zhisheng > > Leonard Xu <[hidden email]> 于2020年3月24日周二 下午5:53写道: > >> Hi, 出发 >> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem >> connector只支持csv format,所以会有这个错误。 >> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。 >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-json</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> Best, >> Leonard >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector >> < >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector >>> >> >> >>> 在 2020年3月23日,23:30,出发 <[hidden email]> 写道: >>> >>> >>> 源码如下: >>> CREATE TABLE buy_cnt_per_hour ( >>> hour_of_day BIGINT, >>> buy_cnt BIGINT >>> ) WITH ( >>> 'connector.type' = 'elasticsearch', >>> 'connector.version' = '6', >>> 'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200', >>> 'connector.index' = 'buy_cnt_per_hour', >>> 'connector.document-type' = 'user_behavior', >>> 'connector.bulk-flush.max-actions' = '1', >>> 'format.type' = 'json', >>> 'update-mode' = 'append' >>> ) >>> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.Table; >>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>> import org.apache.flink.types.Row; >>> >>> public class ESTest { >>> >>> public static void main(String[] args) throws Exception { >>> >>> //2、设置运行环境 >>> StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >>> EnvironmentSettings settings = >> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); >>> StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(streamEnv, settings); >>> streamEnv.setParallelism(1); >>> String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, >> buy_cnt BIGINT " >>> + ") WITH ( 'connector.type' = 'elasticsearch', >> 'connector.version' = '6'," >>> + " 'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200', >> 'connector.index' = 'buy_cnt_per_hour'," >>> + " 'connector.document-type' = 'user_behavior'," >>> + " 'connector.bulk-flush.max-actions' = '1',\n" + " >> 'format.type' = 'json'," >>> + " 'update-mode' = 'append' )"; >>> tableEnv.sqlUpdate(sinkDDL); >>> Table table = tableEnv.sqlQuery("select * from test_es "); >>> tableEnv.toRetractStream(table, Row.class).print(); >>> streamEnv.execute(""); >>> } >>> >>> } >>> 具体error >>> The matching candidates: >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>> Mismatched properties: >>> 'connector.type' expects 'filesystem', but is 'elasticsearch' >>> 'format.type' expects 'csv', but is 'json' >>> >>> The following properties are requested: >>> connector.bulk-flush.max-actions=1 >>> connector.document-type=user_behavior >>> connector.hosts=http://localhost:9200 >>> connector.index=buy_cnt_per_hour >>> connector.type=elasticsearch >>> connector.version=6 >>> format.type=json >>> schema.0.data-type=BIGINT >>> schema.0.name=hour_of_day >>> schema.1.data-type=BIGINT >>> schema.1.name=buy_cnt >>> update-mode=append >> >> |
👍, zhisheng
我觉得支持ES鉴权在生产中是蛮有用的功能,nice to have, 如jinhai所说,可以先提个improvement的issue,在社区里讨论下(具体参数名,这些参数应该是可选的),讨论一致后开PR就可以了。 Best, Leonard > 在 2020年3月25日,13:51,jinhai wang <[hidden email]> 写道: > > 优秀!可以提个improve issue > > > Best Regards > > [hidden email] > >> 2020年3月25日 下午1:40,zhisheng <[hidden email]> 写道: >> >> hi,Leonar Xu >> >> 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢? >> >> 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png >> >> Best Wishes! >> >> zhisheng >> >> Leonard Xu <[hidden email]> 于2020年3月24日周二 下午5:53写道: >> >>> Hi, 出发 >>> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem >>> connector只支持csv format,所以会有这个错误。 >>> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。 >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-json</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> >>> Best, >>> Leonard >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector >>> < >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector >>>> >>> >>> >>>> 在 2020年3月23日,23:30,出发 <[hidden email]> 写道: >>>> >>>> >>>> 源码如下: >>>> CREATE TABLE buy_cnt_per_hour ( >>>> hour_of_day BIGINT, >>>> buy_cnt BIGINT >>>> ) WITH ( >>>> 'connector.type' = 'elasticsearch', >>>> 'connector.version' = '6', >>>> 'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200', >>>> 'connector.index' = 'buy_cnt_per_hour', >>>> 'connector.document-type' = 'user_behavior', >>>> 'connector.bulk-flush.max-actions' = '1', >>>> 'format.type' = 'json', >>>> 'update-mode' = 'append' >>>> ) >>>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> import org.apache.flink.table.api.EnvironmentSettings; >>>> import org.apache.flink.table.api.Table; >>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>> import org.apache.flink.types.Row; >>>> >>>> public class ESTest { >>>> >>>> public static void main(String[] args) throws Exception { >>>> >>>> //2、设置运行环境 >>>> StreamExecutionEnvironment streamEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> EnvironmentSettings settings = >>> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); >>>> StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(streamEnv, settings); >>>> streamEnv.setParallelism(1); >>>> String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, >>> buy_cnt BIGINT " >>>> + ") WITH ( 'connector.type' = 'elasticsearch', >>> 'connector.version' = '6'," >>>> + " 'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200', >>> 'connector.index' = 'buy_cnt_per_hour'," >>>> + " 'connector.document-type' = 'user_behavior'," >>>> + " 'connector.bulk-flush.max-actions' = '1',\n" + " >>> 'format.type' = 'json'," >>>> + " 'update-mode' = 'append' )"; >>>> tableEnv.sqlUpdate(sinkDDL); >>>> Table table = tableEnv.sqlQuery("select * from test_es "); >>>> tableEnv.toRetractStream(table, Row.class).print(); >>>> streamEnv.execute(""); >>>> } >>>> >>>> } >>>> 具体error >>>> The matching candidates: >>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>> Mismatched properties: >>>> 'connector.type' expects 'filesystem', but is 'elasticsearch' >>>> 'format.type' expects 'csv', but is 'json' >>>> >>>> The following properties are requested: >>>> connector.bulk-flush.max-actions=1 >>>> connector.document-type=user_behavior >>>> connector.hosts=http://localhost:9200 >>>> connector.index=buy_cnt_per_hour >>>> connector.type=elasticsearch >>>> connector.version=6 >>>> format.type=json >>>> schema.0.data-type=BIGINT >>>> schema.0.name=hour_of_day >>>> schema.1.data-type=BIGINT >>>> schema.1.name=buy_cnt >>>> update-mode=append >>> >>> > |
In reply to this post by kcz
Hello,
这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作 真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。 而tableEnv.toRetractStream(table, Row.class).print(); 这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。 2020年7月9日15:31:56 ------------------ 原始邮件 ------------------ 发件人: "出发"<[hidden email]>; 发送时间: 2020年3月23日(星期一) 晚上11:30 收件人: "user-zh"<[hidden email]>; 主题: ddl es 报错 源码如下: CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200', 'connector.index' = 'buy_cnt_per_hour', 'connector.document-type' = 'user_behavior', 'connector.bulk-flush.max-actions' = '1', 'format.type' = 'json', 'update-mode' = 'append' ) import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class ESTest { public static void main(String[] args) throws Exception { //2、设置运行环境 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); streamEnv.setParallelism(1); String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT " + ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6'," + "'connector.hosts' = '<a href="http://localhost:9200','connector.index'">http://localhost:9200','connector.index' = 'buy_cnt_per_hour'," + "'connector.document-type' = 'user_behavior'," + "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json'," + "'update-mode' = 'append' )"; tableEnv.sqlUpdate(sinkDDL); Table table = tableEnv.sqlQuery("select * from test_es "); tableEnv.toRetractStream(table, Row.class).print(); streamEnv.execute(""); } } 具体error The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json'The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append |
Free forum by Nabble | Edit this page |