Hi all,
我们在使用flink-1.10.1 on YARN(版本: 3.0.0-cdh6.3.2)的时候,使用StreamingFileSink时遇到异常信息,详细信息如下: 代码部分: public static <IN> StreamingFileSink<IN> build(String dir, BucketAssigner<IN, String> assigner, String prefix){ return StreamingFileSink.forRowFormat(new Path(dir), new SimpleStringEncoder<IN>()) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.HOURS.toMillis(2)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(10)) .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB .build() ) .withBucketAssigner(assigner) .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(prefix).build()) .build(); } 当任务执行一段时间后,会抛出异常: java.io.IOException: Problem while truncating file: hdfs:///home/2020-08-11/.home-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523 at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:168) at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:91) at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:144) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:131) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:407) at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:154) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:434) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to TRUNCATE_FILE /home/2020-08-11/.shop_home_recommend-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523 for DFSClient_NONMAPREDUCE_-1035692182_1 on 10.131.79.228 because DFSClient_NONMAPREDUCE_-1035692182_1 is already the current lease holder. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) at org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499) at org.apache.hadoop.ipc.Client.call(Client.java:1445) at org.apache.hadoop.ipc.Client.call(Client.java:1355) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy10.truncate(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:366) at sun.reflect.GeneratedMethodAccessor146.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) at com.sun.proxy.$Proxy11.truncate(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:1545) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:862) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:869) at sun.reflect.GeneratedMethodAccessor145.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:203) at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) ... 22 more 根据错误堆栈信息和log信息,结合flink源码可以看到,是由于HadoopRecoverableFsDataOutputStream类的safelyTruncateFile方法中,waitUntilLeaseIsRevoked方法没有做到真正的等待hdfs文件租约取消,导致执行truncate代码的时候,拿不到文件租约而抛出异常。 [image: image.png] 在waitUntilLeaseIsRevoked方法中,while循环的判断条件是文件没有被关闭并且deadline还有剩余时间,但是实际情况中,文件没有关闭,但是deadline剩余时间已过,这个时候不会继续sleep等待,而是方法直接返回true,这个时候safelyTruncateFile没有任何判断而直接申请租约,这就会出现异常。 [image: image.png] 这个问题已经在Stack Overflow上提交了issue: https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure JIRA上提交了ticket: https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure 谢谢。 王剑 |
Hi 王剑,
我认为你的分析是正确的,相关代码在超过lease的soft limit之后应该主动调用一下recover lease的逻辑。你是否愿意提交一个patch来fix该问题?我在JIRA上也留言了,后续可以直接在JIRA上讨论。 另外,更正一下JIRA链接:https://issues.apache.org/jira/browse/FLINK-18592 Best Regards, Yu On Tue, 11 Aug 2020 at 15:16, Jian Wang <[hidden email]> wrote: > Hi all, > > 我们在使用flink-1.10.1 on YARN(版本: > 3.0.0-cdh6.3.2)的时候,使用StreamingFileSink时遇到异常信息,详细信息如下: > > 代码部分: > > public static <IN> StreamingFileSink<IN> build(String dir, > BucketAssigner<IN, String> assigner, String prefix){ > return StreamingFileSink.forRowFormat(new Path(dir), new > SimpleStringEncoder<IN>()) > .withRollingPolicy( > DefaultRollingPolicy.builder() > .withRolloverInterval(TimeUnit.HOURS.toMillis(2)) > .withInactivityInterval(TimeUnit.MINUTES.toMillis(10)) > .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB > .build() > ) > .withBucketAssigner(assigner) > > .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(prefix).build()) > .build(); > } > > > 当任务执行一段时间后,会抛出异常: > > > java.io.IOException: Problem while truncating file: > > hdfs:///home/2020-08-11/.home-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523 > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:168) > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:91) > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:144) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:131) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:407) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:154) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:434) > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: > > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > > /home/2020-08-11/.shop_home_recommend-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523 > for DFSClient_NONMAPREDUCE_-1035692182_1 on 10.131.79.228 because > DFSClient_NONMAPREDUCE_-1035692182_1 is already the current lease holder. > at > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675) > > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499) > at org.apache.hadoop.ipc.Client.call(Client.java:1445) > at org.apache.hadoop.ipc.Client.call(Client.java:1355) > at > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228) > at > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy10.truncate(Unknown Source) > at > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:366) > at sun.reflect.GeneratedMethodAccessor146.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy11.truncate(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:1545) > at > > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:862) > at > > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859) > at > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > > org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:869) > at sun.reflect.GeneratedMethodAccessor145.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:203) > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > ... 22 more > > > 根据错误堆栈信息和log信息,结合flink源码可以看到,是由于HadoopRecoverableFsDataOutputStream类的safelyTruncateFile方法中,waitUntilLeaseIsRevoked方法没有做到真正的等待hdfs文件租约取消,导致执行truncate代码的时候,拿不到文件租约而抛出异常。 > [image: image.png] > > > > > 在waitUntilLeaseIsRevoked方法中,while循环的判断条件是文件没有被关闭并且deadline还有剩余时间,但是实际情况中,文件没有关闭,但是deadline剩余时间已过,这个时候不会继续sleep等待,而是方法直接返回true,这个时候safelyTruncateFile没有任何判断而直接申请租约,这就会出现异常。 > [image: image.png] > > 这个问题已经在Stack Overflow上提交了issue: > > https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure > > > JIRA上提交了ticket: > > https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure > > > > 谢谢。 > > 王剑 > |
hi,
我现在使用flink1.12写HDFS又重新碰到了这个问题,请问有什么办法可以避免吗? best, amenhub 发件人: Yu Li 发送时间: 2020-08-11 21:43 收件人: user-zh 主题: Re: Flink 1.10.1版本StreamingFileSink写入HDFS失败 Hi 王剑, 我认为你的分析是正确的,相关代码在超过lease的soft limit之后应该主动调用一下recover lease的逻辑。你是否愿意提交一个patch来fix该问题?我在JIRA上也留言了,后续可以直接在JIRA上讨论。 另外,更正一下JIRA链接:https://issues.apache.org/jira/browse/FLINK-18592 Best Regards, Yu On Tue, 11 Aug 2020 at 15:16, Jian Wang <[hidden email]> wrote: > Hi all, > > 我们在使用flink-1.10.1 on YARN(版本: > 3.0.0-cdh6.3.2)的时候,使用StreamingFileSink时遇到异常信息,详细信息如下: > > 代码部分: > > public static <IN> StreamingFileSink<IN> build(String dir, > BucketAssigner<IN, String> assigner, String prefix){ > return StreamingFileSink.forRowFormat(new Path(dir), new > SimpleStringEncoder<IN>()) > .withRollingPolicy( > DefaultRollingPolicy.builder() > .withRolloverInterval(TimeUnit.HOURS.toMillis(2)) > .withInactivityInterval(TimeUnit.MINUTES.toMillis(10)) > .withMaxPartSize(1024L * 1024L * 1024L * 50) // Max 50GB > .build() > ) > .withBucketAssigner(assigner) > > .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(prefix).build()) > .build(); > } > > > 当任务执行一段时间后,会抛出异常: > > > java.io.IOException: Problem while truncating file: > > hdfs:///home/2020-08-11/.home-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523 > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:168) > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:91) > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:144) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:131) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:407) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:182) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:170) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:154) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:434) > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: > > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > > /home/2020-08-11/.shop_home_recommend-69-71.inprogress.29cb86c7-a943-411f-aa22-6d12692ae523 > for DFSClient_NONMAPREDUCE_-1035692182_1 on 10.131.79.228 because > DFSClient_NONMAPREDUCE_-1035692182_1 is already the current lease holder. > at > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675) > > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499) > at org.apache.hadoop.ipc.Client.call(Client.java:1445) > at org.apache.hadoop.ipc.Client.call(Client.java:1355) > at > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228) > at > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy10.truncate(Unknown Source) > at > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:366) > at sun.reflect.GeneratedMethodAccessor146.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy11.truncate(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:1545) > at > > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:862) > at > > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859) > at > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > > org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:869) > at sun.reflect.GeneratedMethodAccessor145.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:203) > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > ... 22 more > > > 根据错误堆栈信息和log信息,结合flink源码可以看到,是由于HadoopRecoverableFsDataOutputStream类的safelyTruncateFile方法中,waitUntilLeaseIsRevoked方法没有做到真正的等待hdfs文件租约取消,导致执行truncate代码的时候,拿不到文件租约而抛出异常。 > [image: image.png] > > > > > 在waitUntilLeaseIsRevoked方法中,while循环的判断条件是文件没有被关闭并且deadline还有剩余时间,但是实际情况中,文件没有关闭,但是deadline剩余时间已过,这个时候不会继续sleep等待,而是方法直接返回true,这个时候safelyTruncateFile没有任何判断而直接申请租约,这就会出现异常。 > [image: image.png] > > 这个问题已经在Stack Overflow上提交了issue: > > https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure > > > JIRA上提交了ticket: > > https://stackoverflow.com/questions/63027777/flink-streamingfilesink-fails-due-to-truncating-hdfs-file-failure > > > > 谢谢。 > > 王剑 > |
Free forum by Nabble | Edit this page |