flink如何正则读取hdfs下的文件

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

flink如何正则读取hdfs下的文件

阿华田
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件


| |
王志华
|
|
[hidden email]
|
签名由网易邮箱大师定制

Reply | Threaded
Open this post in threaded view
|

回复:flink如何正则读取hdfs下的文件

jimandlice
flink 写入hive 使用api 思路是怎么的呢


| |
jimandlice
|
|
邮箱:[hidden email]
|

Signature is customized by Netease Mail Master

在2020年05月21日 10:57,阿华田 写道:
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件


| |
王志华
|
|
[hidden email]
|
签名由网易邮箱大师定制

Reply | Threaded
Open this post in threaded view
|

Re: flink如何正则读取hdfs下的文件

Jingsong Li
Hi,

志华,
如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。

jimandlice,
-
如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
- 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html

Best,
Jingsong Lee

On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote:

> flink 写入hive 使用api 思路是怎么的呢
>
>
> | |
> jimandlice
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年05月21日 10:57,阿华田 写道:
> flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
>
>
> | |
> 王志华
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

回复: flink如何正则读取hdfs下的文件

阿华田
public static void main(String[] args) throws Exception {
//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Job job = Job.getInstance();
//自定义input读取hdfs
HadoopInputFormat<LongWritable, Text> hadoopIF = new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job);
//过滤需要读取的子目录
ArrayList<Path> inputhPaths = HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20");
TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new Path[inputhPaths.size()]));
//自定义input的方式读取hdfs
DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF);
source.output(new HdfsTrainSinktest());
env.execute("offline_train");
}
通过这种方式本地可以读取,提交到yarn上会报如下错误


Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at createInput(ExecutionEnvironment.java:552) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading the input/output formats failed: org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
        at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
        at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
        at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
        ... 10 more
Caused by: java.lang.Exception: Loading the input/output formats failed: org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
        at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
        at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
        ... 20 more
Caused by: java.lang.RuntimeException: Deserializing the input/output formats failed: unread block data
        at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
        at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
        ... 22 more
Caused by: java.lang.IllegalStateException: unread block data
        at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
        at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
        at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
        ... 23 more


End of exception on server side>]
        at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
        ... 4 more


| |
王志华
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道:
Hi,

志华,
如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。

jimandlice,
-
如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
- 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html

Best,
Jingsong Lee

On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote:

flink 写入hive 使用api 思路是怎么的呢


| |
jimandlice
|
|
邮箱:[hidden email]
|

Signature is customized by Netease Mail Master

在2020年05月21日 10:57,阿华田 写道:
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件


| |
王志华
|
|
[hidden email]
|
签名由网易邮箱大师定制



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink如何正则读取hdfs下的文件

Jingsong Li
看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars?

Best,
Jingsong Lee

On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote:

> public static void main(String[] args) throws Exception {
> //初始化任务参数
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> Job job = Job.getInstance();
> //自定义input读取hdfs
> HadoopInputFormat<LongWritable, Text> hadoopIF = new
> HadoopInputFormat<LongWritable, Text>(
> new TextInputFormat(), LongWritable.class, Text.class, job);
> //过滤需要读取的子目录
> ArrayList<Path> inputhPaths =
> HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20");
> TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new
> Path[inputhPaths.size()]));
> //自定义input的方式读取hdfs
> DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF);
> source.output(new HdfsTrainSinktest());
> env.execute("offline_train");
> }
> 通过这种方式本地可以读取,提交到yarn上会报如下错误
>
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'DataSource (at createInput(ExecutionEnvironment.java:552)
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading
> the input/output formats failed:
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
>         at
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
>         at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>         at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>         at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
>         ... 10 more
> Caused by: java.lang.Exception: Loading the input/output formats failed:
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
>         ... 20 more
> Caused by: java.lang.RuntimeException: Deserializing the input/output
> formats failed: unread block data
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
>         ... 22 more
> Caused by: java.lang.IllegalStateException: unread block data
>         at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>         at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>         at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
>         ... 23 more
>
>
> End of exception on server side>]
>         at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>         at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>         at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 4 more
>
>
> | |
> 王志华
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道:
> Hi,
>
> 志华,
> 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
> 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
> 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。
>
> jimandlice,
> -
>
> 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
> - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html
>
> Best,
> Jingsong Lee
>
> On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote:
>
> flink 写入hive 使用api 思路是怎么的呢
>
>
> | |
> jimandlice
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年05月21日 10:57,阿华田 写道:
> flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
>
>
> | |
> 王志华
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
>
>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

