flink版本:1.11.2, 从kafka读取数据写到hdfs,运行一段时间报错

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink版本:1.11.2, 从kafka读取数据写到hdfs,运行一段时间报错

史 正超
flink版本:1.11.2, 从kafka读取数据写到hdfs,运行一段时间报错, sql 和 日志如下:

```sql
CREATE TABLE T_ED_CELL_NUM_INFO_SRC(
    bigBox STRING,
    edCode STRING,
    mBoxAmount INT,
    mFaultBoxAmount INT,
    mFaultReserveBoxAmount INT,
    mReserveBoxAmount INT,
    mUseReserveBox INT,
    mUsedBoxCount INT,
    mUsedReserveBoxCount INT,
    middleBox STRING,
    smallBox STRING,
    `time` BIGINT,
    `type` STRING
) WITH (
    'connector' = 'kafka-0.11',
    'topic' = 'ed_cell_num_info',
    'scan.startup.mode' = 'group-offsets',
    'properties.group.id' = 'FlinkEdCellCollectFlink1.11',
    'properties.bootstrap.servers' = 'xxxx',
    'format' = 'json'
);

--HDFS SINK
CREATE TABLE T_ED_CELL_NUM_INFO_SINK(
    sta_date INT,
    bigBox STRING,
    edCode STRING,
    mBoxAmount INT,
    mFaultBoxAmount INT,
    mFaultReserveBoxAmount INT,
    mReserveBoxAmount INT,
    mUseReserveBox INT,
    mUsedBoxCount INT,
    mUsedReserveBoxCount INT,
    middleBox STRING,
    smallBox STRING,
    `time` BIGINT,
    `type` STRING,
    pt   STRING
) PARTITIONED BY (pt) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs://nameservice2/warehouse/rt_ods/ed_cell_num_info',
    'format' = 'csv',
    'csv.field-delimiter' = U&'\0009',
    'csv.disable-quote-character' = 'true',
    'sink.rolling-policy.file-size' = '200m',
    'sink.rolling-policy.rollover-interval' = '15min',
    'sink.rolling-policy.check-interval' = '1min'
);

INSERT INTO T_ED_CELL_NUM_INFO_SINK
SELECT
    CAST(FROM_UNIXTIME(`time` / 1000, 'yyyyMMdd') AS INT) AS sta_date,
    bigBox,
    edCode,
    mBoxAmount,
    mFaultBoxAmount,
    mFaultReserveBoxAmount,
    mReserveBoxAmount,
    mUseReserveBox,
    mUsedBoxCount,
    mUsedReserveBoxCount,
    middleBox,
    smallBox,
    `time`,
    `type`,
    FROM_UNIXTIME(`time` / 1000, 'yyyyMMdd') AS pt
FROM T_ED_CELL_NUM_INFO_SRC
where `time` > UNIX_TIMESTAMP() * 1000 -  (96 * 3600 * 1000)  --小于前4天和大于后2天的数据不要
   and `time` < UNIX_TIMESTAMP() * 1000 + (48 * 3600 * 1000);
```

```java
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1117)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1091)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1222)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1211)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: org.apache.hadoop.ipc.RemoteException(java.io.IOException): BP-504689274-10.204.4.58-1507792652938:blk_3265799450_2192171234 does not exist or is not under Constructionnull
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkUCBlock(FSNamesystem.java:7146)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updateBlockForPipeline(FSNamesystem.java:7214)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.updateBlockForPipeline(NameNodeRpcServer.java:764)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.updateBlockForPipeline(AuthorizationProviderProxyClientProtocol.java:639)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.updateBlockForPipeline(ClientNamenodeProtocolServerSideTranslatorPB.java:949)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

... 12 common frames omitted
Caused by: org.apache.hadoop.ipc.RemoteException: BP-504689274-10.204.4.58-1507792652938:blk_3265799450_2192171234 does not exist or is not under Constructionnull
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkUCBlock(FSNamesystem.java:7146)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updateBlockForPipeline(FSNamesystem.java:7214)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.updateBlockForPipeline(NameNodeRpcServer.java:764)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.updateBlockForPipeline(AuthorizationProviderProxyClientProtocol.java:639)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.updateBlockForPipeline(ClientNamenodeProtocolServerSideTranslatorPB.java:949)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

at org.apache.hadoop.ipc.Client.call(Client.java:1470)
at org.apache.hadoop.ipc.Client.call(Client.java:1401)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)


```