源码在 BucketingSink 615行
Path testPath = new Path(basePath, UUID.randomUUID().toString()); try (FSDataOutputStream outputStream = fs.create(testPath)) { outputStream.writeUTF("hello"); } catch (IOException e) { LOG.error("Could not create file for checking if truncate works.", e); throw new RuntimeException("Could not create file for checking if truncate works. " + "You can disable support for truncate() completely via " + "BucketingSink.setUseTruncate(false).", e); } try { m.invoke(fs, testPath, 2); } catch (IllegalAccessException | InvocationTargetException e) { LOG.debug("Truncate is not supported.", e); m = null; } try { fs.delete(testPath, false); } catch (IOException e) { LOG.error("Could not delete truncate test file.", e); throw new RuntimeException("Could not delete truncate test file. " + "You can disable support for truncate() completely via " + "BucketingSink.setUseTruncate(false).", e); } line 635 开始创建一个测试文件 “FSDataOutputStream outputStream = fs.create(testPath)” line 636 尝试写入 一段 测试文字"hello" "outputStream.writeUTF("hello")" line 645 调用 truncate 方法“m.invoke(fs, testPath, 2);” line 652 删除测试文件 “fs.delete(testPath, false);“ 上述逻辑有一些瑕疵 : 1 在635行创建一个测试文件后,636行写入hello 失败,抛出异常(导致程序重启或退出) 2 在645行调用m.invocate 失败 抛出异常(导致程序重启或退出) 两行操作都抛出异常终止程序或重启程序,导致创建的测试文件无法被删除,极端情况下。程序一直在抛出异常然后重启,根据我阅读的代码 reflectTruncat(Filesystem fs)是程序初始化 state的时候会执行。 望大佬能指点一下,是我的姿势不对还是这块的设计有瑕疵。 |
你好,看了下代码,1.7.2 确实有这问题,最新的代码已经 fix,见[1]
如果可以的话,升级到1.8.0就包含了该 fixing 1. https://github.com/apache/flink/commit/24c2e17c8d52ae2f0f897a5806a3a44fdf62b0a5 巫旭阳 <[hidden email]> 于2019年6月24日周一 下午2:40写道: > 源码在 BucketingSink 615行 > Path testPath = new Path(basePath, UUID.randomUUID().toString()); > try (FSDataOutputStream outputStream = fs.create(testPath)) { > outputStream.writeUTF("hello"); > } catch (IOException e) { > LOG.error("Could not create file for checking if truncate works.", e); > throw new RuntimeException("Could not create file for checking if > truncate works. " + > "You can disable support for truncate() completely via " + > "BucketingSink.setUseTruncate(false).", e); > } > > try { > m.invoke(fs, testPath, 2); > } catch (IllegalAccessException | InvocationTargetException e) { > LOG.debug("Truncate is not supported.", e); > m = null; > } > > try { > fs.delete(testPath, false); > } catch (IOException e) { > LOG.error("Could not delete truncate test file.", e); > throw new RuntimeException("Could not delete truncate test file. " + > "You can disable support for truncate() completely via " + > "BucketingSink.setUseTruncate(false).", e); > } > line 635 开始创建一个测试文件 “FSDataOutputStream outputStream = fs.create(testPath)” > line 636 尝试写入 一段 测试文字"hello" "outputStream.writeUTF("hello")" > line 645 调用 truncate 方法“m.invoke(fs, testPath, 2);” > line 652 删除测试文件 “fs.delete(testPath, false);“ > 上述逻辑有一些瑕疵 : > 1 在635行创建一个测试文件后,636行写入hello 失败,抛出异常(导致程序重启或退出) > 2 在645行调用m.invocate 失败 抛出异常(导致程序重启或退出) > 两行操作都抛出异常终止程序或重启程序,导致创建的测试文件无法被删除,极端情况下。程序一直在抛出异常然后重启,根据我阅读的代码 > reflectTruncat(Filesystem fs)是程序初始化 state的时候会执行。 > > > 望大佬能指点一下,是我的姿势不对还是这块的设计有瑕疵。 > > |
感谢大佬的解决,1.8确实已经修改
> On Jun 24, 2019, at 3:49 PM, Biao Liu <[hidden email]> wrote: > > 你好,看了下代码,1.7.2 确实有这问题,最新的代码已经 fix,见[1] > 如果可以的话,升级到1.8.0就包含了该 fixing > > 1. > https://github.com/apache/flink/commit/24c2e17c8d52ae2f0f897a5806a3a44fdf62b0a5 > > 巫旭阳 <[hidden email]> 于2019年6月24日周一 下午2:40写道: > >> 源码在 BucketingSink 615行 >> Path testPath = new Path(basePath, UUID.randomUUID().toString()); >> try (FSDataOutputStream outputStream = fs.create(testPath)) { >> outputStream.writeUTF("hello"); >> } catch (IOException e) { >> LOG.error("Could not create file for checking if truncate works.", e); >> throw new RuntimeException("Could not create file for checking if >> truncate works. " + >> "You can disable support for truncate() completely via " + >> "BucketingSink.setUseTruncate(false).", e); >> } >> >> try { >> m.invoke(fs, testPath, 2); >> } catch (IllegalAccessException | InvocationTargetException e) { >> LOG.debug("Truncate is not supported.", e); >> m = null; >> } >> >> try { >> fs.delete(testPath, false); >> } catch (IOException e) { >> LOG.error("Could not delete truncate test file.", e); >> throw new RuntimeException("Could not delete truncate test file. " + >> "You can disable support for truncate() completely via " + >> "BucketingSink.setUseTruncate(false).", e); >> } >> line 635 开始创建一个测试文件 “FSDataOutputStream outputStream = fs.create(testPath)” >> line 636 尝试写入 一段 测试文字"hello" "outputStream.writeUTF("hello")" >> line 645 调用 truncate 方法“m.invoke(fs, testPath, 2);” >> line 652 删除测试文件 “fs.delete(testPath, false);“ >> 上述逻辑有一些瑕疵 : >> 1 在635行创建一个测试文件后,636行写入hello 失败,抛出异常(导致程序重启或退出) >> 2 在645行调用m.invocate 失败 抛出异常(导致程序重启或退出) >> 两行操作都抛出异常终止程序或重启程序,导致创建的测试文件无法被删除,极端情况下。程序一直在抛出异常然后重启,根据我阅读的代码 >> reflectTruncat(Filesystem fs)是程序初始化 state的时候会执行。 >> >> >> 望大佬能指点一下,是我的姿势不对还是这块的设计有瑕疵。 >> >> |
Free forum by Nabble | Edit this page |