Recoverable writers on Hadoop are only supported for HDFS
如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 |
具体报错信息如下:
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( HadoopRecoverableWriter.java:61) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem .createRecoverableWriter(HadoopFileSystem.java:210) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem .createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) at org.apache.flink.streaming.api.functions.sink.filesystem. StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink .java:260) at org.apache.flink.streaming.api.functions.sink.filesystem. StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) at org.apache.flink.streaming.api.functions.sink.filesystem. StreamingFileSink.initializeState(StreamingFileSink.java:412) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils .tryRestoreFunction(StreamingFunctionUtils.java:185) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils .restoreFunctionState(StreamingFunctionUtils.java:167) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator .initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler .initializeOperatorState(StreamOperatorStateHandler.java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .initializeState(AbstractStreamOperator.java:264) at org.apache.flink.streaming.runtime.tasks.OperatorChain .initializeStateAndOpenOperators(OperatorChain.java:400) at org.apache.flink.streaming.runtime.tasks.StreamTask .lambda$beforeInvoke$2(StreamTask.java:507) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 .runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( StreamTask.java:501) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:531) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.lang.Thread.run(Thread.java:748) 赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: > Recoverable writers on Hadoop are only supported for HDFS > > 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > > 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > > > |
除此以外,FlinkSQL读现有的hive数据仓库也是失败。配置okhive的catalog,表信息都能出来,但select操作就是失败。
赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:18写道: > 具体报错信息如下: > > java.lang.UnsupportedOperationException: Recoverable writers on Hadoop > are only supported for HDFS > at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( > HadoopRecoverableWriter.java:61) > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > .createRecoverableWriter(HadoopFileSystem.java:210) > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > .createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > at org.apache.flink.streaming.api.functions.sink.filesystem. > StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > .java:260) > at org.apache.flink.streaming.api.functions.sink.filesystem. > StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java: > 270) > at org.apache.flink.streaming.api.functions.sink.filesystem. > StreamingFileSink.initializeState(StreamingFileSink.java:412) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > .tryRestoreFunction(StreamingFunctionUtils.java:185) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > .restoreFunctionState(StreamingFunctionUtils.java:167) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > .initializeState(AbstractUdfStreamOperator.java:96) > at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > .initializeOperatorState(StreamOperatorStateHandler.java:107) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .initializeState(AbstractStreamOperator.java:264) > at org.apache.flink.streaming.runtime.tasks.OperatorChain > .initializeStateAndOpenOperators(OperatorChain.java:400) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$beforeInvoke$2(StreamTask.java:507) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .runThrowing(StreamTaskActionExecutor.java:47) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > StreamTask.java:501) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:531) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > at java.lang.Thread.run(Thread.java:748) > > > 赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: > >> Recoverable writers on Hadoop are only supported for HDFS >> >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> >> >> |
In reply to this post by nobleyd
这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道: >具体报错信息如下: > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are >only supported for HDFS > at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( >HadoopRecoverableWriter.java:61) > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem >.createRecoverableWriter(HadoopFileSystem.java:210) > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > at org.apache.flink.streaming.api.functions.sink.filesystem. >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink >.java:260) > at org.apache.flink.streaming.api.functions.sink.filesystem. >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > at org.apache.flink.streaming.api.functions.sink.filesystem. >StreamingFileSink.initializeState(StreamingFileSink.java:412) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils >.tryRestoreFunction(StreamingFunctionUtils.java:185) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils >.restoreFunctionState(StreamingFunctionUtils.java:167) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator >.initializeState(AbstractUdfStreamOperator.java:96) > at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler >.initializeOperatorState(StreamOperatorStateHandler.java:107) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator >.initializeState(AbstractStreamOperator.java:264) > at org.apache.flink.streaming.runtime.tasks.OperatorChain >.initializeStateAndOpenOperators(OperatorChain.java:400) > at org.apache.flink.streaming.runtime.tasks.StreamTask >.lambda$beforeInvoke$2(StreamTask.java:507) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >.runThrowing(StreamTaskActionExecutor.java:47) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( >StreamTask.java:501) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >.java:531) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > at java.lang.Thread.run(Thread.java:748) > > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: > >> Recoverable writers on Hadoop are only supported for HDFS >> >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> >> >> |
In reply to this post by nobleyd
hi all,
有没有什么办法可以将json转成map呢?类似于str_to_map函数。 版本:flink 1.11 planner: blink sql 需求背景: UDF函数通过http请求获得了json字符串,希望能够直接使用json内字段, UDTF可以满足一部分要求,但不灵活,因为定义UDTF时需要指定输出字段及类型,限制很大。 |
In reply to this post by Michael Ran
@Michael Ran
然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道: > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道: > >具体报错信息如下: > > > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are > >only supported for HDFS > > at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( > >HadoopRecoverableWriter.java:61) > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > >.createRecoverableWriter(HadoopFileSystem.java:210) > > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > > at org.apache.flink.streaming.api.functions.sink.filesystem. > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > >.java:260) > > at org.apache.flink.streaming.api.functions.sink.filesystem. > > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > > at org.apache.flink.streaming.api.functions.sink.filesystem. > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >.restoreFunctionState(StreamingFunctionUtils.java:167) > > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > >.initializeState(AbstractUdfStreamOperator.java:96) > > at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > >.initializeState(AbstractStreamOperator.java:264) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain > >.initializeStateAndOpenOperators(OperatorChain.java:400) > > at org.apache.flink.streaming.runtime.tasks.StreamTask > >.lambda$beforeInvoke$2(StreamTask.java:507) > > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > >.runThrowing(StreamTaskActionExecutor.java:47) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > >StreamTask.java:501) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > >.java:531) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > > at java.lang.Thread.run(Thread.java:748) > > > > > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: > > > >> Recoverable writers on Hadoop are only supported for HDFS > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > >> > >> > >> > |
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
DateTimeBucketer} sink.setBucketer sink.setWriter用这种方式试试 赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道: > @Michael Ran > 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > > Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道: > > > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public > > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道: > > >具体报错信息如下: > > > > > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop > are > > >only supported for HDFS > > > at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( > > >HadoopRecoverableWriter.java:61) > > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > > >.createRecoverableWriter(HadoopFileSystem.java:210) > > > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > > > at org.apache.flink.streaming.api.functions.sink.filesystem. > > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > > >.java:260) > > > at org.apache.flink.streaming.api.functions.sink.filesystem. > > > > > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > > > at org.apache.flink.streaming.api.functions.sink.filesystem. > > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > > > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > > > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > > >.restoreFunctionState(StreamingFunctionUtils.java:167) > > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > > >.initializeState(AbstractUdfStreamOperator.java:96) > > > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > > >.initializeState(AbstractStreamOperator.java:264) > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain > > >.initializeStateAndOpenOperators(OperatorChain.java:400) > > > at org.apache.flink.streaming.runtime.tasks.StreamTask > > >.lambda$beforeInvoke$2(StreamTask.java:507) > > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > > >.runThrowing(StreamTaskActionExecutor.java:47) > > > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > > >StreamTask.java:501) > > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > > >.java:531) > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > > > at java.lang.Thread.run(Thread.java:748) > > > > > > > > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: > > > > > >> Recoverable writers on Hadoop are only supported for HDFS > > >> > > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > > >> > > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > > >> > > >> > > >> > > > |
很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法
在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道: >import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, >DateTimeBucketer} > >sink.setBucketer sink.setWriter用这种方式试试 > > > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道: > >> @Michael Ran >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 >> >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道: >> >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道: >> > >具体报错信息如下: >> > > >> > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop >> are >> > >only supported for HDFS >> > > at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( >> > >HadoopRecoverableWriter.java:61) >> > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem >> > >.createRecoverableWriter(HadoopFileSystem.java:210) >> > > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink >> > >.java:260) >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >> > >> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) >> > > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) >> > > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) >> > > at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator >> > >.initializeState(AbstractUdfStreamOperator.java:96) >> > > at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) >> > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator >> > >.initializeState(AbstractStreamOperator.java:264) >> > > at org.apache.flink.streaming.runtime.tasks.OperatorChain >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) >> > > at org.apache.flink.streaming.runtime.tasks.StreamTask >> > >.lambda$beforeInvoke$2(StreamTask.java:507) >> > > at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >> > >.runThrowing(StreamTaskActionExecutor.java:47) >> > > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( >> > >StreamTask.java:501) >> > > at >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >> > >.java:531) >> > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> > > at java.lang.Thread.run(Thread.java:748) >> > > >> > > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: >> > > >> > >> Recoverable writers on Hadoop are only supported for HDFS >> > >> >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> > >> >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> > >> >> > >> >> > >> >> > >> |
@Michael Ran; 嗯嗯,没关系。
@张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 Michael Ran <[hidden email]> 于2021年1月21日周四 下午7:01写道: > > 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法 > 在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道: > >import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, > >DateTimeBucketer} > > > >sink.setBucketer sink.setWriter用这种方式试试 > > > > > > > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道: > > > >> @Michael Ran > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > >> > >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道: > >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道: > >> > >具体报错信息如下: > >> > > > >> > >java.lang.UnsupportedOperationException: Recoverable writers on > Hadoop > >> are > >> > >only supported for HDFS > >> > > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( > >> > >HadoopRecoverableWriter.java:61) > >> > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) > >> > > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > >> > >.java:260) > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > > >> > > >> > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > >> > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > >> > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) > >> > > at > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > >> > >.initializeState(AbstractUdfStreamOperator.java:96) > >> > > at > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > >> > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator > >> > >.initializeState(AbstractStreamOperator.java:264) > >> > > at org.apache.flink.streaming.runtime.tasks.OperatorChain > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) > >> > > at org.apache.flink.streaming.runtime.tasks.StreamTask > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) > >> > > at > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > >> > >.runThrowing(StreamTaskActionExecutor.java:47) > >> > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > >> > >StreamTask.java:501) > >> > > at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > >> > >.java:531) > >> > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > >> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > >> > > at java.lang.Thread.run(Thread.java:748) > >> > > > >> > > > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: > >> > > > >> > >> Recoverable writers on Hadoop are only supported for HDFS > >> > >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > >> > >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > >> > >> > >> > >> > >> > >> > >> > > >> > |
我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
赵一旦 <[hidden email]> 于2021年1月21日周四 下午7:05写道: > @Michael Ran; 嗯嗯,没关系。 > > @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 > > 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 > > Michael Ran <[hidden email]> 于2021年1月21日周四 下午7:01写道: > > > > > > 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法 > > 在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道: > > >import > org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, > > >DateTimeBucketer} > > > > > >sink.setBucketer sink.setWriter用这种方式试试 > > > > > > > > > > > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道: > > > > > >> @Michael Ran > > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > > >> > > >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道: > > >> > > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public > > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > > >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道: > > >> > >具体报错信息如下: > > >> > > > > >> > >java.lang.UnsupportedOperationException: Recoverable writers on > > Hadoop > > >> are > > >> > >only supported for HDFS > > >> > > at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( > > >> > >HadoopRecoverableWriter.java:61) > > >> > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) > > >> > > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. > > >> > > > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > > >> > >.java:260) > > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. > > >> > > > >> > > > >> > > > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. > > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > > >> > > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > > >> > > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) > > >> > > at > > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > > >> > >.initializeState(AbstractUdfStreamOperator.java:96) > > >> > > at > > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > > >> > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator > > >> > >.initializeState(AbstractStreamOperator.java:264) > > >> > > at org.apache.flink.streaming.runtime.tasks.OperatorChain > > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) > > >> > > at org.apache.flink.streaming.runtime.tasks.StreamTask > > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) > > >> > > at > > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > > >> > >.runThrowing(StreamTaskActionExecutor.java:47) > > >> > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > > >> > >StreamTask.java:501) > > >> > > at > > >> > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > > >> > >.java:531) > > >> > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > > >> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > > >> > > at java.lang.Thread.run(Thread.java:748) > > >> > > > > >> > > > > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: > > >> > > > > >> > >> Recoverable writers on Hadoop are only supported for HDFS > > >> > >> > > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > > >> > >> > > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > > >> > >> > > >> > >> > > >> > >> > > >> > > > >> > > > |
@赵一旦
另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 张锴 <[hidden email]> 于2021年1月21日周四 下午7:13写道: > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 > > 赵一旦 <[hidden email]> 于2021年1月21日周四 下午7:05写道: > >> @Michael Ran; 嗯嗯,没关系。 >> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 >> >> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 >> >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午7:01写道: >> >> > >> > >> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法 >> > 在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道: >> > >import >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, >> > >DateTimeBucketer} >> > > >> > >sink.setBucketer sink.setWriter用这种方式试试 >> > > >> > > >> > > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道: >> > > >> > >> @Michael Ran >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 >> > >> >> > >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道: >> > >> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} >> > >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道: >> > >> > >具体报错信息如下: >> > >> > > >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on >> > Hadoop >> > >> are >> > >> > >only supported for HDFS >> > >> > > at >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( >> > >> > >HadoopRecoverableWriter.java:61) >> > >> > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) >> > >> > > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) >> > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >> > >> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink >> > >> > >.java:260) >> > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >> > >> > >> > >> > >> >> > >> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) >> > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) >> > >> > > at >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) >> > >> > > at >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) >> > >> > > at >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96) >> > >> > > at >> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) >> > >> > > at >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator >> > >> > >.initializeState(AbstractStreamOperator.java:264) >> > >> > > at org.apache.flink.streaming.runtime.tasks.OperatorChain >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) >> > >> > > at org.apache.flink.streaming.runtime.tasks.StreamTask >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) >> > >> > > at >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47) >> > >> > > at >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( >> > >> > >StreamTask.java:501) >> > >> > > at >> > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >> > >> > >.java:531) >> > >> > > at >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> > >> > > at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> > >> > > at java.lang.Thread.run(Thread.java:748) >> > >> > > >> > >> > > >> > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: >> > >> > > >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS >> > >> > >> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> > >> > >> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> > >> > >> >> > >> > >> >> > >> > >> >> > >> > >> > >> >> > >> > |
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。
张锴 <[hidden email]> 于2021年1月21日周四 下午7:35写道: > @赵一旦 > 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 > > 张锴 <[hidden email]> 于2021年1月21日周四 下午7:13写道: > > > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 > > > > 赵一旦 <[hidden email]> 于2021年1月21日周四 下午7:05写道: > > > >> @Michael Ran; 嗯嗯,没关系。 > >> > >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 > >> > >> > 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 > >> > >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午7:01写道: > >> > >> > > >> > > >> > 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法 > >> > 在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道: > >> > >import > >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, > >> > >DateTimeBucketer} > >> > > > >> > >sink.setBucketer sink.setWriter用这种方式试试 > >> > > > >> > > > >> > > > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道: > >> > > > >> > >> @Michael Ran > >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > >> > >> > >> > >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道: > >> > >> > >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public > >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > >> > >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道: > >> > >> > >具体报错信息如下: > >> > >> > > > >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on > >> > Hadoop > >> > >> are > >> > >> > >only supported for HDFS > >> > >> > > at > >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( > >> > >> > >HadoopRecoverableWriter.java:61) > >> > >> > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) > >> > >> > > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > >> > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >> > > >> > > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > >> > >> > >.java:260) > >> > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >> > > >> > >> > > >> > >> > >> > > >> > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > >> > >> > > at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > >> > >> > > at > >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > >> > >> > > at > >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) > >> > >> > > at > >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96) > >> > >> > > at > >> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > >> > >> > > at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator > >> > >> > >.initializeState(AbstractStreamOperator.java:264) > >> > >> > > at org.apache.flink.streaming.runtime.tasks.OperatorChain > >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) > >> > >> > > at org.apache.flink.streaming.runtime.tasks.StreamTask > >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) > >> > >> > > at > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47) > >> > >> > > at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > >> > >> > >StreamTask.java:501) > >> > >> > > at > >> > >> > > >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > >> > >> > >.java:531) > >> > >> > > at > >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > >> > >> > > at > >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > >> > >> > > at java.lang.Thread.run(Thread.java:748) > >> > >> > > > >> > >> > > > >> > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: > >> > >> > > > >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS > >> > >> > >> > >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > >> > >> > >> > >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > > >> > >> > >> > > >> > > > |
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。
如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。 MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2]; 赵一旦 <[hidden email]> 于2021年1月23日周六 下午1:42写道: > 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。 > > 张锴 <[hidden email]> 于2021年1月21日周四 下午7:35写道: > >> @赵一旦 >> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 >> >> 张锴 <[hidden email]> 于2021年1月21日周四 下午7:13写道: >> >> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 >> > >> > 赵一旦 <[hidden email]> 于2021年1月21日周四 下午7:05写道: >> > >> >> @Michael Ran; 嗯嗯,没关系。 >> >> >> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 >> >> >> >> >> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 >> >> >> >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午7:01写道: >> >> >> >> > >> >> > >> >> >> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法 >> >> > 在 2021-01-21 18:45:06,"张锴" <[hidden email]> 写道: >> >> > >import >> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, >> >> > >DateTimeBucketer} >> >> > > >> >> > >sink.setBucketer sink.setWriter用这种方式试试 >> >> > > >> >> > > >> >> > > >> >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午6:37写道: >> >> > > >> >> > >> @Michael Ran >> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 >> >> > >> >> >> > >> Michael Ran <[hidden email]> 于2021年1月21日周四 下午5:23写道: >> >> > >> >> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public >> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) >> {...} >> >> > >> > 在 2021-01-21 17:18:23,"赵一旦" <[hidden email]> 写道: >> >> > >> > >具体报错信息如下: >> >> > >> > > >> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on >> >> > Hadoop >> >> > >> are >> >> > >> > >only supported for HDFS >> >> > >> > > at >> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( >> >> > >> > >HadoopRecoverableWriter.java:61) >> >> > >> > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem >> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) >> >> > >> > > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem >> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) >> >> > >> > > at >> org.apache.flink.streaming.api.functions.sink.filesystem. >> >> > >> > >> >> > >> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink >> >> > >> > >.java:260) >> >> > >> > > at >> org.apache.flink.streaming.api.functions.sink.filesystem. >> >> > >> > >> >> > >> > >> >> > >> >> >> > >> >> >> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) >> >> > >> > > at >> org.apache.flink.streaming.api.functions.sink.filesystem. >> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) >> >> > >> > > at >> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) >> >> > >> > > at >> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) >> >> > >> > > at >> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator >> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96) >> >> > >> > > at >> >> > >> >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler >> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) >> >> > >> > > at >> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator >> >> > >> > >.initializeState(AbstractStreamOperator.java:264) >> >> > >> > > at org.apache.flink.streaming.runtime.tasks.OperatorChain >> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) >> >> > >> > > at org.apache.flink.streaming.runtime.tasks.StreamTask >> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) >> >> > >> > > at >> >> > >> >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47) >> >> > >> > > at >> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( >> >> > >> > >StreamTask.java:501) >> >> > >> > > at >> >> > >> > >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >> >> > >> > >.java:531) >> >> > >> > > at >> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> >> > >> > > at >> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> >> > >> > > at java.lang.Thread.run(Thread.java:748) >> >> > >> > > >> >> > >> > > >> >> > >> > >赵一旦 <[hidden email]> 于2021年1月21日周四 下午5:17写道: >> >> > >> > > >> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS >> >> > >> > >> >> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> >> > >> > >> >> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> >> > >> > >> >> >> > >> > >> >> >> > >> > >> >> >> > >> > >> >> > >> >> >> > >> >> >> > >> > |
Free forum by Nabble | Edit this page |