回复:flink如何正则读取hdfs下的文件

jimandlice
写入之后 还需要用脚本倒数据入hive么




| |
jimandlice
|
|
邮箱:[hidden email]
|

Signature is customized by Netease Mail Master

在2020年05月21日 15:02,Jingsong Li 写道:
看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars?

Best,
Jingsong Lee

On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote:

> public static void main(String[] args) throws Exception {
> //初始化任务参数
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> Job job = Job.getInstance();
> //自定义input读取hdfs
> HadoopInputFormat<LongWritable, Text> hadoopIF = new
> HadoopInputFormat<LongWritable, Text>(
> new TextInputFormat(), LongWritable.class, Text.class, job);
> //过滤需要读取的子目录
> ArrayList<Path> inputhPaths =
> HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20");
> TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new
> Path[inputhPaths.size()]));
> //自定义input的方式读取hdfs
> DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF);
> source.output(new HdfsTrainSinktest());
> env.execute("offline_train");
> }
> 通过这种方式本地可以读取,提交到yarn上会报如下错误
>
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'DataSource (at createInput(ExecutionEnvironment.java:552)
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading
> the input/output formats failed:
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
>         at
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
>         at
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
>         at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>         at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>         at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
>         ... 10 more
> Caused by: java.lang.Exception: Loading the input/output formats failed:
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
>         ... 20 more
> Caused by: java.lang.RuntimeException: Deserializing the input/output
> formats failed: unread block data
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
>         ... 22 more
> Caused by: java.lang.IllegalStateException: unread block data
>         at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>         at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>         at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>         at
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
>         ... 23 more
>
>
> End of exception on server side>]
>         at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>         at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>         at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 4 more
>
>
> | |
> 王志华
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道:
> Hi,
>
> 志华,
> 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
> 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
> 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。
>
> jimandlice,
> -
>
> 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
> - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html
>
> Best,
> Jingsong Lee
>
> On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote:
>
> flink 写入hive 使用api 思路是怎么的呢
>
>
> | |
> jimandlice
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年05月21日 10:57,阿华田 写道:
> flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
>
>
> | |
> 王志华
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
>
>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink如何正则读取hdfs下的文件

Jingsong Li
> 写入之后 还需要用脚本倒数据入hive么
- 用Datastream来写,需要
- 1.11的Table层来写,配置下就自动add partition到hive metastore了

Best,
Jingsong Lee

On Thu, May 21, 2020 at 7:11 PM jimandlice <[hidden email]> wrote:

> 写入之后 还需要用脚本倒数据入hive么
>
>
>
>
> | |
> jimandlice
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年05月21日 15:02,Jingsong Li 写道:
> 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars?
>
> Best,
> Jingsong Lee
>
> On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote:
>
> > public static void main(String[] args) throws Exception {
> > //初始化任务参数
> > ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> > Job job = Job.getInstance();
> > //自定义input读取hdfs
> > HadoopInputFormat<LongWritable, Text> hadoopIF = new
> > HadoopInputFormat<LongWritable, Text>(
> > new TextInputFormat(), LongWritable.class, Text.class, job);
> > //过滤需要读取的子目录
> > ArrayList<Path> inputhPaths =
> >
> HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20");
> > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new
> > Path[inputhPaths.size()]));
> > //自定义input的方式读取hdfs
> > DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF);
> > source.output(new HdfsTrainSinktest());
> > env.execute("offline_train");
> > }
> > 通过这种方式本地可以读取,提交到yarn上会报如下错误
> >
> >
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> > initialize task 'DataSource (at
> createInput(ExecutionEnvironment.java:552)
> > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading
> > the input/output formats failed:
> > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
> >         at
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
> >         at
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
> >         at
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
> >         at
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
> >         at
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
> >         at
> >
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
> >         at
> >
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
> >         at
> > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
> >         at
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> >         at
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> >         at
> >
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
> >         ... 10 more
> > Caused by: java.lang.Exception: Loading the input/output formats failed:
> > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
> >         at
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
> >         at
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
> >         at
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
> >         ... 20 more
> > Caused by: java.lang.RuntimeException: Deserializing the input/output
> > formats failed: unread block data
> >         at
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
> >         at
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
> >         ... 22 more
> > Caused by: java.lang.IllegalStateException: unread block data
> >         at
> >
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
> >         at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
> >         at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> >         at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> >         at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> >         at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> >         at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> >         at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> >         at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> >         at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> >         at
> >
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> >         at
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> >         at
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
> >         ... 23 more
> >
> >
> > End of exception on server side>]
> >         at
> >
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
> >         at
> >
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
> >         at
> >
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> >         at
> >
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >         ... 4 more
> >
> >
> > | |
> > 王志华
> > |
> > |
> > [hidden email]
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道:
> > Hi,
> >
> > 志华,
> > 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
> > 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
> > 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。
> >
> > jimandlice,
> > -
> >
> >
> 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
> > - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote:
> >
> > flink 写入hive 使用api 思路是怎么的呢
> >
> >
> > | |
> > jimandlice
> > |
> > |
> > 邮箱:[hidden email]
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年05月21日 10:57,阿华田 写道:
> > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
> >
> >
> > | |
> > 王志华
> > |
> > |
> > [hidden email]
> > |
> > 签名由网易邮箱大师定制
> >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

