Flinksql通过phoenix查询维表,报Caused by: org.apache.calcite.avatica.NoSuchStatementException

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flinksql通过phoenix查询维表,报Caused by: org.apache.calcite.avatica.NoSuchStatementException

黑色
flinksql消费kafka,自定义的 connector phoenix 查询维表
任务在启动一段时间有时候一周左右后,任务挂掉,看日志是:
2020-11-24 00:52:38,534 ERROR com.custom.jdbc.table.JdbcRowDataLookupFunction [] - JDBC executeBatch error, retry times = 2
java.sql.SQLException: null
        at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at com.custom.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?]
        at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?]
        at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: org.apache.calcite.avatica.NoSuchStatementException
        at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        ... 20 more
2020-11-24 00:52:40,539 ERROR org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC executeBatch error, retry times = 3
java.sql.SQLException: null
        at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at com.custom.phoenix.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?]
        at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?]
        at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: org.apache.calcite.avatica.NoSuchStatementException
        at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
        ... 20 more
2020-11-24 00:52:40,635 WARN  org.apache.flink.runtime.taskmanager.Task                switched from RUNNING to FAILED.
java.lang.RuntimeException: Execution of JDBC statement failed.



各位大佬帮我看下哪的问题?
感谢
Reply | Threaded
Open this post in threaded view
|

Re:Flinksql通过phoenix查询维表,报Caused by: org.apache.calcite.avatica.NoSuchStatementException

hailongwang
Hi,
   从你的堆栈看,你自定义的 “com.custom.jdbc.table.JdbcRowDataLookupFunction” 函数引用的 PreparedStatement 包不对。
   具体实现可以参考:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
我理解如果 phoenix 支持标准的 SQL 协议的话,直接用提供的 JDBCRowDataLookupFunction 也可以?


Best ,
Hailong
在 2020-12-01 16:40:48,"hoose" <[hidden email]> 写道:

>flinksql消费kafka,自定义的 connector phoenix 查询维表
>任务在启动一段时间有时候一周左右后,任务挂掉,看日志是:
>2020-11-24 00:52:38,534 ERROR com.custom.jdbc.table.JdbcRowDataLookupFunction&nbsp;[] - JDBC executeBatch error, retry times = 2
>java.sql.SQLException: null
> at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at com.custom.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?]
> at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?]
> at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1]
>Caused by: org.apache.calcite.avatica.NoSuchStatementException
> at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> ... 20 more
>2020-11-24 00:52:40,539 ERROR org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC executeBatch error, retry times = 3
>java.sql.SQLException: null
> at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at com.custom.phoenix.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?]
> at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?]
> at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1]
>Caused by: org.apache.calcite.avatica.NoSuchStatementException
> at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> ... 20 more
>2020-11-24 00:52:40,635 WARN&nbsp; org.apache.flink.runtime.taskmanager.Task&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; switched from RUNNING to FAILED.
>java.lang.RuntimeException: Execution of JDBC statement failed.
>
>
>
>各位大佬帮我看下哪的问题?
>感谢
Reply | Threaded
Open this post in threaded view
|

回复:Flinksql通过phoenix查询维表,报Caused by: org.apache.calcite.avatica.NoSuchStatementException

黑色
感觉回复,
抱歉我没说清楚,这个自定义的connector其实就是根据jdbc的源码拷贝一份剥离出来的,只是把没用的去了,包路径不一样,里面内容都一样
只是不想在源码上修改影响原代码

statement.clearParameters();
statement = lookupKeyRowConverter.toExternal(keyRow, statement);
try (ResultSet resultSet = statement.executeQuery())
就是在statement.executeQuery()的时候报错,但不常常有,目前执行了一周看上去正常,但不知道啥时候会有错误出现。


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年12月1日(星期二) 晚上11:17
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re:Flinksql通过phoenix查询维表,报Caused by: org.apache.calcite.avatica.NoSuchStatementException



Hi,
&nbsp;&nbsp; 从你的堆栈看,你自定义的 “com.custom.jdbc.table.JdbcRowDataLookupFunction” 函数引用的 PreparedStatement 包不对。
&nbsp;&nbsp; 具体实现可以参考:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
我理解如果 phoenix 支持标准的 SQL 协议的话,直接用提供的 JDBCRowDataLookupFunction 也可以?


