hi all:
之前用flink sql查询hive的数据,hive的数据文件是150个,sql client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? |
你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。
Best, Kurt On Tue, Mar 24, 2020 at 10:39 PM Chief <[hidden email]> wrote: > hi all: > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? |
hi,Chief:
目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 Best Jun ------------------ 原始邮件 ------------------ 发件人: Kurt Young <[hidden email]> 发送时间: 2020年3月25日 08:53 收件人: user-zh <[hidden email]> 主题: 回复:关于flink sql 1.10 source并行度自动推断的疑问 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 Best, Kurt On Tue, Mar 24, 2020 at 10:39 PM Chief <[hidden email]> wrote: > hi all: > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? |
In reply to this post by Kurt Young
hi,Chief:
目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 Kurt Young <[hidden email]> 于2020年3月25日周三 上午8:53写道: > 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 > > Best, > Kurt > > > On Tue, Mar 24, 2020 at 10:39 PM Chief <[hidden email]> wrote: > > > hi all: > > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? > |
In reply to this post by Kurt Young
hi Kurt Young
hive的数据13万多,然后下午我在web ui 上观察了别的语句的任务执行,发现确实source虽然是150的并行,但执行的时候确实是分批调度的,那为什么会在前十个任务就把数据读完了,如果是你说的这种情况那为什么还要自动推断出这么多的并行任务?我一开始的理解是如果根据文件数量推断source的并行度是不是应该每个任务对应一个文件的读取呢?主要对这不太理解 ------------------ 原始邮件 ------------------ 发件人: "Kurt Young"<[hidden email]>; 发送时间: 2020年3月25日(星期三) 上午8:52 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flink sql 1.10 source并行度自动推断的疑问 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 Best, Kurt On Tue, Mar 24, 2020 at 10:39 PM Chief <[hidden email]> wrote: > hi all: > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? |
In reply to this post by Jun Zhang
hi Jun Zhang
您说的我明白,就是不太理解,为什么根据文件数量自动推断任务并行后,不是每个并行任务读取一个文件? ------------------ 原始邮件 ------------------ 发件人: "Jun Zhang"<[hidden email]>; 发送时间: 2020年3月25日(星期三) 上午9:08 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flink sql 1.10 source并行度自动推断的疑问 hi,Chief: 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 Kurt Young <[hidden email]> 于2020年3月25日周三 上午8:53写道: > 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 > > Best, > Kurt > > > On Tue, Mar 24, 2020 at 10:39 PM Chief <[hidden email]> wrote: > > > hi all: > > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? > |
Hi Chief,
目前Hive connector读取数据是通过 InputFormatSourceFunction 来实现的。 InputFormatSourceFunction 的工作模式不是预分配的模式,而是每个source task向master请求split。 如果某些source task提前调度起来且读完了所有的split,后调度起来的source task就没有数据可读了。 你可以看看JM/TM日志,确认下是不是前十个调度起来的source task读完了所有的数据。 *Best Regards,* *Zhenghua Gao* On Wed, Mar 25, 2020 at 3:31 PM Chief <[hidden email]> wrote: > hi Jun Zhang > 您说的我明白,就是不太理解,为什么根据文件数量自动推断任务并行后,不是每个并行任务读取一个文件? > > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Jun Zhang"<[hidden email]>; > 发送时间: 2020年3月25日(星期三) 上午9:08 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 关于flink sql 1.10 source并行度自动推断的疑问 > > > > hi,Chief: > > > 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, > 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 > > Kurt Young <[hidden email]> 于2020年3月25日周三 上午8:53写道: > > > 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 > > > > Best, > > Kurt > > > > > > On Tue, Mar 24, 2020 at 10:39 PM Chief <[hidden email]> > wrote: > > > > > hi all: > > > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > > > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > > > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? > > |
hi Zhenghua Gao
好的,谢谢,我拉日志看看。 ------------------ 原始邮件 ------------------ 发件人: "Zhenghua Gao"<[hidden email]>; 发送时间: 2020年3月25日(星期三) 晚上6:09 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flink sql 1.10 source并行度自动推断的疑问 Hi Chief, 目前Hive connector读取数据是通过 InputFormatSourceFunction 来实现的。 InputFormatSourceFunction 的工作模式不是预分配的模式,而是每个source task向master请求split。 如果某些source task提前调度起来且读完了所有的split,后调度起来的source task就没有数据可读了。 你可以看看JM/TM日志,确认下是不是前十个调度起来的source task读完了所有的数据。 *Best Regards,* *Zhenghua Gao* On Wed, Mar 25, 2020 at 3:31 PM Chief <[hidden email]> wrote: > hi&nbsp;Jun Zhang > 您说的我明白,就是不太理解,为什么根据文件数量自动推断任务并行后,不是每个并行任务读取一个文件? > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Jun Zhang"<[hidden email]&gt;; > 发送时间:&nbsp;2020年3月25日(星期三) 上午9:08 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 关于flink sql 1.10 source并行度自动推断的疑问 > > > > hi,Chief: > > > 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, > 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 > > Kurt Young <[hidden email]&gt; 于2020年3月25日周三 上午8:53写道: > > &gt; 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 > &gt; > &gt; Best, > &gt; Kurt > &gt; > &gt; > &gt; On Tue, Mar 24, 2020 at 10:39 PM Chief <[hidden email]&gt; > wrote: > &gt; > &gt; &gt; hi all: > &gt; &gt; 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > &gt; &gt; client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > &gt; &gt; ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? > &gt; |
Hi,
就像Zhenghua所说,各个tasks是去抢split的,而不是平均分配,所以一旦后面的tasks没有调度起来,前面的tasks会把split抢光的。 但是少slots多并发的场景并不少见,前面tasks读取太多数据可能会让性能/容错都不友好。所以我们也需要引入平均分配的策略。创建了个JIRA [1], FYI. [1]https://issues.apache.org/jira/browse/FLINK-16787 Best, Jingsong Lee On Wed, Mar 25, 2020 at 6:25 PM Chief <[hidden email]> wrote: > hi Zhenghua Gao > 好的,谢谢,我拉日志看看。 > > > ------------------ 原始邮件 ------------------ > 发件人: "Zhenghua Gao"<[hidden email]>; > 发送时间: 2020年3月25日(星期三) 晚上6:09 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 关于flink sql 1.10 source并行度自动推断的疑问 > > > > Hi Chief, > > 目前Hive connector读取数据是通过 InputFormatSourceFunction 来实现的。 > InputFormatSourceFunction 的工作模式不是预分配的模式,而是每个source task向master请求split。 > 如果某些source task提前调度起来且读完了所有的split,后调度起来的source task就没有数据可读了。 > 你可以看看JM/TM日志,确认下是不是前十个调度起来的source task读完了所有的数据。 > > *Best Regards,* > *Zhenghua Gao* > > > On Wed, Mar 25, 2020 at 3:31 PM Chief <[hidden email]> wrote: > > > hi&nbsp;Jun Zhang > > 您说的我明白,就是不太理解,为什么根据文件数量自动推断任务并行后,不是每个并行任务读取一个文件? > > > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Jun Zhang"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年3月25日(星期三) 上午9:08 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Re: 关于flink sql 1.10 source并行度自动推断的疑问 > > > > > > > > hi,Chief: > > > > > > > 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, > > 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 > > > > Kurt Young <[hidden email]&gt; 于2020年3月25日周三 上午8:53写道: > > > > &gt; 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 > > &gt; > > &gt; Best, > > &gt; Kurt > > &gt; > > &gt; > > &gt; On Tue, Mar 24, 2020 at 10:39 PM Chief <[hidden email] > &gt; > > wrote: > > &gt; > > &gt; &gt; hi all: > > &gt; &gt; 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > > &gt; &gt; client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > > &gt; &gt; ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? > > &gt; -- Best, Jingsong Lee |
In reply to this post by Chief
大家好
目前环境是flink 1.7.2,使用YARN Session模式提交任务,Hadoop 版本2.7.3,hdfs namenode配置了ha模式,提交任务的时候报以下错误,系统环境变量中已经设置了HADOOP_HOME,YARN_CONF_DIR,HADOOP_CONF_DIR,HADOOP_CLASSPATH,在flink_conf.yaml中配置了fs.hdfs.hadoopconf 2020-04-10 19:12:02,908 INFO org.apache.flink.runtime.jobmaster.JobMaster - Connecting to ResourceManager akka.tcp://flink@trusfortpoc1:23584/user/resourcemanager(00000000000000000000000000000000) 2020-04-10 19:12:02,909 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}] 2020-04-10 19:12:02,911 INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved ResourceManager address, beginning registration 2020-04-10 19:12:02,911 INFO org.apache.flink.runtime.jobmaster.JobMaster - Registration at ResourceManager attempt 1 (timeout=100ms) 2020-04-10 19:12:02,912 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}] 2020-04-10 19:12:02,913 INFO org.apache.flink.yarn.YarnResourceManager - Registering job manager [hidden email]://flink@trusfortpoc1:23584/user/jobmanager_0 for job 24691b33c18d7ad73b1f52edb3d68ae4. 2020-04-10 19:12:02,917 INFO org.apache.flink.yarn.YarnResourceManager - Registered job manager [hidden email]://flink@trusfortpoc1:23584/user/jobmanager_0 for job 24691b33c18d7ad73b1f52edb3d68ae4. 2020-04-10 19:12:02,919 INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000. 2020-04-10 19:12:02,919 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Requesting new slot [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. 2020-04-10 19:12:02,920 INFO org.apache.flink.yarn.YarnResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id AllocationID{5a12237c7f2bd8b1cc760ddcbab5a1c0}. 2020-04-10 19:12:02,921 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Requesting new slot [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. 2020-04-10 19:12:02,924 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:4096, vCores:6>. Number pending requests 1. 2020-04-10 19:12:02,926 INFO org.apache.flink.yarn.YarnResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id AllocationID{37dd666a18040bf63ffbf2e022b2ea9b}. 2020-04-10 19:12:06,531 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Received new token for : trusfortpoc3:35206 2020-04-10 19:12:06,543 INFO org.apache.flink.yarn.YarnResourceManager - Received new container: container_1586426824930_0006_01_000002 - Remaining pending container requests: 1 2020-04-10 19:12:06,543 INFO org.apache.flink.yarn.YarnResourceManager - Removing container request Capability[<memory:4096, vCores:6>]Priority[1]. Pending container requests 0. 2020-04-10 19:12:06,568 ERROR org.apache.flink.yarn.YarnResourceManager - Could not start TaskManager in container container_1586426824930_0006_01_000002. java.lang.IllegalArgumentException: java.net.UnknownHostException: hdfsClusterForML at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2701) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2683) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:372) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:453) at org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:555) at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:390) at org.apache.flink.yarn.YarnResourceManager$$Lambda$183/1182651376.run(Unknown Source) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.net.UnknownHostException: hdfsClusterForML ... 33 more 这个hdfsClusterForML是namenode ha 的nameservice,经过分析是没加载hdfs-site.xml配置导致的, 也尝试过把Hadoop的几个配置文件放到flink 的conf目录下但都无效,最终通过改YarnResourceManager源码后能够正常提交任务。 public YarnResourceManager( RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, Configuration flinkConfig, Map<String, String> env, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, @Nullable String webInterfaceUrl, JobManagerMetricGroup jobManagerMetricGroup) { super( rpcService, resourceManagerEndpointId, resourceId, highAvailabilityServices, heartbeatServices, slotManager, metricRegistry, jobLeaderIdService, clusterInformation, fatalErrorHandler, jobManagerMetricGroup); this.flinkConfig = flinkConfig; this.yarnConfig = new YarnConfiguration(HadoopUtils.getHadoopConfiguration(flinkConfig)); 但我认为这肯定不是解决这个问题的方法,所以向大家求助,是不是我忽略什么。 |
Free forum by Nabble | Edit this page |