使用FileInputFormat 递归读取hdfs文件,并添加过滤器。程序执行没有报错但是很快就执行完成也没有读取到数据,本地测试可以过滤并读取到数据,yarn集群上执行出现上述情况。
代码: //初始化任务参数 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs://arc/success_fid_flow")); fileInputFormat.setNestedFileEnumeration(true); //过滤掉条件为true fileInputFormat.setFilesFilter(new RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24")); DataSet<String> source =env.createInput(fileInputFormat); source.output(new HdfsTrainSinktest()); 打印的日志: 2020-06-01 14:43:41,848 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2020-06-01 14:43:41,848 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2020-06-01 14:43:41,848 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2020-06-01 14:43:41,848 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2020-06-01 14:43:41,849 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2020-06-01 14:43:41,849 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2020-06-01 14:43:41,849 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.execution.failover-strategy, region 2020-06-01 14:43:41,849 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: historyserver.web.address, 0.0.0.0 2020-06-01 14:43:41,849 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: historyserver.web.port, 8082 2020-06-01 14:43:41,890 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers 2020-06-01 14:43:41,993 INFO org.apache.flink.optimizer.Optimizer - Compiler could not determine the size of input 'TextInputFormat ([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates. 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.execution.failover-strategy, region 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.archive.fs.dir, hdfs://dap/tmp/completed-jobs/ 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: historyserver.web.address, 0.0.0.0 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: historyserver.web.port, 8082 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: historyserver.archive.fs.dir, hdfs://dap/tmp/completed-jobs/ 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: historyserver.archive.fs.refresh-interval, 10000 2020-06-01 14:43:42,069 INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 410508f08b0775c0529e84b221dd909d (detached: false). 2020-06-01 14:43:52,134 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers 2020-06-01 14:43:52,167 INFO org.apache.flink.optimizer.Optimizer -Compiler could not determine the size of input 'TextInputFormat ([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates. 2020-06-01 14:43:52,171 INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 10ac036dbb1d546996f2db76a2901253 (detached: false). 2020-06-01 14:43:55,649 INFO org.apache.flink.client.cli.CliFrontend - Program execution finished Program execution finished Job with JobID 10ac036dbb1d546996f2db76a2901253 has finished. Job Runtime: 1143 ms 2020-06-01 14:43:55,663 INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint. 2020-06-01 14:43:55,665 INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete. | | 王志华 | | [hidden email] | 签名由网易邮箱大师定制 |
嗨,找到问题了吗?我也遇到了
> 2020年6月1日 下午2:48,阿华田 <[hidden email]> 写道: > > //初始化任务参数 > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs://arc/success_fid_flow <hdfs://arc/success_fid_flow>")); > fileInputFormat.setNestedFileEnumeration(true); > //过滤掉条件为true > fileInputFormat.setFilesFilter(new RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24")); > DataSet<String> source =env.createInput(fileInputFormat); > source.output(new HdfsTrainSinktest()); |
In reply to this post by 阿华田
解决了
| | 阿华田 | | [hidden email] | 签名由网易邮箱大师定制 在2020年06月18日 17:56,john<[hidden email]> 写道: 嗨,找到问题了吗?我也遇到了 2020年6月1日 下午2:48,阿华田 <[hidden email]> 写道: //初始化任务参数 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs://arc/success_fid_flow <hdfs://arc/success_fid_flow>")); fileInputFormat.setNestedFileEnumeration(true); //过滤掉条件为true fileInputFormat.setFilesFilter(new RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24")); DataSet<String> source =env.createInput(fileInputFormat); source.output(new HdfsTrainSinktest()); |
Free forum by Nabble | Edit this page |