我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
连接es的时候报错,findAndCreateTableSink failed。 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 Caused by Could not find a suitable factory for ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath. Reason: Required context properties mismatch from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch def area_cnts(): s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) s_env.set_parallelism(1) # use blink table planner st_env = StreamTableEnvironment \ .create(s_env, environment_settings=EnvironmentSettings .new_instance() .in_streaming_mode() .use_blink_planner().build()) # register source and sink register_rides_source(st_env) register_cnt_sink(st_env) # query st_env.from_path("source")\ .group_by("taxiId")\ .select("taxiId, count(1) as cnt")\ .insert_into("sink") # execute st_env.execute("6-write_with_elasticsearch") def register_rides_source(st_env): st_env \ .connect( # declare the external system to connect to Kafka() .version("universal") .topic("Rides") .start_from_earliest() .property("zookeeper.connect", "zookeeper:2181") .property("bootstrap.servers", "kafka:9092")) \ .with_format( # declare a format for this system Json() .fail_on_missing_field(True) .schema(DataTypes.ROW([ DataTypes.FIELD("rideId", DataTypes.BIGINT()), DataTypes.FIELD("isStart", DataTypes.BOOLEAN()), DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()), DataTypes.FIELD("lon", DataTypes.FLOAT()), DataTypes.FIELD("lat", DataTypes.FLOAT()), DataTypes.FIELD("psgCnt", DataTypes.INT()), DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \ .with_schema( # declare the schema of the table Schema() .field("rideId", DataTypes.BIGINT()) .field("taxiId", DataTypes.BIGINT()) .field("isStart", DataTypes.BOOLEAN()) .field("lon", DataTypes.FLOAT()) .field("lat", DataTypes.FLOAT()) .field("psgCnt", DataTypes.INT()) .field("rideTime", DataTypes.TIMESTAMP()) .rowtime( Rowtime() .timestamps_from_field("eventTime") .watermarks_periodic_bounded(60000))) \ .in_append_mode() \ .register_table_source("source") def register_cnt_sink(st_env): st_env.connect( Elasticsearch() .version("6") .host("elasticsearch", 9200, "http") .index("taxiid-cnts") .document_type('taxiidcnt') .key_delimiter("$")) \ .with_schema( Schema() .field("taxiId", DataTypes.BIGINT()) .field("cnt", DataTypes.BIGINT())) \ .with_format( Json() .derive_schema()) \ .in_upsert_mode() \ .register_table_sink("sink") if __name__ == '__main__': area_cnts() |
I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.
> 在 2020年6月16日,下午1:47,jack <[hidden email]> 写道: > > 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 > 连接es的时候报错,findAndCreateTableSink failed。 > 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 > > Caused by Could not find a suitable factory for ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath. > Reason: Required context properties mismatch > > > > from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic > from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings > from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch > > > def area_cnts(): > s_env = StreamExecutionEnvironment.get_execution_environment() > s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > s_env.set_parallelism(1) > > # use blink table planner > st_env = StreamTableEnvironment \ > .create(s_env, environment_settings=EnvironmentSettings > .new_instance() > .in_streaming_mode() > .use_blink_planner().build()) > > # register source and sink > register_rides_source(st_env) > register_cnt_sink(st_env) > > # query > st_env.from_path("source")\ > .group_by("taxiId")\ > .select("taxiId, count(1) as cnt")\ > .insert_into("sink") > > # execute > st_env.execute("6-write_with_elasticsearch") > > > def register_rides_source(st_env): > st_env \ > .connect( # declare the external system to connect to > Kafka() > .version("universal") > .topic("Rides") > .start_from_earliest() > .property("zookeeper.connect", "zookeeper:2181") > .property("bootstrap.servers", "kafka:9092")) \ > .with_format( # declare a format for this system > Json() > .fail_on_missing_field(True) > .schema(DataTypes.ROW([ > DataTypes.FIELD("rideId", DataTypes.BIGINT()), > DataTypes.FIELD("isStart", DataTypes.BOOLEAN()), > DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()), > DataTypes.FIELD("lon", DataTypes.FLOAT()), > DataTypes.FIELD("lat", DataTypes.FLOAT()), > DataTypes.FIELD("psgCnt", DataTypes.INT()), > DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \ > .with_schema( # declare the schema of the table > Schema() > .field("rideId", DataTypes.BIGINT()) > .field("taxiId", DataTypes.BIGINT()) > .field("isStart", DataTypes.BOOLEAN()) > .field("lon", DataTypes.FLOAT()) > .field("lat", DataTypes.FLOAT()) > .field("psgCnt", DataTypes.INT()) > .field("rideTime", DataTypes.TIMESTAMP()) > .rowtime( > Rowtime() > .timestamps_from_field("eventTime") > .watermarks_periodic_bounded(60000))) \ > .in_append_mode() \ > .register_table_source("source") > > > def register_cnt_sink(st_env): > st_env.connect( > Elasticsearch() > .version("6") > .host("elasticsearch", 9200, "http") > .index("taxiid-cnts") > .document_type('taxiidcnt') > .key_delimiter("$")) \ > .with_schema( > Schema() > .field("taxiId", DataTypes.BIGINT()) > .field("cnt", DataTypes.BIGINT())) \ > .with_format( > Json() > .derive_schema()) \ > .in_upsert_mode() \ > .register_table_sink("sink") > > > if __name__ == '__main__': > area_cnts() > |
连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>> st_env.connect( >> Elasticsearch() >> .version("5") >> .host("localhost", 9200, "http") >> .index("taxiid-cnts") >> .document_type('taxiidcnt') >> .key_delimiter("$")) \ 在 2020-06-16 15:38:28,"Dian Fu" <[hidden email]> 写道: >I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`. > >> 在 2020年6月16日,下午1:47,jack <[hidden email]> 写道: >> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 >> 连接es的时候报错,findAndCreateTableSink failed。 >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 >> >> Caused by Could not find a suitable factory for ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath. >> Reason: Required context properties mismatch >> >> >> >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic >> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch >> >> >> def area_cnts(): >> s_env = StreamExecutionEnvironment.get_execution_environment() >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> s_env.set_parallelism(1) >> >> # use blink table planner >> st_env = StreamTableEnvironment \ >> .create(s_env, environment_settings=EnvironmentSettings >> .new_instance() >> .in_streaming_mode() >> .use_blink_planner().build()) >> >> # register source and sink >> register_rides_source(st_env) >> register_cnt_sink(st_env) >> >> # query >> st_env.from_path("source")\ >> .group_by("taxiId")\ >> .select("taxiId, count(1) as cnt")\ >> .insert_into("sink") >> >> # execute >> st_env.execute("6-write_with_elasticsearch") >> >> >> def register_rides_source(st_env): >> st_env \ >> .connect( # declare the external system to connect to >> Kafka() >> .version("universal") >> .topic("Rides") >> .start_from_earliest() >> .property("zookeeper.connect", "zookeeper:2181") >> .property("bootstrap.servers", "kafka:9092")) \ >> .with_format( # declare a format for this system >> Json() >> .fail_on_missing_field(True) >> .schema(DataTypes.ROW([ >> DataTypes.FIELD("rideId", DataTypes.BIGINT()), >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()), >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()), >> DataTypes.FIELD("lon", DataTypes.FLOAT()), >> DataTypes.FIELD("lat", DataTypes.FLOAT()), >> DataTypes.FIELD("psgCnt", DataTypes.INT()), >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \ >> .with_schema( # declare the schema of the table >> Schema() >> .field("rideId", DataTypes.BIGINT()) >> .field("taxiId", DataTypes.BIGINT()) >> .field("isStart", DataTypes.BOOLEAN()) >> .field("lon", DataTypes.FLOAT()) >> .field("lat", DataTypes.FLOAT()) >> .field("psgCnt", DataTypes.INT()) >> .field("rideTime", DataTypes.TIMESTAMP()) >> .rowtime( >> Rowtime() >> .timestamps_from_field("eventTime") >> .watermarks_periodic_bounded(60000))) \ >> .in_append_mode() \ >> .register_table_source("source") >> >> >> def register_cnt_sink(st_env): >> st_env.connect( >> Elasticsearch() >> .version("6") >> .host("elasticsearch", 9200, "http") >> .index("taxiid-cnts") >> .document_type('taxiidcnt') >> .key_delimiter("$")) \ >> .with_schema( >> Schema() >> .field("taxiId", DataTypes.BIGINT()) >> .field("cnt", DataTypes.BIGINT())) \ >> .with_format( >> Json() >> .derive_schema()) \ >> .in_upsert_mode() \ >> .register_table_sink("sink") >> >> >> if __name__ == '__main__': >> area_cnts() >> > |
可以发一下完整的异常吗?
> 在 2020年6月16日,下午3:45,jack <[hidden email]> 写道: > > 连接的版本部分我本地已经修改为 5了,发生了下面的报错; > >> st_env.connect( > >> Elasticsearch() > >> .version("5") > >> .host("localhost", 9200, "http") > >> .index("taxiid-cnts") > >> .document_type('taxiidcnt') > >> .key_delimiter("$")) \ > > > > > > 在 2020-06-16 15:38:28,"Dian Fu" <[hidden email]> 写道: > >I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`. > > > >> 在 2020年6月16日,下午1:47,jack <[hidden email]> 写道: > >> > >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 > >> 连接es的时候报错,findAndCreateTableSink failed。 > >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 > >> > >> Caused by Could not find a suitable factory for ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath. > >> Reason: Required context properties mismatch > >> > >> > >> > >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic > >> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings > >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch > >> > >> > >> def area_cnts(): > >> s_env = StreamExecutionEnvironment.get_execution_environment() > >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > >> s_env.set_parallelism(1) > >> > >> # use blink table planner > >> st_env = StreamTableEnvironment \ > >> .create(s_env, environment_settings=EnvironmentSettings > >> .new_instance() > >> .in_streaming_mode() > >> .use_blink_planner().build()) > >> > >> # register source and sink > >> register_rides_source(st_env) > >> register_cnt_sink(st_env) > >> > >> # query > >> st_env.from_path("source")\ > >> .group_by("taxiId")\ > >> .select("taxiId, count(1) as cnt")\ > >> .insert_into("sink") > >> > >> # execute > >> st_env.execute("6-write_with_elasticsearch") > >> > >> > >> def register_rides_source(st_env): > >> st_env \ > >> .connect( # declare the external system to connect to > >> Kafka() > >> .version("universal") > >> .topic("Rides") > >> .start_from_earliest() > >> .property("zookeeper.connect", "zookeeper:2181") > >> .property("bootstrap.servers", "kafka:9092")) \ > >> .with_format( # declare a format for this system > >> Json() > >> .fail_on_missing_field(True) > >> .schema(DataTypes.ROW([ > >> DataTypes.FIELD("rideId", DataTypes.BIGINT()), > >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()), > >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()), > >> DataTypes.FIELD("lon", DataTypes.FLOAT()), > >> DataTypes.FIELD("lat", DataTypes.FLOAT()), > >> DataTypes.FIELD("psgCnt", DataTypes.INT()), > >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \ > >> .with_schema( # declare the schema of the table > >> Schema() > >> .field("rideId", DataTypes.BIGINT()) > >> .field("taxiId", DataTypes.BIGINT()) > >> .field("isStart", DataTypes.BOOLEAN()) > >> .field("lon", DataTypes.FLOAT()) > >> .field("lat", DataTypes.FLOAT()) > >> .field("psgCnt", DataTypes.INT()) > >> .field("rideTime", DataTypes.TIMESTAMP()) > >> .rowtime( > >> Rowtime() > >> .timestamps_from_field("eventTime") > >> .watermarks_periodic_bounded(60000))) \ > >> .in_append_mode() \ > >> .register_table_source("source") > >> > >> > >> def register_cnt_sink(st_env): > >> st_env.connect( > >> Elasticsearch() > >> .version("6") > >> .host("elasticsearch", 9200, "http") > >> .index("taxiid-cnts") > >> .document_type('taxiidcnt') > >> .key_delimiter("$")) \ > >> .with_schema( > >> Schema() > >> .field("taxiId", DataTypes.BIGINT()) > >> .field("cnt", DataTypes.BIGINT())) \ > >> .with_format( > >> Json() > >> .derive_schema()) \ > >> .in_upsert_mode() \ > >> .register_table_sink("sink") > >> > >> > >> if __name__ == '__main__': > >> area_cnts() > >> > > |
Administrator
|
Hi,
据我所知,Flink 1.10 官方没有支持Elasticsearch 5.x 版本的 sql connector。 Best, Jark On Tue, 16 Jun 2020 at 16:08, Dian Fu <[hidden email]> wrote: > 可以发一下完整的异常吗? > > 在 2020年6月16日,下午3:45,jack <[hidden email]> 写道: > > 连接的版本部分我本地已经修改为 5了,发生了下面的报错; > > >> st_env.connect( > >> Elasticsearch() > >> .version("5") > >> .host("localhost", 9200, "http") > >> .index("taxiid-cnts") > >> .document_type('taxiidcnt') > >> .key_delimiter("$")) \ > > > > > > > 在 2020-06-16 15:38:28,"Dian Fu" <[hidden email]> 写道: > >I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`. > > > >> 在 2020年6月16日,下午1:47,jack <[hidden email]> 写道: > >> > >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 > >> 连接es的时候报错,findAndCreateTableSink failed。 > >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 > >> > >> Caused by Could not find a suitable factory for ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath. > >> Reason: Required context properties mismatch > >> > >> > >> > >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic > >> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings > >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch > >> > >> > >> def area_cnts(): > >> s_env = StreamExecutionEnvironment.get_execution_environment() > >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > >> s_env.set_parallelism(1) > >> > >> # use blink table planner > >> st_env = StreamTableEnvironment \ > >> .create(s_env, environment_settings=EnvironmentSettings > >> .new_instance() > >> .in_streaming_mode() > >> .use_blink_planner().build()) > >> > >> # register source and sink > >> register_rides_source(st_env) > >> register_cnt_sink(st_env) > >> > >> # query > >> st_env.from_path("source")\ > >> .group_by("taxiId")\ > >> .select("taxiId, count(1) as cnt")\ > >> .insert_into("sink") > >> > >> # execute > >> st_env.execute("6-write_with_elasticsearch") > >> > >> > >> def register_rides_source(st_env): > >> st_env \ > >> .connect( # declare the external system to connect to > >> Kafka() > >> .version("universal") > >> .topic("Rides") > >> .start_from_earliest() > >> .property("zookeeper.connect", "zookeeper:2181") > >> .property("bootstrap.servers", "kafka:9092")) \ > >> .with_format( # declare a format for this system > >> Json() > >> .fail_on_missing_field(True) > >> .schema(DataTypes.ROW([ > >> DataTypes.FIELD("rideId", DataTypes.BIGINT()), > >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()), > >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()), > >> DataTypes.FIELD("lon", DataTypes.FLOAT()), > >> DataTypes.FIELD("lat", DataTypes.FLOAT()), > >> DataTypes.FIELD("psgCnt", DataTypes.INT()), > >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \ > >> .with_schema( # declare the schema of the table > >> Schema() > >> .field("rideId", DataTypes.BIGINT()) > >> .field("taxiId", DataTypes.BIGINT()) > >> .field("isStart", DataTypes.BOOLEAN()) > >> .field("lon", DataTypes.FLOAT()) > >> .field("lat", DataTypes.FLOAT()) > >> .field("psgCnt", DataTypes.INT()) > >> .field("rideTime", DataTypes.TIMESTAMP()) > >> .rowtime( > >> Rowtime() > >> .timestamps_from_field("eventTime") > >> .watermarks_periodic_bounded(60000))) \ > >> .in_append_mode() \ > >> .register_table_source("source") > >> > >> > >> def register_cnt_sink(st_env): > >> st_env.connect( > >> Elasticsearch() > >> .version("6") > >> .host("elasticsearch", 9200, "http") > >> .index("taxiid-cnts") > >> .document_type('taxiidcnt') > >> .key_delimiter("$")) \ > >> .with_schema( > >> Schema() > >> .field("taxiId", DataTypes.BIGINT()) > >> .field("cnt", DataTypes.BIGINT())) \ > >> .with_format( > >> Json() > >> .derive_schema()) \ > >> .in_upsert_mode() \ > >> .register_table_sink("sink") > >> > >> > >> if __name__ == '__main__': > >> area_cnts() > >> > > > > > |
Free forum by Nabble | Edit this page |