Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

nobleyd
Recoverable writers on Hadoop are only supported for HDFS

如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。

使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
Reply | Threaded
Open this post in threaded view
|

Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

nobleyd
具体报错信息如下:

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
only supported for HDFS
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
HadoopRecoverableWriter.java:61)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
.createRecoverableWriter(HadoopFileSystem.java:210)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
    at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
.java:260)
    at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
    at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink.initializeState(StreamingFileSink.java:412)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.tryRestoreFunction(StreamingFunctionUtils.java:185)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.restoreFunctionState(StreamingFunctionUtils.java:167)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.initializeOperatorState(StreamOperatorStateHandler.java:107)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:264)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:400)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)


赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:

> Recoverable writers on Hadoop are only supported for HDFS
>
> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>
> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

nobleyd
除此以外,FlinkSQL读现有的hive数据仓库也是失败。配置okhive的catalog,表信息都能出来,但select操作就是失败。

赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:18写道:

> 具体报错信息如下:
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS
>     at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
> HadoopRecoverableWriter.java:61)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> .createRecoverableWriter(HadoopFileSystem.java:210)
>     at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> .createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> .java:260)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:
> 270)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink.initializeState(StreamingFileSink.java:412)
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .tryRestoreFunction(StreamingFunctionUtils.java:185)
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .restoreFunctionState(StreamingFunctionUtils.java:167)
>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> .initializeState(AbstractUdfStreamOperator.java:96)
>     at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> .initializeOperatorState(StreamOperatorStateHandler.java:107)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:264)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:400)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:507)
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:47)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:501)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:531)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     at java.lang.Thread.run(Thread.java:748)
>
>
> 赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:
>
>> Recoverable writers on Hadoop are only supported for HDFS
>>
>> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>>
>> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

Michael Ran
In reply to this post by nobleyd
这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道:

>具体报错信息如下:
>
>java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
>only supported for HDFS
>    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
>HadoopRecoverableWriter.java:61)
>    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>.createRecoverableWriter(HadoopFileSystem.java:210)
>    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>    at org.apache.flink.streaming.api.functions.sink.filesystem.
>StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>.java:260)
>    at org.apache.flink.streaming.api.functions.sink.filesystem.
>StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>    at org.apache.flink.streaming.api.functions.sink.filesystem.
>StreamingFileSink.initializeState(StreamingFileSink.java:412)
>    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>.tryRestoreFunction(StreamingFunctionUtils.java:185)
>    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>.restoreFunctionState(StreamingFunctionUtils.java:167)
>    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>.initializeState(AbstractUdfStreamOperator.java:96)
>    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>.initializeOperatorState(StreamOperatorStateHandler.java:107)
>    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>.initializeState(AbstractStreamOperator.java:264)
>    at org.apache.flink.streaming.runtime.tasks.OperatorChain
>.initializeStateAndOpenOperators(OperatorChain.java:400)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask
>.lambda$beforeInvoke$2(StreamTask.java:507)
>    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>.runThrowing(StreamTaskActionExecutor.java:47)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>StreamTask.java:501)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>.java:531)
>    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>    at java.lang.Thread.run(Thread.java:748)
>
>
>赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:
>
>> Recoverable writers on Hadoop are only supported for HDFS
>>
>> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>>
>> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

flink sql是否有json_to_map函数,或者实现方法是怎么样的?

Jeff
In reply to this post by nobleyd
hi all,


有没有什么办法可以将json转成map呢?类似于str_to_map函数。


版本:flink 1.11
planner: blink sql


需求背景: UDF函数通过http请求获得了json字符串,希望能够直接使用json内字段, UDTF可以满足一部分要求,但不灵活,因为定义UDTF时需要指定输出字段及类型,限制很大。



Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

nobleyd
In reply to this post by Michael Ran
@Michael Ran
然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。

Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道:

> 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public
> HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道:
> >具体报错信息如下:
> >
> >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
> >only supported for HDFS
> >    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
> >HadoopRecoverableWriter.java:61)
> >    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >.createRecoverableWriter(HadoopFileSystem.java:210)
> >    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >.java:260)
> >    at org.apache.flink.streaming.api.functions.sink.filesystem.
>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >.initializeState(AbstractUdfStreamOperator.java:96)
> >    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >.initializeState(AbstractStreamOperator.java:264)
> >    at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >    at org.apache.flink.streaming.runtime.tasks.StreamTask
> >.lambda$beforeInvoke$2(StreamTask.java:507)
> >    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >.runThrowing(StreamTaskActionExecutor.java:47)
> >    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >StreamTask.java:501)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >.java:531)
> >    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >    at java.lang.Thread.run(Thread.java:748)
> >
> >
> >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:
> >
> >> Recoverable writers on Hadoop are only supported for HDFS
> >>
> >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >>
> >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >>
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