Best ,
Hailong
在 2020-12-01 16:40:48,"hoose" <[hidden email]&gt; 写道:
&gt;flinksql消费kafka,自定义的 connector phoenix 查询维表
&gt;任务在启动一段时间有时候一周左右后,任务挂掉,看日志是:
&gt;2020-11-24 00:52:38,534 ERROR com.custom.jdbc.table.JdbcRowDataLookupFunction&amp;nbsp;[] - JDBC executeBatch error, retry times = 2
&gt;java.sql.SQLException: null
&gt; at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at com.custom.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?]
&gt; at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?]
&gt; at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt;Caused by: org.apache.calcite.avatica.NoSuchStatementException
&gt; at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; ... 20 more
&gt;2020-11-24 00:52:40,539 ERROR org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC executeBatch error, retry times = 3
&gt;java.sql.SQLException: null
&gt; at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at com.custom.phoenix.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?]
&gt; at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?]
&gt; at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1]
&gt;Caused by: org.apache.calcite.avatica.NoSuchStatementException
&gt; at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
&gt; ... 20 more
&gt;2020-11-24 00:52:40,635 WARN&amp;nbsp; org.apache.flink.runtime.taskmanager.Task&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; switched from RUNNING to FAILED.
&gt;java.lang.RuntimeException: Execution of JDBC statement failed.
&gt;
&gt;
&gt;
&gt;各位大佬帮我看下哪的问题?
&gt;感谢
Reply | Threaded
Open this post in threaded view
|

Re:回复:Flinksql通过phoenix查询维表,报Caused by: org.apache.calcite.avatica.NoSuchStatementException

hailongwang


应该出现问题的之前任务都重启了下?
感觉是类加载顺序的问题,因为从栈看,正确的栈应该会出现 `PhoenixPreparedStatement`,但是却是 `AvaticaPreparedStatement `,说明是先加载到了 Avatica 下面的类。
1. 可以在启动的 tm 的 jvm 中加一个 `-verbose` 看下每次重启类是从哪个包加载出来的,是否符合预期,
2. 可以在 lookupfunction 里面直接使用 PhoenixPreparedStatement 类,而不是 `PreparedStatement` 接口,看能不能绕过。


Best ,
Hailong

在 2020-12-02 10:55:29,"黑色" <[hidden email]> 写道:

>感觉回复,
>抱歉我没说清楚,这个自定义的connector其实就是根据jdbc的源码拷贝一份剥离出来的,只是把没用的去了,包路径不一样,里面内容都一样
>只是不想在源码上修改影响原代码
>
>statement.clearParameters();
>statement = lookupKeyRowConverter.toExternal(keyRow, statement);
>try (ResultSet resultSet = statement.executeQuery())
>就是在statement.executeQuery()的时候报错,但不常常有,目前执行了一周看上去正常,但不知道啥时候会有错误出现。
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
>发送时间:&nbsp;2020年12月1日(星期二) 晚上11:17
>收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
>主题:&nbsp;Re:Flinksql通过phoenix查询维表,报Caused by: org.apache.calcite.avatica.NoSuchStatementException
>
>
>
>Hi,
>&nbsp;&nbsp; 从你的堆栈看,你自定义的 “com.custom.jdbc.table.JdbcRowDataLookupFunction” 函数引用的 PreparedStatement 包不对。
>&nbsp;&nbsp; 具体实现可以参考:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>我理解如果 phoenix 支持标准的 SQL 协议的话,直接用提供的 JDBCRowDataLookupFunction 也可以?
>
>
>Best ,
>Hailong
>在 2020-12-01 16:40:48,"hoose" <[hidden email]&gt; 写道:
>&gt;flinksql消费kafka,自定义的 connector phoenix 查询维表
>&gt;任务在启动一段时间有时候一周左右后,任务挂掉,看日志是:
>&gt;2020-11-24 00:52:38,534 ERROR com.custom.jdbc.table.JdbcRowDataLookupFunction&amp;nbsp;[] - JDBC executeBatch error, retry times = 2
>&gt;java.sql.SQLException: null
>&gt; at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at com.custom.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?]
>&gt; at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?]
>&gt; at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt;Caused by: org.apache.calcite.avatica.NoSuchStatementException
>&gt; at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; ... 20 more
>&gt;2020-11-24 00:52:40,539 ERROR org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC executeBatch error, retry times = 3
>&gt;java.sql.SQLException: null
>&gt; at org.apache.calcite.avatica.Helper.createException(Helper.java:56) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.Helper.createException(Helper.java:41) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at com.custom.phoenix.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:145) [sql-client-1.0-SNAPSHOT.jar:?]
>&gt; at LookupFunction$2.flatMap(Unknown Source) [flink-table-blink_2.11-1.11.1.jar:?]
>&gt; at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [flink-sql-connector-kafka_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) [flink-dist_2.11-1.11.1.jar:1.11.1]
>&gt;Caused by: org.apache.calcite.avatica.NoSuchStatementException
>&gt; at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; at org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>&gt; ... 20 more
>&gt;2020-11-24 00:52:40,635 WARN&amp;nbsp; org.apache.flink.runtime.taskmanager.Task&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; switched from RUNNING to FAILED.
>&gt;java.lang.RuntimeException: Execution of JDBC statement failed.
>&gt;
>&gt;
>&gt;
>&gt;各位大佬帮我看下哪的问题?
>&gt;感谢