hi, all
我这边用flink sql client 创建表的时候 CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a directory 'format' = 'json', -- required: file system connector) 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' sql client 提交job会很慢,最后会报错 Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted. at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280) at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 这种情况不知道有没有遇到过? Best Wishes. |
Hi, all
当指定partition的时候这个问题通过path 也没法解决了
job 会一直卡在一个地方 这种改怎么解决呢? Peihui He <[hidden email]> 于2020年9月4日周五 下午6:02写道:
|
Hi, all 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。 请问有什么好的解决方式没呢? Best Wishes. Peihui He <[hidden email]> 于2020年9月4日周五 下午6:25写道:
|
Hi,
可以在JobMaster里面看一下jstack吗?看下具体卡在哪里? On Sat, Sep 5, 2020 at 11:11 PM Peihui He <[hidden email]> wrote: > Hi, all > > 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。 > 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。 > > 请问有什么好的解决方式没呢? > > Best Wishes. > > Peihui He <[hidden email]> 于2020年9月4日周五 下午6:25写道: > >> Hi, all >> >> 当指定partition的时候这个问题通过path 也没法解决了 >> >> CREATE TABLE MyUserTable ( >> column_name1 INT, >> column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( >> 'connector' = 'filesystem', -- required: specify the connector >> 'path' = 'file:///path/to/whatever', -- required: path to a directory >> 'format' = 'json', -- required: file system connector) >> >> >> select * from MyUserTable limit 10; >> >> job 会一直卡在一个地方 >> [image: image.png] >> >> 这种改怎么解决呢? >> >> Peihui He <[hidden email]> 于2020年9月4日周五 下午6:02写道: >> >>> hi, all >>> 我这边用flink sql client 创建表的时候 >>> >>> CREATE TABLE MyUserTable ( >>> column_name1 INT, >>> column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( >>> 'connector' = 'filesystem', -- required: specify the connector >>> 'path' = 'file:///path/to/whatever', -- required: path to a directory >>> 'format' = 'json', -- required: file system connector) >>> >>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' >>> sql client 提交job会很慢,最后会报错 >>> >>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: >>> [Internal server error., <Exception on server side: >>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has >>> already been submitted. at >>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280) >>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) >>> at >>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at >>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at >>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at >>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at >>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at >>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at >>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at >>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at >>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at >>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at >>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at >>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at >>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> End of exception on server side>] at >>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) >>> at >>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) >>> at >>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) >>> at >>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) >>> >>> >>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 >>> >>> 这种情况不知道有没有遇到过? >>> >>> Best Wishes. >>> >>> >>> >> -- Best, Jingsong Lee |
另外,可能和使用本地文件系统有关?换成HDFS试试?
On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li <[hidden email]> wrote: > Hi, > > 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里? > > On Sat, Sep 5, 2020 at 11:11 PM Peihui He <[hidden email]> wrote: > >> Hi, all >> >> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。 >> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。 >> >> 请问有什么好的解决方式没呢? >> >> Best Wishes. >> >> Peihui He <[hidden email]> 于2020年9月4日周五 下午6:25写道: >> >>> Hi, all >>> >>> 当指定partition的时候这个问题通过path 也没法解决了 >>> >>> CREATE TABLE MyUserTable ( >>> column_name1 INT, >>> column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( >>> 'connector' = 'filesystem', -- required: specify the connector >>> 'path' = 'file:///path/to/whatever', -- required: path to a directory >>> 'format' = 'json', -- required: file system connector) >>> >>> >>> select * from MyUserTable limit 10; >>> >>> job 会一直卡在一个地方 >>> [image: image.png] >>> >>> 这种改怎么解决呢? >>> >>> Peihui He <[hidden email]> 于2020年9月4日周五 下午6:02写道: >>> >>>> hi, all >>>> 我这边用flink sql client 创建表的时候 >>>> >>>> CREATE TABLE MyUserTable ( >>>> column_name1 INT, >>>> column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( >>>> 'connector' = 'filesystem', -- required: specify the connector >>>> 'path' = 'file:///path/to/whatever', -- required: path to a directory >>>> 'format' = 'json', -- required: file system connector) >>>> >>>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' >>>> sql client 提交job会很慢,最后会报错 >>>> >>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: >>>> [Internal server error., <Exception on server side: >>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has >>>> already been submitted. at >>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280) >>>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) >>>> at >>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at >>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at >>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at >>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at >>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at >>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at >>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at >>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at >>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at >>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at >>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at >>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at >>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> End of exception on server side>] at >>>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) >>>> at >>>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) >>>> at >>>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) >>>> at >>>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) >>>> >>>> >>>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 >>>> >>>> 这种情况不知道有没有遇到过? >>>> >>>> Best Wishes. >>>> >>>> >>>> >>> > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee |
In reply to this post by Jingsong Li
Hi,
从jstack 分析,因该是卡在下面这里了。看代码好像是需要遍历所有hdfs上指定path的文件。是这样的不?如果文件很多的话不是要很慢? "flink-akka.actor.default-dispatcher-30" #103 prio=5 os_prio=0 tid=0x00007f6264001000 nid=0x4a93 in Object.wait() [0x00007f62964f1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.hadoop.util.concurrent.AsyncGet$Util.wait(AsyncGet.java:59) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1533) - locked <0x00000000ebd49e50> (a org.apache.hadoop.ipc.Client$Call) at org.apache.hadoop.ipc.Client.call(Client.java:1491) at org.apache.hadoop.ipc.Client.call(Client.java:1388) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118) at com.sun.proxy.$Proxy45.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:324) at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) - locked <0x00000000ebd49d40> (a org.apache.hadoop.io.retry.RetryInvocationHandler$Call) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) at com.sun.proxy.$Proxy46.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:864) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:853) at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:910) at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:267) at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:264) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:274) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:248) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileBlockLocations(HadoopFileSystem.java:98) at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:652) at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:258) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388) at org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$241/1691741073.get(Unknown Source) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Jingsong Li <[hidden email]> 于2020年9月7日周一 上午11:15写道: > Hi, > > 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里? > > On Sat, Sep 5, 2020 at 11:11 PM Peihui He <[hidden email]> wrote: > > > Hi, all > > > > 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。 > > 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。 > > > > 请问有什么好的解决方式没呢? > > > > Best Wishes. > > > > Peihui He <[hidden email]> 于2020年9月4日周五 下午6:25写道: > > > >> Hi, all > >> > >> 当指定partition的时候这个问题通过path 也没法解决了 > >> > >> CREATE TABLE MyUserTable ( > >> column_name1 INT, > >> column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( > >> 'connector' = 'filesystem', -- required: specify the > connector > >> 'path' = 'file:///path/to/whatever', -- required: path to a directory > >> 'format' = 'json', -- required: file system > connector) > >> > >> > >> select * from MyUserTable limit 10; > >> > >> job 会一直卡在一个地方 > >> [image: image.png] > >> > >> 这种改怎么解决呢? > >> > >> Peihui He <[hidden email]> 于2020年9月4日周五 下午6:02写道: > >> > >>> hi, all > >>> 我这边用flink sql client 创建表的时候 > >>> > >>> CREATE TABLE MyUserTable ( > >>> column_name1 INT, > >>> column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( > >>> 'connector' = 'filesystem', -- required: specify the > connector > >>> 'path' = 'file:///path/to/whatever', -- required: path to a > directory > >>> 'format' = 'json', -- required: file system > connector) > >>> > >>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' > >>> sql client 提交job会很慢,最后会报错 > >>> > >>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: > >>> [Internal server error., <Exception on server side: > >>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job > has > >>> already been submitted. at > >>> > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280) > >>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at > >>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>> at java.lang.reflect.Method.invoke(Method.java:498) at > >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > >>> at > >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > >>> at > >>> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > >>> at > >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at > >>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at > >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at > >>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at > >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at > >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > >>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at > >>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at > >>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at > >>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at > >>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at > >>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at > >>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at > >>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > >>> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >>> at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > >>> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >>> End of exception on server side>] at > >>> > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > >>> at > >>> > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > >>> at > >>> > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > >>> at > >>> > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > >>> > >>> > >>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 > >>> > >>> 这种情况不知道有没有遇到过? > >>> > >>> Best Wishes. > >>> > >>> > >>> > >> > > -- > Best, Jingsong Lee > |
In reply to this post by Jingsong Li
Hi,
就是用hdfs的。 Jingsong Li <[hidden email]> 于2020年9月7日周一 上午11:16写道: > 另外,可能和使用本地文件系统有关?换成HDFS试试? > > On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li <[hidden email]> > wrote: > > > Hi, > > > > 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里? > > > > On Sat, Sep 5, 2020 at 11:11 PM Peihui He <[hidden email]> wrote: > > > >> Hi, all > >> > >> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。 > >> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。 > >> > >> 请问有什么好的解决方式没呢? > >> > >> Best Wishes. > >> > >> Peihui He <[hidden email]> 于2020年9月4日周五 下午6:25写道: > >> > >>> Hi, all > >>> > >>> 当指定partition的时候这个问题通过path 也没法解决了 > >>> > >>> CREATE TABLE MyUserTable ( > >>> column_name1 INT, > >>> column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( > >>> 'connector' = 'filesystem', -- required: specify the > connector > >>> 'path' = 'file:///path/to/whatever', -- required: path to a > directory > >>> 'format' = 'json', -- required: file system > connector) > >>> > >>> > >>> select * from MyUserTable limit 10; > >>> > >>> job 会一直卡在一个地方 > >>> [image: image.png] > >>> > >>> 这种改怎么解决呢? > >>> > >>> Peihui He <[hidden email]> 于2020年9月4日周五 下午6:02写道: > >>> > >>>> hi, all > >>>> 我这边用flink sql client 创建表的时候 > >>>> > >>>> CREATE TABLE MyUserTable ( > >>>> column_name1 INT, > >>>> column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( > >>>> 'connector' = 'filesystem', -- required: specify the > connector > >>>> 'path' = 'file:///path/to/whatever', -- required: path to a > directory > >>>> 'format' = 'json', -- required: file system > connector) > >>>> > >>>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' > >>>> sql client 提交job会很慢,最后会报错 > >>>> > >>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: > >>>> [Internal server error., <Exception on server side: > >>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job > has > >>>> already been submitted. at > >>>> > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280) > >>>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at > >>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>> at java.lang.reflect.Method.invoke(Method.java:498) at > >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > >>>> at > >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > >>>> at > >>>> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > >>>> at > >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at > >>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at > >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at > >>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at > >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at > >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > >>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at > >>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at > >>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at > >>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at > >>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at > >>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at > >>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at > >>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > >>>> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >>>> at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > >>>> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >>>> End of exception on server side>] at > >>>> > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > >>>> at > >>>> > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > >>>> at > >>>> > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > >>>> at > >>>> > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > >>>> > >>>> > >>>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 > >>>> > >>>> 这种情况不知道有没有遇到过? > >>>> > >>>> Best Wishes. > >>>> > >>>> > >>>> > >>> > > > > -- > > Best, Jingsong Lee > > > > > -- > Best, Jingsong Lee > |
In reply to this post by Peihui He
Hi, 详细jstack信息,见附件。 Peihui He <[hidden email]> 于2020年9月7日周一 下午7:22写道:
5.txt (199K) Download Attachment |
Free forum by Nabble | Edit this page |