flink sql client:cdc 至elasticsearch 有问题(版本1.11.0),提交任务成功,但web dashbord上却看不到任务!求看是不是bug?还是我配置有问题?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql client:cdc 至elasticsearch 有问题(版本1.11.0),提交任务成功,但web dashbord上却看不到任务!求看是不是bug?还是我配置有问题?

jindy_liu
建表如下:

CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
 'connector'='kafka',
 'topic'='test',
 'properties.group.id'='c_mysql_binlog_es',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='latest-offset',
 'format'='canal-json',
 'canal-json.ignore-parse-errors'='true'
);


# 输出表至es
CREATE TABLE test_mirror_es (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
  'index' = 'test_mirror'
);

INSERT into test_mirror_es SELECT * from test where test.id >=0;

日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to
create a source for reading table
'default_catalog.default_database.test_mirror_es'.

完整日志如下:


2020-08-12 13:07:20,815 INFO
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.rpc.address, localhost
2020-08-12 13:07:20,820 INFO
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.rpc.port, 6123
2020-08-12 13:07:20,820 INFO
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.memory.process.size, 1600m
2020-08-12 13:07:20,820 INFO
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: taskmanager.memory.process.size, 1728m
2020-08-12 13:07:20,820 INFO
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 10
2020-08-12 13:07:20,820 INFO
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: parallelism.default, 5
2020-08-12 13:07:20,821 INFO
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: state.savepoints.dir,
hdfs://localhost:9000/flink-1.11.0/flink-savepoints
2020-08-12 13:07:20,821 INFO
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-08-12 13:07:21,198 INFO
org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property
'execution.restart-strategy.type' not specified. Using default value:
fallback
2020-08-12 13:07:22,099 INFO
org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor
config: {taskmanager.memory.process.size=1728m,
jobmanager.execution.failover-strategy=region,
jobmanager.rpc.address=localhost, execution.target=remote,
jobmanager.memory.process.size=1600m,
state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints,
jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false,
execution.attached=true, execution.shutdown-on-attached-exit=false,
pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar],
parallelism.default=5, taskmanager.numberOfTaskSlots=10,
pipeline.classpaths=[]}
2020-08-12 13:07:22,286 INFO  org.apache.flink.table.client.cli.CliClient                
[] - Command history file path: /root/.flink-sql-history
2020-08-12 13:07:46,637 INFO
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query
default: INSERT into test_mirror_es SELECT * from test where id >0`
2020-08-12 13:07:46,709 INFO  org.apache.flink.configuration.Configuration                
[] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
of key 'rest.address'
2020-08-12 13:10:17,512 INFO
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query
default: INSERT into test_mirror_es SELECT * from test where id >0`
2020-08-12 13:10:17,516 INFO  org.apache.flink.configuration.Configuration                
[] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
of key 'rest.address'
2020-08-12 13:10:38,360 WARN  org.apache.flink.table.client.cli.CliClient                
[] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL
statement.
        at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a source for reading table
'default_catalog.default_database.test_mirror_es'.

Table options are:

'connector'='elasticsearch-7'
'hosts'='<a href="http://localhost:9200'">http://localhost:9200'
'index'='test_mirror'
        at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown
Source) ~[?:?]
        at
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        ... 6 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option ''connector'='elasticsearch-7''.
        at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown
Source) ~[?:?]
        at
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        ... 6 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'elasticsearch-7' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.

Available factory identifiers are:

datagen
jdbc
kafka
        at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown
Source) ~[?:?]
        at
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        ... 6 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink sql client:cdc 至elasticsearch 有问题(版本1.11.0),提交任务成功,但web dashbord上却看不到任务!求看是不是bug?还是我配置有问题?

Leonard Xu
Hello

1. 使用CDC功能的话请用1.11.1,该版本修复了一个CDC的bug[1]
2. 另外你这个异常栈是没有找到对应的 connector jar,确认下用的是 flink-sql-connector-elasticsearch6_2.11-1.11.0 这个jar.

祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-18461 <https://issues.apache.org/jira/browse/FLINK-18461>

> 在 2020年8月12日,13:31,jindy_liu <[hidden email]> 写道:
>
> 建表如下:
>
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
> 'connector'='kafka',
> 'topic'='test',
> 'properties.group.id'='c_mysql_binlog_es',
> 'properties.bootstrap.servers'='localhost:9092',
> 'scan.startup.mode'='latest-offset',
> 'format'='canal-json',
> 'canal-json.ignore-parse-errors'='true'
> );
>
>
> # 输出表至es
> CREATE TABLE test_mirror_es (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>  'connector' = 'elasticsearch-7',
>  'hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>  'index' = 'test_mirror'
> );
>
> INSERT into test_mirror_es SELECT * from test where test.id >=0;
>
> 日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create a source for reading table
> 'default_catalog.default_database.test_mirror_es'.
>
> 完整日志如下:
>
>
> 2020-08-12 13:07:20,815 INFO
> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.rpc.address, localhost
> 2020-08-12 13:07:20,820 INFO
> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-08-12 13:07:20,820 INFO
> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2020-08-12 13:07:20,820 INFO
> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2020-08-12 13:07:20,820 INFO
> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 10
> 2020-08-12 13:07:20,820 INFO
> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: parallelism.default, 5
> 2020-08-12 13:07:20,821 INFO
> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: state.savepoints.dir,
> hdfs://localhost:9000/flink-1.11.0/flink-savepoints
> 2020-08-12 13:07:20,821 INFO
> org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-08-12 13:07:21,198 INFO
> org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property
> 'execution.restart-strategy.type' not specified. Using default value:
> fallback
> 2020-08-12 13:07:22,099 INFO
> org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor
> config: {taskmanager.memory.process.size=1728m,
> jobmanager.execution.failover-strategy=region,
> jobmanager.rpc.address=localhost, execution.target=remote,
> jobmanager.memory.process.size=1600m,
> state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints,
> jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false,
> execution.attached=true, execution.shutdown-on-attached-exit=false,
> pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar],
> parallelism.default=5, taskmanager.numberOfTaskSlots=10,
> pipeline.classpaths=[]}
> 2020-08-12 13:07:22,286 INFO  org.apache.flink.table.client.cli.CliClient                
> [] - Command history file path: /root/.flink-sql-history
> 2020-08-12 13:07:46,637 INFO
> org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
> job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query
> default: INSERT into test_mirror_es SELECT * from test where id >0`
> 2020-08-12 13:07:46,709 INFO  org.apache.flink.configuration.Configuration                
> [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
> of key 'rest.address'
> 2020-08-12 13:10:17,512 INFO
> org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
> job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query
> default: INSERT into test_mirror_es SELECT * from test where id >0`
> 2020-08-12 13:10:17,516 INFO  org.apache.flink.configuration.Configuration                
> [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
> of key 'rest.address'
> 2020-08-12 13:10:38,360 WARN  org.apache.flink.table.client.cli.CliClient                
> [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL
> statement.
> at
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257)
> [flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
> [flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
> [flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
> [flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> [flink-sql-client_2.11-1.11.0.jar:1.11.0]
> Caused by: org.apache.flink.table.api.ValidationException: Unable to create
> a source for reading table
> 'default_catalog.default_database.test_mirror_es'.
>
> Table options are:
>
> 'connector'='elasticsearch-7'
> 'hosts'='<a href="http://localhost:9200'">http://localhost:9200'
> 'index'='test_mirror'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown
> Source) ~[?:?]
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> ... 6 more
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
> connector using option ''connector'='elasticsearch-7''.
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown
> Source) ~[?:?]
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> ... 6 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'elasticsearch-7' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
>
> Available factory identifiers are:
>
> datagen
> jdbc
> kafka
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown
> Source) ~[?:?]
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97)
> ~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
> ... 6 more
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/