flink版本:1.11.2, 从kafka读取数据写到hdfs,运行一段时间报错, sql 和 日志如下:
```sql CREATE TABLE T_ED_CELL_NUM_INFO_SRC( bigBox STRING, edCode STRING, mBoxAmount INT, mFaultBoxAmount INT, mFaultReserveBoxAmount INT, mReserveBoxAmount INT, mUseReserveBox INT, mUsedBoxCount INT, mUsedReserveBoxCount INT, middleBox STRING, smallBox STRING, `time` BIGINT, `type` STRING ) WITH ( 'connector' = 'kafka-0.11', 'topic' = 'ed_cell_num_info', 'scan.startup.mode' = 'group-offsets', 'properties.group.id' = 'FlinkEdCellCollectFlink1.11', 'properties.bootstrap.servers' = 'xxxx', 'format' = 'json' ); --HDFS SINK CREATE TABLE T_ED_CELL_NUM_INFO_SINK( sta_date INT, bigBox STRING, edCode STRING, mBoxAmount INT, mFaultBoxAmount INT, mFaultReserveBoxAmount INT, mReserveBoxAmount INT, mUseReserveBox INT, mUsedBoxCount INT, mUsedReserveBoxCount INT, middleBox STRING, smallBox STRING, `time` BIGINT, `type` STRING, pt STRING ) PARTITIONED BY (pt) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://nameservice2/warehouse/rt_ods/ed_cell_num_info', 'format' = 'csv', 'csv.field-delimiter' = U&'\0009', 'csv.disable-quote-character' = 'true', 'sink.rolling-policy.file-size' = '200m', 'sink.rolling-policy.rollover-interval' = '15min', 'sink.rolling-policy.check-interval' = '1min' ); INSERT INTO T_ED_CELL_NUM_INFO_SINK SELECT CAST(FROM_UNIXTIME(`time` / 1000, 'yyyyMMdd') AS INT) AS sta_date, bigBox, edCode, mBoxAmount, mFaultBoxAmount, mFaultReserveBoxAmount, mReserveBoxAmount, mUseReserveBox, mUsedBoxCount, mUsedReserveBoxCount, middleBox, smallBox, `time`, `type`, FROM_UNIXTIME(`time` / 1000, 'yyyyMMdd') AS pt FROM T_ED_CELL_NUM_INFO_SRC where `time` > UNIX_TIMESTAMP() * 1000 - (96 * 3600 * 1000) --小于前4天和大于后2天的数据不要 and `time` < UNIX_TIMESTAMP() * 1000 + (48 * 3600 * 1000); ``` ```java org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1117) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1091) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1222) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1211) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: org.apache.hadoop.ipc.RemoteException(java.io.IOException): BP-504689274-10.204.4.58-1507792652938:blk_3265799450_2192171234 does not exist or is not under Constructionnull at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkUCBlock(FSNamesystem.java:7146) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updateBlockForPipeline(FSNamesystem.java:7214) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.updateBlockForPipeline(NameNodeRpcServer.java:764) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.updateBlockForPipeline(AuthorizationProviderProxyClientProtocol.java:639) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.updateBlockForPipeline(ClientNamenodeProtocolServerSideTranslatorPB.java:949) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211) ... 12 common frames omitted Caused by: org.apache.hadoop.ipc.RemoteException: BP-504689274-10.204.4.58-1507792652938:blk_3265799450_2192171234 does not exist or is not under Constructionnull at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkUCBlock(FSNamesystem.java:7146) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updateBlockForPipeline(FSNamesystem.java:7214) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.updateBlockForPipeline(NameNodeRpcServer.java:764) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.updateBlockForPipeline(AuthorizationProviderProxyClientProtocol.java:639) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.updateBlockForPipeline(ClientNamenodeProtocolServerSideTranslatorPB.java:949) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211) at org.apache.hadoop.ipc.Client.call(Client.java:1470) at org.apache.hadoop.ipc.Client.call(Client.java:1401) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) ``` |
Free forum by Nabble | Edit this page |