FileInputFormat 使用问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FileInputFormat 使用问题

阿华田
使用FileInputFormat 递归读取hdfs文件,并添加过滤器。程序执行没有报错但是很快就执行完成也没有读取到数据,本地测试可以过滤并读取到数据,yarn集群上执行出现上述情况。
代码:
//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs://arc/success_fid_flow"));
fileInputFormat.setNestedFileEnumeration(true);
//过滤掉条件为true
fileInputFormat.setFilesFilter(new RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24"));
DataSet<String> source =env.createInput(fileInputFormat);
source.output(new HdfsTrainSinktest());
打印的日志:


    2020-06-01 14:43:41,848 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2020-06-01 14:43:41,848 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-06-01 14:43:41,848 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-06-01 14:43:41,848 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2020-06-01 14:43:41,849 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-01 14:43:41,849 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2020-06-01 14:43:41,849 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-06-01 14:43:41,849 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: historyserver.web.address, 0.0.0.0
2020-06-01 14:43:41,849 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: historyserver.web.port, 8082
2020-06-01 14:43:41,890 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 0 registered types and 0 default Kryo serializers
2020-06-01 14:43:41,993 INFO  org.apache.flink.optimizer.Optimizer                          - Compiler could not determine the size of input 'TextInputFormat ([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates.
2020-06-01 14:43:42,022 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2020-06-01 14:43:42,022 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-06-01 14:43:42,022 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-06-01 14:43:42,022 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2020-06-01 14:43:42,022 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-01 14:43:42,022 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2020-06-01 14:43:42,023 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-06-01 14:43:42,023 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.archive.fs.dir, hdfs://dap/tmp/completed-jobs/
2020-06-01 14:43:42,023 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: historyserver.web.address, 0.0.0.0
2020-06-01 14:43:42,023 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: historyserver.web.port, 8082
2020-06-01 14:43:42,023 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: historyserver.archive.fs.dir, hdfs://dap/tmp/completed-jobs/
2020-06-01 14:43:42,023 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: historyserver.archive.fs.refresh-interval, 10000
2020-06-01 14:43:42,069 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 410508f08b0775c0529e84b221dd909d (detached: false).
2020-06-01 14:43:52,134 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 0 registered types and 0 default Kryo serializers
2020-06-01 14:43:52,167 INFO  org.apache.flink.optimizer.Optimizer                          -Compiler could not determine the size of input 'TextInputFormat ([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates.
2020-06-01 14:43:52,171 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 10ac036dbb1d546996f2db76a2901253 (detached: false).
2020-06-01 14:43:55,649 INFO  org.apache.flink.client.cli.CliFrontend                       - Program execution finished
Program execution finished
Job with JobID 10ac036dbb1d546996f2db76a2901253 has finished.
Job Runtime: 1143 ms
2020-06-01 14:43:55,663 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2020-06-01 14:43:55,665 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
| |
王志华
|
|
[hidden email]
|
签名由网易邮箱大师定制

Reply | Threaded
Open this post in threaded view
|

Re: FileInputFormat 使用问题

john
嗨,找到问题了吗?我也遇到了

> 2020年6月1日 下午2:48,阿华田 <[hidden email]> 写道:
>
> //初始化任务参数
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs://arc/success_fid_flow <hdfs://arc/success_fid_flow>"));
> fileInputFormat.setNestedFileEnumeration(true);
> //过滤掉条件为true
> fileInputFormat.setFilesFilter(new RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24"));
> DataSet<String> source =env.createInput(fileInputFormat);
> source.output(new HdfsTrainSinktest());

Reply | Threaded
Open this post in threaded view
|

回复: FileInputFormat 使用问题

阿华田
In reply to this post by 阿华田
解决了  
| |
阿华田
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2020年06月18日 17:56,john<[hidden email]> 写道:
嗨,找到问题了吗?我也遇到了

2020年6月1日 下午2:48,阿华田 <[hidden email]> 写道:

//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs://arc/success_fid_flow <hdfs://arc/success_fid_flow>"));
fileInputFormat.setNestedFileEnumeration(true);
//过滤掉条件为true
fileInputFormat.setFilesFilter(new RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24"));
DataSet<String> source =env.createInput(fileInputFormat);
source.output(new HdfsTrainSinktest());