Flink 1.10.1版本StreamingFileSink写入HDFS失败

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.10.1版本StreamingFileSink写入HDFS失败

Jian Wang
Hi all,

我们在使用flink-1.10.1 on YARN(版本:
3.0.0-cdh6.3.2)的时候,使用StreamingFileSink时遇到异常信息,详细信息如下:

代码部分:

public static <IN> StreamingFileSink<IN> build(String dir,
BucketAssigner<IN, String> assigner, String prefix){
    return StreamingFileSink.forRowFormat(new Path(dir), new
SimpleStringEncoder<IN>())
        .withRollingPolicy(
            DefaultRollingPolicy.builder()
            .withRolloverInterval(TimeUnit.HOURS.toMillis(2))
            .withInactivityInterval(TimeUnit.MINUTES.toMillis(10))
            .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB
            .build()
        )
        .withBucketAssigner(assigner)
        .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(prefix).build())
        .build();
}


当任务执行一段时间后,会抛出异常:


java.io.IOException: Problem while truncating file:
hdfs:///home/2020-08-11/.home-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523
at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:168)
at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:91)
at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:144)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:131)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:407)
at
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:154)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:434)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to TRUNCATE_FILE
/home/2020-08-11/.shop_home_recommend-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523
for DFSClient_NONMAPREDUCE_-1035692182_1 on 10.131.79.228 because
DFSClient_NONMAPREDUCE_-1035692182_1 is already the current lease holder.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
at
org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499)
at org.apache.hadoop.ipc.Client.call(Client.java:1445)
at org.apache.hadoop.ipc.Client.call(Client.java:1355)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy10.truncate(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:366)
at sun.reflect.GeneratedMethodAccessor146.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy11.truncate(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:1545)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:862)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:869)
at sun.reflect.GeneratedMethodAccessor145.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:203)
at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
... 22 more

根据错误堆栈信息和log信息,结合flink源码可以看到,是由于HadoopRecoverableFsDataOutputStream类的safelyTruncateFile方法中,waitUntilLeaseIsRevoked方法没有做到真正的等待hdfs文件租约取消,导致执行truncate代码的时候,拿不到文件租约而抛出异常。
[image: image.png]



在waitUntilLeaseIsRevoked方法中,while循环的判断条件是文件没有被关闭并且deadline还有剩余时间,但是实际情况中,文件没有关闭,但是deadline剩余时间已过,这个时候不会继续sleep等待,而是方法直接返回true,这个时候safelyTruncateFile没有任何判断而直接申请租约,这就会出现异常。
[image: image.png]

这个问题已经在Stack Overflow上提交了issue:
https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure


JIRA上提交了ticket:
https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure



谢谢。

王剑
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10.1版本StreamingFileSink写入HDFS失败

Yu Li
Hi 王剑,

我认为你的分析是正确的,相关代码在超过lease的soft limit之后应该主动调用一下recover
lease的逻辑。你是否愿意提交一个patch来fix该问题?我在JIRA上也留言了,后续可以直接在JIRA上讨论。

另外,更正一下JIRA链接:https://issues.apache.org/jira/browse/FLINK-18592

Best Regards,
Yu


On Tue, 11 Aug 2020 at 15:16, Jian Wang <[hidden email]> wrote:

> Hi all,
>
> 我们在使用flink-1.10.1 on YARN(版本:
> 3.0.0-cdh6.3.2)的时候,使用StreamingFileSink时遇到异常信息,详细信息如下:
>
> 代码部分:
>
> public static <IN> StreamingFileSink<IN> build(String dir,
> BucketAssigner<IN, String> assigner, String prefix){
>     return StreamingFileSink.forRowFormat(new Path(dir), new
> SimpleStringEncoder<IN>())
>         .withRollingPolicy(
>             DefaultRollingPolicy.builder()
>             .withRolloverInterval(TimeUnit.HOURS.toMillis(2))
>             .withInactivityInterval(TimeUnit.MINUTES.toMillis(10))
>             .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB
>             .build()
>         )
>         .withBucketAssigner(assigner)
>
> .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(prefix).build())
>         .build();
> }
>
>
> 当任务执行一段时间后,会抛出异常:
>
>
> java.io.IOException: Problem while truncating file:
>
> hdfs:///home/2020-08-11/.home-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:168)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:91)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:144)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:131)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:407)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:154)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:434)
> at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at
>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
>
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
> Failed to TRUNCATE_FILE
>
> /home/2020-08-11/.shop_home_recommend-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523
> for DFSClient_NONMAPREDUCE_-1035692182_1 on 10.131.79.228 because
> DFSClient_NONMAPREDUCE_-1035692182_1 is already the current lease holder.
> at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
> at
>
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
> at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
> at
>
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
> at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
> at
>
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)
>
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499)
> at org.apache.hadoop.ipc.Client.call(Client.java:1445)
> at org.apache.hadoop.ipc.Client.call(Client.java:1355)
> at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
> at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy10.truncate(Unknown Source)
> at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:366)
> at sun.reflect.GeneratedMethodAccessor146.invoke(Unknown Source)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy11.truncate(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:1545)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:862)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859)
> at
>
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:869)
> at sun.reflect.GeneratedMethodAccessor145.invoke(Unknown Source)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:203)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
> ... 22 more
>
>
> 根据错误堆栈信息和log信息,结合flink源码可以看到,是由于HadoopRecoverableFsDataOutputStream类的safelyTruncateFile方法中,waitUntilLeaseIsRevoked方法没有做到真正的等待hdfs文件租约取消,导致执行truncate代码的时候,拿不到文件租约而抛出异常。
> [image: image.png]
>
>
>
>
> 在waitUntilLeaseIsRevoked方法中,while循环的判断条件是文件没有被关闭并且deadline还有剩余时间,但是实际情况中,文件没有关闭,但是deadline剩余时间已过,这个时候不会继续sleep等待,而是方法直接返回true,这个时候safelyTruncateFile没有任何判断而直接申请租约,这就会出现异常。
> [image: image.png]
>
> 这个问题已经在Stack Overflow上提交了issue:
>
> https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure
>
>
> JIRA上提交了ticket:
>
> https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure
>
>
>
> 谢谢。
>
> 王剑
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink 1.10.1版本StreamingFileSink写入HDFS失败