回复:flink如何正则读取hdfs下的文件

jimandlice
1.11的话 能提供一个demo么




| |
jimandlice
|
|
邮箱:[hidden email]
|

Signature is customized by Netease Mail Master

在2020年05月21日 19:31,Jingsong Li 写道:
> 写入之后 还需要用脚本倒数据入hive么
- 用Datastream来写,需要
- 1.11的Table层来写,配置下就自动add partition到hive metastore了

Best,
Jingsong Lee

On Thu, May 21, 2020 at 7:11 PM jimandlice <[hidden email]> wrote:

> 写入之后 还需要用脚本倒数据入hive么
>
>
>
>
> | |
> jimandlice
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年05月21日 15:02,Jingsong Li 写道:
> 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars?
>
> Best,
> Jingsong Lee
>
> On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote:
>
> > public static void main(String[] args) throws Exception {
> > //初始化任务参数
> > ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> > Job job = Job.getInstance();
> > //自定义input读取hdfs
> > HadoopInputFormat<LongWritable, Text> hadoopIF = new
> > HadoopInputFormat<LongWritable, Text>(
> > new TextInputFormat(), LongWritable.class, Text.class, job);
> > //过滤需要读取的子目录
> > ArrayList<Path> inputhPaths =
> >
> HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20");
> > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new
> > Path[inputhPaths.size()]));
> > //自定义input的方式读取hdfs
> > DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF);
> > source.output(new HdfsTrainSinktest());
> > env.execute("offline_train");
> > }
> > 通过这种方式本地可以读取,提交到yarn上会报如下错误
> >
> >
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> > initialize task 'DataSource (at
> createInput(ExecutionEnvironment.java:552)
> > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading
> > the input/output formats failed:
> > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
> >         at
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
> >         at
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
> >         at
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
> >         at
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
> >         at
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
> >         at
> >
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
> >         at
> >
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
> >         at
> > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
> >         at
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> >         at
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> >         at
> >
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
> >         ... 10 more
> > Caused by: java.lang.Exception: Loading the input/output formats failed:
> > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
> >         at
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
> >         at
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
> >         at
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
> >         ... 20 more
> > Caused by: java.lang.RuntimeException: Deserializing the input/output
> > formats failed: unread block data
> >         at
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
> >         at
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
> >         ... 22 more
> > Caused by: java.lang.IllegalStateException: unread block data
> >         at
> >
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
> >         at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
> >         at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> >         at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> >         at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> >         at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> >         at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> >         at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> >         at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> >         at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> >         at
> >
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> >         at
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> >         at
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
> >         ... 23 more
> >
> >
> > End of exception on server side>]
> >         at
> >
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
> >         at
> >
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
> >         at
> >
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> >         at
> >
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >         ... 4 more
> >
> >
> > | |
> > 王志华
> > |
> > |
> > [hidden email]
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道:
> > Hi,
> >
> > 志华,
> > 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
> > 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
> > 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。
> >
> > jimandlice,
> > -
> >
> >
> 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
> > - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote:
> >
> > flink 写入hive 使用api 思路是怎么的呢
> >
> >
> > | |
> > jimandlice
> > |
> > |
> > 邮箱:[hidden email]
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年05月21日 10:57,阿华田 写道:
> > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
> >
> >
> > | |
> > 王志华
> > |
> > |
> > [hidden email]
> > |
> > 签名由网易邮箱大师定制
> >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink如何正则读取hdfs下的文件

