大家好 我使用的flink 1.9的blink planner
先按 月 和城市 去重id 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable rowkey是month+city 然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city 运行了近18个小时 中间有过restore和ck失败,然后我统计了 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因 伪代码如下: val env = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, bsSettings) myDataStream=...... tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) //按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复) val monthCount = tableEnv.sqlQuery( s""" select month,city,count(distinct id) as cnt from monthtable group by month,city """.stripMargin) //将月统计结果输出到hbase,rokey为month+city monthCount.toRetractStream[Row].filter(_._1).map(line=>{ val row=line._2 val month=row.getField(0).toString val city=row.getField(1).toString val cnt=row.getField(2).toString val map=new util.HashMap[String,String]() map.put("cnt",cnt) (month+city,map)// month+city是rowkey cnt是一个column }).addSink(new MyHbaseSink("monthHbaseTable") //将上面的月表注册成新表 monthStat tableEnv.registerTable("monthStat",monthCount) //按城市统计id的数量 val totalCount = tableEnv.sqlQuery( s""" |select city,sum(cnt) as cityCnt from monthStat group by city """.stripMargin) //将月统计结果输出到hbase,rokey为city totalCount.toRetractStream[Row].filter(_._1).map(line=>{ val row=line._2 val city=row.getField(0).toString val totalCnt=row.getField(1).toString val map=new util.HashMap[String,String]() map.put("totalCnt",totalCnt) (city,map) }).addSink("totalHbaseTable") |
Hi
如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL ------------------ 原始邮件 ------------------ 发件人: "star"<[hidden email]>; 发送时间: 2020年6月5日(星期五) 上午10:22 收件人: "[hidden email]"<[hidden email]>; 主题: flink1.9 Sql 注册的中间临时表不自动存state的吗? 大家好&nbsp; &nbsp;我使用的flink 1.9的blink planner 先按 月 和城市 去重id&nbsp; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable&nbsp; &nbsp;rowkey是month+city 然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city 运行了近18个小时 中间有过restore和ck失败,然后我统计了&nbsp; 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因 伪代码如下: val env = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, bsSettings) myDataStream=...... tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) //按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复) val monthCount = tableEnv.sqlQuery( &nbsp; &nbsp; &nbsp; s""" &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;select month,city,count(distinct id) as cnt from monthtable&nbsp; group by month,city &nbsp; &nbsp; &nbsp; """.stripMargin) //将月统计结果输出到hbase,rokey为month+city monthCount.toRetractStream[Row].filter(_._1).map(line=&gt;{ &nbsp; &nbsp; &nbsp; val row=line._2 &nbsp; &nbsp; &nbsp; val month=row.getField(0).toString &nbsp; &nbsp; &nbsp; val city=row.getField(1).toString &nbsp; &nbsp; &nbsp; val cnt=row.getField(2).toString &nbsp; &nbsp; &nbsp; val map=new util.HashMap[String,String]() &nbsp; &nbsp; &nbsp; map.put("cnt",cnt) &nbsp; &nbsp; &nbsp; (month+city,map)// month+city是rowkey&nbsp; cnt是一个column &nbsp; &nbsp; }).addSink(new MyHbaseSink("monthHbaseTable") &nbsp;//将上面的月表注册成新表 monthStat &nbsp;tableEnv.registerTable("monthStat",monthCount) &nbsp;//按城市统计id的数量 &nbsp;val totalCount = tableEnv.sqlQuery( &nbsp; &nbsp; &nbsp; s""" &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;|select city,sum(cnt) as cityCnt from monthStat&nbsp; group by city &nbsp; &nbsp; &nbsp; """.stripMargin) //将月统计结果输出到hbase,rokey为city &nbsp; &nbsp; totalCount.toRetractStream[Row].filter(_._1).map(line=&gt;{ &nbsp; &nbsp; &nbsp; val row=line._2 &nbsp; &nbsp; &nbsp; val city=row.getField(0).toString &nbsp; &nbsp; &nbsp; val totalCnt=row.getField(1).toString &nbsp; &nbsp; &nbsp; val map=new util.HashMap[String,String]() &nbsp; &nbsp; &nbsp; map.put("totalCnt",totalCnt) &nbsp; &nbsp; &nbsp; (city,map) &nbsp; &nbsp; }).addSink("totalHbaseTable") |
没有使用窗口,状态不该被清理,但从结果看貌似被清理过呀
------------------ 原始邮件 ------------------ 发件人: "zhiyezou"<[hidden email]>; 发送时间: 2020年6月5日(星期五) 上午10:31 收件人: "user-zh"<[hidden email]>; 主题: 回复:flink1.9 Sql 注册的中间临时表不自动存state的吗? Hi 如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL ------------------&nbsp;原始邮件&nbsp;------------------ 发件人:&nbsp;"star"<[hidden email]&gt;; 发送时间:&nbsp;2020年6月5日(星期五) 上午10:22 收件人:&nbsp;"[hidden email]"<[hidden email]&gt;; 主题:&nbsp;flink1.9 Sql 注册的中间临时表不自动存state的吗? 大家好&amp;nbsp; &amp;nbsp;我使用的flink 1.9的blink planner 先按 月 和城市 去重id&amp;nbsp; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable&amp;nbsp; &amp;nbsp;rowkey是month+city 然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city 运行了近18个小时 中间有过restore和ck失败,然后我统计了&amp;nbsp; 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因 伪代码如下: val env = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, bsSettings) myDataStream=...... tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) //按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复) val monthCount = tableEnv.sqlQuery( &amp;nbsp; &amp;nbsp; &amp;nbsp; s""" &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;select month,city,count(distinct id) as cnt from monthtable&amp;nbsp; group by month,city &amp;nbsp; &amp;nbsp; &amp;nbsp; """.stripMargin) //将月统计结果输出到hbase,rokey为month+city monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{ &amp;nbsp; &amp;nbsp; &amp;nbsp; val row=line._2 &amp;nbsp; &amp;nbsp; &amp;nbsp; val month=row.getField(0).toString &amp;nbsp; &amp;nbsp; &amp;nbsp; val city=row.getField(1).toString &amp;nbsp; &amp;nbsp; &amp;nbsp; val cnt=row.getField(2).toString &amp;nbsp; &amp;nbsp; &amp;nbsp; val map=new util.HashMap[String,String]() &amp;nbsp; &amp;nbsp; &amp;nbsp; map.put("cnt",cnt) &amp;nbsp; &amp;nbsp; &amp;nbsp; (month+city,map)// month+city是rowkey&amp;nbsp; cnt是一个column &amp;nbsp; &amp;nbsp; }).addSink(new MyHbaseSink("monthHbaseTable") &amp;nbsp;//将上面的月表注册成新表 monthStat &amp;nbsp;tableEnv.registerTable("monthStat",monthCount) &amp;nbsp;//按城市统计id的数量 &amp;nbsp;val totalCount = tableEnv.sqlQuery( &amp;nbsp; &amp;nbsp; &amp;nbsp; s""" &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;|select city,sum(cnt) as cityCnt from monthStat&amp;nbsp; group by city &amp;nbsp; &amp;nbsp; &amp;nbsp; """.stripMargin) //将月统计结果输出到hbase,rokey为city &amp;nbsp; &amp;nbsp; totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{ &amp;nbsp; &amp;nbsp; &amp;nbsp; val row=line._2 &amp;nbsp; &amp;nbsp; &amp;nbsp; val city=row.getField(0).toString &amp;nbsp; &amp;nbsp; &amp;nbsp; val totalCnt=row.getField(1).toString &amp;nbsp; &amp;nbsp; &amp;nbsp; val map=new util.HashMap[String,String]() &amp;nbsp; &amp;nbsp; &amp;nbsp; map.put("totalCnt",totalCnt) &amp;nbsp; &amp;nbsp; &amp;nbsp; (city,map) &amp;nbsp; &amp;nbsp; }).addSink("totalHbaseTable") |
各位大佬有遇到过类似问题吗?
------------------ 原始邮件 ------------------ 发件人: "star"<[hidden email]>; 发送时间: 2020年6月5日(星期五) 上午10:40 收件人: "[hidden email]"<[hidden email]>; 主题: 回复:flink1.9 Sql 注册的中间临时表不自动存state的吗? 没有使用窗口,状态不该被清理,但从结果看貌似被清理过呀 ------------------ 原始邮件 ------------------ 发件人: "zhiyezou"<[hidden email]>; 发送时间: 2020年6月5日(星期五) 上午10:31 收件人: "user-zh"<[hidden email]>; 主题: 回复:flink1.9 Sql 注册的中间临时表不自动存state的吗? Hi 如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL ------------------&nbsp;原始邮件&nbsp;------------------ 发件人:&nbsp;"star"<[hidden email]&gt;; 发送时间:&nbsp;2020年6月5日(星期五) 上午10:22 收件人:&nbsp;"[hidden email]"<[hidden email]&gt;; 主题:flink1.9 Sql 注册的中间临时表不自动存state的吗? 大家好,我使用的flink 1.9的blink planner 先按 月 和城市 去重id; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable;rowkey是month+city 然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city 运行了近18个小时 中间有过restore和ck失败,然后我统计了&amp;nbsp; 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因 伪代码如下: val env = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, bsSettings) myDataStream=...... tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) //按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复) val monthCount = tableEnv.sqlQuery( s""" select month,city,count(distinct id) as cnt from monthtable group by month,city """.stripMargin) //将月统计结果输出到hbase,rokey为month+city monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{ val row=line._2 val month=row.getField(0).toString val city=row.getField(1).toString val cnt=row.getField(2).toString val map=new util.HashMap[String,String]() map.put("cnt",cnt) (month+city,map) // month+city是rowkey cnt是一个column }).addSink(new MyHbaseSink("monthHbaseTable") //将上面的月表注册成新表 monthStat tableEnv.registerTable("monthStat",monthCount) //按城市统计id的数量 val totalCount = tableEnv.sqlQuery( s""" select city,sum(cnt) as cityCnt from monthStat&amp;nbsp; group by city """.stripMargin) //将月统计结果输出到hbase,rokey为city totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{ val row=line._2 val city=row.getField(0).toString val totalCnt=row.getField(1).toString val map=new util.HashMap[String,String]() map.put("totalCnt",totalCnt) (city,map) }).addSink("totalHbaseTable") |
Hi,
看你的问题描述,我们可能遇到过类似的问题。 我们的问题是,从cp恢复之后,某些key跟之前的状态里的key对不上了,所以就有点类似于丢失了一部分状态。 但是我们也没有查出来具体的原因,一方面是因为问题比较难以复现,我们用线上数据,也只是有部分数据有问题, 也看不出来这部分有问题的数据有什么规律;另一方面是blink planner底层用的都是binary的数据结构,debug起来也会 比较困难。 如果你能提供一个比较稳定的能复现的数据集和测试方法,我觉得这个问题我们可以再推进解决一下。 star <[hidden email]> 于2020年6月5日周五 下午4:02写道: > 各位大佬有遇到过类似问题吗? > > > > > ------------------ 原始邮件 ------------------ > 发件人: "star"<[hidden email]>; > 发送时间: 2020年6月5日(星期五) 上午10:40 > 收件人: "[hidden email]"<[hidden email]>; > > 主题: 回复:flink1.9 Sql 注册的中间临时表不自动存state的吗? > > > > 没有使用窗口,状态不该被清理,但从结果看貌似被清理过呀 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "zhiyezou"<[hidden email]>; > 发送时间: 2020年6月5日(星期五) 上午10:31 > 收件人: "user-zh"<[hidden email]>; > > 主题: 回复:flink1.9 Sql 注册的中间临时表不自动存state的吗? > > > > Hi > > > 如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"star"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月5日(星期五) 上午10:22 > 收件人:&nbsp;"[hidden email]"<[hidden email]&gt;; > > 主题:flink1.9 Sql 注册的中间临时表不自动存state的吗? > > > > 大家好,我使用的flink 1.9的blink planner > > > > 先按 月 和城市 去重id; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable;rowkey是month+city > > > > > 然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city > > > > > > > 运行了近18个小时 中间有过restore和ck失败,然后我统计了&amp;nbsp; > 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因 > > > 伪代码如下: > > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) > val bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val tableEnv = StreamTableEnvironment.create(env, bsSettings) > > > myDataStream=...... > > > > > tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) > > > > > //按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复) > val monthCount = tableEnv.sqlQuery( > s""" > select month,city,count(distinct id) as cnt from monthtable group by > month,city > """.stripMargin) > > > //将月统计结果输出到hbase,rokey为month+city > monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{ > val row=line._2 > val month=row.getField(0).toString > val city=row.getField(1).toString > val cnt=row.getField(2).toString > val map=new util.HashMap[String,String]() > map.put("cnt",cnt) > (month+city,map) // month+city是rowkey cnt是一个column > }).addSink(new MyHbaseSink("monthHbaseTable") > > > > > > > //将上面的月表注册成新表 monthStat > tableEnv.registerTable("monthStat",monthCount) > > > //按城市统计id的数量 > val totalCount = tableEnv.sqlQuery( > s""" > select city,sum(cnt) as cityCnt from monthStat&amp;nbsp; group by city > """.stripMargin) > > > //将月统计结果输出到hbase,rokey为city > totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{ > val row=line._2 > val city=row.getField(0).toString > val totalCnt=row.getField(1).toString > val map=new util.HashMap[String,String]() > map.put("totalCnt",totalCnt) > (city,map) > }).addSink("totalHbaseTable") -- Best, Benchao Li |
感谢您的回复,复现应该可以,就是数据集太大了,而且是生产数据,需要脱敏拿下来,操作起来比较难。
另外我看了对不上几条数据,数据的处理的时间和程序restore的时间差很多,好像还不是restore引起了 那个时间点也没有error日志 ------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年6月5日(星期五) 下午5:53 收件人: "user-zh"<[hidden email]>; 主题: Re: flink1.9 Sql 注册的中间临时表不自动存state的吗? Hi, 看你的问题描述,我们可能遇到过类似的问题。 我们的问题是,从cp恢复之后,某些key跟之前的状态里的key对不上了,所以就有点类似于丢失了一部分状态。 但是我们也没有查出来具体的原因,一方面是因为问题比较难以复现,我们用线上数据,也只是有部分数据有问题, 也看不出来这部分有问题的数据有什么规律;另一方面是blink planner底层用的都是binary的数据结构,debug起来也会 比较困难。 如果你能提供一个比较稳定的能复现的数据集和测试方法,我觉得这个问题我们可以再推进解决一下。 star <[hidden email]> 于2020年6月5日周五 下午4:02写道: > 各位大佬有遇到过类似问题吗? > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"star"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月5日(星期五) 上午10:40 > 收件人:&nbsp;"[hidden email]"<[hidden email]&gt;; > > 主题:&nbsp;回复:flink1.9 Sql 注册的中间临时表不自动存state的吗? > > > > 没有使用窗口,状态不该被清理,但从结果看貌似被清理过呀 > > > > > ------------------ 原始邮件 ------------------ > 发件人:&nbsp;"zhiyezou"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月5日(星期五) 上午10:31 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;回复:flink1.9 Sql 注册的中间临时表不自动存state的吗? > > > > Hi > > > 如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL > > > > > ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > 发件人:&amp;nbsp;"star"<[hidden email]&amp;gt;; > 发送时间:&amp;nbsp;2020年6月5日(星期五) 上午10:22 > 收件人:&amp;nbsp;"[hidden email]"<[hidden email]&amp;gt;; > > 主题:flink1.9 Sql 注册的中间临时表不自动存state的吗? > > > > 大家好,我使用的flink 1.9的blink planner > > > > 先按 月 和城市 去重id; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable;rowkey是month+city > > > > > 然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city > > > > > > > 运行了近18个小时 中间有过restore和ck失败,然后我统计了&amp;amp;nbsp; > 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因 > > > 伪代码如下: > > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) > val bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val tableEnv = StreamTableEnvironment.create(env, bsSettings) > > > myDataStream=...... > > > > > tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) > > > > > //按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复) > val monthCount = tableEnv.sqlQuery( > &nbsp;s""" > select month,city,count(distinct id) as cnt from monthtable group by > month,city > &nbsp;""".stripMargin) > > > //将月统计结果输出到hbase,rokey为month+city > monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{ > val row=line._2 > &nbsp;val month=row.getField(0).toString > val city=row.getField(1).toString > &nbsp;val cnt=row.getField(2).toString > val map=new util.HashMap[String,String]() > &nbsp;map.put("cnt",cnt) > &nbsp;(month+city,map)&nbsp; &nbsp; &nbsp;// month+city是rowkey cnt是一个column > &nbsp;}).addSink(new MyHbaseSink("monthHbaseTable") > > > > > > > //将上面的月表注册成新表 monthStat > tableEnv.registerTable("monthStat",monthCount) > > > //按城市统计id的数量 > val totalCount = tableEnv.sqlQuery( > s""" > select city,sum(cnt) as cityCnt from monthStat&amp;amp;nbsp; group by city > &nbsp;""".stripMargin) > > > //将月统计结果输出到hbase,rokey为city > &nbsp;totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{ > val row=line._2 > val city=row.getField(0).toString > val totalCnt=row.getField(1).toString > val map=new util.HashMap[String,String]() > map.put("totalCnt",totalCnt) > (city,map) > &nbsp;}).addSink("totalHbaseTable") -- Best, Benchao Li |
Free forum by Nabble | Edit this page |