amenhub@163.com
hi,

我现在使用flink1.12写HDFS又重新碰到了这个问题,请问有什么办法可以避免吗?

best,
amenhub



 
发件人: Yu Li
发送时间: 2020-08-11 21:43
收件人: user-zh
主题: Re: Flink 1.10.1版本StreamingFileSink写入HDFS失败
Hi 王剑,
 
我认为你的分析是正确的,相关代码在超过lease的soft limit之后应该主动调用一下recover
lease的逻辑。你是否愿意提交一个patch来fix该问题?我在JIRA上也留言了,后续可以直接在JIRA上讨论。
 
另外,更正一下JIRA链接:https://issues.apache.org/jira/browse/FLINK-18592
 
Best Regards,
Yu
 
 
On Tue, 11 Aug 2020 at 15:16, Jian Wang <[hidden email]> wrote:
 

> Hi all,
>
> 我们在使用flink-1.10.1 on YARN(版本:
> 3.0.0-cdh6.3.2)的时候,使用StreamingFileSink时遇到异常信息,详细信息如下:
>
> 代码部分:
>
> public static <IN> StreamingFileSink<IN> build(String dir,
> BucketAssigner<IN, String> assigner, String prefix){
>     return StreamingFileSink.forRowFormat(new Path(dir), new
> SimpleStringEncoder<IN>())
>         .withRollingPolicy(
>             DefaultRollingPolicy.builder()
>             .withRolloverInterval(TimeUnit.HOURS.toMillis(2))
>             .withInactivityInterval(TimeUnit.MINUTES.toMillis(10))
>             .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB
>             .build()
>         )
>         .withBucketAssigner(assigner)
>
> .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(prefix).build())
>         .build();
> }
>
>
> 当任务执行一段时间后,会抛出异常:
>
>
> java.io.IOException: Problem while truncating file:
>
> hdfs:///home/2020-08-11/.home-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:168)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:91)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:144)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:131)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:407)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:154)
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:434)
> at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at
>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
>
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
> Failed to TRUNCATE_FILE
>
> /home/2020-08-11/.shop_home_recommend-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523
> for DFSClient_NONMAPREDUCE_-1035692182_1 on 10.131.79.228 because
> DFSClient_NONMAPREDUCE_-1035692182_1 is already the current lease holder.
> at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
> at
>
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
> at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
> at
>
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
> at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
> at
>
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)
>
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499)
> at org.apache.hadoop.ipc.Client.call(Client.java:1445)
> at org.apache.hadoop.ipc.Client.call(Client.java:1355)
> at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
> at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy10.truncate(Unknown Source)
> at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:366)
> at sun.reflect.GeneratedMethodAccessor146.invoke(Unknown Source)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy11.truncate(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:1545)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:862)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859)
> at
>
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:869)
> at sun.reflect.GeneratedMethodAccessor145.invoke(Unknown Source)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:203)
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
> ... 22 more
>
>
> 根据错误堆栈信息和log信息,结合flink源码可以看到,是由于HadoopRecoverableFsDataOutputStream类的safelyTruncateFile方法中,waitUntilLeaseIsRevoked方法没有做到真正的等待hdfs文件租约取消,导致执行truncate代码的时候,拿不到文件租约而抛出异常。
> [image: image.png]
>
>
>
>
> 在waitUntilLeaseIsRevoked方法中,while循环的判断条件是文件没有被关闭并且deadline还有剩余时间,但是实际情况中,文件没有关闭,但是deadline剩余时间已过,这个时候不会继续sleep等待,而是方法直接返回true,这个时候safelyTruncateFile没有任何判断而直接申请租约,这就会出现异常。
> [image: image.png]
>
> 这个问题已经在Stack Overflow上提交了issue:
>
> https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure
>
>
> JIRA上提交了ticket:
>
> https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure
>
>
>
> 谢谢。
>
> 王剑
>