张锴
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
DateTimeBucketer}

sink.setBucketer sink.setWriter用这种方式试试



赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道:

> @Michael Ran
> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>
> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道:
>
> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public
> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道:
> > >具体报错信息如下:
> > >
> > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are
> > >only supported for HDFS
> > >    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
> > >HadoopRecoverableWriter.java:61)
> > >    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> > >    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> > >.java:260)
> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >
> >
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> > >    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> > >    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> > >    at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> > >.initializeState(AbstractUdfStreamOperator.java:96)
> > >    at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> > >    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> > >.initializeState(AbstractStreamOperator.java:264)
> > >    at org.apache.flink.streaming.runtime.tasks.OperatorChain
> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask
> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> > >    at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> > >.runThrowing(StreamTaskActionExecutor.java:47)
> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> > >StreamTask.java:501)
> > >    at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> > >.java:531)
> > >    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> > >    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> > >    at java.lang.Thread.run(Thread.java:748)
> > >
> > >
> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:
> > >
> > >> Recoverable writers on Hadoop are only supported for HDFS
> > >>
> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> > >>
> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> > >>
> > >>
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

Michael Ran
很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法
在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道:

>import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
>DateTimeBucketer}
>
>sink.setBucketer sink.setWriter用这种方式试试
>
>
>
>赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道:
>
>> @Michael Ran
>> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>>
>> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道:
>>
>> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public
>> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
>> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道:
>> > >具体报错信息如下:
>> > >
>> > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
>> are
>> > >only supported for HDFS
>> > >    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
>> > >HadoopRecoverableWriter.java:61)
>> > >    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>> > >.createRecoverableWriter(HadoopFileSystem.java:210)
>> > >    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>> > >.java:260)
>> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
>> >
>> >
>> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
>> > >    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
>> > >    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
>> > >    at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> > >.initializeState(AbstractUdfStreamOperator.java:96)
>> > >    at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
>> > >    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> > >.initializeState(AbstractStreamOperator.java:264)
>> > >    at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
>> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask
>> > >.lambda$beforeInvoke$2(StreamTask.java:507)
>> > >    at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> > >.runThrowing(StreamTaskActionExecutor.java:47)
>> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> > >StreamTask.java:501)
>> > >    at
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> > >.java:531)
>> > >    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> > >    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> > >    at java.lang.Thread.run(Thread.java:748)
>> > >
>> > >
>> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:
>> > >
>> > >> Recoverable writers on Hadoop are only supported for HDFS
>> > >>
>> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>> > >>
>> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>> > >>
>> > >>
>> > >>
>> >
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

nobleyd
@Michael Ran; 嗯嗯,没关系。

@张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。

Michael Ran <[hidden email]> 于2021年1月21日周四 下午7:01写道:

>
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法
> 在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道:
> >import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> >DateTimeBucketer}
> >
> >sink.setBucketer sink.setWriter用这种方式试试
> >
> >
> >
> >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道:
> >
> >> @Michael Ran
> >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> >>
> >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道:
> >>
> >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public
> >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道:
> >> > >具体报错信息如下:
> >> > >
> >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop
> >> are
> >> > >only supported for HDFS
> >> > >    at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
> >> > >HadoopRecoverableWriter.java:61)
> >> > >    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> >> > >    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> >
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >> > >.java:260)
> >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> >
> >> >
> >>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >> > >    at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >> > >    at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >> > >    at
> >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> >> > >    at
> >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >> > >    at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >> > >.initializeState(AbstractStreamOperator.java:264)
> >> > >    at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask
> >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> >> > >    at
> >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> >> > >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >> > >StreamTask.java:501)
> >> > >    at
> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >> > >.java:531)
> >> > >    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >> > >    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >> > >    at java.lang.Thread.run(Thread.java:748)
> >> > >
> >> > >
> >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:
> >> > >
> >> > >> Recoverable writers on Hadoop are only supported for HDFS
> >> > >>
> >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >> > >>
> >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >> > >>
> >> > >>
> >> > >>
> >> >
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

张锴
我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的

赵一旦 <[hidden email]> 于2021年1月21日周四 下午7:05写道:

> @Michael Ran; 嗯嗯,没关系。
>
> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
>
> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
>
> Michael Ran <[hidden email]> 于2021年1月21日周四 下午7:01写道:
>
> >
> >
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法
> > 在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道:
> > >import
> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> > >DateTimeBucketer}
> > >
> > >sink.setBucketer sink.setWriter用这种方式试试
> > >
> > >
> > >
> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道:
> > >
> > >> @Michael Ran
> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> > >>
> > >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道:
> > >>
> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public
> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> > >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道:
> > >> > >具体报错信息如下:
> > >> > >
> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> > Hadoop
> > >> are
> > >> > >only supported for HDFS
> > >> > >    at
> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
> > >> > >HadoopRecoverableWriter.java:61)
> > >> > >    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> > >> > >    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >> >
> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> > >> > >.java:260)
> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >> >
> > >> >
> > >>
> >
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> > >> > >    at
> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> > >> > >    at
> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> > >> > >    at
> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> > >> > >    at
> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> > >> > >    at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
> > >> > >.initializeState(AbstractStreamOperator.java:264)
> > >> > >    at org.apache.flink.streaming.runtime.tasks.OperatorChain
> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> > >> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask
> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> > >> > >    at
> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> > >> > >    at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> > >> > >StreamTask.java:501)
> > >> > >    at
> > >> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> > >> > >.java:531)
> > >> > >    at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> > >> > >    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> > >> > >    at java.lang.Thread.run(Thread.java:748)
> > >> > >
> > >> > >
> > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:
> > >> > >
> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
> > >> > >>
> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> > >> > >>
> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> > >> > >>
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

张锴
@赵一旦
另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下

张锴 <[hidden email]> 于2021年1月21日周四 下午7:13写道:

> 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
>
> 赵一旦 <[hidden email]> 于2021年1月21日周四 下午7:05写道:
>
>> @Michael Ran; 嗯嗯,没关系。
>>
>> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
>>
>> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
>>
>> Michael Ran <[hidden email]> 于2021年1月21日周四 下午7:01写道:
>>
>> >
>> >
>> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法
>> > 在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道:
>> > >import
>> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
>> > >DateTimeBucketer}
>> > >
>> > >sink.setBucketer sink.setWriter用这种方式试试
>> > >
>> > >
>> > >
>> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道:
>> > >
>> > >> @Michael Ran
>> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>> > >>
>> > >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道:
>> > >>
>> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public
>> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
>> > >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道:
>> > >> > >具体报错信息如下:
>> > >> > >
>> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
>> > Hadoop
>> > >> are
>> > >> > >only supported for HDFS
>> > >> > >    at
>> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
>> > >> > >HadoopRecoverableWriter.java:61)
>> > >> > >    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
>> > >> > >    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >> >
>> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>> > >> > >.java:260)
>> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >> >
>> > >> >
>> > >>
>> >
>> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
>> > >> > >    at
>> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
>> > >> > >    at
>> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
>> > >> > >    at
>> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
>> > >> > >    at
>> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
>> > >> > >    at
>> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> > >> > >.initializeState(AbstractStreamOperator.java:264)
>> > >> > >    at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
>> > >> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask
>> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
>> > >> > >    at
>> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
>> > >> > >    at
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> > >> > >StreamTask.java:501)
>> > >> > >    at
>> > >> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> > >> > >.java:531)
>> > >> > >    at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> > >> > >    at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> > >> > >    at java.lang.Thread.run(Thread.java:748)
>> > >> > >
>> > >> > >
>> > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:
>> > >> > >
>> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
>> > >> > >>
>> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>> > >> > >>
>> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> >
>> > >>
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

nobleyd
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。

张锴 <[hidden email]> 于2021年1月21日周四 下午7:35写道:

