建表如下:
CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka', 'topic'='test', 'properties.group.id'='c_mysql_binlog_es', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='latest-offset', 'format'='canal-json', 'canal-json.ignore-parse-errors'='true' ); # 输出表至es CREATE TABLE test_mirror_es ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = '<a href="http://localhost:9200'">http://localhost:9200', 'index' = 'test_mirror' ); INSERT into test_mirror_es SELECT * from test where test.id >=0; 日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.test_mirror_es'. 完整日志如下: 2020-08-12 13:07:20,815 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost 2020-08-12 13:07:20,820 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123 2020-08-12 13:07:20,820 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m 2020-08-12 13:07:20,820 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m 2020-08-12 13:07:20,820 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 10 2020-08-12 13:07:20,820 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 5 2020-08-12 13:07:20,821 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.savepoints.dir, hdfs://localhost:9000/flink-1.11.0/flink-savepoints 2020-08-12 13:07:20,821 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region 2020-08-12 13:07:21,198 INFO org.apache.flink.table.client.config.entries.ExecutionEntry [] - Property 'execution.restart-strategy.type' not specified. Using default value: fallback 2020-08-12 13:07:22,099 INFO org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor config: {taskmanager.memory.process.size=1728m, jobmanager.execution.failover-strategy=region, jobmanager.rpc.address=localhost, execution.target=remote, jobmanager.memory.process.size=1600m, state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints, jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false, execution.attached=true, execution.shutdown-on-attached-exit=false, pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar], parallelism.default=5, taskmanager.numberOfTaskSlots=10, pipeline.classpaths=[]} 2020-08-12 13:07:22,286 INFO org.apache.flink.table.client.cli.CliClient [] - Command history file path: /root/.flink-sql-history 2020-08-12 13:07:46,637 INFO org.apache.flink.table.client.gateway.local.ProgramDeployer [] - Submitting job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query default: INSERT into test_mirror_es SELECT * from test where id >0` 2020-08-12 13:07:46,709 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address' 2020-08-12 13:10:17,512 INFO org.apache.flink.table.client.gateway.local.ProgramDeployer [] - Submitting job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query default: INSERT into test_mirror_es SELECT * from test where id >0` 2020-08-12 13:10:17,516 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address' 2020-08-12 13:10:38,360 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL statement. at org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257) [flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) [flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) [flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) [flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) [flink-sql-client_2.11-1.11.0.jar:1.11.0] Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.test_mirror_es'. Table options are: 'connector'='elasticsearch-7' 'hosts'='<a href="http://localhost:9200'">http://localhost:9200' 'index'='test_mirror' at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown Source) ~[?:?] at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] ... 6 more Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='elasticsearch-7''. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown Source) ~[?:?] at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] ... 6 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'elasticsearch-7' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. Available factory identifiers are: datagen jdbc kafka at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown Source) ~[?:?] at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] ... 6 more -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hello
1. 使用CDC功能的话请用1.11.1,该版本修复了一个CDC的bug[1] 2. 另外你这个异常栈是没有找到对应的 connector jar,确认下用的是 flink-sql-connector-elasticsearch6_2.11-1.11.0 这个jar. 祝好 Leonard [1] https://issues.apache.org/jira/browse/FLINK-18461 <https://issues.apache.org/jira/browse/FLINK-18461> > 在 2020年8月12日,13:31,jindy_liu <[hidden email]> 写道: > > 建表如下: > > CREATE TABLE test ( > `id` INT, > `name` VARCHAR(255), > `time` TIMESTAMP(3), > `status` INT, > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector'='kafka', > 'topic'='test', > 'properties.group.id'='c_mysql_binlog_es', > 'properties.bootstrap.servers'='localhost:9092', > 'scan.startup.mode'='latest-offset', > 'format'='canal-json', > 'canal-json.ignore-parse-errors'='true' > ); > > > # 输出表至es > CREATE TABLE test_mirror_es ( > `id` INT, > `name` VARCHAR(255), > `time` TIMESTAMP(3), > `status` INT, > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-7', > 'hosts' = '<a href="http://localhost:9200'">http://localhost:9200', > 'index' = 'test_mirror' > ); > > INSERT into test_mirror_es SELECT * from test where test.id >=0; > > 日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to > create a source for reading table > 'default_catalog.default_database.test_mirror_es'. > > 完整日志如下: > > > 2020-08-12 13:07:20,815 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.rpc.address, localhost > 2020-08-12 13:07:20,820 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.rpc.port, 6123 > 2020-08-12 13:07:20,820 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.memory.process.size, 1600m > 2020-08-12 13:07:20,820 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: taskmanager.memory.process.size, 1728m > 2020-08-12 13:07:20,820 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: taskmanager.numberOfTaskSlots, 10 > 2020-08-12 13:07:20,820 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: parallelism.default, 5 > 2020-08-12 13:07:20,821 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: state.savepoints.dir, > hdfs://localhost:9000/flink-1.11.0/flink-savepoints > 2020-08-12 13:07:20,821 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.execution.failover-strategy, region > 2020-08-12 13:07:21,198 INFO > org.apache.flink.table.client.config.entries.ExecutionEntry [] - Property > 'execution.restart-strategy.type' not specified. Using default value: > fallback > 2020-08-12 13:07:22,099 INFO > org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor > config: {taskmanager.memory.process.size=1728m, > jobmanager.execution.failover-strategy=region, > jobmanager.rpc.address=localhost, execution.target=remote, > jobmanager.memory.process.size=1600m, > state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints, > jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false, > execution.attached=true, execution.shutdown-on-attached-exit=false, > pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar], > parallelism.default=5, taskmanager.numberOfTaskSlots=10, > pipeline.classpaths=[]} > 2020-08-12 13:07:22,286 INFO org.apache.flink.table.client.cli.CliClient > [] - Command history file path: /root/.flink-sql-history > 2020-08-12 13:07:46,637 INFO > org.apache.flink.table.client.gateway.local.ProgramDeployer [] - Submitting > job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query > default: INSERT into test_mirror_es SELECT * from test where id >0` > 2020-08-12 13:07:46,709 INFO org.apache.flink.configuration.Configuration > [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead > of key 'rest.address' > 2020-08-12 13:10:17,512 INFO > org.apache.flink.table.client.gateway.local.ProgramDeployer [] - Submitting > job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query > default: INSERT into test_mirror_es SELECT * from test where id >0` > 2020-08-12 13:10:17,516 INFO org.apache.flink.configuration.Configuration > [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead > of key 'rest.address' > 2020-08-12 13:10:38,360 WARN org.apache.flink.table.client.cli.CliClient > [] - Could not execute SQL statement. > org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL > statement. > at > org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257) > [flink-sql-client_2.11-1.11.0.jar:1.11.0] > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) > [flink-sql-client_2.11-1.11.0.jar:1.11.0] > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) > [flink-sql-client_2.11-1.11.0.jar:1.11.0] > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) > [flink-sql-client_2.11-1.11.0.jar:1.11.0] > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > [flink-sql-client_2.11-1.11.0.jar:1.11.0] > Caused by: org.apache.flink.table.api.ValidationException: Unable to create > a source for reading table > 'default_catalog.default_database.test_mirror_es'. > > Table options are: > > 'connector'='elasticsearch-7' > 'hosts'='<a href="http://localhost:9200'">http://localhost:9200' > 'index'='test_mirror' > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown > Source) ~[?:?] > at > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > ... 6 more > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a > connector using option ''connector'='elasticsearch-7''. > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown > Source) ~[?:?] > at > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > ... 6 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'elasticsearch-7' that implements > 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the > classpath. > > Available factory identifiers are: > > datagen > jdbc > kafka > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > ~[flink-table_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown > Source) ~[?:?] > at > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97) > ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] > ... 6 more > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |