yarn-per-job 模式 savepoint执行保存点报错

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

yarn-per-job 模式 savepoint执行保存点报错

刘海
Hi
 我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢


flink1.12 yarn-per-job 模式
jobID:fea3d87f138ef4c260ffe9324acc0e51  
yarnID : application_1610788069646_0021
执行的命令如下:
./bin/flink savepoint -t yarn-per-job -D yarn.application.id=application_1610788069646_0021 fea3d87f138ef4c260ffe9324acc0e51


报错如下:


org.apache.flink.util.FlinkException: Triggering a savepoint for the job fea3d87f138ef4c260ffe9324acc0e51 failed.
        at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712)
        at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690)
        at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919)
        at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: java.util.concurrent.TimeoutException
        at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
        at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)


祝好!
| |
刘海
|
|
[hidden email]
|
签名由网易邮箱大师定制
Reply | Threaded
Open this post in threaded view
|

Re:yarn-per-job 模式 savepoint执行保存点报错

guanyq
./bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
./bin/flink savepoint fea3d87f138ef4c260ffe9324acc0e51 [:targetDirectory] application_1610788069646_0021



[:targetDirectory]

hdfs:///flink/savepoints











在 2021-01-21 10:24:31,"刘海" <[hidden email]> 写道:

>Hi
> 我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢
>
>
>flink1.12 yarn-per-job 模式
>jobID:fea3d87f138ef4c260ffe9324acc0e51  
>yarnID : application_1610788069646_0021
>执行的命令如下:
>./bin/flink savepoint -t yarn-per-job -D yarn.application.id=application_1610788069646_0021 fea3d87f138ef4c260ffe9324acc0e51
>
>
>报错如下:
>
>
>org.apache.flink.util.FlinkException: Triggering a savepoint for the job fea3d87f138ef4c260ffe9324acc0e51 failed.
>        at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712)
>        at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690)
>        at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919)
>        at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687)
>        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989)
>        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:422)
>        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
>Caused by: java.util.concurrent.TimeoutException
>        at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
>        at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
>        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>        at java.lang.Thread.run(Thread.java:748)
>
>
>祝好!
>| |
>刘海
>|
>|
>[hidden email]
>|
>签名由网易邮箱大师定制
Reply | Threaded
Open this post in threaded view
|

Re: yarn-per-job 模式 savepoint执行保存点报错

zhisheng
In reply to this post by 刘海
检查一下作业是否有反压的情况?目前我们也有遇到这种情况就是作业在反压的情况下,对作业做一次 savepoint
其实是很难完成的,经常超时,社区目前的版本还不支持单独设置 savepoint 的超时时间。


刘海 <[hidden email]> 于2021年1月21日周四 上午10:24写道:

> Hi
>  我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢
>
>
> flink1.12 yarn-per-job 模式
> jobID:fea3d87f138ef4c260ffe9324acc0e51
> yarnID : application_1610788069646_0021
> 执行的命令如下:
> ./bin/flink savepoint -t yarn-per-job -D yarn.application.id=application_1610788069646_0021
> fea3d87f138ef4c260ffe9324acc0e51
>
>
> 报错如下:
>
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> fea3d87f138ef4c260ffe9324acc0e51 failed.
>         at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690)
>         at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919)
>         at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687)
>         at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>         at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
> Caused by: java.util.concurrent.TimeoutException
>         at
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
>         at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>         at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
>
>
> 祝好!
> | |
> 刘海
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
Reply | Threaded
Open this post in threaded view
|

Flink1.12.0版本,Versioned Tables

guanyq
In reply to this post by 刘海
求一个Flink1.12.0版本,Versioned Tables的demo。
CREATETABLEproducts(product_idSTRING,product_nameSTRING,priceDECIMAL(32,2),update_timeTIMESTAMP(3)METADATAFROM'value.source.timestamp'VIRTUAL,PRIMARYKEY(product_id)NOTENFORCEDWATERMARKFORupdate_timeASupdate_time)WITH(...);
Reply | Threaded
Open this post in threaded view
|

Flink1.12.0版本 Distinct Aggregation

guanyq
附件是代码,按照官网写的demo。
不知道哪里有问题,麻烦帮忙看下。

root

 |-- orderId: STRING

 |-- userId: INT

 |-- money: INT

 |-- createTime: BIGINT

 |-- pt: TIMESTAMP(3) *PROCTIME*


17:17:11,935 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

Exception in thread "main" java.lang.RuntimeException: Unknown call expression: count(orderId)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:102)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)

at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)

at org.apache.flink.table.planner.expressions.converter.OverConvertRule.convert(OverConvertRule.java:81)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)

at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convertAs(CustomizedConvertRule.java:251)

at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.lambda$convert$0(CustomizedConvertRule.java:93)

at java.util.Optional.map(Optional.java:215)

at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convert(CustomizedConvertRule.java:93)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:734)

at org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:129)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$6(QueryOperationConverter.java:540)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)

at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:541)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:153)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)

at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)

at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)

at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)

at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)

at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)

at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186)

at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:250)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)

at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307)

at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:298)

at com.guanyq.study.TableAPIAndSQL.TableAPI.Aggregations.DistinctAggregation3.main(DistinctAggregation3.java:68)




 


DistinctAggregation3.java (3K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Flink1.12.0版本 FlinkStreaming如何将偏移量提交到kafka

guanyq
In reply to this post by guanyq
kafka版本0.11
目前查看消费组的解压情况,报消费组不存在。
Reply | Threaded
Open this post in threaded view
|

Flink1.12.0版本 FlinkStreaming 如何将偏移量提交到kafka 0.11

guanyq
flink 1.12版本
kafka版本0.11版本
目前可以消费,但是偏移量无法提交到kafka




我试过相同的代码,kafka版本2.4.1就可以提交偏移量到kafka
目前kafka 0.11版本有问题。无法提交。


有没有大佬帮忙想想办法。如何解决这个版本问题。