Jingsong Li
1.11还没发布,文档还在编写中

Best,
Jingsong Lee

On Thu, May 21, 2020 at 7:33 PM jimandlice <[hidden email]> wrote:

> 1.11的话 能提供一个demo么
>
>
>
>
> | |
> jimandlice
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年05月21日 19:31,Jingsong Li 写道:
> > 写入之后 还需要用脚本倒数据入hive么
> - 用Datastream来写,需要
> - 1.11的Table层来写,配置下就自动add partition到hive metastore了
>
> Best,
> Jingsong Lee
>
> On Thu, May 21, 2020 at 7:11 PM jimandlice <[hidden email]> wrote:
>
> > 写入之后 还需要用脚本倒数据入hive么
> >
> >
> >
> >
> > | |
> > jimandlice
> > |
> > |
> > 邮箱:[hidden email]
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年05月21日 15:02,Jingsong Li 写道:
> > 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars?
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote:
> >
> > > public static void main(String[] args) throws Exception {
> > > //初始化任务参数
> > > ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> > > Job job = Job.getInstance();
> > > //自定义input读取hdfs
> > > HadoopInputFormat<LongWritable, Text> hadoopIF = new
> > > HadoopInputFormat<LongWritable, Text>(
> > > new TextInputFormat(), LongWritable.class, Text.class, job);
> > > //过滤需要读取的子目录
> > > ArrayList<Path> inputhPaths =
> > >
> >
> HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20");
> > > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new
> > > Path[inputhPaths.size()]));
> > > //自定义input的方式读取hdfs
> > > DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF);
> > > source.output(new HdfsTrainSinktest());
> > > env.execute("offline_train");
> > > }
> > > 通过这种方式本地可以读取,提交到yarn上会报如下错误
> > >
> > >
> > > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Cannot
> > > initialize task 'DataSource (at
> > createInput(ExecutionEnvironment.java:552)
> > > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))':
> Loading
> > > the input/output formats failed:
> > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
> > >         at
> > >
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
> > >         at
> > >
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
> > >         at
> > >
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
> > >         at
> > >
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
> > >         at
> > >
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
> > >         at
> > >
> >
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
> > >         at
> > > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
> > >         ... 10 more
> > > Caused by: java.lang.Exception: Loading the input/output formats
> failed:
> > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
> > >         at
> > >
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
> > >         at
> > >
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
> > >         ... 20 more
> > > Caused by: java.lang.RuntimeException: Deserializing the input/output
> > > formats failed: unread block data
> > >         at
> > >
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
> > >         ... 22 more
> > > Caused by: java.lang.IllegalStateException: unread block data
> > >         at
> > >
> >
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
> > >         at
> > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
> > >         at
> > >
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> > >         at
> > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> > >         at
> > >
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> > >         at
> > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> > >         at
> > java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> > >         at
> > >
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> > >         at
> > >
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> > >         at
> > >
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> > >         at
> > >
> >
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> > >         at
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
> > >         ... 23 more
> > >
> > >
> > > End of exception on server side>]
> > >         at
> > >
> >
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
> > >         at
> > >
> >
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
> > >         at
> > >
> >
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> > >         at
> > >
> >
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> > >         ... 4 more
> > >
> > >
> > > | |
> > > 王志华
> > > |
> > > |
> > > [hidden email]
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > > 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道:
> > > Hi,
> > >
> > > 志华,
> > > 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
> > > 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
> > > 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。
> > >
> > > jimandlice,
> > > -
> > >
> > >
> >
> 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
> > > - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]>
> wrote:
> > >
> > > flink 写入hive 使用api 思路是怎么的呢
> > >
> > >
> > > | |
> > > jimandlice
> > > |
> > > |
> > > 邮箱:[hidden email]
> > > |
> > >
> > > Signature is customized by Netease Mail Master
> > >
> > > 在2020年05月21日 10:57,阿华田 写道:
> > > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
> > >
> > >
> > > | |
> > > 王志华
> > > |
> > > |
> > > [hidden email]
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

