Flink集群迁移savepoint还保留原集群地址问题讨论

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink集群迁移savepoint还保留原集群地址问题讨论

jackjiang
HI all,

本来在Flink集群迁移过程中,使用版本flink
1.6.2,将savepoint进行保存,并迁移到新的hdfs集群的时候,并从该savepoint进行恢复的时候,出现找不到之前hdfs集群信息的错误信息,错误如下:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) from
any of the 1 provided restore options.
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
    ... 5 more
Caused by: java.io.IOException: Cannot instantiate file system for URI:
hdfs://xxx/user/flink_1.6/savepoints/savepoint-ab68c7-562a00439efe/2e56b78a-fa36-4cf4-a383-990358fcd539
    at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
    at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
    at
org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:100)
    at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
    at
org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
    at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
    at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
    ... 7 more
Caused by: java.lang.IllegalArgumentException:
java.net.UnknownHostException: xxx
    at
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:439)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:321)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:690)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:631)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
    at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
    ... 16 more
Caused by: java.net.UnknownHostException: xxx
    ... 23 more

补充:
这个的xxx指的是原hdfs的clustername,用这个方式进行任务的迁移会出现这样的问题,checkpoint也有类似的错误,请问下为什么会出现这样的问题,并如何解决,谢谢

祝好
Reply | Threaded
Open this post in threaded view
|

Re: Flink集群迁移savepoint还保留原集群地址问题讨论

Qi Kang
Hi,

这是因为savepoint的_metadata文件中存储的是savepoint文件的绝对路径,而非相对路径,所以恢复的时候仍然会找旧的集群。

从现有资料看,savepoint仍然无法迁移。详情可以参考 [1] 和 [2]。

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#triggering-savepoints
[2] https://issues.apache.org/jira/browse/FLINK-5763

BR,
Qi Kang


> On Oct 10, 2019, at 17:13, 蒋涛涛 <[hidden email]> wrote:
>
> HI all,
>
> 本来在Flink集群迁移过程中,使用版本flink
> 1.6.2,将savepoint进行保存,并迁移到新的hdfs集群的时候,并从该savepoint进行恢复的时候,出现找不到之前hdfs集群信息的错误信息,错误如下:
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>    at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>    at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>    at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator
> state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) from
> any of the 1 provided restore options.
>    at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>    at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>    at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>    ... 5 more
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
> hdfs://xxx/user/flink_1.6/savepoints/savepoint-ab68c7-562a00439efe/2e56b78a-fa36-4cf4-a383-990358fcd539
>    at
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>    at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
>    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>    at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:100)
>    at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>    at
> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
>    at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
>    at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
>    at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>    at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>    ... 7 more
> Caused by: java.lang.IllegalArgumentException:
> java.net.UnknownHostException: xxx
>    at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:439)
>    at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:321)
>    at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:690)
>    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:631)
>    at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
>    at
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>    ... 16 more
> Caused by: java.net.UnknownHostException: xxx
>    ... 23 more
>
> 补充:
> 这个的xxx指的是原hdfs的clustername,用这个方式进行任务的迁移会出现这样的问题,checkpoint也有类似的错误,请问下为什么会出现这样的问题,并如何解决,谢谢
>
> 祝好

Reply | Threaded
Open this post in threaded view
|

Re: Flink集群迁移savepoint还保留原集群地址问题讨论

pengchenglins@163.com
In reply to this post by jackjiang
你好,生成savepoint之后,是否把savepoint移动到了其他地方?官方说不能移动。

 
发件人: 蒋涛涛
发送时间: 2019-10-10 17:13
收件人: user-zh
主题: Flink集群迁移savepoint还保留原集群地址问题讨论
HI all,
 
本来在Flink集群迁移过程中,使用版本flink
1.6.2,将savepoint进行保存,并迁移到新的hdfs集群的时候,并从该savepoint进行恢复的时候,出现找不到之前hdfs集群信息的错误信息,错误如下:
 
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) from
any of the 1 provided restore options.
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
    ... 5 more
Caused by: java.io.IOException: Cannot instantiate file system for URI:
hdfs://xxx/user/flink_1.6/savepoints/savepoint-ab68c7-562a00439efe/2e56b78a-fa36-4cf4-a383-990358fcd539
    at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
    at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
    at
org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:100)
    at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
    at
org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
    at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
    at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
    ... 7 more
Caused by: java.lang.IllegalArgumentException:
java.net.UnknownHostException: xxx
    at
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:439)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:321)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:690)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:631)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
    at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
    ... 16 more
Caused by: java.net.UnknownHostException: xxx
    ... 23 more
 
补充:
这个的xxx指的是原hdfs的clustername,用这个方式进行任务的迁移会出现这样的问题,checkpoint也有类似的错误,请问下为什么会出现这样的问题,并如何解决,谢谢
 
祝好
Reply | Threaded
Open this post in threaded view
|

大数据集群开启了kerberos认证 TM上kafka连接不上

andrew
 @[hidden email] :
      CDH集群kafka认证协议选择是: SASL_PLAINTEXT
      提交FlinkPerjob任务,发现TaskManager端连接kafka没有进行认证初始化,kafka客户端连接不上broker!
 
     提交脚本如下:
   ./flink/bin/flink  run   \
   -m yarn-cluster  \
   -yD env.java.opts="-Djava.security.auth.login.config=/software/servers/keytab/kafka_client_jass.conf" \
   -yqu root.budata.ads  \
   -c $clazz  $jar_path


请问有什么解决方案样例没?
Reply | Threaded
Open this post in threaded view
|

Re: Flink集群迁移savepoint还保留原集群地址问题讨论

Congxian Qiu
In reply to this post by pengchenglins@163.com
你好

现在 Savepoint 中保存的是文件的绝对路径,暂时不支持 savepoint 的的移动,如果真的需要迁移你可以自己写一个程序将
savepoint 中 meta 的地址进行转换,然后重新生成一个 meta 文件。

Best,
Congxian


[hidden email] <[hidden email]> 于2019年10月10日周四 下午5:31写道:

> 你好,生成savepoint之后,是否把savepoint移动到了其他地方?官方说不能移动。
>
>
> 发件人: 蒋涛涛
> 发送时间: 2019-10-10 17:13
> 收件人: user-zh
> 主题: Flink集群迁移savepoint还保留原集群地址问题讨论
> HI all,
>
> 本来在Flink集群迁移过程中,使用版本flink
>
> 1.6.2,将savepoint进行保存,并迁移到新的hdfs集群的时候,并从该savepoint进行恢复的时候,出现找不到之前hdfs集群信息的错误信息,错误如下:
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>     at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>     at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>     at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator
> state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) from
> any of the 1 provided restore options.
>     at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>     at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>     at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>     ... 5 more
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>
> hdfs://xxx/user/flink_1.6/savepoints/savepoint-ab68c7-562a00439efe/2e56b78a-fa36-4cf4-a383-990358fcd539
>     at
>
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>     at
>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>     at
>
> org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:100)
>     at
>
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>     at
>
> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
>     at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
>     at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
>     at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>     at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>     ... 7 more
> Caused by: java.lang.IllegalArgumentException:
> java.net.UnknownHostException: xxx
>     at
>
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:439)
>     at
>
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:321)
>     at
>
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:690)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:631)
>     at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
>     at
>
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>     ... 16 more
> Caused by: java.net.UnknownHostException: xxx
>     ... 23 more
>
> 补充:
>
> 这个的xxx指的是原hdfs的clustername,用这个方式进行任务的迁移会出现这样的问题,checkpoint也有类似的错误,请问下为什么会出现这样的问题,并如何解决,谢谢
>
> 祝好
>