你好
感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 附: userbehavior建表语句 CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.zookeeper.connect' = '192.168.0.150:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = '192.168.0.150:9092', -- kafka broker 地址 'format.type' = 'json' -- 数据源格式为 json ) 每小时购买数建表语句 CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector 'connector.version' = '6', -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本 'connector.hosts' = '<a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', -- elasticsearch 地址 'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,相当于数据库的表名 'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相当于数据库的库名 'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新 'format.type' = 'json', -- 输出数据格式 json 'update-mode' = 'append' ) 插入语句 INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) kafka数据发送代码 import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.text.SimpleDateFormat; import java.util.*; public class UserBehaviorProducer { public static final String brokerList = "192.168.0.150:9092"; // public static final String topic="user_behavior"; public static final String topic = "user_behavior"; public static void main(String args[]) { //配置生产者客户端参数 //将配置序列化 Properties properties = new Properties(); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("bootstrap.servers", brokerList); //创建KafkaProducer 实例 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); //构建待发送的消息 //{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} //{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} String[] behaviors = {"pv", "buy", "coll", "cart"};//浏览,购买,收藏,加入购物车 JSONObject jsonObject = new JSONObject(); HashMap<String, String> info = new HashMap<>(); Random random = new Random(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); long date_long=getDate(); while (true) { jsonObject.put("user_id", random.nextInt(900000) + 100000 + ""); jsonObject.put("item_id", random.nextInt(900000) + 100000 + ""); jsonObject.put("category_id", random.nextInt(1000) + ""); jsonObject.put("behavior", behaviors[random.nextInt(4)]); jsonObject.put("ts", format.format(new Date(date_long))); String msg = jsonObject.toString(); System.out.println(msg); ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg); producer.send(record); // date_long +=500+random.nextGaussian()*1000; date_long +=800+random.nextGaussian()*1500; try { Thread.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); } } } private static long getDate() { Date date = new Date(); Calendar c = Calendar.getInstance(); c.setTime(date); //设置为1号,当前日期既为本月第一天 c.set(Calendar.DAY_OF_MONTH, 1); //将小时至0 c.set(Calendar.HOUR_OF_DAY, 0); //将分钟至0 c.set(Calendar.MINUTE, 0); //将秒至0 c.set(Calendar.SECOND,0); //将毫秒至0 c.set(Calendar.MILLISECOND, 0); // 本月第一天的时间戳转换为字符串 return c.getTimeInMillis(); } } ------------------ 原始邮件 ------------------ 发件人: "Jark Wu"<[hidden email]>; 发送时间: 2020年4月18日(星期六) 晚上10:08 收件人: "user-zh"<[hidden email]>; 主题: Re: 问题请教-flinksql的kafkasource方面 Hi, 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? Best, Jark On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]> wrote: > 大佬好: > &nbsp; &nbsp; > &nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > &nbsp; &nbsp; > &nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > &nbsp;&nbsp; > &nbsp; &nbsp; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > view as ...")却会报错。报错如下: > Exception in thread "main" org.apache.flink.table.api.TableException: > Unsupported query: CREATE VIEW rich_user_behavior AS > SELECT U.user_id, U.item_id, U.behavior,&nbsp; > &nbsp; CASE C.parent_category_id > &nbsp; &nbsp; WHEN 1 THEN '服饰鞋包' > &nbsp; &nbsp; WHEN 2 THEN '家装家饰' > &nbsp; &nbsp; WHEN 3 THEN '家电' > &nbsp; &nbsp; WHEN 4 THEN '美妆' > &nbsp; &nbsp; WHEN 5 THEN '母婴' > &nbsp; &nbsp; WHEN 6 THEN '3C数码' > &nbsp; &nbsp; WHEN 7 THEN '运动户外' > &nbsp; &nbsp; WHEN 8 THEN '食品' > &nbsp; &nbsp; ELSE '其他' > &nbsp; END AS category_name > FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF > U.proctime AS C > ON U.category_id = C.sub_category_id > at > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > at > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > Source) > at java.util.Optional.orElseThrow(Optional.java:290) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > > > > &nbsp; &nbsp; &nbsp; &nbsp; 望解答,十分感谢! |
Administrator
|
Hi,
根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 根据你的 Java 代码,数据的 event time 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark 能容忍 5s 乱序). 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 partition 进度快很多的现象, 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 完美的解决方案还需要等 FLIP-27 的完成。 当前可以通过增加 watermark delay来增大迟到数据的容忍。 Best, Jark On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: > 你好 > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > 附: > userbehavior建表语句 > CREATE TABLE user_behavior ( > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING, > ts TIMESTAMP(3), > proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 > WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- > 在ts上定义watermark,ts成为事件时间列 > ) WITH ( > 'connector.type' = 'kafka', -- 使用 kafka connector > 'connector.version' = 'universal', -- kafka > 版本,universal 支持 0.11 以上的版本 > 'connector.topic' = 'user_behavior', -- kafka topic > 'connector.startup-mode' = 'earliest-offset', -- 从起始 > offset 开始读取 > 'connector.properties.zookeeper.connect' = ' > 192.168.0.150:2181', -- zookeeper 地址 > 'connector.properties.bootstrap.servers' = ' > 192.168.0.150:9092', -- kafka broker 地址 > 'format.type' = 'json' -- 数据源格式为 json > ) > > 每小时购买数建表语句 > CREATE TABLE buy_cnt_per_hour ( > hour_of_day BIGINT, > buy_cnt BIGINT > ) WITH ( > 'connector.type' = 'elasticsearch', -- 使用 elasticsearch > connector > 'connector.version' = '6', -- elasticsearch 版本,6 能支持 > es 6+ 以及 7+ 的版本 > 'connector.hosts' = '<a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', -- > elasticsearch 地址 > 'connector.index' = 'buy_cnt_per_hour', -- > elasticsearch 索引名,相当于数据库的表名 > 'connector.document-type' = 'user_behavior', -- > elasticsearch 的 type,相当于数据库的库名 > 'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新 > 'format.type' = 'json', -- 输出数据格式 json > 'update-mode' = 'append' > ) > > 插入语句 > INSERT INTO buy_cnt_per_hour > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*) > FROM user_behavior > WHERE behavior = 'buy' > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > kafka数据发送代码 > > import com.alibaba.fastjson.JSONObject; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerRecord; > > import java.text.SimpleDateFormat; > import java.util.*; > > > public class UserBehaviorProducer { > public static final String brokerList = "192.168.0.150:9092"; > > // public static final String topic="user_behavior"; > public static final String topic = "user_behavior"; > > public static void main(String args[]) { > > //配置生产者客户端参数 > //将配置序列化 > Properties properties = new Properties(); > properties.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > properties.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > properties.put("bootstrap.servers", brokerList); > //创建KafkaProducer 实例 > KafkaProducer<String, String> producer = new > KafkaProducer<>(properties); > //构建待发送的消息 > //{"user_id": "952483", "item_id":"310884", "category_id": > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > //{"user_id": "794777", "item_id":"5119439", "category_id": > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > String[] behaviors = {"pv", "buy", "coll", "cart"};//浏览,购买,收藏,加入购物车 > JSONObject jsonObject = new JSONObject(); > HashMap<String, String> info = new HashMap<>(); > Random random = new Random(); > SimpleDateFormat format = new > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > long date_long=getDate(); > while (true) { > jsonObject.put("user_id", random.nextInt(900000) + 100000 + > ""); > jsonObject.put("item_id", random.nextInt(900000) + 100000 + > ""); > jsonObject.put("category_id", random.nextInt(1000) + ""); > jsonObject.put("behavior", behaviors[random.nextInt(4)]); > jsonObject.put("ts", format.format(new Date(date_long))); > String msg = jsonObject.toString(); > System.out.println(msg); > ProducerRecord<String, String> record = new > ProducerRecord<>(topic, msg); > producer.send(record); > // date_long +=500+random.nextGaussian()*1000; > date_long +=800+random.nextGaussian()*1500; > try { > Thread.sleep(60); > } catch (InterruptedException e) { > e.printStackTrace(); > } > } > > } > > private static long getDate() { > Date date = new Date(); > Calendar c = Calendar.getInstance(); > c.setTime(date); > //设置为1号,当前日期既为本月第一天 > c.set(Calendar.DAY_OF_MONTH, 1); > //将小时至0 > c.set(Calendar.HOUR_OF_DAY, 0); > //将分钟至0 > c.set(Calendar.MINUTE, 0); > //将秒至0 > c.set(Calendar.SECOND,0); > //将毫秒至0 > c.set(Calendar.MILLISECOND, 0); > // 本月第一天的时间戳转换为字符串 > return c.getTimeInMillis(); > } > } > > ------------------ 原始邮件 ------------------ > 发件人: "Jark Wu"<[hidden email]>; > 发送时间: 2020年4月18日(星期六) 晚上10:08 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 问题请教-flinksql的kafkasource方面 > > > > Hi, > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。 > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? > > Best, > Jark > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]> wrote: > > > 大佬好: > > &nbsp; &nbsp; > > &nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > &nbsp; &nbsp; > > > &nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > &nbsp;&nbsp; > > &nbsp; &nbsp; > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > view as ...")却会报错。报错如下: > > Exception in thread "main" org.apache.flink.table.api.TableException: > > Unsupported query: CREATE VIEW rich_user_behavior AS > > SELECT U.user_id, U.item_id, U.behavior,&nbsp; > > &nbsp; CASE C.parent_category_id > > &nbsp; &nbsp; WHEN 1 THEN '服饰鞋包' > > &nbsp; &nbsp; WHEN 2 THEN '家装家饰' > > &nbsp; &nbsp; WHEN 3 THEN '家电' > > &nbsp; &nbsp; WHEN 4 THEN '美妆' > > &nbsp; &nbsp; WHEN 5 THEN '母婴' > > &nbsp; &nbsp; WHEN 6 THEN '3C数码' > > &nbsp; &nbsp; WHEN 7 THEN '运动户外' > > &nbsp; &nbsp; WHEN 8 THEN '食品' > > &nbsp; &nbsp; ELSE '其他' > > &nbsp; END AS category_name > > FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF > > U.proctime AS C > > ON U.category_id = C.sub_category_id > > at > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > at > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > Source) > > at > java.util.Optional.orElseThrow(Optional.java:290) > > at > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > at > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > > > > > > > > > > &nbsp; &nbsp; &nbsp; &nbsp; 望解答,十分感谢! |
如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道: > Hi, > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > 根据你的 Java 代码,数据的 event time 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > 能容忍 5s 乱序). > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 partition > 进度快很多的现象, > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > 完美的解决方案还需要等 FLIP-27 的完成。 > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > Best, > Jark > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: > > > 你好 > > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > > > > > 附: > > userbehavior建表语句 > > CREATE TABLE user_behavior ( > > user_id BIGINT, > > item_id BIGINT, > > category_id BIGINT, > > behavior STRING, > > ts TIMESTAMP(3), > > proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 > > WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- > > 在ts上定义watermark,ts成为事件时间列 > > ) WITH ( > > 'connector.type' = 'kafka', -- 使用 kafka connector > > 'connector.version' = 'universal', -- kafka > > 版本,universal 支持 0.11 以上的版本 > > 'connector.topic' = 'user_behavior', -- kafka topic > > 'connector.startup-mode' = 'earliest-offset', -- 从起始 > > offset 开始读取 > > 'connector.properties.zookeeper.connect' = ' > > 192.168.0.150:2181', -- zookeeper 地址 > > 'connector.properties.bootstrap.servers' = ' > > 192.168.0.150:9092', -- kafka broker 地址 > > 'format.type' = 'json' -- 数据源格式为 json > > ) > > > > 每小时购买数建表语句 > > CREATE TABLE buy_cnt_per_hour ( > > hour_of_day BIGINT, > > buy_cnt BIGINT > > ) WITH ( > > 'connector.type' = 'elasticsearch', -- 使用 elasticsearch > > connector > > 'connector.version' = '6', -- elasticsearch 版本,6 能支持 > > es 6+ 以及 7+ 的版本 > > 'connector.hosts' = '<a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', -- > > elasticsearch 地址 > > 'connector.index' = 'buy_cnt_per_hour', -- > > elasticsearch 索引名,相当于数据库的表名 > > 'connector.document-type' = 'user_behavior', -- > > elasticsearch 的 type,相当于数据库的库名 > > 'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新 > > 'format.type' = 'json', -- 输出数据格式 json > > 'update-mode' = 'append' > > ) > > > > 插入语句 > > INSERT INTO buy_cnt_per_hour > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*) > > FROM user_behavior > > WHERE behavior = 'buy' > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > > > kafka数据发送代码 > > > > import com.alibaba.fastjson.JSONObject; > > import org.apache.kafka.clients.producer.KafkaProducer; > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > import java.text.SimpleDateFormat; > > import java.util.*; > > > > > > public class UserBehaviorProducer { > > public static final String brokerList = "192.168.0.150:9092"; > > > > // public static final String topic="user_behavior"; > > public static final String topic = "user_behavior"; > > > > public static void main(String args[]) { > > > > //配置生产者客户端参数 > > //将配置序列化 > > Properties properties = new Properties(); > > properties.put("key.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("value.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("bootstrap.servers", brokerList); > > //创建KafkaProducer 实例 > > KafkaProducer<String, String> producer = new > > KafkaProducer<>(properties); > > //构建待发送的消息 > > //{"user_id": "952483", "item_id":"310884", "category_id": > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > //{"user_id": "794777", "item_id":"5119439", "category_id": > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > String[] behaviors = {"pv", "buy", "coll", > "cart"};//浏览,购买,收藏,加入购物车 > > JSONObject jsonObject = new JSONObject(); > > HashMap<String, String> info = new HashMap<>(); > > Random random = new Random(); > > SimpleDateFormat format = new > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > > long date_long=getDate(); > > while (true) { > > jsonObject.put("user_id", random.nextInt(900000) + 100000 + > > ""); > > jsonObject.put("item_id", random.nextInt(900000) + 100000 + > > ""); > > jsonObject.put("category_id", random.nextInt(1000) + ""); > > jsonObject.put("behavior", behaviors[random.nextInt(4)]); > > jsonObject.put("ts", format.format(new Date(date_long))); > > String msg = jsonObject.toString(); > > System.out.println(msg); > > ProducerRecord<String, String> record = new > > ProducerRecord<>(topic, msg); > > producer.send(record); > > // date_long +=500+random.nextGaussian()*1000; > > date_long +=800+random.nextGaussian()*1500; > > try { > > Thread.sleep(60); > > } catch (InterruptedException e) { > > e.printStackTrace(); > > } > > } > > > > } > > > > private static long getDate() { > > Date date = new Date(); > > Calendar c = Calendar.getInstance(); > > c.setTime(date); > > //设置为1号,当前日期既为本月第一天 > > c.set(Calendar.DAY_OF_MONTH, 1); > > //将小时至0 > > c.set(Calendar.HOUR_OF_DAY, 0); > > //将分钟至0 > > c.set(Calendar.MINUTE, 0); > > //将秒至0 > > c.set(Calendar.SECOND,0); > > //将毫秒至0 > > c.set(Calendar.MILLISECOND, 0); > > // 本月第一天的时间戳转换为字符串 > > return c.getTimeInMillis(); > > } > > } > > > > ------------------ 原始邮件 ------------------ > > 发件人: "Jark Wu"<[hidden email]>; > > 发送时间: 2020年4月18日(星期六) 晚上10:08 > > 收件人: "user-zh"<[hidden email]>; > > > > 主题: Re: 问题请教-flinksql的kafkasource方面 > > > > > > > > Hi, > > > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。 > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? > > > > Best, > > Jark > > > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]> wrote: > > > > > 大佬好: > > > &nbsp; &nbsp; > > > > &nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > > &nbsp; &nbsp; > > > > > > &nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > > &nbsp;&nbsp; > > > &nbsp; &nbsp; > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > > view as ...")却会报错。报错如下: > > > Exception in thread "main" > org.apache.flink.table.api.TableException: > > > Unsupported query: CREATE VIEW rich_user_behavior AS > > > SELECT U.user_id, U.item_id, U.behavior,&nbsp; > > > &nbsp; CASE C.parent_category_id > > > &nbsp; &nbsp; WHEN 1 THEN '服饰鞋包' > > > &nbsp; &nbsp; WHEN 2 THEN '家装家饰' > > > &nbsp; &nbsp; WHEN 3 THEN '家电' > > > &nbsp; &nbsp; WHEN 4 THEN '美妆' > > > &nbsp; &nbsp; WHEN 5 THEN '母婴' > > > &nbsp; &nbsp; WHEN 6 THEN '3C数码' > > > &nbsp; &nbsp; WHEN 7 THEN '运动户外' > > > &nbsp; &nbsp; WHEN 8 THEN '食品' > > > &nbsp; &nbsp; ELSE '其他' > > > &nbsp; END AS category_name > > > FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF > > > U.proctime AS C > > > ON U.category_id = C.sub_category_id > > > at > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > > at > > > > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > > Source) > > > at > > java.util.Optional.orElseThrow(Optional.java:290) > > > at > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > > at > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > > at > > > > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > > > > > > > > > > > > > > > > &nbsp; &nbsp; &nbsp; &nbsp; 望解答,十分感谢! > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
嗯嗯,十分感谢
------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年4月19日(星期天) 晚上9:25 收件人: "user-zh"<[hidden email]>; 主题: Re: 问题请教-flinksql的kafkasource方面 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道: > Hi, > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > 根据你的 Java 代码,数据的 event time 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > 能容忍 5s 乱序). > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 partition > 进度快很多的现象, > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > 完美的解决方案还需要等 FLIP-27 的完成。 > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > Best, > Jark > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: > > > 你好 > > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > > > > > 附: > > userbehavior建表语句 > > CREATE TABLE user_behavior ( > > &nbsp; &nbsp; user_id BIGINT, > > &nbsp; &nbsp; item_id BIGINT, > > &nbsp; &nbsp; category_id BIGINT, > > &nbsp; &nbsp; behavior STRING, > > &nbsp; &nbsp; ts TIMESTAMP(3), > > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- 通过计算列产生一个处理时间列 > > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' SECOND &nbsp;-- > > 在ts上定义watermark,ts成为事件时间列 > > ) WITH ( > > &nbsp; &nbsp; 'connector.type' = 'kafka', &nbsp;-- 使用 kafka connector > > &nbsp; &nbsp; 'connector.version' = 'universal', &nbsp;-- kafka > > 版本,universal 支持 0.11 以上的版本 > > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', &nbsp;-- kafka topic > > &nbsp; &nbsp; 'connector.startup-mode' = 'earliest-offset', &nbsp;-- 从起始 > > offset 开始读取 > > &nbsp; &nbsp; 'connector.properties.zookeeper.connect' = ' > > 192.168.0.150:2181', &nbsp;-- zookeeper 地址 > > &nbsp; &nbsp; 'connector.properties.bootstrap.servers' = ' > > 192.168.0.150:9092', &nbsp;-- kafka broker 地址 > > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- 数据源格式为 json > > ) > > > > 每小时购买数建表语句 > > CREATE TABLE buy_cnt_per_hour (&nbsp; > > &nbsp; &nbsp; hour_of_day BIGINT, > > &nbsp; &nbsp; buy_cnt BIGINT > > ) WITH ( > > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', -- 使用 elasticsearch > > connector > > &nbsp; &nbsp; 'connector.version' = '6', &nbsp;-- elasticsearch 版本,6 能支持 > > es 6+ 以及 7+ 的版本 > > &nbsp; &nbsp; 'connector.hosts' = '<a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;-- > > elasticsearch 地址 > > &nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', &nbsp;-- > > elasticsearch 索引名,相当于数据库的表名 > > &nbsp; &nbsp; 'connector.document-type' = 'user_behavior', -- > > elasticsearch 的 type,相当于数据库的库名 > > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = '1', &nbsp;-- 每条数据都刷新 > > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- 输出数据格式 json > > &nbsp; &nbsp; 'update-mode' = 'append' > > ) > > > > 插入语句 > > INSERT INTO buy_cnt_per_hour&nbsp; > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*)&nbsp; > > FROM user_behavior > > WHERE behavior = 'buy' > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > > > kafka数据发送代码 > > > > import com.alibaba.fastjson.JSONObject; > > import org.apache.kafka.clients.producer.KafkaProducer; > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > import java.text.SimpleDateFormat; > > import java.util.*; > > > > > > public class UserBehaviorProducer { > > public static final String brokerList = "192.168.0.150:9092"; > > > > // public static final String topic="user_behavior"; > > public static final String topic = "user_behavior"; > > > > public static void main(String args[]) { > > > > //配置生产者客户端参数 > > //将配置序列化 > > Properties properties = new Properties(); > > properties.put("key.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("value.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("bootstrap.servers", brokerList); > > //创建KafkaProducer 实例 > > KafkaProducer<String, String&gt; producer = new > > KafkaProducer<&gt;(properties); > > //构建待发送的消息 > > //{"user_id": "952483", "item_id":"310884", "category_id": > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > //{"user_id": "794777", "item_id":"5119439", "category_id": > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > String[] behaviors = {"pv", "buy", "coll", > "cart"};//浏览,购买,收藏,加入购物车 > > JSONObject jsonObject = new JSONObject(); > > HashMap<String, String&gt; info = new HashMap<&gt;(); > > Random random = new Random(); > > SimpleDateFormat format = new > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > > long date_long=getDate(); > > while (true) { > > jsonObject.put("user_id", random.nextInt(900000) + 100000 + > > ""); > > jsonObject.put("item_id", random.nextInt(900000) + 100000 + > > ""); > > jsonObject.put("category_id", random.nextInt(1000) + ""); > > jsonObject.put("behavior", behaviors[random.nextInt(4)]); > > jsonObject.put("ts", format.format(new Date(date_long))); > > String msg = jsonObject.toString(); > > System.out.println(msg); > > ProducerRecord<String, String&gt; record = new > > ProducerRecord<&gt;(topic, msg); > > producer.send(record); > > // date_long +=500+random.nextGaussian()*1000; > > date_long +=800+random.nextGaussian()*1500; > > try { > > Thread.sleep(60); > > } catch (InterruptedException e) { > > e.printStackTrace(); > > } > > } > > > > } > > > > private static long getDate() { > > Date date = new Date(); > > Calendar c = Calendar.getInstance(); > > c.setTime(date); > > //设置为1号,当前日期既为本月第一天 > > c.set(Calendar.DAY_OF_MONTH, 1); > > //将小时至0 > > c.set(Calendar.HOUR_OF_DAY, 0); > > //将分钟至0 > > c.set(Calendar.MINUTE, 0); > > //将秒至0 > > c.set(Calendar.SECOND,0); > > //将毫秒至0 > > c.set(Calendar.MILLISECOND, 0); > > // 本月第一天的时间戳转换为字符串 > > return c.getTimeInMillis(); > > } > > } > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面 > > > > > > > > Hi, > > > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。 > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? > > > > Best, > > Jark > > > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&gt; wrote: > > > > &gt; 大佬好: > > &gt; &amp;nbsp; &amp;nbsp; > > &gt; > &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > &gt; &amp;nbsp; &amp;nbsp; > > &gt; > > > &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > &gt; &amp;nbsp;&amp;nbsp; > > &gt; &amp;nbsp; &amp;nbsp; > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > &gt; view as ...")却会报错。报错如下: > > &gt; Exception in thread "main" > org.apache.flink.table.api.TableException: > > &gt; Unsupported query: CREATE VIEW rich_user_behavior AS > > &gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp; > > &gt; &amp;nbsp; CASE C.parent_category_id > > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品' > > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他' > > &gt; &amp;nbsp; END AS category_name > > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF > > &gt; U.proctime AS C > > &gt; ON U.category_id = C.sub_category_id > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > &gt; Source) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > java.util.Optional.orElseThrow(Optional.java:290) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > &gt; > > &gt; > > &gt; > > &gt; > > &gt; > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 望解答,十分感谢! > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
| | Sun.Zhu | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年04月19日 22:43,人生若只如初见 写道: 嗯嗯,十分感谢 ------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年4月19日(星期天) 晚上9:25 收件人: "user-zh"<[hidden email]>; 主题: Re: 问题请教-flinksql的kafkasource方面 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道: > Hi, > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > 根据你的 Java 代码,数据的 event time 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > 能容忍 5s 乱序). > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 partition > 进度快很多的现象, > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > 完美的解决方案还需要等 FLIP-27 的完成。 > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > Best, > Jark > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: > > > 你好 > > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > > > > > 附: > > userbehavior建表语句 > > CREATE TABLE user_behavior ( > > &nbsp; &nbsp; user_id BIGINT, > > &nbsp; &nbsp; item_id BIGINT, > > &nbsp; &nbsp; category_id BIGINT, > > &nbsp; &nbsp; behavior STRING, > > &nbsp; &nbsp; ts TIMESTAMP(3), > > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- 通过计算列产生一个处理时间列 > > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' SECOND &nbsp;-- > > 在ts上定义watermark,ts成为事件时间列 > > ) WITH ( > > &nbsp; &nbsp; 'connector.type' = 'kafka', &nbsp;-- 使用 kafka connector > > &nbsp; &nbsp; 'connector.version' = 'universal', &nbsp;-- kafka > > 版本,universal 支持 0.11 以上的版本 > > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', &nbsp;-- kafka topic > > &nbsp; &nbsp; 'connector.startup-mode' = 'earliest-offset', &nbsp;-- 从起始 > > offset 开始读取 > > &nbsp; &nbsp; 'connector.properties.zookeeper.connect' = ' > > 192.168.0.150:2181', &nbsp;-- zookeeper 地址 > > &nbsp; &nbsp; 'connector.properties.bootstrap.servers' = ' > > 192.168.0.150:9092', &nbsp;-- kafka broker 地址 > > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- 数据源格式为 json > > ) > > > > 每小时购买数建表语句 > > CREATE TABLE buy_cnt_per_hour (&nbsp; > > &nbsp; &nbsp; hour_of_day BIGINT, > > &nbsp; &nbsp; buy_cnt BIGINT > > ) WITH ( > > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', -- 使用 elasticsearch > > connector > > &nbsp; &nbsp; 'connector.version' = '6', &nbsp;-- elasticsearch 版本,6 能支持 > > es 6+ 以及 7+ 的版本 > > &nbsp; &nbsp; 'connector.hosts' = '<a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;-- > > elasticsearch 地址 > > &nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', &nbsp;-- > > elasticsearch 索引名,相当于数据库的表名 > > &nbsp; &nbsp; 'connector.document-type' = 'user_behavior', -- > > elasticsearch 的 type,相当于数据库的库名 > > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = '1', &nbsp;-- 每条数据都刷新 > > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- 输出数据格式 json > > &nbsp; &nbsp; 'update-mode' = 'append' > > ) > > > > 插入语句 > > INSERT INTO buy_cnt_per_hour&nbsp; > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*)&nbsp; > > FROM user_behavior > > WHERE behavior = 'buy' > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > > > kafka数据发送代码 > > > > import com.alibaba.fastjson.JSONObject; > > import org.apache.kafka.clients.producer.KafkaProducer; > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > import java.text.SimpleDateFormat; > > import java.util.*; > > > > > > public class UserBehaviorProducer { > > public static final String brokerList = "192.168.0.150:9092"; > > > > // public static final String topic="user_behavior"; > > public static final String topic = "user_behavior"; > > > > public static void main(String args[]) { > > > > //配置生产者客户端参数 > > //将配置序列化 > > Properties properties = new Properties(); > > properties.put("key.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("value.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > properties.put("bootstrap.servers", brokerList); > > //创建KafkaProducer 实例 > > KafkaProducer<String, String&gt; producer = new > > KafkaProducer<&gt;(properties); > > //构建待发送的消息 > > //{"user_id": "952483", "item_id":"310884", "category_id": > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > //{"user_id": "794777", "item_id":"5119439", "category_id": > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > String[] behaviors = {"pv", "buy", "coll", > "cart"};//浏览,购买,收藏,加入购物车 > > JSONObject jsonObject = new JSONObject(); > > HashMap<String, String&gt; info = new HashMap<&gt;(); > > Random random = new Random(); > > SimpleDateFormat format = new > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > > long date_long=getDate(); > > while (true) { > > jsonObject.put("user_id", random.nextInt(900000) + 100000 + > > ""); > > jsonObject.put("item_id", random.nextInt(900000) + 100000 + > > ""); > > jsonObject.put("category_id", random.nextInt(1000) + ""); > > jsonObject.put("behavior", behaviors[random.nextInt(4)]); > > jsonObject.put("ts", format.format(new Date(date_long))); > > String msg = jsonObject.toString(); > > System.out.println(msg); > > ProducerRecord<String, String&gt; record = new > > ProducerRecord<&gt;(topic, msg); > > producer.send(record); > > // date_long +=500+random.nextGaussian()*1000; > > date_long +=800+random.nextGaussian()*1500; > > try { > > Thread.sleep(60); > > } catch (InterruptedException e) { > > e.printStackTrace(); > > } > > } > > > > } > > > > private static long getDate() { > > Date date = new Date(); > > Calendar c = Calendar.getInstance(); > > c.setTime(date); > > //设置为1号,当前日期既为本月第一天 > > c.set(Calendar.DAY_OF_MONTH, 1); > > //将小时至0 > > c.set(Calendar.HOUR_OF_DAY, 0); > > //将分钟至0 > > c.set(Calendar.MINUTE, 0); > > //将秒至0 > > c.set(Calendar.SECOND,0); > > //将毫秒至0 > > c.set(Calendar.MILLISECOND, 0); > > // 本月第一天的时间戳转换为字符串 > > return c.getTimeInMillis(); > > } > > } > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面 > > > > > > > > Hi, > > > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。 > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? > > > > Best, > > Jark > > > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&gt; wrote: > > > > &gt; 大佬好: > > &gt; &amp;nbsp; &amp;nbsp; > > &gt; > &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > &gt; &amp;nbsp; &amp;nbsp; > > &gt; > > > &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > &gt; &amp;nbsp;&amp;nbsp; > > &gt; &amp;nbsp; &amp;nbsp; > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > &gt; view as ...")却会报错。报错如下: > > &gt; Exception in thread "main" > org.apache.flink.table.api.TableException: > > &gt; Unsupported query: CREATE VIEW rich_user_behavior AS > > &gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp; > > &gt; &amp;nbsp; CASE C.parent_category_id > > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外' > > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品' > > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他' > > &gt; &amp;nbsp; END AS category_name > > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF > > &gt; U.proctime AS C > > &gt; ON U.category_id = C.sub_category_id > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > &gt; Source) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > java.util.Optional.orElseThrow(Optional.java:290) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > &gt; > > &gt; > > &gt; > > &gt; > > &gt; > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 望解答,十分感谢! > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
应该是不会的。分配不到partition的source会标记为idle状态。
Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道: > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > > > > | | > Sun.Zhu > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年04月19日 22:43,人生若只如初见 写道: > 嗯嗯,十分感谢 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Benchao Li"<[hidden email]>; > 发送时间: 2020年4月19日(星期天) 晚上9:25 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 问题请教-flinksql的kafkasource方面 > > > > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道: > > > Hi, > > > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > 根据你的 Java 代码,数据的 event time > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > 能容忍 5s 乱序). > > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > partition > > 进度快很多的现象, > > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > > > 完美的解决方案还需要等 FLIP-27 的完成。 > > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > > > Best, > > Jark > > > > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: > > > > > 你好 > > > > > > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > > > > > > > > > 附: > > > userbehavior建表语句 > > > CREATE TABLE user_behavior ( > > > &nbsp; &nbsp; user_id BIGINT, > > > &nbsp; &nbsp; item_id BIGINT, > > > &nbsp; &nbsp; category_id BIGINT, > > > &nbsp; &nbsp; behavior STRING, > > > &nbsp; &nbsp; ts TIMESTAMP(3), > > > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- > 通过计算列产生一个处理时间列 > > > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > SECOND &nbsp;-- > > > 在ts上定义watermark,ts成为事件时间列 > > > ) WITH ( > > > &nbsp; &nbsp; 'connector.type' = 'kafka', &nbsp;-- > 使用 kafka connector > > > &nbsp; &nbsp; 'connector.version' = 'universal', > &nbsp;-- kafka > > > 版本,universal 支持 0.11 以上的版本 > > > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', > &nbsp;-- kafka topic > > > &nbsp; &nbsp; 'connector.startup-mode' = > 'earliest-offset', &nbsp;-- 从起始 > > > offset 开始读取 > > > &nbsp; &nbsp; 'connector.properties.zookeeper.connect' = > ' > > > 192.168.0.150:2181', &nbsp;-- zookeeper 地址 > > > &nbsp; &nbsp; 'connector.properties.bootstrap.servers' = > ' > > > 192.168.0.150:9092', &nbsp;-- kafka broker 地址 > > > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- 数据源格式为 > json > > > ) > > > > > > 每小时购买数建表语句 > > > CREATE TABLE buy_cnt_per_hour (&nbsp; > > > &nbsp; &nbsp; hour_of_day BIGINT, > > > &nbsp; &nbsp; buy_cnt BIGINT > > > ) WITH ( > > > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', -- 使用 > elasticsearch > > > connector > > > &nbsp; &nbsp; 'connector.version' = '6', &nbsp;-- > elasticsearch 版本,6 能支持 > > > es 6+ 以及 7+ 的版本 > > > &nbsp; &nbsp; 'connector.hosts' = ' > <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;-- > > > elasticsearch 地址 > > > &nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', > &nbsp;-- > > > elasticsearch 索引名,相当于数据库的表名 > > > &nbsp; &nbsp; 'connector.document-type' = > 'user_behavior', -- > > > elasticsearch 的 type,相当于数据库的库名 > > > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = '1', > &nbsp;-- 每条数据都刷新 > > > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- > 输出数据格式 json > > > &nbsp; &nbsp; 'update-mode' = 'append' > > > ) > > > > > > 插入语句 > > > INSERT INTO buy_cnt_per_hour&nbsp; > > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > HOUR)),COUNT(*)&nbsp; > > > FROM user_behavior > > > WHERE behavior = 'buy' > > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > > > > > kafka数据发送代码 > > > > > > import com.alibaba.fastjson.JSONObject; > > > import org.apache.kafka.clients.producer.KafkaProducer; > > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > > > import java.text.SimpleDateFormat; > > > import java.util.*; > > > > > > > > > public class UserBehaviorProducer { > > > public static final String brokerList = " > 192.168.0.150:9092"; > > > > > > // public static final > String topic="user_behavior"; > > > public static final String topic = > "user_behavior"; > > > > > > public static void main(String args[]) { > > > > > > //配置生产者客户端参数 > > > //将配置序列化 > > > Properties > properties = new Properties(); > > > > properties.put("key.serializer", > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > properties.put("value.serializer", > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > properties.put("bootstrap.servers", brokerList); > > > > //创建KafkaProducer 实例 > > > > KafkaProducer<String, String&gt; producer = new > > > KafkaProducer<&gt;(properties); > > > //构建待发送的消息 > > > //{"user_id": > "952483", "item_id":"310884", "category_id": > > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > > //{"user_id": > "794777", "item_id":"5119439", "category_id": > > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > > String[] > behaviors = {"pv", "buy", "coll", > > "cart"};//浏览,购买,收藏,加入购物车 > > > JSONObject > jsonObject = new JSONObject(); > > > HashMap<String, > String&gt; info = new HashMap<&gt;(); > > > Random random = > new Random(); > > > SimpleDateFormat > format = new > > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > > > long > date_long=getDate(); > > > while (true) { > > > > > jsonObject.put("user_id", random.nextInt(900000) + 100000 + > > > ""); > > > > > jsonObject.put("item_id", random.nextInt(900000) + 100000 + > > > ""); > > > > > jsonObject.put("category_id", random.nextInt(1000) + ""); > > > > > jsonObject.put("behavior", behaviors[random.nextInt(4)]); > > > > > jsonObject.put("ts", format.format(new Date(date_long))); > > > > > String msg = jsonObject.toString(); > > > > > System.out.println(msg); > > > > > ProducerRecord<String, String&gt; record = new > > > ProducerRecord<&gt;(topic, msg); > > > > > producer.send(record); > > > > // > date_long +=500+random.nextGaussian()*1000; > > > > > date_long +=800+random.nextGaussian()*1500; > > > > > try { > > > > > Thread.sleep(60); > > > > > } catch (InterruptedException e) { > > > > > e.printStackTrace(); > > > > > } > > > } > > > > > > } > > > > > > private static long getDate() { > > > Date date = new > Date(); > > > Calendar c = > Calendar.getInstance(); > > > c.setTime(date); > > > > //设置为1号,当前日期既为本月第一天 > > > > c.set(Calendar.DAY_OF_MONTH, 1); > > > //将小时至0 > > > > c.set(Calendar.HOUR_OF_DAY, 0); > > > //将分钟至0 > > > > c.set(Calendar.MINUTE, 0); > > > //将秒至0 > > > > c.set(Calendar.SECOND,0); > > > //将毫秒至0 > > > > c.set(Calendar.MILLISECOND, 0); > > > // > 本月第一天的时间戳转换为字符串 > > > return > c.getTimeInMillis(); > > > } > > > } > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; > > > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08 > > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > > > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面 > > > > > > > > > > > > Hi, > > > > > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, > TableEnvironment 上还未支持。 > > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? > > > > > > Best, > > > Jark > > > > > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&gt; > wrote: > > > > > > &gt; 大佬好: > > > &gt; &amp;nbsp; &amp;nbsp; > > > &gt; > > > &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > > &gt; &amp;nbsp; &amp;nbsp; > > > &gt; > > > > > > &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > > &gt; &amp;nbsp;&amp;nbsp; > > > &gt; &amp;nbsp; &amp;nbsp; > > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > > &gt; view as ...")却会报错。报错如下: > > > &gt; Exception in thread "main" > > org.apache.flink.table.api.TableException: > > > &gt; Unsupported query: CREATE VIEW rich_user_behavior AS > > > &gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp; > > > &gt; &amp;nbsp; CASE C.parent_category_id > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品' > > > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他' > > > &gt; &amp;nbsp; END AS category_name > > > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR > SYSTEM_TIME AS OF > > > &gt; U.proctime AS C > > > &gt; ON U.category_id = C.sub_category_id > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > > &gt; Source) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > java.util.Optional.orElseThrow(Optional.java:290) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > &gt; > > > &gt; > > > &gt; > > > &gt; > > > &gt; > > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; > &amp;nbsp; 望解答,十分感谢! > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint ,源码CheckpointCoordinator#triggerCheckpoint也有说明
| | Sun.Zhu | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年04月20日 10:37,Benchao Li 写道: 应该是不会的。分配不到partition的source会标记为idle状态。 Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道: > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > > > > | | > Sun.Zhu > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年04月19日 22:43,人生若只如初见 写道: > 嗯嗯,十分感谢 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Benchao Li"<[hidden email]>; > 发送时间: 2020年4月19日(星期天) 晚上9:25 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 问题请教-flinksql的kafkasource方面 > > > > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道: > > > Hi, > > > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > 根据你的 Java 代码,数据的 event time > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > 能容忍 5s 乱序). > > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > partition > > 进度快很多的现象, > > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > > > 完美的解决方案还需要等 FLIP-27 的完成。 > > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > > > Best, > > Jark > > > > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: > > > > > 你好 > > > > > > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > > > > > > > > > 附: > > > userbehavior建表语句 > > > CREATE TABLE user_behavior ( > > > &nbsp; &nbsp; user_id BIGINT, > > > &nbsp; &nbsp; item_id BIGINT, > > > &nbsp; &nbsp; category_id BIGINT, > > > &nbsp; &nbsp; behavior STRING, > > > &nbsp; &nbsp; ts TIMESTAMP(3), > > > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- > 通过计算列产生一个处理时间列 > > > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > SECOND &nbsp;-- > > > 在ts上定义watermark,ts成为事件时间列 > > > ) WITH ( > > > &nbsp; &nbsp; 'connector.type' = 'kafka', &nbsp;-- > 使用 kafka connector > > > &nbsp; &nbsp; 'connector.version' = 'universal', > &nbsp;-- kafka > > > 版本,universal 支持 0.11 以上的版本 > > > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', > &nbsp;-- kafka topic > > > &nbsp; &nbsp; 'connector.startup-mode' = > 'earliest-offset', &nbsp;-- 从起始 > > > offset 开始读取 > > > &nbsp; &nbsp; 'connector.properties.zookeeper.connect' = > ' > > > 192.168.0.150:2181', &nbsp;-- zookeeper 地址 > > > &nbsp; &nbsp; 'connector.properties.bootstrap.servers' = > ' > > > 192.168.0.150:9092', &nbsp;-- kafka broker 地址 > > > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- 数据源格式为 > json > > > ) > > > > > > 每小时购买数建表语句 > > > CREATE TABLE buy_cnt_per_hour (&nbsp; > > > &nbsp; &nbsp; hour_of_day BIGINT, > > > &nbsp; &nbsp; buy_cnt BIGINT > > > ) WITH ( > > > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', -- 使用 > elasticsearch > > > connector > > > &nbsp; &nbsp; 'connector.version' = '6', &nbsp;-- > elasticsearch 版本,6 能支持 > > > es 6+ 以及 7+ 的版本 > > > &nbsp; &nbsp; 'connector.hosts' = ' > <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;-- > > > elasticsearch 地址 > > > &nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', > &nbsp;-- > > > elasticsearch 索引名,相当于数据库的表名 > > > &nbsp; &nbsp; 'connector.document-type' = > 'user_behavior', -- > > > elasticsearch 的 type,相当于数据库的库名 > > > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = '1', > &nbsp;-- 每条数据都刷新 > > > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- > 输出数据格式 json > > > &nbsp; &nbsp; 'update-mode' = 'append' > > > ) > > > > > > 插入语句 > > > INSERT INTO buy_cnt_per_hour&nbsp; > > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > HOUR)),COUNT(*)&nbsp; > > > FROM user_behavior > > > WHERE behavior = 'buy' > > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > > > > > kafka数据发送代码 > > > > > > import com.alibaba.fastjson.JSONObject; > > > import org.apache.kafka.clients.producer.KafkaProducer; > > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > > > import java.text.SimpleDateFormat; > > > import java.util.*; > > > > > > > > > public class UserBehaviorProducer { > > > public static final String brokerList = " > 192.168.0.150:9092"; > > > > > > // public static final > String topic="user_behavior"; > > > public static final String topic = > "user_behavior"; > > > > > > public static void main(String args[]) { > > > > > > //配置生产者客户端参数 > > > //将配置序列化 > > > Properties > properties = new Properties(); > > > > properties.put("key.serializer", > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > properties.put("value.serializer", > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > properties.put("bootstrap.servers", brokerList); > > > > //创建KafkaProducer 实例 > > > > KafkaProducer<String, String&gt; producer = new > > > KafkaProducer<&gt;(properties); > > > //构建待发送的消息 > > > //{"user_id": > "952483", "item_id":"310884", "category_id": > > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > > //{"user_id": > "794777", "item_id":"5119439", "category_id": > > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > > String[] > behaviors = {"pv", "buy", "coll", > > "cart"};//浏览,购买,收藏,加入购物车 > > > JSONObject > jsonObject = new JSONObject(); > > > HashMap<String, > String&gt; info = new HashMap<&gt;(); > > > Random random = > new Random(); > > > SimpleDateFormat > format = new > > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > > > long > date_long=getDate(); > > > while (true) { > > > > > jsonObject.put("user_id", random.nextInt(900000) + 100000 + > > > ""); > > > > > jsonObject.put("item_id", random.nextInt(900000) + 100000 + > > > ""); > > > > > jsonObject.put("category_id", random.nextInt(1000) + ""); > > > > > jsonObject.put("behavior", behaviors[random.nextInt(4)]); > > > > > jsonObject.put("ts", format.format(new Date(date_long))); > > > > > String msg = jsonObject.toString(); > > > > > System.out.println(msg); > > > > > ProducerRecord<String, String&gt; record = new > > > ProducerRecord<&gt;(topic, msg); > > > > > producer.send(record); > > > > // > date_long +=500+random.nextGaussian()*1000; > > > > > date_long +=800+random.nextGaussian()*1500; > > > > > try { > > > > > Thread.sleep(60); > > > > > } catch (InterruptedException e) { > > > > > e.printStackTrace(); > > > > > } > > > } > > > > > > } > > > > > > private static long getDate() { > > > Date date = new > Date(); > > > Calendar c = > Calendar.getInstance(); > > > c.setTime(date); > > > > //设置为1号,当前日期既为本月第一天 > > > > c.set(Calendar.DAY_OF_MONTH, 1); > > > //将小时至0 > > > > c.set(Calendar.HOUR_OF_DAY, 0); > > > //将分钟至0 > > > > c.set(Calendar.MINUTE, 0); > > > //将秒至0 > > > > c.set(Calendar.SECOND,0); > > > //将毫秒至0 > > > > c.set(Calendar.MILLISECOND, 0); > > > // > 本月第一天的时间戳转换为字符串 > > > return > c.getTimeInMillis(); > > > } > > > } > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; > > > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08 > > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > > > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面 > > > > > > > > > > > > Hi, > > > > > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, > TableEnvironment 上还未支持。 > > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? > > > > > > Best, > > > Jark > > > > > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&gt; > wrote: > > > > > > &gt; 大佬好: > > > &gt; &amp;nbsp; &amp;nbsp; > > > &gt; > > > &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > > &gt; &amp;nbsp; &amp;nbsp; > > > &gt; > > > > > > &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > > &gt; &amp;nbsp;&amp;nbsp; > > > &gt; &amp;nbsp; &amp;nbsp; > > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > > &gt; view as ...")却会报错。报错如下: > > > &gt; Exception in thread "main" > > org.apache.flink.table.api.TableException: > > > &gt; Unsupported query: CREATE VIEW rich_user_behavior AS > > > &gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp; > > > &gt; &amp;nbsp; CASE C.parent_category_id > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外' > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品' > > > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他' > > > &gt; &amp;nbsp; END AS category_name > > > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR > SYSTEM_TIME AS OF > > > &gt; U.proctime AS C > > > &gt; ON U.category_id = C.sub_category_id > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > > &gt; Source) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > java.util.Optional.orElseThrow(Optional.java:290) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > > &gt; > > > > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > &gt; > > > &gt; > > > &gt; > > > &gt; > > > &gt; > > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; > &amp;nbsp; 望解答,十分感谢! > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
Sun.Zhu <[hidden email]> 于2020年4月20日周一 下午5:43写道: > 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint > ,源码CheckpointCoordinator#triggerCheckpoint也有说明 > > > > > > | | > Sun.Zhu > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年04月20日 10:37,Benchao Li 写道: > 应该是不会的。分配不到partition的source会标记为idle状态。 > > Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道: > > > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > > > > > > > > > | | > > Sun.Zhu > > | > > | > > 邮箱:[hidden email] > > | > > > > Signature is customized by Netease Mail Master > > > > 在2020年04月19日 22:43,人生若只如初见 写道: > > 嗯嗯,十分感谢 > > > > > > > > > > ------------------ 原始邮件 ------------------ > > 发件人: "Benchao Li"<[hidden email]>; > > 发送时间: 2020年4月19日(星期天) 晚上9:25 > > 收件人: "user-zh"<[hidden email]>; > > > > 主题: Re: 问题请教-flinksql的kafkasource方面 > > > > > > > > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > > > Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道: > > > > > Hi, > > > > > > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > > 根据你的 Java 代码,数据的 event time > > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > > 能容忍 5s 乱序). > > > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > > partition > > > 进度快很多的现象, > > > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > > > > > 完美的解决方案还需要等 FLIP-27 的完成。 > > > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > > > > > Best, > > > Jark > > > > > > > > > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: > > > > > > > 你好 > > > > > > > > > > > > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > > > > > > > > > > > > > > > 附: > > > > userbehavior建表语句 > > > > CREATE TABLE user_behavior ( > > > > &nbsp; &nbsp; user_id BIGINT, > > > > &nbsp; &nbsp; item_id BIGINT, > > > > &nbsp; &nbsp; category_id BIGINT, > > > > &nbsp; &nbsp; behavior STRING, > > > > &nbsp; &nbsp; ts TIMESTAMP(3), > > > > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- > > 通过计算列产生一个处理时间列 > > > > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > > SECOND &nbsp;-- > > > > 在ts上定义watermark,ts成为事件时间列 > > > > ) WITH ( > > > > &nbsp; &nbsp; 'connector.type' = 'kafka', &nbsp;-- > > 使用 kafka connector > > > > &nbsp; &nbsp; 'connector.version' = 'universal', > > &nbsp;-- kafka > > > > 版本,universal 支持 0.11 以上的版本 > > > > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', > > &nbsp;-- kafka topic > > > > &nbsp; &nbsp; 'connector.startup-mode' = > > 'earliest-offset', &nbsp;-- 从起始 > > > > offset 开始读取 > > > > &nbsp; &nbsp; 'connector.properties.zookeeper.connect' > = > > ' > > > > 192.168.0.150:2181', &nbsp;-- zookeeper 地址 > > > > &nbsp; &nbsp; 'connector.properties.bootstrap.servers' > = > > ' > > > > 192.168.0.150:9092', &nbsp;-- kafka broker 地址 > > > > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- > 数据源格式为 > > json > > > > ) > > > > > > > > 每小时购买数建表语句 > > > > CREATE TABLE buy_cnt_per_hour (&nbsp; > > > > &nbsp; &nbsp; hour_of_day BIGINT, > > > > &nbsp; &nbsp; buy_cnt BIGINT > > > > ) WITH ( > > > > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', -- 使用 > > elasticsearch > > > > connector > > > > &nbsp; &nbsp; 'connector.version' = '6', &nbsp;-- > > elasticsearch 版本,6 能支持 > > > > es 6+ 以及 7+ 的版本 > > > > &nbsp; &nbsp; 'connector.hosts' = ' > > <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;-- > > > > elasticsearch 地址 > > > > &nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', > > &nbsp;-- > > > > elasticsearch 索引名,相当于数据库的表名 > > > > &nbsp; &nbsp; 'connector.document-type' = > > 'user_behavior', -- > > > > elasticsearch 的 type,相当于数据库的库名 > > > > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = '1', > > &nbsp;-- 每条数据都刷新 > > > > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- > > 输出数据格式 json > > > > &nbsp; &nbsp; 'update-mode' = 'append' > > > > ) > > > > > > > > 插入语句 > > > > INSERT INTO buy_cnt_per_hour&nbsp; > > > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > > HOUR)),COUNT(*)&nbsp; > > > > FROM user_behavior > > > > WHERE behavior = 'buy' > > > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > > > > > > > kafka数据发送代码 > > > > > > > > import com.alibaba.fastjson.JSONObject; > > > > import org.apache.kafka.clients.producer.KafkaProducer; > > > > import org.apache.kafka.clients.producer.ProducerRecord; > > > > > > > > import java.text.SimpleDateFormat; > > > > import java.util.*; > > > > > > > > > > > > public class UserBehaviorProducer { > > > > public static final String brokerList > = " > > 192.168.0.150:9092"; > > > > > > > > // public static > final > > String topic="user_behavior"; > > > > public static final String topic = > > "user_behavior"; > > > > > > > > public static void main(String args[]) > { > > > > > > > > //配置生产者客户端参数 > > > > //将配置序列化 > > > > Properties > > properties = new Properties(); > > > > > > properties.put("key.serializer", > > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > > > properties.put("value.serializer", > > > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > > > properties.put("bootstrap.servers", brokerList); > > > > > > //创建KafkaProducer 实例 > > > > > > KafkaProducer<String, String&gt; producer = new > > > > KafkaProducer<&gt;(properties); > > > > //构建待发送的消息 > > > > //{"user_id": > > "952483", "item_id":"310884", "category_id": > > > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > > > //{"user_id": > > "794777", "item_id":"5119439", "category_id": > > > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > > > String[] > > behaviors = {"pv", "buy", "coll", > > > "cart"};//浏览,购买,收藏,加入购物车 > > > > JSONObject > > jsonObject = new JSONObject(); > > > > HashMap<String, > > String&gt; info = new HashMap<&gt;(); > > > > Random random = > > new Random(); > > > > > SimpleDateFormat > > format = new > > > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > > > > long > > date_long=getDate(); > > > > while (true) { > > > > > > > > > jsonObject.put("user_id", random.nextInt(900000) + 100000 + > > > > ""); > > > > > > > > > jsonObject.put("item_id", random.nextInt(900000) + 100000 + > > > > ""); > > > > > > > > > jsonObject.put("category_id", random.nextInt(1000) + ""); > > > > > > > > > jsonObject.put("behavior", behaviors[random.nextInt(4)]); > > > > > > > > > jsonObject.put("ts", format.format(new Date(date_long))); > > > > > > > > > String msg = jsonObject.toString(); > > > > > > > > > System.out.println(msg); > > > > > > > > > ProducerRecord<String, String&gt; record = new > > > > ProducerRecord<&gt;(topic, msg); > > > > > > > > > producer.send(record); > > > > > > // > > date_long +=500+random.nextGaussian()*1000; > > > > > > > > > date_long +=800+random.nextGaussian()*1500; > > > > > > > > > try { > > > > > > > > > Thread.sleep(60); > > > > > > > > > } catch (InterruptedException e) { > > > > > > > > > e.printStackTrace(); > > > > > > > > > } > > > > } > > > > > > > > } > > > > > > > > private static long getDate() { > > > > Date date = new > > Date(); > > > > Calendar c = > > Calendar.getInstance(); > > > > > c.setTime(date); > > > > > > //设置为1号,当前日期既为本月第一天 > > > > > > c.set(Calendar.DAY_OF_MONTH, 1); > > > > //将小时至0 > > > > > > c.set(Calendar.HOUR_OF_DAY, 0); > > > > //将分钟至0 > > > > > > c.set(Calendar.MINUTE, 0); > > > > //将秒至0 > > > > > > c.set(Calendar.SECOND,0); > > > > //将毫秒至0 > > > > > > c.set(Calendar.MILLISECOND, 0); > > > > // > > 本月第一天的时间戳转换为字符串 > > > > return > > c.getTimeInMillis(); > > > > } > > > > } > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > > > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; > > > > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08 > > > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > > > > > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面 > > > > > > > > > > > > > > > > Hi, > > > > > > > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, > > TableEnvironment 上还未支持。 > > > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" > 是什么意思呢?要不举个例子看下? > > > > > > > > Best, > > > > Jark > > > > > > > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&gt; > > wrote: > > > > > > > > &gt; 大佬好: > > > > &gt; &amp;nbsp; &amp;nbsp; > > > > &gt; > > > > > > &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > > > &gt; &amp;nbsp; &amp;nbsp; > > > > &gt; > > > > > > > > > > &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > > > &gt; &amp;nbsp;&amp;nbsp; > > > > &gt; &amp;nbsp; &amp;nbsp; > > > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > > > &gt; view as ...")却会报错。报错如下: > > > > &gt; Exception in thread "main" > > > org.apache.flink.table.api.TableException: > > > > &gt; Unsupported query: CREATE VIEW rich_user_behavior AS > > > > &gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp; > > > > &gt; &amp;nbsp; CASE C.parent_category_id > > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包' > > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰' > > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电' > > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆' > > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴' > > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码' > > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外' > > > > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品' > > > > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他' > > > > &gt; &amp;nbsp; END AS category_name > > > > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR > > SYSTEM_TIME AS OF > > > > &gt; U.proctime AS C > > > > &gt; ON U.category_id = C.sub_category_id > > > > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > > &gt; > > > > > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > > > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > > &gt; > > > > > > > > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > > > &gt; Source) > > > > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > > java.util.Optional.orElseThrow(Optional.java:290) > > > > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > > &gt; > > > > > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > > > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > > &gt; > > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > > > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > > &gt; > > > > > > > > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > > &gt; > > > > &gt; > > > > &gt; > > > > &gt; > > > > &gt; > > > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; > > &amp;nbsp; 望解答,十分感谢! > > > > > > > > > -- > > > > Benchao Li > > School of Electronics Engineering and Computer Science, Peking University > > Tel:+86-15650713730 > > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
我们是1.8版本,但是这段源码应该是没变把
// check if all tasks that we need to trigger are running. // if not, abort the checkpoint Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); if (ee == null) { LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } else if (ee.getState() == ExecutionState.RUNNING) { executions[i] = ee; } else { LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } 还是我理解的不对 > 2020年4月20日 下午6:21,Benchao Li <[hidden email]> 写道: > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 > > Sun.Zhu <[hidden email]> 于2020年4月20日周一 下午5:43写道: > >> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明 >> >> >> >> >> >> | | >> Sun.Zhu >> | >> | >> 邮箱:[hidden email] >> | >> >> Signature is customized by Netease Mail Master >> >> 在2020年04月20日 10:37,Benchao Li 写道: >> 应该是不会的。分配不到partition的source会标记为idle状态。 >> >> Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道: >> >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 >>> >>> >>> >>> >>> | | >>> Sun.Zhu >>> | >>> | >>> 邮箱:[hidden email] >>> | >>> >>> Signature is customized by Netease Mail Master >>> >>> 在2020年04月19日 22:43,人生若只如初见 写道: >>> 嗯嗯,十分感谢 >>> >>> >>> >>> >>> ------------------ 原始邮件 ------------------ >>> 发件人: "Benchao Li"<[hidden email]>; >>> 发送时间: 2020年4月19日(星期天) 晚上9:25 >>> 收件人: "user-zh"<[hidden email]>; >>> >>> 主题: Re: 问题请教-flinksql的kafkasource方面 >>> >>> >>> >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 >>> >>> Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道: >>> >>> > Hi, >>> > >>> > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 >>> > 根据你的 Java 代码,数据的 event time >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark >>> > 能容忍 5s 乱序). >>> > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 >>> partition >>> > 进度快很多的现象, >>> > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 >>> > >>> > 完美的解决方案还需要等 FLIP-27 的完成。 >>> > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 >>> > >>> > Best, >>> > Jark >>> > >>> > >>> > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: >>> > >>> > > 你好 >>> > > >>> > > >>> > >>> >> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 >>> > > >>> > > >>> > > >>> > > 附: >>> > > userbehavior建表语句 >>> > > CREATE TABLE user_behavior ( >>> > > &nbsp; &nbsp; user_id BIGINT, >>> > > &nbsp; &nbsp; item_id BIGINT, >>> > > &nbsp; &nbsp; category_id BIGINT, >>> > > &nbsp; &nbsp; behavior STRING, >>> > > &nbsp; &nbsp; ts TIMESTAMP(3), >>> > > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- >>> 通过计算列产生一个处理时间列 >>> > > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' >>> SECOND &nbsp;-- >>> > > 在ts上定义watermark,ts成为事件时间列 >>> > > ) WITH ( >>> > > &nbsp; &nbsp; 'connector.type' = 'kafka', &nbsp;-- >>> 使用 kafka connector >>> > > &nbsp; &nbsp; 'connector.version' = 'universal', >>> &nbsp;-- kafka >>> > > 版本,universal 支持 0.11 以上的版本 >>> > > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', >>> &nbsp;-- kafka topic >>> > > &nbsp; &nbsp; 'connector.startup-mode' = >>> 'earliest-offset', &nbsp;-- 从起始 >>> > > offset 开始读取 >>> > > &nbsp; &nbsp; 'connector.properties.zookeeper.connect' >> = >>> ' >>> > > 192.168.0.150:2181', &nbsp;-- zookeeper 地址 >>> > > &nbsp; &nbsp; 'connector.properties.bootstrap.servers' >> = >>> ' >>> > > 192.168.0.150:9092', &nbsp;-- kafka broker 地址 >>> > > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- >> 数据源格式为 >>> json >>> > > ) >>> > > >>> > > 每小时购买数建表语句 >>> > > CREATE TABLE buy_cnt_per_hour (&nbsp; >>> > > &nbsp; &nbsp; hour_of_day BIGINT, >>> > > &nbsp; &nbsp; buy_cnt BIGINT >>> > > ) WITH ( >>> > > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', -- 使用 >>> elasticsearch >>> > > connector >>> > > &nbsp; &nbsp; 'connector.version' = '6', &nbsp;-- >>> elasticsearch 版本,6 能支持 >>> > > es 6+ 以及 7+ 的版本 >>> > > &nbsp; &nbsp; 'connector.hosts' = ' >>> <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;-- >>> > > elasticsearch 地址 >>> > > &nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', >>> &nbsp;-- >>> > > elasticsearch 索引名,相当于数据库的表名 >>> > > &nbsp; &nbsp; 'connector.document-type' = >>> 'user_behavior', -- >>> > > elasticsearch 的 type,相当于数据库的库名 >>> > > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = '1', >>> &nbsp;-- 每条数据都刷新 >>> > > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- >>> 输出数据格式 json >>> > > &nbsp; &nbsp; 'update-mode' = 'append' >>> > > ) >>> > > >>> > > 插入语句 >>> > > INSERT INTO buy_cnt_per_hour&nbsp; >>> > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' >>> HOUR)),COUNT(*)&nbsp; >>> > > FROM user_behavior >>> > > WHERE behavior = 'buy' >>> > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) >>> > > >>> > > kafka数据发送代码 >>> > > >>> > > import com.alibaba.fastjson.JSONObject; >>> > > import org.apache.kafka.clients.producer.KafkaProducer; >>> > > import org.apache.kafka.clients.producer.ProducerRecord; >>> > > >>> > > import java.text.SimpleDateFormat; >>> > > import java.util.*; >>> > > >>> > > >>> > > public class UserBehaviorProducer { >>> > > public static final String brokerList >> = " >>> 192.168.0.150:9092"; >>> > > >>> > > // public static >> final >>> String topic="user_behavior"; >>> > > public static final String topic = >>> "user_behavior"; >>> > > >>> > > public static void main(String args[]) >> { >>> > > >>> > > //配置生产者客户端参数 >>> > > //将配置序列化 >>> > > Properties >>> properties = new Properties(); >>> > > >>> properties.put("key.serializer", >>> > > "org.apache.kafka.common.serialization.StringSerializer"); >>> > > >>> properties.put("value.serializer", >>> > > "org.apache.kafka.common.serialization.StringSerializer"); >>> > > >>> properties.put("bootstrap.servers", brokerList); >>> > > >>> //创建KafkaProducer 实例 >>> > > >>> KafkaProducer<String, String&gt; producer = new >>> > > KafkaProducer<&gt;(properties); >>> > > //构建待发送的消息 >>> > > //{"user_id": >>> "952483", "item_id":"310884", "category_id": >>> > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} >>> > > //{"user_id": >>> "794777", "item_id":"5119439", "category_id": >>> > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} >>> > > String[] >>> behaviors = {"pv", "buy", "coll", >>> > "cart"};//浏览,购买,收藏,加入购物车 >>> > > JSONObject >>> jsonObject = new JSONObject(); >>> > > HashMap<String, >>> String&gt; info = new HashMap<&gt;(); >>> > > Random random = >>> new Random(); >>> > > >> SimpleDateFormat >>> format = new >>> > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); >>> > > long >>> date_long=getDate(); >>> > > while (true) { >>> > >>> >> > >>> jsonObject.put("user_id", random.nextInt(900000) + 100000 + >>> > > ""); >>> > >>> >> > >>> jsonObject.put("item_id", random.nextInt(900000) + 100000 + >>> > > ""); >>> > >>> >> > >>> jsonObject.put("category_id", random.nextInt(1000) + ""); >>> > >>> >> > >>> jsonObject.put("behavior", behaviors[random.nextInt(4)]); >>> > >>> >> > >>> jsonObject.put("ts", format.format(new Date(date_long))); >>> > >>> >> > >>> String msg = jsonObject.toString(); >>> > >>> >> > >>> System.out.println(msg); >>> > >>> >> > >>> ProducerRecord<String, String&gt; record = new >>> > > ProducerRecord<&gt;(topic, msg); >>> > >>> >> > >>> producer.send(record); >>> > > >>> // >>> date_long +=500+random.nextGaussian()*1000; >>> > >>> >> > >>> date_long +=800+random.nextGaussian()*1500; >>> > >>> >> > >>> try { >>> > >>> >> > >>> Thread.sleep(60); >>> > >>> >> > >>> } catch (InterruptedException e) { >>> > >>> >> > >>> e.printStackTrace(); >>> > >>> >> > >>> } >>> > > } >>> > > >>> > > } >>> > > >>> > > private static long getDate() { >>> > > Date date = new >>> Date(); >>> > > Calendar c = >>> Calendar.getInstance(); >>> > > >> c.setTime(date); >>> > > >>> //设置为1号,当前日期既为本月第一天 >>> > > >>> c.set(Calendar.DAY_OF_MONTH, 1); >>> > > //将小时至0 >>> > > >>> c.set(Calendar.HOUR_OF_DAY, 0); >>> > > //将分钟至0 >>> > > >>> c.set(Calendar.MINUTE, 0); >>> > > //将秒至0 >>> > > >>> c.set(Calendar.SECOND,0); >>> > > //将毫秒至0 >>> > > >>> c.set(Calendar.MILLISECOND, 0); >>> > > // >>> 本月第一天的时间戳转换为字符串 >>> > > return >>> c.getTimeInMillis(); >>> > > } >>> > > } >>> > > >>> > > ------------------&nbsp;原始邮件&nbsp;------------------ >>> > > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; >>> > > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08 >>> > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; >>> > > >>> > > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面 >>> > > >>> > > >>> > > >>> > > Hi, >>> > > >>> > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, >>> TableEnvironment 上还未支持。 >>> > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" >> 是什么意思呢?要不举个例子看下? >>> > > >>> > > Best, >>> > > Jark >>> > > >>> > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&gt; >>> wrote: >>> > > >>> > > &gt; 大佬好: >>> > > &gt; &amp;nbsp; &amp;nbsp; >>> > > &gt; >>> > >>> >> &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 >>> > > &gt; &amp;nbsp; &amp;nbsp; >>> > > &gt; >>> > > >>> > >>> >> &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 >>> > > &gt; &amp;nbsp;&amp;nbsp; >>> > > &gt; &amp;nbsp; &amp;nbsp; >>> > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create >>> > > &gt; view as ...")却会报错。报错如下: >>> > > &gt; Exception in thread "main" >>> > org.apache.flink.table.api.TableException: >>> > > &gt; Unsupported query: CREATE VIEW rich_user_behavior AS >>> > > &gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp; >>> > > &gt; &amp;nbsp; CASE C.parent_category_id >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包' >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰' >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电' >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆' >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴' >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码' >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外' >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品' >>> > > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他' >>> > > &gt; &amp;nbsp; END AS category_name >>> > > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR >>> SYSTEM_TIME AS OF >>> > > &gt; U.proctime AS C >>> > > &gt; ON U.category_id = C.sub_category_id >>> > > >>> >> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >>> at >>> > > &gt; >>> > > >>> > >>> >> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) >>> > > >>> >> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >>> at >>> > > &gt; >>> > > >>> > >>> >> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown >>> > > &gt; Source) >>> > > >>> >> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >>> at >>> > > java.util.Optional.orElseThrow(Optional.java:290) >>> > > >>> >> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >>> at >>> > > &gt; >>> > > >>> > >>> >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) >>> > > >>> >> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >>> at >>> > > &gt; >>> > > >>> > >>> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >>> > > >>> >> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >>> at >>> > > &gt; >>> > > >>> > >>> >> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) >>> > > &gt; >>> > > &gt; >>> > > &gt; >>> > > &gt; >>> > > &gt; >>> > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; >>> &amp;nbsp; 望解答,十分感谢! >>> > >>> >>> >>> -- >>> >>> Benchao Li >>> School of Electronics Engineering and Computer Science, Peking University >>> Tel:+86-15650713730 >>> Email: [hidden email]; [hidden email] >> >> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: [hidden email]; [hidden email] >> > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] |
我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。
祝尚 <[hidden email]> 于2020年4月20日周一 下午10:29写道: > 我们是1.8版本,但是这段源码应该是没变把 > // check if all tasks that we need to trigger are running. > // if not, abort the checkpoint > Execution[] executions = new Execution[tasksToTrigger.length]; > for (int i = 0; i < tasksToTrigger.length; i++) { > Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); > if (ee == null) { > LOG.info("Checkpoint triggering task {} of job {} is not being > executed at the moment. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > } else if (ee.getState() == ExecutionState.RUNNING) { > executions[i] = ee; > } else { > LOG.info("Checkpoint triggering task {} of job {} is not in state {} > but {} instead. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job, > ExecutionState.RUNNING, > ee.getState()); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > } > } > 还是我理解的不对 > > > 2020年4月20日 下午6:21,Benchao Li <[hidden email]> 写道: > > > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 > > > > Sun.Zhu <[hidden email]> 于2020年4月20日周一 下午5:43写道: > > > >> > 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明 > >> > >> > >> > >> > >> > >> | | > >> Sun.Zhu > >> | > >> | > >> 邮箱:[hidden email] > >> | > >> > >> Signature is customized by Netease Mail Master > >> > >> 在2020年04月20日 10:37,Benchao Li 写道: > >> 应该是不会的。分配不到partition的source会标记为idle状态。 > >> > >> Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道: > >> > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > >>> > >>> > >>> > >>> > >>> | | > >>> Sun.Zhu > >>> | > >>> | > >>> 邮箱:[hidden email] > >>> | > >>> > >>> Signature is customized by Netease Mail Master > >>> > >>> 在2020年04月19日 22:43,人生若只如初见 写道: > >>> 嗯嗯,十分感谢 > >>> > >>> > >>> > >>> > >>> ------------------ 原始邮件 ------------------ > >>> 发件人: "Benchao Li"<[hidden email]>; > >>> 发送时间: 2020年4月19日(星期天) 晚上9:25 > >>> 收件人: "user-zh"<[hidden email]>; > >>> > >>> 主题: Re: 问题请教-flinksql的kafkasource方面 > >>> > >>> > >>> > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > >>> > >>> Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道: > >>> > >>> > Hi, > >>> > > >>> > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > >>> > 根据你的 Java 代码,数据的 event time > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > >>> > 能容忍 5s 乱序). > >>> > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 > >>> partition > >>> > 进度快很多的现象, > >>> > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > >>> > > >>> > 完美的解决方案还需要等 FLIP-27 的完成。 > >>> > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > >>> > > >>> > Best, > >>> > Jark > >>> > > >>> > > >>> > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: > >>> > > >>> > > 你好 > >>> > > > >>> > > > >>> > > >>> > >> > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > >>> > > > >>> > > > >>> > > > >>> > > 附: > >>> > > userbehavior建表语句 > >>> > > CREATE TABLE user_behavior ( > >>> > > &nbsp; &nbsp; user_id BIGINT, > >>> > > &nbsp; &nbsp; item_id BIGINT, > >>> > > &nbsp; &nbsp; category_id BIGINT, > >>> > > &nbsp; &nbsp; behavior STRING, > >>> > > &nbsp; &nbsp; ts TIMESTAMP(3), > >>> > > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- > >>> 通过计算列产生一个处理时间列 > >>> > > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > >>> SECOND &nbsp;-- > >>> > > 在ts上定义watermark,ts成为事件时间列 > >>> > > ) WITH ( > >>> > > &nbsp; &nbsp; 'connector.type' = 'kafka', > &nbsp;-- > >>> 使用 kafka connector > >>> > > &nbsp; &nbsp; 'connector.version' = 'universal', > >>> &nbsp;-- kafka > >>> > > 版本,universal 支持 0.11 以上的版本 > >>> > > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', > >>> &nbsp;-- kafka topic > >>> > > &nbsp; &nbsp; 'connector.startup-mode' = > >>> 'earliest-offset', &nbsp;-- 从起始 > >>> > > offset 开始读取 > >>> > > &nbsp; &nbsp; > 'connector.properties.zookeeper.connect' > >> = > >>> ' > >>> > > 192.168.0.150:2181', &nbsp;-- zookeeper 地址 > >>> > > &nbsp; &nbsp; > 'connector.properties.bootstrap.servers' > >> = > >>> ' > >>> > > 192.168.0.150:9092', &nbsp;-- kafka broker 地址 > >>> > > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- > >> 数据源格式为 > >>> json > >>> > > ) > >>> > > > >>> > > 每小时购买数建表语句 > >>> > > CREATE TABLE buy_cnt_per_hour (&nbsp; > >>> > > &nbsp; &nbsp; hour_of_day BIGINT, > >>> > > &nbsp; &nbsp; buy_cnt BIGINT > >>> > > ) WITH ( > >>> > > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', -- > 使用 > >>> elasticsearch > >>> > > connector > >>> > > &nbsp; &nbsp; 'connector.version' = '6', &nbsp;-- > >>> elasticsearch 版本,6 能支持 > >>> > > es 6+ 以及 7+ 的版本 > >>> > > &nbsp; &nbsp; 'connector.hosts' = ' > >>> <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;-- > >>> > > elasticsearch 地址 > >>> > > &nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', > >>> &nbsp;-- > >>> > > elasticsearch 索引名,相当于数据库的表名 > >>> > > &nbsp; &nbsp; 'connector.document-type' = > >>> 'user_behavior', -- > >>> > > elasticsearch 的 type,相当于数据库的库名 > >>> > > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = > '1', > >>> &nbsp;-- 每条数据都刷新 > >>> > > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- > >>> 输出数据格式 json > >>> > > &nbsp; &nbsp; 'update-mode' = 'append' > >>> > > ) > >>> > > > >>> > > 插入语句 > >>> > > INSERT INTO buy_cnt_per_hour&nbsp; > >>> > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > >>> HOUR)),COUNT(*)&nbsp; > >>> > > FROM user_behavior > >>> > > WHERE behavior = 'buy' > >>> > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > >>> > > > >>> > > kafka数据发送代码 > >>> > > > >>> > > import com.alibaba.fastjson.JSONObject; > >>> > > import org.apache.kafka.clients.producer.KafkaProducer; > >>> > > import org.apache.kafka.clients.producer.ProducerRecord; > >>> > > > >>> > > import java.text.SimpleDateFormat; > >>> > > import java.util.*; > >>> > > > >>> > > > >>> > > public class UserBehaviorProducer { > >>> > > public static final String brokerList > >> = " > >>> 192.168.0.150:9092"; > >>> > > > >>> > > // public static > >> final > >>> String topic="user_behavior"; > >>> > > public static final String topic = > >>> "user_behavior"; > >>> > > > >>> > > public static void main(String > args[]) > >> { > >>> > > > >>> > > //配置生产者客户端参数 > >>> > > //将配置序列化 > >>> > > Properties > >>> properties = new Properties(); > >>> > > > >>> properties.put("key.serializer", > >>> > > "org.apache.kafka.common.serialization.StringSerializer"); > >>> > > > >>> properties.put("value.serializer", > >>> > > "org.apache.kafka.common.serialization.StringSerializer"); > >>> > > > >>> properties.put("bootstrap.servers", brokerList); > >>> > > > >>> //创建KafkaProducer 实例 > >>> > > > >>> KafkaProducer<String, String&gt; producer = new > >>> > > KafkaProducer<&gt;(properties); > >>> > > //构建待发送的消息 > >>> > > //{"user_id": > >>> "952483", "item_id":"310884", "category_id": > >>> > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > >>> > > //{"user_id": > >>> "794777", "item_id":"5119439", "category_id": > >>> > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > >>> > > String[] > >>> behaviors = {"pv", "buy", "coll", > >>> > "cart"};//浏览,购买,收藏,加入购物车 > >>> > > JSONObject > >>> jsonObject = new JSONObject(); > >>> > > > HashMap<String, > >>> String&gt; info = new HashMap<&gt;(); > >>> > > Random > random = > >>> new Random(); > >>> > > > >> SimpleDateFormat > >>> format = new > >>> > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > >>> > > long > >>> date_long=getDate(); > >>> > > while (true) > { > >>> > > >>> > >> > > > >>> jsonObject.put("user_id", random.nextInt(900000) + 100000 + > >>> > > ""); > >>> > > >>> > >> > > > >>> jsonObject.put("item_id", random.nextInt(900000) + 100000 + > >>> > > ""); > >>> > > >>> > >> > > > >>> jsonObject.put("category_id", random.nextInt(1000) + ""); > >>> > > >>> > >> > > > >>> jsonObject.put("behavior", behaviors[random.nextInt(4)]); > >>> > > >>> > >> > > > >>> jsonObject.put("ts", format.format(new Date(date_long))); > >>> > > >>> > >> > > > >>> String msg = jsonObject.toString(); > >>> > > >>> > >> > > > >>> System.out.println(msg); > >>> > > >>> > >> > > > >>> ProducerRecord<String, String&gt; record = new > >>> > > ProducerRecord<&gt;(topic, msg); > >>> > > >>> > >> > > > >>> producer.send(record); > >>> > > > >>> // > >>> date_long +=500+random.nextGaussian()*1000; > >>> > > >>> > >> > > > >>> date_long +=800+random.nextGaussian()*1500; > >>> > > >>> > >> > > > >>> try { > >>> > > >>> > >> > > > >>> Thread.sleep(60); > >>> > > >>> > >> > > > >>> } catch (InterruptedException e) { > >>> > > >>> > >> > > > >>> e.printStackTrace(); > >>> > > >>> > >> > > > >>> } > >>> > > } > >>> > > > >>> > > } > >>> > > > >>> > > private static long getDate() { > >>> > > Date date = > new > >>> Date(); > >>> > > Calendar c = > >>> Calendar.getInstance(); > >>> > > > >> c.setTime(date); > >>> > > > >>> //设置为1号,当前日期既为本月第一天 > >>> > > > >>> c.set(Calendar.DAY_OF_MONTH, 1); > >>> > > //将小时至0 > >>> > > > >>> c.set(Calendar.HOUR_OF_DAY, 0); > >>> > > //将分钟至0 > >>> > > > >>> c.set(Calendar.MINUTE, 0); > >>> > > //将秒至0 > >>> > > > >>> c.set(Calendar.SECOND,0); > >>> > > //将毫秒至0 > >>> > > > >>> c.set(Calendar.MILLISECOND, 0); > >>> > > // > >>> 本月第一天的时间戳转换为字符串 > >>> > > return > >>> c.getTimeInMillis(); > >>> > > } > >>> > > } > >>> > > > >>> > > ------------------&nbsp;原始邮件&nbsp;------------------ > >>> > > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; > >>> > > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08 > >>> > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > >>> > > > >>> > > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面 > >>> > > > >>> > > > >>> > > > >>> > > Hi, > >>> > > > >>> > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, > >>> TableEnvironment 上还未支持。 > >>> > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" > >> 是什么意思呢?要不举个例子看下? > >>> > > > >>> > > Best, > >>> > > Jark > >>> > > > >>> > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&gt; > >>> wrote: > >>> > > > >>> > > &gt; 大佬好: > >>> > > &gt; &amp;nbsp; &amp;nbsp; > >>> > > &gt; > >>> > > >>> > >> > &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > >>> > > &gt; &amp;nbsp; &amp;nbsp; > >>> > > &gt; > >>> > > > >>> > > >>> > >> > &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > >>> > > &gt; &amp;nbsp;&amp;nbsp; > >>> > > &gt; &amp;nbsp; &amp;nbsp; > >>> > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > >>> > > &gt; view as ...")却会报错。报错如下: > >>> > > &gt; Exception in thread "main" > >>> > org.apache.flink.table.api.TableException: > >>> > > &gt; Unsupported query: CREATE VIEW rich_user_behavior AS > >>> > > &gt; SELECT U.user_id, U.item_id, > U.behavior,&amp;nbsp; > >>> > > &gt; &amp;nbsp; CASE C.parent_category_id > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包' > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰' > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电' > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆' > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴' > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码' > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外' > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品' > >>> > > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他' > >>> > > &gt; &amp;nbsp; END AS category_name > >>> > > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR > >>> SYSTEM_TIME AS OF > >>> > > &gt; U.proctime AS C > >>> > > &gt; ON U.category_id = C.sub_category_id > >>> > > > >>> > >> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > >>> at > >>> > > &gt; > >>> > > > >>> > > >>> > >> > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > >>> > > > >>> > >> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > >>> at > >>> > > &gt; > >>> > > > >>> > > >>> > >> > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > >>> > > &gt; Source) > >>> > > > >>> > >> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > >>> at > >>> > > java.util.Optional.orElseThrow(Optional.java:290) > >>> > > > >>> > >> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > >>> at > >>> > > &gt; > >>> > > > >>> > > >>> > >> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > >>> > > > >>> > >> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > >>> at > >>> > > &gt; > >>> > > > >>> > > >>> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > >>> > > > >>> > >> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > >>> at > >>> > > &gt; > >>> > > > >>> > > >>> > >> > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > >>> > > &gt; > >>> > > &gt; > >>> > > &gt; > >>> > > &gt; > >>> > > &gt; > >>> > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; > >>> &amp;nbsp; 望解答,十分感谢! > >>> > > >>> > >>> > >>> -- > >>> > >>> Benchao Li > >>> School of Electronics Engineering and Computer Science, Peking > University > >>> Tel:+86-15650713730 > >>> Email: [hidden email]; [hidden email] > >> > >> > >> > >> -- > >> > >> Benchao Li > >> School of Electronics Engineering and Computer Science, Peking > University > >> Tel:+86-15650713730 > >> Email: [hidden email]; [hidden email] > >> > > > > > > -- > > > > Benchao Li > > School of Electronics Engineering and Computer Science, Peking University > > Tel:+86-15650713730 > > Email: [hidden email]; [hidden email] > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Administrator
|
Hi,
你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。 Best, Jark On Mon, 20 Apr 2020 at 22:33, Benchao Li <[hidden email]> wrote: > 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。 > > 祝尚 <[hidden email]> 于2020年4月20日周一 下午10:29写道: > > > 我们是1.8版本,但是这段源码应该是没变把 > > // check if all tasks that we need to trigger are running. > > // if not, abort the checkpoint > > Execution[] executions = new Execution[tasksToTrigger.length]; > > for (int i = 0; i < tasksToTrigger.length; i++) { > > Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); > > if (ee == null) { > > LOG.info("Checkpoint triggering task {} of job {} is not being > > executed at the moment. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > > } else if (ee.getState() == ExecutionState.RUNNING) { > > executions[i] = ee; > > } else { > > LOG.info("Checkpoint triggering task {} of job {} is not in state > {} > > but {} instead. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job, > > ExecutionState.RUNNING, > > ee.getState()); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > > } > > } > > 还是我理解的不对 > > > > > 2020年4月20日 下午6:21,Benchao Li <[hidden email]> 写道: > > > > > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 > > > > > > Sun.Zhu <[hidden email]> 于2020年4月20日周一 下午5:43写道: > > > > > >> > > > 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint > > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明 > > >> > > >> > > >> > > >> > > >> > > >> | | > > >> Sun.Zhu > > >> | > > >> | > > >> 邮箱:[hidden email] > > >> | > > >> > > >> Signature is customized by Netease Mail Master > > >> > > >> 在2020年04月20日 10:37,Benchao Li 写道: > > >> 应该是不会的。分配不到partition的source会标记为idle状态。 > > >> > > >> Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道: > > >> > > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > >>> > > >>> > > >>> > > >>> > > >>> | | > > >>> Sun.Zhu > > >>> | > > >>> | > > >>> 邮箱:[hidden email] > > >>> | > > >>> > > >>> Signature is customized by Netease Mail Master > > >>> > > >>> 在2020年04月19日 22:43,人生若只如初见 写道: > > >>> 嗯嗯,十分感谢 > > >>> > > >>> > > >>> > > >>> > > >>> ------------------ 原始邮件 ------------------ > > >>> 发件人: "Benchao Li"<[hidden email]>; > > >>> 发送时间: 2020年4月19日(星期天) 晚上9:25 > > >>> 收件人: "user-zh"<[hidden email]>; > > >>> > > >>> 主题: Re: 问题请教-flinksql的kafkasource方面 > > >>> > > >>> > > >>> > > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > >>> > > >>> Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道: > > >>> > > >>> > Hi, > > >>> > > > >>> > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > >>> > 根据你的 Java 代码,数据的 event time > > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > >>> > 能容忍 5s 乱序). > > >>> > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition > 进度比某些 > > >>> partition > > >>> > 进度快很多的现象, > > >>> > > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > >>> > > > >>> > 完美的解决方案还需要等 FLIP-27 的完成。 > > >>> > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > >>> > > > >>> > Best, > > >>> > Jark > > >>> > > > >>> > > > >>> > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: > > >>> > > > >>> > > 你好 > > >>> > > > > >>> > > > > >>> > > > >>> > > >> > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > >>> > > > > >>> > > > > >>> > > > > >>> > > 附: > > >>> > > userbehavior建表语句 > > >>> > > CREATE TABLE user_behavior ( > > >>> > > &nbsp; &nbsp; user_id BIGINT, > > >>> > > &nbsp; &nbsp; item_id BIGINT, > > >>> > > &nbsp; &nbsp; category_id BIGINT, > > >>> > > &nbsp; &nbsp; behavior STRING, > > >>> > > &nbsp; &nbsp; ts TIMESTAMP(3), > > >>> > > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- > > >>> 通过计算列产生一个处理时间列 > > >>> > > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > > >>> SECOND &nbsp;-- > > >>> > > 在ts上定义watermark,ts成为事件时间列 > > >>> > > ) WITH ( > > >>> > > &nbsp; &nbsp; 'connector.type' = 'kafka', > > &nbsp;-- > > >>> 使用 kafka connector > > >>> > > &nbsp; &nbsp; 'connector.version' = 'universal', > > >>> &nbsp;-- kafka > > >>> > > 版本,universal 支持 0.11 以上的版本 > > >>> > > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', > > >>> &nbsp;-- kafka topic > > >>> > > &nbsp; &nbsp; 'connector.startup-mode' = > > >>> 'earliest-offset', &nbsp;-- 从起始 > > >>> > > offset 开始读取 > > >>> > > &nbsp; &nbsp; > > 'connector.properties.zookeeper.connect' > > >> = > > >>> ' > > >>> > > 192.168.0.150:2181', &nbsp;-- zookeeper 地址 > > >>> > > &nbsp; &nbsp; > > 'connector.properties.bootstrap.servers' > > >> = > > >>> ' > > >>> > > 192.168.0.150:9092', &nbsp;-- kafka broker 地址 > > >>> > > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- > > >> 数据源格式为 > > >>> json > > >>> > > ) > > >>> > > > > >>> > > 每小时购买数建表语句 > > >>> > > CREATE TABLE buy_cnt_per_hour (&nbsp; > > >>> > > &nbsp; &nbsp; hour_of_day BIGINT, > > >>> > > &nbsp; &nbsp; buy_cnt BIGINT > > >>> > > ) WITH ( > > >>> > > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', > -- > > 使用 > > >>> elasticsearch > > >>> > > connector > > >>> > > &nbsp; &nbsp; 'connector.version' = '6', > &nbsp;-- > > >>> elasticsearch 版本,6 能支持 > > >>> > > es 6+ 以及 7+ 的版本 > > >>> > > &nbsp; &nbsp; 'connector.hosts' = ' > > >>> <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;-- > > >>> > > elasticsearch 地址 > > >>> > > &nbsp; &nbsp; 'connector.index' = > 'buy_cnt_per_hour', > > >>> &nbsp;-- > > >>> > > elasticsearch 索引名,相当于数据库的表名 > > >>> > > &nbsp; &nbsp; 'connector.document-type' = > > >>> 'user_behavior', -- > > >>> > > elasticsearch 的 type,相当于数据库的库名 > > >>> > > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = > > '1', > > >>> &nbsp;-- 每条数据都刷新 > > >>> > > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- > > >>> 输出数据格式 json > > >>> > > &nbsp; &nbsp; 'update-mode' = 'append' > > >>> > > ) > > >>> > > > > >>> > > 插入语句 > > >>> > > INSERT INTO buy_cnt_per_hour&nbsp; > > >>> > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > > >>> HOUR)),COUNT(*)&nbsp; > > >>> > > FROM user_behavior > > >>> > > WHERE behavior = 'buy' > > >>> > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > >>> > > > > >>> > > kafka数据发送代码 > > >>> > > > > >>> > > import com.alibaba.fastjson.JSONObject; > > >>> > > import org.apache.kafka.clients.producer.KafkaProducer; > > >>> > > import org.apache.kafka.clients.producer.ProducerRecord; > > >>> > > > > >>> > > import java.text.SimpleDateFormat; > > >>> > > import java.util.*; > > >>> > > > > >>> > > > > >>> > > public class UserBehaviorProducer { > > >>> > > public static final String > brokerList > > >> = " > > >>> 192.168.0.150:9092"; > > >>> > > > > >>> > > // public static > > >> final > > >>> String topic="user_behavior"; > > >>> > > public static final String topic = > > >>> "user_behavior"; > > >>> > > > > >>> > > public static void main(String > > args[]) > > >> { > > >>> > > > > >>> > > > //配置生产者客户端参数 > > >>> > > //将配置序列化 > > >>> > > Properties > > >>> properties = new Properties(); > > >>> > > > > >>> properties.put("key.serializer", > > >>> > > "org.apache.kafka.common.serialization.StringSerializer"); > > >>> > > > > >>> properties.put("value.serializer", > > >>> > > "org.apache.kafka.common.serialization.StringSerializer"); > > >>> > > > > >>> properties.put("bootstrap.servers", brokerList); > > >>> > > > > >>> //创建KafkaProducer 实例 > > >>> > > > > >>> KafkaProducer<String, String&gt; producer = new > > >>> > > KafkaProducer<&gt;(properties); > > >>> > > //构建待发送的消息 > > >>> > > > //{"user_id": > > >>> "952483", "item_id":"310884", "category_id": > > >>> > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > >>> > > > //{"user_id": > > >>> "794777", "item_id":"5119439", "category_id": > > >>> > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > >>> > > String[] > > >>> behaviors = {"pv", "buy", "coll", > > >>> > "cart"};//浏览,购买,收藏,加入购物车 > > >>> > > JSONObject > > >>> jsonObject = new JSONObject(); > > >>> > > > > HashMap<String, > > >>> String&gt; info = new HashMap<&gt;(); > > >>> > > Random > > random = > > >>> new Random(); > > >>> > > > > >> SimpleDateFormat > > >>> format = new > > >>> > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > > >>> > > long > > >>> date_long=getDate(); > > >>> > > while > (true) > > { > > >>> > > > >>> > > >> > > > > > > >>> jsonObject.put("user_id", random.nextInt(900000) + 100000 + > > >>> > > ""); > > >>> > > > >>> > > >> > > > > > > >>> jsonObject.put("item_id", random.nextInt(900000) + 100000 + > > >>> > > ""); > > >>> > > > >>> > > >> > > > > > > >>> jsonObject.put("category_id", random.nextInt(1000) + ""); > > >>> > > > >>> > > >> > > > > > > >>> jsonObject.put("behavior", behaviors[random.nextInt(4)]); > > >>> > > > >>> > > >> > > > > > > >>> jsonObject.put("ts", format.format(new Date(date_long))); > > >>> > > > >>> > > >> > > > > > > >>> String msg = jsonObject.toString(); > > >>> > > > >>> > > >> > > > > > > >>> System.out.println(msg); > > >>> > > > >>> > > >> > > > > > > >>> ProducerRecord<String, String&gt; record = new > > >>> > > ProducerRecord<&gt;(topic, msg); > > >>> > > > >>> > > >> > > > > > > >>> producer.send(record); > > >>> > > > > >>> // > > >>> date_long +=500+random.nextGaussian()*1000; > > >>> > > > >>> > > >> > > > > > > >>> date_long +=800+random.nextGaussian()*1500; > > >>> > > > >>> > > >> > > > > > > >>> try { > > >>> > > > >>> > > >> > > > > > > >>> Thread.sleep(60); > > >>> > > > >>> > > >> > > > > > > >>> } catch (InterruptedException e) { > > >>> > > > >>> > > >> > > > > > > >>> e.printStackTrace(); > > >>> > > > >>> > > >> > > > > > > >>> } > > >>> > > } > > >>> > > > > >>> > > } > > >>> > > > > >>> > > private static long getDate() { > > >>> > > Date date = > > new > > >>> Date(); > > >>> > > Calendar c > = > > >>> Calendar.getInstance(); > > >>> > > > > >> c.setTime(date); > > >>> > > > > >>> //设置为1号,当前日期既为本月第一天 > > >>> > > > > >>> c.set(Calendar.DAY_OF_MONTH, 1); > > >>> > > //将小时至0 > > >>> > > > > >>> c.set(Calendar.HOUR_OF_DAY, 0); > > >>> > > //将分钟至0 > > >>> > > > > >>> c.set(Calendar.MINUTE, 0); > > >>> > > //将秒至0 > > >>> > > > > >>> c.set(Calendar.SECOND,0); > > >>> > > //将毫秒至0 > > >>> > > > > >>> c.set(Calendar.MILLISECOND, 0); > > >>> > > // > > >>> 本月第一天的时间戳转换为字符串 > > >>> > > return > > >>> c.getTimeInMillis(); > > >>> > > } > > >>> > > } > > >>> > > > > >>> > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > >>> > > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; > > >>> > > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08 > > >>> > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > >>> > > > > >>> > > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面 > > >>> > > > > >>> > > > > >>> > > > > >>> > > Hi, > > >>> > > > > >>> > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, > > >>> TableEnvironment 上还未支持。 > > >>> > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" > > >> 是什么意思呢?要不举个例子看下? > > >>> > > > > >>> > > Best, > > >>> > > Jark > > >>> > > > > >>> > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email] > &gt; > > >>> wrote: > > >>> > > > > >>> > > &gt; 大佬好: > > >>> > > &gt; &amp;nbsp; &amp;nbsp; > > >>> > > &gt; > > >>> > > > >>> > > >> > > > &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > >>> > > &gt; &amp;nbsp; &amp;nbsp; > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > >>> > > &gt; &amp;nbsp;&amp;nbsp; > > >>> > > &gt; &amp;nbsp; &amp;nbsp; > > >>> > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > >>> > > &gt; view as ...")却会报错。报错如下: > > >>> > > &gt; Exception in thread "main" > > >>> > org.apache.flink.table.api.TableException: > > >>> > > &gt; Unsupported query: CREATE VIEW rich_user_behavior > AS > > >>> > > &gt; SELECT U.user_id, U.item_id, > > U.behavior,&amp;nbsp; > > >>> > > &gt; &amp;nbsp; CASE C.parent_category_id > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他' > > >>> > > &gt; &amp;nbsp; END AS category_name > > >>> > > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR > > >>> SYSTEM_TIME AS OF > > >>> > > &gt; U.proctime AS C > > >>> > > &gt; ON U.category_id = C.sub_category_id > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > >>> > > &gt; Source) > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > java.util.Optional.orElseThrow(Optional.java:290) > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > >>> > > &gt; > > >>> > > &gt; > > >>> > > &gt; > > >>> > > &gt; > > >>> > > &gt; > > >>> > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; > > >>> &amp;nbsp; 望解答,十分感谢! > > >>> > > > >>> > > >>> > > >>> -- > > >>> > > >>> Benchao Li > > >>> School of Electronics Engineering and Computer Science, Peking > > University > > >>> Tel:+86-15650713730 > > >>> Email: [hidden email]; [hidden email] > > >> > > >> > > >> > > >> -- > > >> > > >> Benchao Li > > >> School of Electronics Engineering and Computer Science, Peking > > University > > >> Tel:+86-15650713730 > > >> Email: [hidden email]; [hidden email] > > >> > > > > > > > > > -- > > > > > > Benchao Li > > > School of Electronics Engineering and Computer Science, Peking > University > > > Tel:+86-15650713730 > > > Email: [hidden email]; [hidden email] > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > |
嗯是的,都设置成小于等于partition数
| | Sun.Zhu | | 邮箱:[hidden email] | Signature is customized by Netease Mail Master 在2020年04月21日 00:28,Jark Wu 写道: Hi, 你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。 Best, Jark On Mon, 20 Apr 2020 at 22:33, Benchao Li <[hidden email]> wrote: > 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。 > > 祝尚 <[hidden email]> 于2020年4月20日周一 下午10:29写道: > > > 我们是1.8版本,但是这段源码应该是没变把 > > // check if all tasks that we need to trigger are running. > > // if not, abort the checkpoint > > Execution[] executions = new Execution[tasksToTrigger.length]; > > for (int i = 0; i < tasksToTrigger.length; i++) { > > Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); > > if (ee == null) { > > LOG.info("Checkpoint triggering task {} of job {} is not being > > executed at the moment. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > > } else if (ee.getState() == ExecutionState.RUNNING) { > > executions[i] = ee; > > } else { > > LOG.info("Checkpoint triggering task {} of job {} is not in state > {} > > but {} instead. Aborting checkpoint.", > > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > > job, > > ExecutionState.RUNNING, > > ee.getState()); > > throw new > > > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); > > } > > } > > 还是我理解的不对 > > > > > 2020年4月20日 下午6:21,Benchao Li <[hidden email]> 写道: > > > > > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。 > > > > > > Sun.Zhu <[hidden email]> 于2020年4月20日周一 下午5:43写道: > > > > > >> > > > 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint > > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明 > > >> > > >> > > >> > > >> > > >> > > >> | | > > >> Sun.Zhu > > >> | > > >> | > > >> 邮箱:[hidden email] > > >> | > > >> > > >> Signature is customized by Netease Mail Master > > >> > > >> 在2020年04月20日 10:37,Benchao Li 写道: > > >> 应该是不会的。分配不到partition的source会标记为idle状态。 > > >> > > >> Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道: > > >> > > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧 > > >>> > > >>> > > >>> > > >>> > > >>> | | > > >>> Sun.Zhu > > >>> | > > >>> | > > >>> 邮箱:[hidden email] > > >>> | > > >>> > > >>> Signature is customized by Netease Mail Master > > >>> > > >>> 在2020年04月19日 22:43,人生若只如初见 写道: > > >>> 嗯嗯,十分感谢 > > >>> > > >>> > > >>> > > >>> > > >>> ------------------ 原始邮件 ------------------ > > >>> 发件人: "Benchao Li"<[hidden email]>; > > >>> 发送时间: 2020年4月19日(星期天) 晚上9:25 > > >>> 收件人: "user-zh"<[hidden email]>; > > >>> > > >>> 主题: Re: 问题请教-flinksql的kafkasource方面 > > >>> > > >>> > > >>> > > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。 > > >>> > > >>> Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道: > > >>> > > >>> > Hi, > > >>> > > > >>> > 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。 > > >>> > 根据你的 Java 代码,数据的 event time > > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark > > >>> > 能容忍 5s 乱序). > > >>> > 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition > 进度比某些 > > >>> partition > > >>> > 进度快很多的现象, > > >>> > > 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。 > > >>> > > > >>> > 完美的解决方案还需要等 FLIP-27 的完成。 > > >>> > 当前可以通过增加 watermark delay来增大迟到数据的容忍。 > > >>> > > > >>> > Best, > > >>> > Jark > > >>> > > > >>> > > > >>> > On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote: > > >>> > > > >>> > > 你好 > > >>> > > > > >>> > > > > >>> > > > >>> > > >> > > > 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多 > > >>> > > > > >>> > > > > >>> > > > > >>> > > 附: > > >>> > > userbehavior建表语句 > > >>> > > CREATE TABLE user_behavior ( > > >>> > > &nbsp; &nbsp; user_id BIGINT, > > >>> > > &nbsp; &nbsp; item_id BIGINT, > > >>> > > &nbsp; &nbsp; category_id BIGINT, > > >>> > > &nbsp; &nbsp; behavior STRING, > > >>> > > &nbsp; &nbsp; ts TIMESTAMP(3), > > >>> > > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- > > >>> 通过计算列产生一个处理时间列 > > >>> > > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' > > >>> SECOND &nbsp;-- > > >>> > > 在ts上定义watermark,ts成为事件时间列 > > >>> > > ) WITH ( > > >>> > > &nbsp; &nbsp; 'connector.type' = 'kafka', > > &nbsp;-- > > >>> 使用 kafka connector > > >>> > > &nbsp; &nbsp; 'connector.version' = 'universal', > > >>> &nbsp;-- kafka > > >>> > > 版本,universal 支持 0.11 以上的版本 > > >>> > > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', > > >>> &nbsp;-- kafka topic > > >>> > > &nbsp; &nbsp; 'connector.startup-mode' = > > >>> 'earliest-offset', &nbsp;-- 从起始 > > >>> > > offset 开始读取 > > >>> > > &nbsp; &nbsp; > > 'connector.properties.zookeeper.connect' > > >> = > > >>> ' > > >>> > > 192.168.0.150:2181', &nbsp;-- zookeeper 地址 > > >>> > > &nbsp; &nbsp; > > 'connector.properties.bootstrap.servers' > > >> = > > >>> ' > > >>> > > 192.168.0.150:9092', &nbsp;-- kafka broker 地址 > > >>> > > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- > > >> 数据源格式为 > > >>> json > > >>> > > ) > > >>> > > > > >>> > > 每小时购买数建表语句 > > >>> > > CREATE TABLE buy_cnt_per_hour (&nbsp; > > >>> > > &nbsp; &nbsp; hour_of_day BIGINT, > > >>> > > &nbsp; &nbsp; buy_cnt BIGINT > > >>> > > ) WITH ( > > >>> > > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', > -- > > 使用 > > >>> elasticsearch > > >>> > > connector > > >>> > > &nbsp; &nbsp; 'connector.version' = '6', > &nbsp;-- > > >>> elasticsearch 版本,6 能支持 > > >>> > > es 6+ 以及 7+ 的版本 > > >>> > > &nbsp; &nbsp; 'connector.hosts' = ' > > >>> <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;-- > > >>> > > elasticsearch 地址 > > >>> > > &nbsp; &nbsp; 'connector.index' = > 'buy_cnt_per_hour', > > >>> &nbsp;-- > > >>> > > elasticsearch 索引名,相当于数据库的表名 > > >>> > > &nbsp; &nbsp; 'connector.document-type' = > > >>> 'user_behavior', -- > > >>> > > elasticsearch 的 type,相当于数据库的库名 > > >>> > > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = > > '1', > > >>> &nbsp;-- 每条数据都刷新 > > >>> > > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- > > >>> 输出数据格式 json > > >>> > > &nbsp; &nbsp; 'update-mode' = 'append' > > >>> > > ) > > >>> > > > > >>> > > 插入语句 > > >>> > > INSERT INTO buy_cnt_per_hour&nbsp; > > >>> > > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' > > >>> HOUR)),COUNT(*)&nbsp; > > >>> > > FROM user_behavior > > >>> > > WHERE behavior = 'buy' > > >>> > > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR) > > >>> > > > > >>> > > kafka数据发送代码 > > >>> > > > > >>> > > import com.alibaba.fastjson.JSONObject; > > >>> > > import org.apache.kafka.clients.producer.KafkaProducer; > > >>> > > import org.apache.kafka.clients.producer.ProducerRecord; > > >>> > > > > >>> > > import java.text.SimpleDateFormat; > > >>> > > import java.util.*; > > >>> > > > > >>> > > > > >>> > > public class UserBehaviorProducer { > > >>> > > public static final String > brokerList > > >> = " > > >>> 192.168.0.150:9092"; > > >>> > > > > >>> > > // public static > > >> final > > >>> String topic="user_behavior"; > > >>> > > public static final String topic = > > >>> "user_behavior"; > > >>> > > > > >>> > > public static void main(String > > args[]) > > >> { > > >>> > > > > >>> > > > //配置生产者客户端参数 > > >>> > > //将配置序列化 > > >>> > > Properties > > >>> properties = new Properties(); > > >>> > > > > >>> properties.put("key.serializer", > > >>> > > "org.apache.kafka.common.serialization.StringSerializer"); > > >>> > > > > >>> properties.put("value.serializer", > > >>> > > "org.apache.kafka.common.serialization.StringSerializer"); > > >>> > > > > >>> properties.put("bootstrap.servers", brokerList); > > >>> > > > > >>> //创建KafkaProducer 实例 > > >>> > > > > >>> KafkaProducer<String, String&gt; producer = new > > >>> > > KafkaProducer<&gt;(properties); > > >>> > > //构建待发送的消息 > > >>> > > > //{"user_id": > > >>> "952483", "item_id":"310884", "category_id": > > >>> > > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > >>> > > > //{"user_id": > > >>> "794777", "item_id":"5119439", "category_id": > > >>> > > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} > > >>> > > String[] > > >>> behaviors = {"pv", "buy", "coll", > > >>> > "cart"};//浏览,购买,收藏,加入购物车 > > >>> > > JSONObject > > >>> jsonObject = new JSONObject(); > > >>> > > > > HashMap<String, > > >>> String&gt; info = new HashMap<&gt;(); > > >>> > > Random > > random = > > >>> new Random(); > > >>> > > > > >> SimpleDateFormat > > >>> format = new > > >>> > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); > > >>> > > long > > >>> date_long=getDate(); > > >>> > > while > (true) > > { > > >>> > > > >>> > > >> > > > > > > >>> jsonObject.put("user_id", random.nextInt(900000) + 100000 + > > >>> > > ""); > > >>> > > > >>> > > >> > > > > > > >>> jsonObject.put("item_id", random.nextInt(900000) + 100000 + > > >>> > > ""); > > >>> > > > >>> > > >> > > > > > > >>> jsonObject.put("category_id", random.nextInt(1000) + ""); > > >>> > > > >>> > > >> > > > > > > >>> jsonObject.put("behavior", behaviors[random.nextInt(4)]); > > >>> > > > >>> > > >> > > > > > > >>> jsonObject.put("ts", format.format(new Date(date_long))); > > >>> > > > >>> > > >> > > > > > > >>> String msg = jsonObject.toString(); > > >>> > > > >>> > > >> > > > > > > >>> System.out.println(msg); > > >>> > > > >>> > > >> > > > > > > >>> ProducerRecord<String, String&gt; record = new > > >>> > > ProducerRecord<&gt;(topic, msg); > > >>> > > > >>> > > >> > > > > > > >>> producer.send(record); > > >>> > > > > >>> // > > >>> date_long +=500+random.nextGaussian()*1000; > > >>> > > > >>> > > >> > > > > > > >>> date_long +=800+random.nextGaussian()*1500; > > >>> > > > >>> > > >> > > > > > > >>> try { > > >>> > > > >>> > > >> > > > > > > >>> Thread.sleep(60); > > >>> > > > >>> > > >> > > > > > > >>> } catch (InterruptedException e) { > > >>> > > > >>> > > >> > > > > > > >>> e.printStackTrace(); > > >>> > > > >>> > > >> > > > > > > >>> } > > >>> > > } > > >>> > > > > >>> > > } > > >>> > > > > >>> > > private static long getDate() { > > >>> > > Date date = > > new > > >>> Date(); > > >>> > > Calendar c > = > > >>> Calendar.getInstance(); > > >>> > > > > >> c.setTime(date); > > >>> > > > > >>> //设置为1号,当前日期既为本月第一天 > > >>> > > > > >>> c.set(Calendar.DAY_OF_MONTH, 1); > > >>> > > //将小时至0 > > >>> > > > > >>> c.set(Calendar.HOUR_OF_DAY, 0); > > >>> > > //将分钟至0 > > >>> > > > > >>> c.set(Calendar.MINUTE, 0); > > >>> > > //将秒至0 > > >>> > > > > >>> c.set(Calendar.SECOND,0); > > >>> > > //将毫秒至0 > > >>> > > > > >>> c.set(Calendar.MILLISECOND, 0); > > >>> > > // > > >>> 本月第一天的时间戳转换为字符串 > > >>> > > return > > >>> c.getTimeInMillis(); > > >>> > > } > > >>> > > } > > >>> > > > > >>> > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > >>> > > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;; > > >>> > > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08 > > >>> > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > >>> > > > > >>> > > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面 > > >>> > > > > >>> > > > > >>> > > > > >>> > > Hi, > > >>> > > > > >>> > > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, > > >>> TableEnvironment 上还未支持。 > > >>> > > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" > > >> 是什么意思呢?要不举个例子看下? > > >>> > > > > >>> > > Best, > > >>> > > Jark > > >>> > > > > >>> > > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email] > &gt; > > >>> wrote: > > >>> > > > > >>> > > &gt; 大佬好: > > >>> > > &gt; &amp;nbsp; &amp;nbsp; > > >>> > > &gt; > > >>> > > > >>> > > >> > > > &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > >>> > > &gt; &amp;nbsp; &amp;nbsp; > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > >>> > > &gt; &amp;nbsp;&amp;nbsp; > > >>> > > &gt; &amp;nbsp; &amp;nbsp; > > >>> > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > > >>> > > &gt; view as ...")却会报错。报错如下: > > >>> > > &gt; Exception in thread "main" > > >>> > org.apache.flink.table.api.TableException: > > >>> > > &gt; Unsupported query: CREATE VIEW rich_user_behavior > AS > > >>> > > &gt; SELECT U.user_id, U.item_id, > > U.behavior,&amp;nbsp; > > >>> > > &gt; &amp;nbsp; CASE C.parent_category_id > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品' > > >>> > > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他' > > >>> > > &gt; &amp;nbsp; END AS category_name > > >>> > > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR > > >>> SYSTEM_TIME AS OF > > >>> > > &gt; U.proctime AS C > > >>> > > &gt; ON U.category_id = C.sub_category_id > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > > >>> > > &gt; Source) > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > java.util.Optional.orElseThrow(Optional.java:290) > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > >>> > > > > >>> > > >> > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > >>> at > > >>> > > &gt; > > >>> > > > > >>> > > > >>> > > >> > > > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > >>> > > &gt; > > >>> > > &gt; > > >>> > > &gt; > > >>> > > &gt; > > >>> > > &gt; > > >>> > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; > > >>> &amp;nbsp; 望解答,十分感谢! > > >>> > > > >>> > > >>> > > >>> -- > > >>> > > >>> Benchao Li > > >>> School of Electronics Engineering and Computer Science, Peking > > University > > >>> Tel:+86-15650713730 > > >>> Email: [hidden email]; [hidden email] > > >> > > >> > > >> > > >> -- > > >> > > >> Benchao Li > > >> School of Electronics Engineering and Computer Science, Peking > > University > > >> Tel:+86-15650713730 > > >> Email: [hidden email]; [hidden email] > > >> > > > > > > > > > -- > > > > > > Benchao Li > > > School of Electronics Engineering and Computer Science, Peking > University > > > Tel:+86-15650713730 > > > Email: [hidden email]; [hidden email] > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > |
Free forum by Nabble | Edit this page |