回复:flink如何正则读取hdfs下的文件

jimandlice
好的 谢谢




| |
jimandlice
|
|
邮箱:[hidden email]
|

Signature is customized by Netease Mail Master

在2020年05月21日 19:42,Jingsong Li 写道:
1.11还没发布,文档还在编写中

Best,
Jingsong Lee

On Thu, May 21, 2020 at 7:33 PM jimandlice <[hidden email]> wrote:

> 1.11的话 能提供一个demo么
>
>
>
>
> | |
> jimandlice
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年05月21日 19:31,Jingsong Li 写道:
> > 写入之后 还需要用脚本倒数据入hive么
> - 用Datastream来写,需要
> - 1.11的Table层来写,配置下就自动add partition到hive metastore了
>
> Best,
> Jingsong Lee
>
> On Thu, May 21, 2020 at 7:11 PM jimandlice <[hidden email]> wrote:
>
> > 写入之后 还需要用脚本倒数据入hive么
> >
> >
> >
> >
> > | |
> > jimandlice
> > |
> > |
> > 邮箱:[hidden email]
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年05月21日 15:02,Jingsong Li 写道:
> > 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars?
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote:
> >
> > > public static void main(String[] args) throws Exception {
> > > //初始化任务参数
> > > ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> > > Job job = Job.getInstance();
> > > //自定义input读取hdfs
> > > HadoopInputFormat<LongWritable, Text> hadoopIF = new
> > > HadoopInputFormat<LongWritable, Text>(
> > > new TextInputFormat(), LongWritable.class, Text.class, job);
> > > //过滤需要读取的子目录
> > > ArrayList<Path> inputhPaths =
> > >
> >
> HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20");
> > > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new
> > > Path[inputhPaths.size()]));
> > > //自定义input的方式读取hdfs
> > > DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF);
> > > source.output(new HdfsTrainSinktest());
> > > env.execute("offline_train");
> > > }
> > > 通过这种方式本地可以读取,提交到yarn上会报如下错误
> > >
> > >
> > > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Cannot
> > > initialize task 'DataSource (at
> > createInput(ExecutionEnvironment.java:552)
> > > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))':
> Loading
> > > the input/output formats failed:
> > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
> > >         at
> > >
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
> > >         at
> > >
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
> > >         at
> > >
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
> > >         at
> > >
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
> > >         at
> > >
> >
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
> > >         at
> > >
> >
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
> > >         at
> > > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
> > >         ... 10 more
> > > Caused by: java.lang.Exception: Loading the input/output formats
> failed:
> > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
> > >         at
> > >
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
> > >         at
> > >
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
> > >         ... 20 more
> > > Caused by: java.lang.RuntimeException: Deserializing the input/output
> > > formats failed: unread block data
> > >         at
> > >
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
> > >         ... 22 more
> > > Caused by: java.lang.IllegalStateException: unread block data
> > >         at
> > >
> >
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
> > >         at
> > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
> > >         at
> > >
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> > >         at
> > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> > >         at
> > >
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> > >         at
> > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> > >         at
> > java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> > >         at
> > >
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> > >         at
> > >
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> > >         at
> > >
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> > >         at
> > >
> >
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> > >         at
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> > >         at
> > >
> >
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
> > >         ... 23 more
> > >
> > >
> > > End of exception on server side>]
> > >         at
> > >
> >
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
> > >         at
> > >
> >
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
> > >         at
> > >
> >
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> > >         at
> > >
> >
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> > >         ... 4 more
> > >
> > >
> > > | |
> > > 王志华
> > > |
> > > |
> > > [hidden email]
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > > 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道:
> > > Hi,
> > >
> > > 志华,
> > > 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
> > > 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
> > > 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。
> > >
> > > jimandlice,
> > > -
> > >
> > >
> >
> 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
> > > - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]>
> wrote:
> > >
> > > flink 写入hive 使用api 思路是怎么的呢
> > >
> > >
> > > | |
> > > jimandlice
> > > |
> > > |
> > > 邮箱:[hidden email]
> > > |
> > >
> > > Signature is customized by Netease Mail Master
> > >
> > > 在2020年05月21日 10:57,阿华田 写道:
> > > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
> > >
> > >
> > > | |
> > > 王志华
> > > |
> > > |
> > > [hidden email]
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee