flink filesystem 1.7.2 on Hadoop 2.7 BucketingSink.reflectTruncat() 有写入很多小文件到hdfs的风险

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink filesystem 1.7.2 on Hadoop 2.7 BucketingSink.reflectTruncat() 有写入很多小文件到hdfs的风险

aven.wu
源码在 BucketingSink 615行
Path testPath = new Path(basePath, UUID.randomUUID().toString());
try (FSDataOutputStream outputStream = fs.create(testPath)) {
   outputStream.writeUTF("hello");
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
   throw new RuntimeException("Could not create file for checking if truncate works. " +
"You can disable support for truncate() completely via " +
"BucketingSink.setUseTruncate(false).", e);
}

try {
   m.invoke(fs, testPath, 2);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.debug("Truncate is not supported.", e);
m = null;
}

try {
   fs.delete(testPath, false);
} catch (IOException e) {
LOG.error("Could not delete truncate test file.", e);
   throw new RuntimeException("Could not delete truncate test file. " +
"You can disable support for truncate() completely via " +
"BucketingSink.setUseTruncate(false).", e);
}
line 635 开始创建一个测试文件 “FSDataOutputStream outputStream = fs.create(testPath)”
line 636 尝试写入 一段 测试文字"hello" "outputStream.writeUTF("hello")"
line 645 调用 truncate 方法“m.invoke(fs, testPath, 2);”
line 652 删除测试文件 “fs.delete(testPath, false);“
上述逻辑有一些瑕疵 :
     1 在635行创建一个测试文件后,636行写入hello 失败,抛出异常(导致程序重启或退出)
     2 在645行调用m.invocate 失败 抛出异常(导致程序重启或退出)
 两行操作都抛出异常终止程序或重启程序,导致创建的测试文件无法被删除,极端情况下。程序一直在抛出异常然后重启,根据我阅读的代码 reflectTruncat(Filesystem fs)是程序初始化 state的时候会执行。


望大佬能指点一下,是我的姿势不对还是这块的设计有瑕疵。

Reply | Threaded
Open this post in threaded view
|

Re: flink filesystem 1.7.2 on Hadoop 2.7 BucketingSink.reflectTruncat() 有写入很多小文件到hdfs的风险

Biao Liu
你好,看了下代码,1.7.2 确实有这问题,最新的代码已经 fix,见[1]
如果可以的话,升级到1.8.0就包含了该 fixing

1.
https://github.com/apache/flink/commit/24c2e17c8d52ae2f0f897a5806a3a44fdf62b0a5

巫旭阳 <[hidden email]> 于2019年6月24日周一 下午2:40写道:

> 源码在 BucketingSink 615行
> Path testPath = new Path(basePath, UUID.randomUUID().toString());
> try (FSDataOutputStream outputStream = fs.create(testPath)) {
>    outputStream.writeUTF("hello");
> } catch (IOException e) {
> LOG.error("Could not create file for checking if truncate works.", e);
>    throw new RuntimeException("Could not create file for checking if
> truncate works. " +
> "You can disable support for truncate() completely via " +
> "BucketingSink.setUseTruncate(false).", e);
> }
>
> try {
>    m.invoke(fs, testPath, 2);
> } catch (IllegalAccessException | InvocationTargetException e) {
> LOG.debug("Truncate is not supported.", e);
> m = null;
> }
>
> try {
>    fs.delete(testPath, false);
> } catch (IOException e) {
> LOG.error("Could not delete truncate test file.", e);
>    throw new RuntimeException("Could not delete truncate test file. " +
> "You can disable support for truncate() completely via " +
> "BucketingSink.setUseTruncate(false).", e);
> }
> line 635 开始创建一个测试文件 “FSDataOutputStream outputStream = fs.create(testPath)”
> line 636 尝试写入 一段 测试文字"hello" "outputStream.writeUTF("hello")"
> line 645 调用 truncate 方法“m.invoke(fs, testPath, 2);”
> line 652 删除测试文件 “fs.delete(testPath, false);“
> 上述逻辑有一些瑕疵 :
>      1 在635行创建一个测试文件后,636行写入hello 失败,抛出异常(导致程序重启或退出)
>      2 在645行调用m.invocate 失败 抛出异常(导致程序重启或退出)
>  两行操作都抛出异常终止程序或重启程序,导致创建的测试文件无法被删除,极端情况下。程序一直在抛出异常然后重启,根据我阅读的代码
> reflectTruncat(Filesystem fs)是程序初始化 state的时候会执行。
>
>
> 望大佬能指点一下,是我的姿势不对还是这块的设计有瑕疵。
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink filesystem 1.7.2 on Hadoop 2.7 BucketingSink.reflectTruncat() 有写入很多小文件到hdfs的风险

aven.wu
感谢大佬的解决,1.8确实已经修改

> On Jun 24, 2019, at 3:49 PM, Biao Liu <[hidden email]> wrote:
>
> 你好,看了下代码,1.7.2 确实有这问题,最新的代码已经 fix,见[1]
> 如果可以的话,升级到1.8.0就包含了该 fixing
>
> 1.
> https://github.com/apache/flink/commit/24c2e17c8d52ae2f0f897a5806a3a44fdf62b0a5
>
> 巫旭阳 <[hidden email]> 于2019年6月24日周一 下午2:40写道:
>
>> 源码在 BucketingSink 615行
>> Path testPath = new Path(basePath, UUID.randomUUID().toString());
>> try (FSDataOutputStream outputStream = fs.create(testPath)) {
>>   outputStream.writeUTF("hello");
>> } catch (IOException e) {
>> LOG.error("Could not create file for checking if truncate works.", e);
>>   throw new RuntimeException("Could not create file for checking if
>> truncate works. " +
>> "You can disable support for truncate() completely via " +
>> "BucketingSink.setUseTruncate(false).", e);
>> }
>>
>> try {
>>   m.invoke(fs, testPath, 2);
>> } catch (IllegalAccessException | InvocationTargetException e) {
>> LOG.debug("Truncate is not supported.", e);
>> m = null;
>> }
>>
>> try {
>>   fs.delete(testPath, false);
>> } catch (IOException e) {
>> LOG.error("Could not delete truncate test file.", e);
>>   throw new RuntimeException("Could not delete truncate test file. " +
>> "You can disable support for truncate() completely via " +
>> "BucketingSink.setUseTruncate(false).", e);
>> }
>> line 635 开始创建一个测试文件 “FSDataOutputStream outputStream = fs.create(testPath)”
>> line 636 尝试写入 一段 测试文字"hello" "outputStream.writeUTF("hello")"
>> line 645 调用 truncate 方法“m.invoke(fs, testPath, 2);”
>> line 652 删除测试文件 “fs.delete(testPath, false);“
>> 上述逻辑有一些瑕疵 :
>>     1 在635行创建一个测试文件后,636行写入hello 失败,抛出异常(导致程序重启或退出)
>>     2 在645行调用m.invocate 失败 抛出异常(导致程序重启或退出)
>> 两行操作都抛出异常终止程序或重启程序,导致创建的测试文件无法被删除,极端情况下。程序一直在抛出异常然后重启,根据我阅读的代码
>> reflectTruncat(Filesystem fs)是程序初始化 state的时候会执行。
>>
>>
>> 望大佬能指点一下,是我的姿势不对还是这块的设计有瑕疵。
>>
>>