遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql
sql语句如下: CREATE TABLE sls_log_sz_itsp ( request STRING, http_bundleId STRING, upstream_addr STRING, http_appid STRING, bodyUserId STRING, http_sequence STRING, http_version STRING, response_body STRING, uri STRING, bytes_sent STRING, http_userId STRING, http_cityId STRING, http_user_agent STRING, http_deviceType STRING, record_time STRING, rt AS TO_TIMESTAMP(DATE_FORMAT(record_time,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR rt AS rt - INTERVAL '5' SECOND, request_time STRING, request_body STRING, request_length STRING, nginx_id STRING, proxy_add_x_forwarded_for STRING, http_deviceId STRING, host STRING, upstream_response_time STRING, status STRING ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'sls', 'connector.properties.zookeeper.connect' = 'hadoop85:2181,hadoop86:2181,hadoop87:2181', 'connector.properties.bootstrap.servers' = 'hadoop85:9092,hadoop86:9092,hadoop87:9092', 'connector.properties.group.id' = 'log-sz-itsp', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); CREATE TABLE sz_itsp_test( request STRING, request_count BIGINT NOT NULL, window_end TIMESTAMP(3) ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://hadoop85:3306/test?useSSL=false&autoReconnect=true', 'connector.table' = 'sz_itsp_test', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = '000000', 'connector.write.flush.max-rows' = '1', 'connector.write.flush.interval' = '2s', 'connector.write.max-retries' = '3' ); INSERT INTO sz_itsp_test SELECT request, count(request) request_count, TUMBLE_END(rt, INTERVAL '5' MINUTE) AS window_end FROM sls_log_sz_itsp WHERE nginx_id = 'sz-itsp' AND nginx_id IS NOT NULL GROUP BY TUMBLE(rt, INTERVAL '5' MINUTE), request ; sql client使用中出现如下报错: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) Caused by: java.lang.RuntimeException: Error running SQL job. at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:608) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:529) at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537) at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:605) ... 8 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:999) at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$14(FutureUtils.java:427) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.TimeoutException ... 10 more 看报错信息,是有bug? | | zzy | | [hidden email] | 签名由网易邮箱大师定制 |
Administrator
|
看样子是提交作业超时失败了,请确认
1. flink cluster 已经起来了 2. sql client 的环境与 flink cluster 环境连通 3. sql-client-defaults.yaml 中配置了正确的 gateway-address 地址 (如果是本地 cluster,则不用配置) Best, Jark On Wed, 2 Dec 2020 at 14:12, zzy <[hidden email]> wrote: > 遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql > > > sql语句如下: > CREATE TABLE sls_log_sz_itsp ( > request STRING, > http_bundleId STRING, > upstream_addr STRING, > http_appid STRING, > bodyUserId STRING, > http_sequence STRING, > http_version STRING, > response_body STRING, > uri STRING, > bytes_sent STRING, > http_userId STRING, > http_cityId STRING, > http_user_agent STRING, > http_deviceType STRING, > record_time STRING, > rt AS TO_TIMESTAMP(DATE_FORMAT(record_time,'yyyy-MM-dd HH:mm:ss')), > WATERMARK FOR rt AS rt - INTERVAL '5' SECOND, > request_time STRING, > request_body STRING, > request_length STRING, > nginx_id STRING, > proxy_add_x_forwarded_for STRING, > http_deviceId STRING, > host STRING, > upstream_response_time STRING, > status STRING > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'sls', > 'connector.properties.zookeeper.connect' = > 'hadoop85:2181,hadoop86:2181,hadoop87:2181', > 'connector.properties.bootstrap.servers' = > 'hadoop85:9092,hadoop86:9092,hadoop87:9092', > 'connector.properties.group.id' = 'log-sz-itsp', > 'connector.startup-mode' = 'latest-offset', > 'format.type' = 'json' > ); > > > > CREATE TABLE sz_itsp_test( > request STRING, > request_count BIGINT NOT NULL, > window_end TIMESTAMP(3) > ) WITH ( > 'connector.type' = 'jdbc', > 'connector.url' = > 'jdbc:mysql://hadoop85:3306/test?useSSL=false&autoReconnect=true', > 'connector.table' = 'sz_itsp_test', > 'connector.driver' = 'com.mysql.jdbc.Driver', > 'connector.username' = 'root', > 'connector.password' = '000000', > 'connector.write.flush.max-rows' = '1', > 'connector.write.flush.interval' = '2s', > 'connector.write.max-retries' = '3' > ); > > > INSERT INTO sz_itsp_test > SELECT > request, > count(request) request_count, > TUMBLE_END(rt, INTERVAL '5' MINUTE) AS window_end > FROM sls_log_sz_itsp > WHERE nginx_id = 'sz-itsp' AND nginx_id IS NOT NULL > GROUP BY TUMBLE(rt, INTERVAL '5' MINUTE), request > ; > > > sql client使用中出现如下报错: > Exception in thread "main" > org.apache.flink.table.client.SqlClientException: Unexpected exception. > This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) > Caused by: java.lang.RuntimeException: Error running SQL job. > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:608) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:529) > at > org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537) > at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) > at java.util.Optional.ifPresent(Optional.java:159) > at > org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) > at > org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) > at > org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:605) > ... 8 more > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit JobGraph. > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:999) > at > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$14(FutureUtils.java:427) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.TimeoutException > ... 10 more > > > > > > > 看报错信息,是有bug? > | | > zzy > | > | > [hidden email] > | > 签名由网易邮箱大师定制 |
Free forum by Nabble | Edit this page |