> @赵一旦
> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>
> 张锴 <[hidden email]> 于2021年1月21日周四 下午7:13写道:
>
> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
> >
> > 赵一旦 <[hidden email]> 于2021年1月21日周四 下午7:05写道:
> >
> >> @Michael Ran; 嗯嗯,没关系。
> >>
> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
> >>
> >>
> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
> >>
> >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午7:01写道:
> >>
> >> >
> >> >
> >>
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法
> >> > 在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道:
> >> > >import
> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> >> > >DateTimeBucketer}
> >> > >
> >> > >sink.setBucketer sink.setWriter用这种方式试试
> >> > >
> >> > >
> >> > >
> >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道:
> >> > >
> >> > >> @Michael Ran
> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> >> > >>
> >> > >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道:
> >> > >>
> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public
> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> >> > >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道:
> >> > >> > >具体报错信息如下:
> >> > >> > >
> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> >> > Hadoop
> >> > >> are
> >> > >> > >only supported for HDFS
> >> > >> > >    at
> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
> >> > >> > >HadoopRecoverableWriter.java:61)
> >> > >> > >    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> >> > >> > >    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> >
> >> >
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >> > >> > >.java:260)
> >> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >> > >> > >    at
> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >> > >> > >    at
> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >> > >> > >    at
> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> >> > >> > >    at
> >> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >> > >> > >    at
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >> > >> > >.initializeState(AbstractStreamOperator.java:264)
> >> > >> > >    at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >> > >> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask
> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> >> > >> > >    at
> >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> >> > >> > >    at
> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >> > >> > >StreamTask.java:501)
> >> > >> > >    at
> >> > >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >> > >> > >.java:531)
> >> > >> > >    at
> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >> > >> > >    at
> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >> > >> > >    at java.lang.Thread.run(Thread.java:748)
> >> > >> > >
> >> > >> > >
> >> > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:
> >> > >> > >
> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
> >> > >> > >>
> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >> > >> > >>
> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >> > >> > >>
> >> > >> > >>
> >> > >> > >>
> >> > >> >
> >> > >>
> >> >
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

nobleyd
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。
如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。

MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];


赵一旦 <[hidden email]> 于2021年1月23日周六 下午1:42写道:

> 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。
>
> 张锴 <[hidden email]> 于2021年1月21日周四 下午7:35写道:
>
>> @赵一旦
>> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>>
>> 张锴 <[hidden email]> 于2021年1月21日周四 下午7:13写道:
>>
>> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
>> >
>> > 赵一旦 <[hidden email]> 于2021年1月21日周四 下午7:05写道:
>> >
>> >> @Michael Ran; 嗯嗯,没关系。
>> >>
>> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
>> >>
>> >>
>> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
>> >>
>> >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午7:01写道:
>> >>
>> >> >
>> >> >
>> >>
>> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法
>> >> > 在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道:
>> >> > >import
>> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
>> >> > >DateTimeBucketer}
>> >> > >
>> >> > >sink.setBucketer sink.setWriter用这种方式试试
>> >> > >
>> >> > >
>> >> > >
>> >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道:
>> >> > >
>> >> > >> @Michael Ran
>> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>> >> > >>
>> >> > >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道:
>> >> > >>
>> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public
>> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs)
>> {...}
>> >> > >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道:
>> >> > >> > >具体报错信息如下:
>> >> > >> > >
>> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
>> >> > Hadoop
>> >> > >> are
>> >> > >> > >only supported for HDFS
>> >> > >> > >    at
>> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
>> >> > >> > >HadoopRecoverableWriter.java:61)
>> >> > >> > >    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
>> >> > >> > >    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>> >> > >> > >    at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> >
>> >> >
>> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>> >> > >> > >.java:260)
>> >> > >> > >    at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> >
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>> >> > >> > >    at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
>> >> > >> > >    at
>> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
>> >> > >> > >    at
>> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
>> >> > >> > >    at
>> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
>> >> > >> > >    at
>> >> > >>
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
>> >> > >> > >    at
>> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> >> > >> > >.initializeState(AbstractStreamOperator.java:264)
>> >> > >> > >    at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
>> >> > >> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask
>> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
>> >> > >> > >    at
>> >> > >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
>> >> > >> > >    at
>> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> >> > >> > >StreamTask.java:501)
>> >> > >> > >    at
>> >> > >> >
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> >> > >> > >.java:531)
>> >> > >> > >    at
>> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> >> > >> > >    at
>> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> >> > >> > >    at java.lang.Thread.run(Thread.java:748)
>> >> > >> > >
>> >> > >> > >
>> >> > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道:
>> >> > >> > >
>> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
>> >> > >> > >>
>> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>> >> > >> > >>
>> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>> >> > >> > >>
>> >> > >> > >>
>> >> > >> > >>
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> >
>>
>