原先sql任务是:
CREATE TABLE A_source(...) CREATE TABLE B_sink (...) INSERT INTO B_sink SELECT 1 FROM A_source ; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为 CREATE TABLE A_source(...) CREATE TABLE B_sink (...) CREATE TABLE C_source(...) CREATE TABLE D_sink (...) INSERT INTO B_sink SELECT 1 FROM A_source ; INSERT INTO C_sink SELECT 1 FROM D_source ; 并基于Savepoint提交,结果显示 Cannot map checkpoint/savepoint state for operator 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. 想请教一下底层是因为什么原因导致了opertor匹配不上? |
可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html 如有错误,欢迎补充回答。 陈赋赟 <[hidden email]> 于2020年6月8日周一 上午11:53写道: > 原先sql任务是: > CREATE TABLE A_source(...) > CREATE TABLE B_sink (...) > INSERT INTO B_sink > SELECT > 1 > FROM > A_source > ; > 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为 > > > CREATE TABLE A_source(...) > CREATE TABLE B_sink (...) > CREATE TABLE C_source(...) > CREATE TABLE D_sink (...) > INSERT INTO B_sink > SELECT > 1 > FROM > A_source > ; > > > INSERT INTO C_sink > SELECT > 1 > FROM > D_source > ; > 并基于Savepoint提交,结果显示 > > Cannot map checkpoint/savepoint state for operator > 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator > is not available in the new program. > If you want to allow to skip this, you can set the --allowNonRestoredState > option on the CLI. > > > 想请教一下底层是因为什么原因导致了opertor匹配不上? |
我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id, 因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。 注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。 方盛凯 <[hidden email]> 于2020年6月9日周二 下午9:26写道: > > 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 > 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html > > 如有错误,欢迎补充回答。 > > 陈赋赟 <[hidden email]> 于2020年6月8日周一 上午11:53写道: > >> 原先sql任务是: >> CREATE TABLE A_source(...) >> CREATE TABLE B_sink (...) >> INSERT INTO B_sink >> SELECT >> 1 >> FROM >> A_source >> ; >> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为 >> >> >> CREATE TABLE A_source(...) >> CREATE TABLE B_sink (...) >> CREATE TABLE C_source(...) >> CREATE TABLE D_sink (...) >> INSERT INTO B_sink >> SELECT >> 1 >> FROM >> A_source >> ; >> >> >> INSERT INTO C_sink >> SELECT >> 1 >> FROM >> D_source >> ; >> 并基于Savepoint提交,结果显示 >> >> Cannot map checkpoint/savepoint state for operator >> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator >> is not available in the new program. >> If you want to allow to skip this, you can set the >> --allowNonRestoredState option on the CLI. >> >> >> 想请教一下底层是因为什么原因导致了opertor匹配不上? > > |
sql可以指定 operatorID吗?我突然发现我的代码没有指定这个。。。而且我还没找到指定ID的文档。
------------------ 原始邮件 ------------------ 发件人: "方盛凯"<[hidden email]>; 发送时间: 2020年6月10日(星期三) 中午11:00 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题 我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id, 因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。 注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。 方盛凯 <[hidden email]> 于2020年6月9日周二 下午9:26写道: > > 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 > 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html > > 如有错误,欢迎补充回答。 > > 陈赋赟 <[hidden email]> 于2020年6月8日周一 上午11:53写道: > >> 原先sql任务是: >> CREATE TABLE A_source(...) >> CREATE TABLE B_sink (...) >> INSERT INTO B_sink >> SELECT >> 1 >> FROM >> A_source >> ; >> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为 >> >> >> CREATE TABLE A_source(...) >> CREATE TABLE B_sink (...) >> CREATE TABLE C_source(...) >> CREATE TABLE D_sink (...) >> INSERT INTO B_sink >> SELECT >> 1 >> FROM >> A_source >> ; >> >> >> INSERT INTO C_sink >> SELECT >> 1 >> FROM >> D_source >> ; >> 并基于Savepoint提交,结果显示 >> >> Cannot map checkpoint/savepoint state for operator >> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator >> is not available in the new program. >> If you want to allow to skip this, you can set the >> --allowNonRestoredState option on the CLI. >> >> >> 想请教一下底层是因为什么原因导致了opertor匹配不上? > > |
Hi
Flink sql 目前不支持给算子自定义uid的。如果这种sql修改比较频繁,建议使用datastream api来支持。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "kcz"<[hidden email]>; 发送时间: 2020年6月10日(星期三) 中午11:27 收件人: "user-zh"<[hidden email]>; 主题: 回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题 sql可以指定 operatorID吗?我突然发现我的代码没有指定这个。。。而且我还没找到指定ID的文档。 ------------------&nbsp;原始邮件&nbsp;------------------ 发件人:&nbsp;"方盛凯"<[hidden email]&gt;; 发送时间:&nbsp;2020年6月10日(星期三) 中午11:00 收件人:&nbsp;"user-zh"<[hidden email]&gt;; 主题:&nbsp;Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题 我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id, 因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。 注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。 方盛凯 <[hidden email]&gt; 于2020年6月9日周二 下午9:26写道: &gt; &gt; 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 &gt; 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 &gt; https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html &gt; &gt; 如有错误,欢迎补充回答。 &gt; &gt; 陈赋赟 <[hidden email]&gt; 于2020年6月8日周一 上午11:53写道: &gt; &gt;&gt; 原先sql任务是: &gt;&gt; CREATE TABLE A_source(...) &gt;&gt; CREATE TABLE B_sink (...) &gt;&gt; INSERT INTO B_sink &gt;&gt; SELECT &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 1 &gt;&gt; FROM &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; A_source &gt;&gt; ; &gt;&gt; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为 &gt;&gt; &gt;&gt; &gt;&gt; CREATE TABLE A_source(...) &gt;&gt; CREATE TABLE B_sink (...) &gt;&gt; CREATE TABLE C_source(...) &gt;&gt; CREATE TABLE D_sink (...) &gt;&gt; INSERT INTO B_sink &gt;&gt; SELECT &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 1 &gt;&gt; FROM &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; A_source &gt;&gt; ; &gt;&gt; &gt;&gt; &gt;&gt; INSERT INTO C_sink &gt;&gt; SELECT &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 1 &gt;&gt; FROM &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; D_source &gt;&gt; ; &gt;&gt; 并基于Savepoint提交,结果显示 &gt;&gt; &gt;&gt; Cannot map checkpoint/savepoint state for operator &gt;&gt; 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator &gt;&gt; is not available in the new program. &gt;&gt; If you want to allow to skip this, you can set the &gt;&gt; --allowNonRestoredState option on the CLI. &gt;&gt; &gt;&gt; &gt;&gt; 想请教一下底层是因为什么原因导致了opertor匹配不上? &gt; &gt; |
嗯呢tks,收到。
------------------ 原始邮件 ------------------ 发件人: "Yichao Yang"<[hidden email]>; 发送时间: 2020年6月10日(星期三) 中午11:32 收件人: "user-zh"<[hidden email]>; 主题: 回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题 Hi Flink sql 目前不支持给算子自定义uid的。如果这种sql修改比较频繁,建议使用datastream api来支持。 Best, Yichao Yang ------------------&nbsp;原始邮件&nbsp;------------------ 发件人:&nbsp;"kcz"<[hidden email]&gt;; 发送时间:&nbsp;2020年6月10日(星期三) 中午11:27 收件人:&nbsp;"user-zh"<[hidden email]&gt;; 主题:&nbsp;回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题 sql可以指定 operatorID吗?我突然发现我的代码没有指定这个。。。而且我还没找到指定ID的文档。 ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ 发件人:&amp;nbsp;"方盛凯"<[hidden email]&amp;gt;; 发送时间:&amp;nbsp;2020年6月10日(星期三) 中午11:00 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; 主题:&amp;nbsp;Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题 我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id, 因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。 注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。 方盛凯 <[hidden email]&amp;gt; 于2020年6月9日周二 下午9:26写道: &amp;gt; &amp;gt; 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 &amp;gt; 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 &amp;gt; https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html &amp;gt; &amp;gt; 如有错误,欢迎补充回答。 &amp;gt; &amp;gt; 陈赋赟 <[hidden email]&amp;gt; 于2020年6月8日周一 上午11:53写道: &amp;gt; &amp;gt;&amp;gt; 原先sql任务是: &amp;gt;&amp;gt; CREATE TABLE A_source(...) &amp;gt;&amp;gt; CREATE TABLE B_sink (...) &amp;gt;&amp;gt; INSERT INTO B_sink &amp;gt;&amp;gt; SELECT &amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 1 &amp;gt;&amp;gt; FROM &amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; A_source &amp;gt;&amp;gt; ; &amp;gt;&amp;gt; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为 &amp;gt;&amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;gt; CREATE TABLE A_source(...) &amp;gt;&amp;gt; CREATE TABLE B_sink (...) &amp;gt;&amp;gt; CREATE TABLE C_source(...) &amp;gt;&amp;gt; CREATE TABLE D_sink (...) &amp;gt;&amp;gt; INSERT INTO B_sink &amp;gt;&amp;gt; SELECT &amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 1 &amp;gt;&amp;gt; FROM &amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; A_source &amp;gt;&amp;gt; ; &amp;gt;&amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;gt; INSERT INTO C_sink &amp;gt;&amp;gt; SELECT &amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 1 &amp;gt;&amp;gt; FROM &amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; D_source &amp;gt;&amp;gt; ; &amp;gt;&amp;gt; 并基于Savepoint提交,结果显示 &amp;gt;&amp;gt; &amp;gt;&amp;gt; Cannot map checkpoint/savepoint state for operator &amp;gt;&amp;gt; 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator &amp;gt;&amp;gt; is not available in the new program. &amp;gt;&amp;gt; If you want to allow to skip this, you can set the &amp;gt;&amp;gt; --allowNonRestoredState option on the CLI. &amp;gt;&amp;gt; &amp;gt;&amp;gt; &amp;gt;&amp;gt; 想请教一下底层是因为什么原因导致了opertor匹配不上? &amp;gt; &amp;gt; |
Free forum by Nabble | Edit this page |