flink 写入hive 使用api 思路是怎么的呢
| | jimandlice | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年05月21日 10:57,阿华田 写道: flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 | | 王志华 | | [hidden email] | 签名由网易邮箱大师定制 |
Hi,
志华, 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。 jimandlice, - 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1] - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html Best, Jingsong Lee On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote: > flink 写入hive 使用api 思路是怎么的呢 > > > | | > jimandlice > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年05月21日 10:57,阿华田 写道: > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 > > > | | > 王志华 > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > -- Best, Jingsong Lee |
public static void main(String[] args) throws Exception {
//初始化任务参数 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Job job = Job.getInstance(); //自定义input读取hdfs HadoopInputFormat<LongWritable, Text> hadoopIF = new HadoopInputFormat<LongWritable, Text>( new TextInputFormat(), LongWritable.class, Text.class, job); //过滤需要读取的子目录 ArrayList<Path> inputhPaths = HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20"); TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new Path[inputhPaths.size()])); //自定义input的方式读取hdfs DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF); source.output(new HdfsTrainSinktest()); env.execute("offline_train"); } 通过这种方式本地可以读取,提交到yarn上会报如下错误 Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at createInput(ExecutionEnvironment.java:552) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading the input/output formats failed: org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) ... 10 more Caused by: java.lang.Exception: Loading the input/output formats failed: org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) ... 20 more Caused by: java.lang.RuntimeException: Deserializing the input/output formats failed: unread block data at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153) ... 22 more Caused by: java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) ... 23 more End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 4 more | | 王志华 | | [hidden email] | 签名由网易邮箱大师定制 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道: Hi, 志华, 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。 jimandlice, - 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1] - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html Best, Jingsong Lee On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote: flink 写入hive 使用api 思路是怎么的呢 | | jimandlice | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年05月21日 10:57,阿华田 写道: flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 | | 王志华 | | [hidden email] | 签名由网易邮箱大师定制 -- Best, Jingsong Lee |
看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars?
Best, Jingsong Lee On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote: > public static void main(String[] args) throws Exception { > //初始化任务参数 > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > Job job = Job.getInstance(); > //自定义input读取hdfs > HadoopInputFormat<LongWritable, Text> hadoopIF = new > HadoopInputFormat<LongWritable, Text>( > new TextInputFormat(), LongWritable.class, Text.class, job); > //过滤需要读取的子目录 > ArrayList<Path> inputhPaths = > HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20"); > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new > Path[inputhPaths.size()])); > //自定义input的方式读取hdfs > DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF); > source.output(new HdfsTrainSinktest()); > env.execute("offline_train"); > } > 通过这种方式本地可以读取,提交到yarn上会报如下错误 > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize task 'DataSource (at createInput(ExecutionEnvironment.java:552) > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading > the input/output formats failed: > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > at > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > ... 10 more > Caused by: java.lang.Exception: Loading the input/output formats failed: > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) > ... 20 more > Caused by: java.lang.RuntimeException: Deserializing the input/output > formats failed: unread block data > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153) > ... 22 more > Caused by: java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) > ... 23 more > > > End of exception on server side>] > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more > > > | | > 王志华 > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > > 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道: > Hi, > > 志华, > 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。 > 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。 > 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。 > > jimandlice, > - > > 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1] > - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html > > Best, > Jingsong Lee > > On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote: > > flink 写入hive 使用api 思路是怎么的呢 > > > | | > jimandlice > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年05月21日 10:57,阿华田 写道: > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 > > > | | > 王志华 > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee |
写入之后 还需要用脚本倒数据入hive么
| | jimandlice | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年05月21日 15:02,Jingsong Li 写道: 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars? Best, Jingsong Lee On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote: > public static void main(String[] args) throws Exception { > //初始化任务参数 > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > Job job = Job.getInstance(); > //自定义input读取hdfs > HadoopInputFormat<LongWritable, Text> hadoopIF = new > HadoopInputFormat<LongWritable, Text>( > new TextInputFormat(), LongWritable.class, Text.class, job); > //过滤需要读取的子目录 > ArrayList<Path> inputhPaths = > HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20"); > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new > Path[inputhPaths.size()])); > //自定义input的方式读取hdfs > DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF); > source.output(new HdfsTrainSinktest()); > env.execute("offline_train"); > } > 通过这种方式本地可以读取,提交到yarn上会报如下错误 > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize task 'DataSource (at createInput(ExecutionEnvironment.java:552) > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading > the input/output formats failed: > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > at > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > ... 10 more > Caused by: java.lang.Exception: Loading the input/output formats failed: > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) > ... 20 more > Caused by: java.lang.RuntimeException: Deserializing the input/output > formats failed: unread block data > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153) > ... 22 more > Caused by: java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) > ... 23 more > > > End of exception on server side>] > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more > > > | | > 王志华 > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > > 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道: > Hi, > > 志华, > 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。 > 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。 > 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。 > > jimandlice, > - > > 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1] > - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html > > Best, > Jingsong Lee > > On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote: > > flink 写入hive 使用api 思路是怎么的呢 > > > | | > jimandlice > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年05月21日 10:57,阿华田 写道: > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 > > > | | > 王志华 > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee |
> 写入之后 还需要用脚本倒数据入hive么
- 用Datastream来写,需要 - 1.11的Table层来写,配置下就自动add partition到hive metastore了 Best, Jingsong Lee On Thu, May 21, 2020 at 7:11 PM jimandlice <[hidden email]> wrote: > 写入之后 还需要用脚本倒数据入hive么 > > > > > | | > jimandlice > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年05月21日 15:02,Jingsong Li 写道: > 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars? > > Best, > Jingsong Lee > > On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote: > > > public static void main(String[] args) throws Exception { > > //初始化任务参数 > > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > Job job = Job.getInstance(); > > //自定义input读取hdfs > > HadoopInputFormat<LongWritable, Text> hadoopIF = new > > HadoopInputFormat<LongWritable, Text>( > > new TextInputFormat(), LongWritable.class, Text.class, job); > > //过滤需要读取的子目录 > > ArrayList<Path> inputhPaths = > > > HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20"); > > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new > > Path[inputhPaths.size()])); > > //自定义input的方式读取hdfs > > DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF); > > source.output(new HdfsTrainSinktest()); > > env.execute("offline_train"); > > } > > 通过这种方式本地可以读取,提交到yarn上会报如下错误 > > > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > > initialize task 'DataSource (at > createInput(ExecutionEnvironment.java:552) > > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading > > the input/output formats failed: > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > > at > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > > at > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > > at > > > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > > at > > > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > > at > > > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > > at > > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) > > at > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > > at > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > > at > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > > ... 10 more > > Caused by: java.lang.Exception: Loading the input/output formats failed: > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) > > ... 20 more > > Caused by: java.lang.RuntimeException: Deserializing the input/output > > formats failed: unread block data > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153) > > ... 22 more > > Caused by: java.lang.IllegalStateException: unread block data > > at > > > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783) > > at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > > at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > > at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > > at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > > at > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > > at > > > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) > > ... 23 more > > > > > > End of exception on server side>] > > at > > > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > > at > > > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > > at > > > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > > at > > > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > > ... 4 more > > > > > > | | > > 王志华 > > | > > | > > [hidden email] > > | > > 签名由网易邮箱大师定制 > > > > > > 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道: > > Hi, > > > > 志华, > > 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。 > > 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。 > > 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。 > > > > jimandlice, > > - > > > > > 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1] > > - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。 > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html > > > > Best, > > Jingsong Lee > > > > On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote: > > > > flink 写入hive 使用api 思路是怎么的呢 > > > > > > | | > > jimandlice > > | > > | > > 邮箱:[hidden email] > > | > > > > Signature is customized by Netease Mail Master > > > > 在2020年05月21日 10:57,阿华田 写道: > > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 > > > > > > | | > > 王志华 > > | > > | > > [hidden email] > > | > > 签名由网易邮箱大师定制 > > > > > > > > -- > > Best, Jingsong Lee > > > > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee |
1.11的话 能提供一个demo么
| | jimandlice | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年05月21日 19:31,Jingsong Li 写道: > 写入之后 还需要用脚本倒数据入hive么 - 用Datastream来写,需要 - 1.11的Table层来写,配置下就自动add partition到hive metastore了 Best, Jingsong Lee On Thu, May 21, 2020 at 7:11 PM jimandlice <[hidden email]> wrote: > 写入之后 还需要用脚本倒数据入hive么 > > > > > | | > jimandlice > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年05月21日 15:02,Jingsong Li 写道: > 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars? > > Best, > Jingsong Lee > > On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote: > > > public static void main(String[] args) throws Exception { > > //初始化任务参数 > > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > Job job = Job.getInstance(); > > //自定义input读取hdfs > > HadoopInputFormat<LongWritable, Text> hadoopIF = new > > HadoopInputFormat<LongWritable, Text>( > > new TextInputFormat(), LongWritable.class, Text.class, job); > > //过滤需要读取的子目录 > > ArrayList<Path> inputhPaths = > > > HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20"); > > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new > > Path[inputhPaths.size()])); > > //自定义input的方式读取hdfs > > DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF); > > source.output(new HdfsTrainSinktest()); > > env.execute("offline_train"); > > } > > 通过这种方式本地可以读取,提交到yarn上会报如下错误 > > > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > > initialize task 'DataSource (at > createInput(ExecutionEnvironment.java:552) > > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading > > the input/output formats failed: > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > > at > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > > at > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > > at > > > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > > at > > > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > > at > > > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > > at > > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) > > at > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > > at > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > > at > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > > ... 10 more > > Caused by: java.lang.Exception: Loading the input/output formats failed: > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) > > ... 20 more > > Caused by: java.lang.RuntimeException: Deserializing the input/output > > formats failed: unread block data > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153) > > ... 22 more > > Caused by: java.lang.IllegalStateException: unread block data > > at > > > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783) > > at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > > at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > > at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > > at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > > at > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > > at > > > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > > at > > > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) > > ... 23 more > > > > > > End of exception on server side>] > > at > > > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > > at > > > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > > at > > > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > > at > > > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > > ... 4 more > > > > > > | | > > 王志华 > > | > > | > > [hidden email] > > | > > 签名由网易邮箱大师定制 > > > > > > 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道: > > Hi, > > > > 志华, > > 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。 > > 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。 > > 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。 > > > > jimandlice, > > - > > > > > 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1] > > - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。 > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html > > > > Best, > > Jingsong Lee > > > > On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> wrote: > > > > flink 写入hive 使用api 思路是怎么的呢 > > > > > > | | > > jimandlice > > | > > | > > 邮箱:[hidden email] > > | > > > > Signature is customized by Netease Mail Master > > > > 在2020年05月21日 10:57,阿华田 写道: > > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 > > > > > > | | > > 王志华 > > | > > | > > [hidden email] > > | > > 签名由网易邮箱大师定制 > > > > > > > > -- > > Best, Jingsong Lee > > > > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee |
1.11还没发布,文档还在编写中
Best, Jingsong Lee On Thu, May 21, 2020 at 7:33 PM jimandlice <[hidden email]> wrote: > 1.11的话 能提供一个demo么 > > > > > | | > jimandlice > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年05月21日 19:31,Jingsong Li 写道: > > 写入之后 还需要用脚本倒数据入hive么 > - 用Datastream来写,需要 > - 1.11的Table层来写,配置下就自动add partition到hive metastore了 > > Best, > Jingsong Lee > > On Thu, May 21, 2020 at 7:11 PM jimandlice <[hidden email]> wrote: > > > 写入之后 还需要用脚本倒数据入hive么 > > > > > > > > > > | | > > jimandlice > > | > > | > > 邮箱:[hidden email] > > | > > > > Signature is customized by Netease Mail Master > > > > 在2020年05月21日 15:02,Jingsong Li 写道: > > 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars? > > > > Best, > > Jingsong Lee > > > > On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote: > > > > > public static void main(String[] args) throws Exception { > > > //初始化任务参数 > > > ExecutionEnvironment env = > > ExecutionEnvironment.getExecutionEnvironment(); > > > Job job = Job.getInstance(); > > > //自定义input读取hdfs > > > HadoopInputFormat<LongWritable, Text> hadoopIF = new > > > HadoopInputFormat<LongWritable, Text>( > > > new TextInputFormat(), LongWritable.class, Text.class, job); > > > //过滤需要读取的子目录 > > > ArrayList<Path> inputhPaths = > > > > > > HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20"); > > > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new > > > Path[inputhPaths.size()])); > > > //自定义input的方式读取hdfs > > > DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF); > > > source.output(new HdfsTrainSinktest()); > > > env.execute("offline_train"); > > > } > > > 通过这种方式本地可以读取,提交到yarn上会报如下错误 > > > > > > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: > Cannot > > > initialize task 'DataSource (at > > createInput(ExecutionEnvironment.java:552) > > > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': > Loading > > > the input/output formats failed: > > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > > at > > > > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) > > > at > > > > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > > > at > > > > > > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > > > at > > > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) > > > at > > > > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > > > at > > > > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > > > at > > > > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > > > ... 10 more > > > Caused by: java.lang.Exception: Loading the input/output formats > failed: > > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) > > > at > > > > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) > > > ... 20 more > > > Caused by: java.lang.RuntimeException: Deserializing the input/output > > > formats failed: unread block data > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153) > > > ... 22 more > > > Caused by: java.lang.IllegalStateException: unread block data > > > at > > > > > > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783) > > > at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605) > > > at > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > > > at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > > > at > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > > > at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > > > at > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > > > at > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > > > at > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > > > at > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > > > at > > > > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > > > at > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) > > > ... 23 more > > > > > > > > > End of exception on server side>] > > > at > > > > > > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > > > at > > > > > > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > > > at > > > > > > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > > > at > > > > > > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > > > ... 4 more > > > > > > > > > | | > > > 王志华 > > > | > > > | > > > [hidden email] > > > | > > > 签名由网易邮箱大师定制 > > > > > > > > > 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道: > > > Hi, > > > > > > 志华, > > > 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。 > > > 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。 > > > 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。 > > > > > > jimandlice, > > > - > > > > > > > > > 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1] > > > - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。 > > > > > > [1] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html > > > > > > Best, > > > Jingsong Lee > > > > > > On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> > wrote: > > > > > > flink 写入hive 使用api 思路是怎么的呢 > > > > > > > > > | | > > > jimandlice > > > | > > > | > > > 邮箱:[hidden email] > > > | > > > > > > Signature is customized by Netease Mail Master > > > > > > 在2020年05月21日 10:57,阿华田 写道: > > > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 > > > > > > > > > | | > > > 王志华 > > > | > > > | > > > [hidden email] > > > | > > > 签名由网易邮箱大师定制 > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > > > > -- > > Best, Jingsong Lee > > > > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee |
好的 谢谢
| | jimandlice | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年05月21日 19:42,Jingsong Li 写道: 1.11还没发布,文档还在编写中 Best, Jingsong Lee On Thu, May 21, 2020 at 7:33 PM jimandlice <[hidden email]> wrote: > 1.11的话 能提供一个demo么 > > > > > | | > jimandlice > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年05月21日 19:31,Jingsong Li 写道: > > 写入之后 还需要用脚本倒数据入hive么 > - 用Datastream来写,需要 > - 1.11的Table层来写,配置下就自动add partition到hive metastore了 > > Best, > Jingsong Lee > > On Thu, May 21, 2020 at 7:11 PM jimandlice <[hidden email]> wrote: > > > 写入之后 还需要用脚本倒数据入hive么 > > > > > > > > > > | | > > jimandlice > > | > > | > > 邮箱:[hidden email] > > | > > > > Signature is customized by Netease Mail Master > > > > 在2020年05月21日 15:02,Jingsong Li 写道: > > 看起来是因为你客户端和Server端的依赖不一致导致的问题,你检查下客户端的jars? > > > > Best, > > Jingsong Lee > > > > On Thu, May 21, 2020 at 2:57 PM 阿华田 <[hidden email]> wrote: > > > > > public static void main(String[] args) throws Exception { > > > //初始化任务参数 > > > ExecutionEnvironment env = > > ExecutionEnvironment.getExecutionEnvironment(); > > > Job job = Job.getInstance(); > > > //自定义input读取hdfs > > > HadoopInputFormat<LongWritable, Text> hadoopIF = new > > > HadoopInputFormat<LongWritable, Text>( > > > new TextInputFormat(), LongWritable.class, Text.class, job); > > > //过滤需要读取的子目录 > > > ArrayList<Path> inputhPaths = > > > > > > HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20"); > > > TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new > > > Path[inputhPaths.size()])); > > > //自定义input的方式读取hdfs > > > DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF); > > > source.output(new HdfsTrainSinktest()); > > > env.execute("offline_train"); > > > } > > > 通过这种方式本地可以读取,提交到yarn上会报如下错误 > > > > > > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: > Cannot > > > initialize task 'DataSource (at > > createInput(ExecutionEnvironment.java:552) > > > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': > Loading > > > the input/output formats failed: > > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > > at > > > > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) > > > at > > > > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > > > at > > > > > > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > > > at > > > > > > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > > > at > > > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) > > > at > > > > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > > > at > > > > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > > > at > > > > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > > > ... 10 more > > > Caused by: java.lang.Exception: Loading the input/output formats > failed: > > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0 > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) > > > at > > > > > > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) > > > ... 20 more > > > Caused by: java.lang.RuntimeException: Deserializing the input/output > > > formats failed: unread block data > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153) > > > ... 22 more > > > Caused by: java.lang.IllegalStateException: unread block data > > > at > > > > > > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783) > > > at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605) > > > at > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > > > at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > > > at > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > > > at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > > > at > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > > > at > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > > > at > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > > > at > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > > > at > > > > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > > > at > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > > > at > > > > > > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) > > > ... 23 more > > > > > > > > > End of exception on server side>] > > > at > > > > > > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > > > at > > > > > > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > > > at > > > > > > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > > > at > > > > > > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > > > ... 4 more > > > > > > > > > | | > > > 王志华 > > > | > > > | > > > [hidden email] > > > | > > > 签名由网易邮箱大师定制 > > > > > > > > > 在2020年05月21日 12:24,Jingsong Li<[hidden email]> 写道: > > > Hi, > > > > > > 志华, > > > 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。 > > > 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。 > > > 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。 > > > > > > jimandlice, > > > - > > > > > > > > > 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1] > > > - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。 > > > > > > [1] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html > > > > > > Best, > > > Jingsong Lee > > > > > > On Thu, May 21, 2020 at 10:59 AM jimandlice <[hidden email]> > wrote: > > > > > > flink 写入hive 使用api 思路是怎么的呢 > > > > > > > > > | | > > > jimandlice > > > | > > > | > > > 邮箱:[hidden email] > > > | > > > > > > Signature is customized by Netease Mail Master > > > > > > 在2020年05月21日 10:57,阿华田 写道: > > > flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 > > > > > > > > > | | > > > 王志华 > > > | > > > | > > > [hidden email] > > > | > > > 签名由网易邮箱大师定制 > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > > > > -- > > Best, Jingsong Lee > > > > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |