回复: 问题请教-flinksql的kafkasource方面

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

回复: 问题请教-flinksql的kafkasource方面

人生若只如初见
你好
感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多



附:
userbehavior建表语句
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    'connector.type' = 'kafka',  -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
    'connector.properties.zookeeper.connect' = '192.168.0.150:2181',  -- zookeeper 地址
    'connector.properties.bootstrap.servers' = '192.168.0.150:9092',  -- kafka broker 地址
    'format.type' = 'json'  -- 数据源格式为 json
)

每小时购买数建表语句
CREATE TABLE buy_cnt_per_hour ( 
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector
    'connector.version' = '6',  -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本
&nbsp; &nbsp; 'connector.hosts' = '<a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;-- elasticsearch 地址
&nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', &nbsp;-- elasticsearch 索引名,相当于数据库的表名
&nbsp; &nbsp; 'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相当于数据库的库名
&nbsp; &nbsp; 'connector.bulk-flush.max-actions' = '1', &nbsp;-- 每条数据都刷新
&nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- 输出数据格式 json
&nbsp; &nbsp; 'update-mode' = 'append'
)

插入语句
INSERT INTO buy_cnt_per_hour&nbsp;
SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*)&nbsp;
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)

kafka数据发送代码

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.SimpleDateFormat;
import java.util.*;


public class UserBehaviorProducer {
    public static final String brokerList = "192.168.0.150:9092";

    //    public static final String topic="user_behavior";
    public static final String topic = "user_behavior";

    public static void main(String args[]) {

        //配置生产者客户端参数
        //将配置序列化
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        //创建KafkaProducer 实例
        KafkaProducer<String, String&gt; producer = new KafkaProducer<&gt;(properties);
        //构建待发送的消息
        //{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
        //{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
        String[] behaviors = {"pv", "buy", "coll", "cart"};//浏览,购买,收藏,加入购物车
        JSONObject jsonObject = new JSONObject();
        HashMap<String, String&gt; info = new HashMap<&gt;();
        Random random = new Random();
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
        long date_long=getDate();
        while (true) {
            jsonObject.put("user_id", random.nextInt(900000) + 100000 + "");
            jsonObject.put("item_id", random.nextInt(900000) + 100000 + "");
            jsonObject.put("category_id", random.nextInt(1000) + "");
            jsonObject.put("behavior", behaviors[random.nextInt(4)]);
            jsonObject.put("ts", format.format(new Date(date_long)));
            String msg = jsonObject.toString();
            System.out.println(msg);
            ProducerRecord<String, String&gt; record = new ProducerRecord<&gt;(topic, msg);
            producer.send(record);
//            date_long +=500+random.nextGaussian()*1000;
            date_long +=800+random.nextGaussian()*1500;
            try {
                Thread.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    private static long getDate() {
        Date date = new Date();
        Calendar c = Calendar.getInstance();
        c.setTime(date);
        //设置为1号,当前日期既为本月第一天
        c.set(Calendar.DAY_OF_MONTH, 1);
        //将小时至0
        c.set(Calendar.HOUR_OF_DAY, 0);
        //将分钟至0
        c.set(Calendar.MINUTE, 0);
        //将秒至0
        c.set(Calendar.SECOND,0);
        //将毫秒至0
        c.set(Calendar.MILLISECOND, 0);
        // 本月第一天的时间戳转换为字符串
        return c.getTimeInMillis();
    }
}

------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Jark Wu"<[hidden email]&gt;;
发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面



Hi,

关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。
关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?

Best,
Jark

On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&gt; wrote:

&gt; 大佬好:
&gt; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
&gt; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
&gt; &amp;nbsp;&amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
&gt; view as ...")却会报错。报错如下:
&gt; Exception in thread "main" org.apache.flink.table.api.TableException:
&gt; Unsupported query: CREATE VIEW rich_user_behavior AS
&gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp;
&gt; &amp;nbsp; CASE C.parent_category_id
&gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包'
&gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰'
&gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电'
&gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆'
&gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴'
&gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码'
&gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外'
&gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品'
&gt; &amp;nbsp; &amp;nbsp; ELSE '其他'
&gt; &amp;nbsp; END AS category_name
&gt; FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
&gt; U.proctime AS C
&gt; ON U.category_id = C.sub_category_id
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
&gt; Source)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.Optional.orElseThrow(Optional.java:290)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 望解答,十分感谢!
Reply | Threaded
Open this post in threaded view
|

Re: 问题请教-flinksql的kafkasource方面

Jark
Administrator
Hi,

根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
根据你的 Java 代码,数据的 event time 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
能容忍 5s 乱序).
但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 partition
进度快很多的现象,
导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。

完美的解决方案还需要等 FLIP-27 的完成。
当前可以通过增加 watermark delay来增大迟到数据的容忍。

Best,
Jark


On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote:

> 你好
>
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
>
>
>
> 附:
> userbehavior建表语句
> CREATE TABLE user_behavior (
> &nbsp; &nbsp; user_id BIGINT,
> &nbsp; &nbsp; item_id BIGINT,
> &nbsp; &nbsp; category_id BIGINT,
> &nbsp; &nbsp; behavior STRING,
> &nbsp; &nbsp; ts TIMESTAMP(3),
> &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- 通过计算列产生一个处理时间列
> &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' SECOND &nbsp;--
> 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> &nbsp; &nbsp; 'connector.type' = 'kafka', &nbsp;-- 使用 kafka connector
> &nbsp; &nbsp; 'connector.version' = 'universal', &nbsp;-- kafka
> 版本,universal 支持 0.11 以上的版本
> &nbsp; &nbsp; 'connector.topic' = 'user_behavior', &nbsp;-- kafka topic
> &nbsp; &nbsp; 'connector.startup-mode' = 'earliest-offset', &nbsp;-- 从起始
> offset 开始读取
> &nbsp; &nbsp; 'connector.properties.zookeeper.connect' = '
> 192.168.0.150:2181', &nbsp;-- zookeeper 地址
> &nbsp; &nbsp; 'connector.properties.bootstrap.servers' = '
> 192.168.0.150:9092', &nbsp;-- kafka broker 地址
> &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- 数据源格式为 json
> )
>
> 每小时购买数建表语句
> CREATE TABLE buy_cnt_per_hour (&nbsp;
> &nbsp; &nbsp; hour_of_day BIGINT,
> &nbsp; &nbsp; buy_cnt BIGINT
> ) WITH (
> &nbsp; &nbsp; 'connector.type' = 'elasticsearch', -- 使用 elasticsearch
> connector
> &nbsp; &nbsp; 'connector.version' = '6', &nbsp;-- elasticsearch 版本,6 能支持
> es 6+ 以及 7+ 的版本
> &nbsp; &nbsp; 'connector.hosts' = '<a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;--
> elasticsearch 地址
> &nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', &nbsp;--
> elasticsearch 索引名,相当于数据库的表名
> &nbsp; &nbsp; 'connector.document-type' = 'user_behavior', --
> elasticsearch 的 type,相当于数据库的库名
> &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = '1', &nbsp;-- 每条数据都刷新
> &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- 输出数据格式 json
> &nbsp; &nbsp; 'update-mode' = 'append'
> )
>
> 插入语句
> INSERT INTO buy_cnt_per_hour&nbsp;
> SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*)&nbsp;
> FROM user_behavior
> WHERE behavior = 'buy'
> GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
>
> kafka数据发送代码
>
> import com.alibaba.fastjson.JSONObject;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import java.text.SimpleDateFormat;
> import java.util.*;
>
>
> public class UserBehaviorProducer {
>     public static final String brokerList = "192.168.0.150:9092";
>
>     //    public static final String topic="user_behavior";
>     public static final String topic = "user_behavior";
>
>     public static void main(String args[]) {
>
>         //配置生产者客户端参数
>         //将配置序列化
>         Properties properties = new Properties();
>         properties.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>         properties.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>         properties.put("bootstrap.servers", brokerList);
>         //创建KafkaProducer 实例
>         KafkaProducer<String, String&gt; producer = new
> KafkaProducer<&gt;(properties);
>         //构建待发送的消息
>         //{"user_id": "952483", "item_id":"310884", "category_id":
> "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
>         //{"user_id": "794777", "item_id":"5119439", "category_id":
> "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
>         String[] behaviors = {"pv", "buy", "coll", "cart"};//浏览,购买,收藏,加入购物车
>         JSONObject jsonObject = new JSONObject();
>         HashMap<String, String&gt; info = new HashMap<&gt;();
>         Random random = new Random();
>         SimpleDateFormat format = new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
>         long date_long=getDate();
>         while (true) {
>             jsonObject.put("user_id", random.nextInt(900000) + 100000 +
> "");
>             jsonObject.put("item_id", random.nextInt(900000) + 100000 +
> "");
>             jsonObject.put("category_id", random.nextInt(1000) + "");
>             jsonObject.put("behavior", behaviors[random.nextInt(4)]);
>             jsonObject.put("ts", format.format(new Date(date_long)));
>             String msg = jsonObject.toString();
>             System.out.println(msg);
>             ProducerRecord<String, String&gt; record = new
> ProducerRecord<&gt;(topic, msg);
>             producer.send(record);
> //            date_long +=500+random.nextGaussian()*1000;
>             date_long +=800+random.nextGaussian()*1500;
>             try {
>                 Thread.sleep(60);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>         }
>
>     }
>
>     private static long getDate() {
>         Date date = new Date();
>         Calendar c = Calendar.getInstance();
>         c.setTime(date);
>         //设置为1号,当前日期既为本月第一天
>         c.set(Calendar.DAY_OF_MONTH, 1);
>         //将小时至0
>         c.set(Calendar.HOUR_OF_DAY, 0);
>         //将分钟至0
>         c.set(Calendar.MINUTE, 0);
>         //将秒至0
>         c.set(Calendar.SECOND,0);
>         //将毫秒至0
>         c.set(Calendar.MILLISECOND, 0);
>         // 本月第一天的时间戳转换为字符串
>         return c.getTimeInMillis();
>     }
> }
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面
>
>
>
> Hi,
>
> 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。
> 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?
>
> Best,
> Jark
>
> On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&gt; wrote:
>
> &gt; 大佬好:
> &gt; &amp;nbsp; &amp;nbsp;
> &gt; &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> &gt; &amp;nbsp; &amp;nbsp;
> &gt;
> &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> &gt; &amp;nbsp;&amp;nbsp;
> &gt; &amp;nbsp; &amp;nbsp;
> 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> &gt; view as ...")却会报错。报错如下:
> &gt; Exception in thread "main" org.apache.flink.table.api.TableException:
> &gt; Unsupported query: CREATE VIEW rich_user_behavior AS
> &gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp;
> &gt; &amp;nbsp; CASE C.parent_category_id
> &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包'
> &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰'
> &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电'
> &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆'
> &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴'
> &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码'
> &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外'
> &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品'
> &gt; &amp;nbsp; &amp;nbsp; ELSE '其他'
> &gt; &amp;nbsp; END AS category_name
> &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
> &gt; U.proctime AS C
> &gt; ON U.category_id = C.sub_category_id
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> &gt; Source)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.util.Optional.orElseThrow(Optional.java:290)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 望解答,十分感谢!
Reply | Threaded
Open this post in threaded view
|

Re: 问题请教-flinksql的kafkasource方面

Benchao Li
如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。

Jark Wu <[hidden email]> 于2020年4月19日周日 下午8:22写道:

> Hi,
>
> 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> 根据你的 Java 代码,数据的 event time 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> 能容忍 5s 乱序).
> 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 partition
> 进度快很多的现象,
> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
>
> 完美的解决方案还需要等 FLIP-27 的完成。
> 当前可以通过增加 watermark delay来增大迟到数据的容忍。
>
> Best,
> Jark
>
>
> On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]> wrote:
>
> > 你好
> >
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> >
> >
> >
> > 附:
> > userbehavior建表语句
> > CREATE TABLE user_behavior (
> > &nbsp; &nbsp; user_id BIGINT,
> > &nbsp; &nbsp; item_id BIGINT,
> > &nbsp; &nbsp; category_id BIGINT,
> > &nbsp; &nbsp; behavior STRING,
> > &nbsp; &nbsp; ts TIMESTAMP(3),
> > &nbsp; &nbsp; proctime as PROCTIME(), &nbsp; -- 通过计算列产生一个处理时间列
> > &nbsp; &nbsp; WATERMARK FOR ts as ts - INTERVAL '5' SECOND &nbsp;--
> > 在ts上定义watermark,ts成为事件时间列
> > ) WITH (
> > &nbsp; &nbsp; 'connector.type' = 'kafka', &nbsp;-- 使用 kafka connector
> > &nbsp; &nbsp; 'connector.version' = 'universal', &nbsp;-- kafka
> > 版本,universal 支持 0.11 以上的版本
> > &nbsp; &nbsp; 'connector.topic' = 'user_behavior', &nbsp;-- kafka topic
> > &nbsp; &nbsp; 'connector.startup-mode' = 'earliest-offset', &nbsp;-- 从起始
> > offset 开始读取
> > &nbsp; &nbsp; 'connector.properties.zookeeper.connect' = '
> > 192.168.0.150:2181', &nbsp;-- zookeeper 地址
> > &nbsp; &nbsp; 'connector.properties.bootstrap.servers' = '
> > 192.168.0.150:9092', &nbsp;-- kafka broker 地址
> > &nbsp; &nbsp; 'format.type' = 'json' &nbsp;-- 数据源格式为 json
> > )
> >
> > 每小时购买数建表语句
> > CREATE TABLE buy_cnt_per_hour (&nbsp;
> > &nbsp; &nbsp; hour_of_day BIGINT,
> > &nbsp; &nbsp; buy_cnt BIGINT
> > ) WITH (
> > &nbsp; &nbsp; 'connector.type' = 'elasticsearch', -- 使用 elasticsearch
> > connector
> > &nbsp; &nbsp; 'connector.version' = '6', &nbsp;-- elasticsearch 版本,6 能支持
> > es 6+ 以及 7+ 的版本
> > &nbsp; &nbsp; 'connector.hosts' = '<a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &nbsp;--
> > elasticsearch 地址
> > &nbsp; &nbsp; 'connector.index' = 'buy_cnt_per_hour', &nbsp;--
> > elasticsearch 索引名,相当于数据库的表名
> > &nbsp; &nbsp; 'connector.document-type' = 'user_behavior', --
> > elasticsearch 的 type,相当于数据库的库名
> > &nbsp; &nbsp; 'connector.bulk-flush.max-actions' = '1', &nbsp;-- 每条数据都刷新
> > &nbsp; &nbsp; 'format.type' = 'json', &nbsp;-- 输出数据格式 json
> > &nbsp; &nbsp; 'update-mode' = 'append'
> > )
> >
> > 插入语句
> > INSERT INTO buy_cnt_per_hour&nbsp;
> > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*)&nbsp;
> > FROM user_behavior
> > WHERE behavior = 'buy'
> > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> >
> > kafka数据发送代码
> >
> > import com.alibaba.fastjson.JSONObject;
> > import org.apache.kafka.clients.producer.KafkaProducer;
> > import org.apache.kafka.clients.producer.ProducerRecord;
> >
> > import java.text.SimpleDateFormat;
> > import java.util.*;
> >
> >
> > public class UserBehaviorProducer {
> >     public static final String brokerList = "192.168.0.150:9092";
> >
> >     //    public static final String topic="user_behavior";
> >     public static final String topic = "user_behavior";
> >
> >     public static void main(String args[]) {
> >
> >         //配置生产者客户端参数
> >         //将配置序列化
> >         Properties properties = new Properties();
> >         properties.put("key.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> >         properties.put("value.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> >         properties.put("bootstrap.servers", brokerList);
> >         //创建KafkaProducer 实例
> >         KafkaProducer<String, String&gt; producer = new
> > KafkaProducer<&gt;(properties);
> >         //构建待发送的消息
> >         //{"user_id": "952483", "item_id":"310884", "category_id":
> > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> >         //{"user_id": "794777", "item_id":"5119439", "category_id":
> > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> >         String[] behaviors = {"pv", "buy", "coll",
> "cart"};//浏览,购买,收藏,加入购物车
> >         JSONObject jsonObject = new JSONObject();
> >         HashMap<String, String&gt; info = new HashMap<&gt;();
> >         Random random = new Random();
> >         SimpleDateFormat format = new
> > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
> >         long date_long=getDate();
> >         while (true) {
> >             jsonObject.put("user_id", random.nextInt(900000) + 100000 +
> > "");
> >             jsonObject.put("item_id", random.nextInt(900000) + 100000 +
> > "");
> >             jsonObject.put("category_id", random.nextInt(1000) + "");
> >             jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> >             jsonObject.put("ts", format.format(new Date(date_long)));
> >             String msg = jsonObject.toString();
> >             System.out.println(msg);
> >             ProducerRecord<String, String&gt; record = new
> > ProducerRecord<&gt;(topic, msg);
> >             producer.send(record);
> > //            date_long +=500+random.nextGaussian()*1000;
> >             date_long +=800+random.nextGaussian()*1500;
> >             try {
> >                 Thread.sleep(60);
> >             } catch (InterruptedException e) {
> >                 e.printStackTrace();
> >             }
> >         }
> >
> >     }
> >
> >     private static long getDate() {
> >         Date date = new Date();
> >         Calendar c = Calendar.getInstance();
> >         c.setTime(date);
> >         //设置为1号,当前日期既为本月第一天
> >         c.set(Calendar.DAY_OF_MONTH, 1);
> >         //将小时至0
> >         c.set(Calendar.HOUR_OF_DAY, 0);
> >         //将分钟至0
> >         c.set(Calendar.MINUTE, 0);
> >         //将秒至0
> >         c.set(Calendar.SECOND,0);
> >         //将毫秒至0
> >         c.set(Calendar.MILLISECOND, 0);
> >         // 本月第一天的时间戳转换为字符串
> >         return c.getTimeInMillis();
> >     }
> > }
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;;
> > 发送时间:&nbsp;2020年4月18日(星期六) 晚上10:08
> > 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> >
> > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面
> >
> >
> >
> > Hi,
> >
> > 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。
> > 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?
> >
> > Best,
> > Jark
> >
> > On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&gt; wrote:
> >
> > &gt; 大佬好:
> > &gt; &amp;nbsp; &amp;nbsp;
> > &gt;
> &amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> > &gt; &amp;nbsp; &amp;nbsp;
> > &gt;
> >
> &amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> > &gt; &amp;nbsp;&amp;nbsp;
> > &gt; &amp;nbsp; &amp;nbsp;
> > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> > &gt; view as ...")却会报错。报错如下:
> > &gt; Exception in thread "main"
> org.apache.flink.table.api.TableException:
> > &gt; Unsupported query: CREATE VIEW rich_user_behavior AS
> > &gt; SELECT U.user_id, U.item_id, U.behavior,&amp;nbsp;
> > &gt; &amp;nbsp; CASE C.parent_category_id
> > &gt; &amp;nbsp; &amp;nbsp; WHEN 1 THEN '服饰鞋包'
> > &gt; &amp;nbsp; &amp;nbsp; WHEN 2 THEN '家装家饰'
> > &gt; &amp;nbsp; &amp;nbsp; WHEN 3 THEN '家电'
> > &gt; &amp;nbsp; &amp;nbsp; WHEN 4 THEN '美妆'
> > &gt; &amp;nbsp; &amp;nbsp; WHEN 5 THEN '母婴'
> > &gt; &amp;nbsp; &amp;nbsp; WHEN 6 THEN '3C数码'
> > &gt; &amp;nbsp; &amp;nbsp; WHEN 7 THEN '运动户外'
> > &gt; &amp;nbsp; &amp;nbsp; WHEN 8 THEN '食品'
> > &gt; &amp;nbsp; &amp;nbsp; ELSE '其他'
> > &gt; &amp;nbsp; END AS category_name
> > &gt; FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
> > &gt; U.proctime AS C
> > &gt; ON U.category_id = C.sub_category_id
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> > &gt; Source)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.util.Optional.orElseThrow(Optional.java:290)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> > &gt;
> > &gt;
> > &gt;
> > &gt;
> > &gt;
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 望解答,十分感谢!
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: 问题请教-flinksql的kafkasource方面

人生若只如初见
嗯嗯,十分感谢
 

 

 ------------------&nbsp;原始邮件&nbsp;------------------
  发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
 发送时间:&nbsp;2020年4月19日(星期天) 晚上9:25
 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
 
 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面

 

如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。

Jark Wu <[hidden email]&gt; 于2020年4月19日周日 下午8:22写道:

&gt; Hi,
&gt;
&gt; 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
&gt; 根据你的 Java 代码,数据的 event time 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
&gt; 能容忍 5s 乱序).
&gt; 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 partition
&gt; 进度快很多的现象,
&gt; 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
&gt;
&gt; 完美的解决方案还需要等 FLIP-27 的完成。
&gt; 当前可以通过增加 watermark delay来增大迟到数据的容忍。
&gt;
&gt; Best,
&gt; Jark
&gt;
&gt;
&gt; On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]&gt; wrote:
&gt;
&gt; &gt; 你好
&gt; &gt;
&gt; &gt;
&gt; 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; 附:
&gt; &gt; userbehavior建表语句
&gt; &gt; CREATE TABLE user_behavior (
&gt; &gt; &amp;nbsp; &amp;nbsp; user_id BIGINT,
&gt; &gt; &amp;nbsp; &amp;nbsp; item_id BIGINT,
&gt; &gt; &amp;nbsp; &amp;nbsp; category_id BIGINT,
&gt; &gt; &amp;nbsp; &amp;nbsp; behavior STRING,
&gt; &gt; &amp;nbsp; &amp;nbsp; ts TIMESTAMP(3),
&gt; &gt; &amp;nbsp; &amp;nbsp; proctime as PROCTIME(), &amp;nbsp; -- 通过计算列产生一个处理时间列
&gt; &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts as ts - INTERVAL '5' SECOND &amp;nbsp;--
&gt; &gt; 在ts上定义watermark,ts成为事件时间列
&gt; &gt; ) WITH (
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'kafka', &amp;nbsp;-- 使用 kafka connector
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = 'universal', &amp;nbsp;-- kafka
&gt; &gt; 版本,universal 支持 0.11 以上的版本
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.topic' = 'user_behavior', &amp;nbsp;-- kafka topic
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.startup-mode' = 'earliest-offset', &amp;nbsp;-- 从起始
&gt; &gt; offset 开始读取
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.zookeeper.connect' = '
&gt; &gt; 192.168.0.150:2181', &amp;nbsp;-- zookeeper 地址
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.bootstrap.servers' = '
&gt; &gt; 192.168.0.150:9092', &amp;nbsp;-- kafka broker 地址
&gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json' &amp;nbsp;-- 数据源格式为 json
&gt; &gt; )
&gt; &gt;
&gt; &gt; 每小时购买数建表语句
&gt; &gt; CREATE TABLE buy_cnt_per_hour (&amp;nbsp;
&gt; &gt; &amp;nbsp; &amp;nbsp; hour_of_day BIGINT,
&gt; &gt; &amp;nbsp; &amp;nbsp; buy_cnt BIGINT
&gt; &gt; ) WITH (
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'elasticsearch', -- 使用 elasticsearch
&gt; &gt; connector
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = '6', &amp;nbsp;-- elasticsearch 版本,6 能支持
&gt; &gt; es 6+ 以及 7+ 的版本
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.hosts' = '<a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &amp;nbsp;--
&gt; &gt; elasticsearch 地址
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.index' = 'buy_cnt_per_hour', &amp;nbsp;--
&gt; &gt; elasticsearch 索引名,相当于数据库的表名
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.document-type' = 'user_behavior', --
&gt; &gt; elasticsearch 的 type,相当于数据库的库名
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.bulk-flush.max-actions' = '1', &amp;nbsp;-- 每条数据都刷新
&gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json', &amp;nbsp;-- 输出数据格式 json
&gt; &gt; &amp;nbsp; &amp;nbsp; 'update-mode' = 'append'
&gt; &gt; )
&gt; &gt;
&gt; &gt; 插入语句
&gt; &gt; INSERT INTO buy_cnt_per_hour&amp;nbsp;
&gt; &gt; SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*)&amp;nbsp;
&gt; &gt; FROM user_behavior
&gt; &gt; WHERE behavior = 'buy'
&gt; &gt; GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
&gt; &gt;
&gt; &gt; kafka数据发送代码
&gt; &gt;
&gt; &gt; import com.alibaba.fastjson.JSONObject;
&gt; &gt; import org.apache.kafka.clients.producer.KafkaProducer;
&gt; &gt; import org.apache.kafka.clients.producer.ProducerRecord;
&gt; &gt;
&gt; &gt; import java.text.SimpleDateFormat;
&gt; &gt; import java.util.*;
&gt; &gt;
&gt; &gt;
&gt; &gt; public class UserBehaviorProducer {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String brokerList = "192.168.0.150:9092";
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;&nbsp;&nbsp; public static final String topic="user_behavior";
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String topic = "user_behavior";
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static void main(String args[]) {
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //配置生产者客户端参数
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将配置序列化
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties = new Properties();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties.put("key.serializer",
&gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties.put("value.serializer",
&gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties.put("bootstrap.servers", brokerList);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //创建KafkaProducer 实例
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; KafkaProducer<String, String&amp;gt; producer = new
&gt; &gt; KafkaProducer<&amp;gt;(properties);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //构建待发送的消息
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id": "952483", "item_id":"310884", "category_id":
&gt; &gt; "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id": "794777", "item_id":"5119439", "category_id":
&gt; &gt; "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String[] behaviors = {"pv", "buy", "coll",
&gt; "cart"};//浏览,购买,收藏,加入购物车
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; JSONObject jsonObject = new JSONObject();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; HashMap<String, String&amp;gt; info = new HashMap<&amp;gt;();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Random random = new Random();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SimpleDateFormat format = new
&gt; &gt; SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long date_long=getDate();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (true) {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; jsonObject.put("user_id", random.nextInt(900000) + 100000 +
&gt; &gt; "");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; jsonObject.put("item_id", random.nextInt(900000) + 100000 +
&gt; &gt; "");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; jsonObject.put("category_id", random.nextInt(1000) + "");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; jsonObject.put("behavior", behaviors[random.nextInt(4)]);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; jsonObject.put("ts", format.format(new Date(date_long)));
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String msg = jsonObject.toString();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; System.out.println(msg);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ProducerRecord<String, String&amp;gt; record = new
&gt; &gt; ProducerRecord<&amp;gt;(topic, msg);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; producer.send(record);
&gt; &gt; //&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; date_long +=500+random.nextGaussian()*1000;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; date_long +=800+random.nextGaussian()*1500;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Thread.sleep(60);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } catch (InterruptedException e) {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; e.printStackTrace();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; private static long getDate() {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Date date = new Date();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Calendar c = Calendar.getInstance();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.setTime(date);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //设置为1号,当前日期既为本月第一天
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.set(Calendar.DAY_OF_MONTH, 1);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将小时至0
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.set(Calendar.HOUR_OF_DAY, 0);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将分钟至0
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.set(Calendar.MINUTE, 0);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将秒至0
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.set(Calendar.SECOND,0);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将毫秒至0
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.set(Calendar.MILLISECOND, 0);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 本月第一天的时间戳转换为字符串
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return c.getTimeInMillis();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt; }
&gt; &gt;
&gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; &gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
&gt; &gt; 发送时间:&amp;nbsp;2020年4月18日(星期六) 晚上10:08
&gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt; &gt;
&gt; &gt; 主题:&amp;nbsp;Re: 问题请教-flinksql的kafkasource方面
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; Hi,
&gt; &gt;
&gt; &gt; 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。
&gt; &gt; 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?
&gt; &gt;
&gt; &gt; Best,
&gt; &gt; Jark
&gt; &gt;
&gt; &gt; On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&amp;gt; wrote:
&gt; &gt;
&gt; &gt; &amp;gt; 大佬好:
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &gt; &amp;gt;
&gt; &amp;amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; &amp;amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
&gt; &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &gt; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
&gt; &gt; &amp;gt; view as ...")却会报错。报错如下:
&gt; &gt; &amp;gt; Exception in thread "main"
&gt; org.apache.flink.table.api.TableException:
&gt; &gt; &amp;gt; Unsupported query: CREATE VIEW rich_user_behavior AS
&gt; &gt; &amp;gt; SELECT U.user_id, U.item_id, U.behavior,&amp;amp;nbsp;
&gt; &gt; &amp;gt; &amp;amp;nbsp; CASE C.parent_category_id
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 1 THEN '服饰鞋包'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 2 THEN '家装家饰'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 3 THEN '家电'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 4 THEN '美妆'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 5 THEN '母婴'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 6 THEN '3C数码'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 7 THEN '运动户外'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 8 THEN '食品'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ELSE '其他'
&gt; &gt; &amp;gt; &amp;amp;nbsp; END AS category_name
&gt; &gt; &amp;gt; FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
&gt; &gt; &amp;gt; U.proctime AS C
&gt; &gt; &amp;gt; ON U.category_id = C.sub_category_id
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
&gt; &gt; &amp;gt; Source)
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; java.util.Optional.orElseThrow(Optional.java:290)
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 望解答,十分感谢!
&gt;


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

回复:问题请教-flinksql的kafkasource方面

admin
Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧




| |
Sun.Zhu
|
|
邮箱:[hidden email]
|

Signature is customized by Netease Mail Master

在2020年04月19日 22:43,人生若只如初见 写道:
嗯嗯,十分感谢
 



------------------&nbsp;原始邮件&nbsp;------------------
 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年4月19日(星期天) 晚上9:25
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面



如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。

Jark Wu <[hidden email]&gt; 于2020年4月19日周日 下午8:22写道:

&gt; Hi,
&gt;
&gt; 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
&gt; 根据你的 Java 代码,数据的 event time 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
&gt; 能容忍 5s 乱序).
&gt; 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 partition
&gt; 进度快很多的现象,
&gt; 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
&gt;
&gt; 完美的解决方案还需要等 FLIP-27 的完成。
&gt; 当前可以通过增加 watermark delay来增大迟到数据的容忍。
&gt;
&gt; Best,
&gt; Jark
&gt;
&gt;
&gt; On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]&gt; wrote:
&gt;
&gt; &gt; 你好
&gt; &gt;
&gt; &gt;
&gt; 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; 附:
&gt; &gt; userbehavior建表语句
&gt; &gt; CREATE TABLE user_behavior (
&gt; &gt; &amp;nbsp; &amp;nbsp; user_id BIGINT,
&gt; &gt; &amp;nbsp; &amp;nbsp; item_id BIGINT,
&gt; &gt; &amp;nbsp; &amp;nbsp; category_id BIGINT,
&gt; &gt; &amp;nbsp; &amp;nbsp; behavior STRING,
&gt; &gt; &amp;nbsp; &amp;nbsp; ts TIMESTAMP(3),
&gt; &gt; &amp;nbsp; &amp;nbsp; proctime as PROCTIME(), &amp;nbsp; -- 通过计算列产生一个处理时间列
&gt; &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts as ts - INTERVAL '5' SECOND &amp;nbsp;--
&gt; &gt; 在ts上定义watermark,ts成为事件时间列
&gt; &gt; ) WITH (
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'kafka', &amp;nbsp;-- 使用 kafka connector
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = 'universal', &amp;nbsp;-- kafka
&gt; &gt; 版本,universal 支持 0.11 以上的版本
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.topic' = 'user_behavior', &amp;nbsp;-- kafka topic
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.startup-mode' = 'earliest-offset', &amp;nbsp;-- 从起始
&gt; &gt; offset 开始读取
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.zookeeper.connect' = '
&gt; &gt; 192.168.0.150:2181', &amp;nbsp;-- zookeeper 地址
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.bootstrap.servers' = '
&gt; &gt; 192.168.0.150:9092', &amp;nbsp;-- kafka broker 地址
&gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json' &amp;nbsp;-- 数据源格式为 json
&gt; &gt; )
&gt; &gt;
&gt; &gt; 每小时购买数建表语句
&gt; &gt; CREATE TABLE buy_cnt_per_hour (&amp;nbsp;
&gt; &gt; &amp;nbsp; &amp;nbsp; hour_of_day BIGINT,
&gt; &gt; &amp;nbsp; &amp;nbsp; buy_cnt BIGINT
&gt; &gt; ) WITH (
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'elasticsearch', -- 使用 elasticsearch
&gt; &gt; connector
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = '6', &amp;nbsp;-- elasticsearch 版本,6 能支持
&gt; &gt; es 6+ 以及 7+ 的版本
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.hosts' = '<a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &amp;nbsp;--
&gt; &gt; elasticsearch 地址
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.index' = 'buy_cnt_per_hour', &amp;nbsp;--
&gt; &gt; elasticsearch 索引名,相当于数据库的表名
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.document-type' = 'user_behavior', --
&gt; &gt; elasticsearch 的 type,相当于数据库的库名
&gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.bulk-flush.max-actions' = '1', &amp;nbsp;-- 每条数据都刷新
&gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json', &amp;nbsp;-- 输出数据格式 json
&gt; &gt; &amp;nbsp; &amp;nbsp; 'update-mode' = 'append'
&gt; &gt; )
&gt; &gt;
&gt; &gt; 插入语句
&gt; &gt; INSERT INTO buy_cnt_per_hour&amp;nbsp;
&gt; &gt; SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*)&amp;nbsp;
&gt; &gt; FROM user_behavior
&gt; &gt; WHERE behavior = 'buy'
&gt; &gt; GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
&gt; &gt;
&gt; &gt; kafka数据发送代码
&gt; &gt;
&gt; &gt; import com.alibaba.fastjson.JSONObject;
&gt; &gt; import org.apache.kafka.clients.producer.KafkaProducer;
&gt; &gt; import org.apache.kafka.clients.producer.ProducerRecord;
&gt; &gt;
&gt; &gt; import java.text.SimpleDateFormat;
&gt; &gt; import java.util.*;
&gt; &gt;
&gt; &gt;
&gt; &gt; public class UserBehaviorProducer {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String brokerList = "192.168.0.150:9092";
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;&nbsp;&nbsp; public static final String topic="user_behavior";
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String topic = "user_behavior";
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static void main(String args[]) {
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //配置生产者客户端参数
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将配置序列化
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties = new Properties();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties.put("key.serializer",
&gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties.put("value.serializer",
&gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties.put("bootstrap.servers", brokerList);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //创建KafkaProducer 实例
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; KafkaProducer<String, String&amp;gt; producer = new
&gt; &gt; KafkaProducer<&amp;gt;(properties);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //构建待发送的消息
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id": "952483", "item_id":"310884", "category_id":
&gt; &gt; "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id": "794777", "item_id":"5119439", "category_id":
&gt; &gt; "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String[] behaviors = {"pv", "buy", "coll",
&gt; "cart"};//浏览,购买,收藏,加入购物车
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; JSONObject jsonObject = new JSONObject();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; HashMap<String, String&amp;gt; info = new HashMap<&amp;gt;();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Random random = new Random();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SimpleDateFormat format = new
&gt; &gt; SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long date_long=getDate();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (true) {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; jsonObject.put("user_id", random.nextInt(900000) + 100000 +
&gt; &gt; "");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; jsonObject.put("item_id", random.nextInt(900000) + 100000 +
&gt; &gt; "");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; jsonObject.put("category_id", random.nextInt(1000) + "");
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; jsonObject.put("behavior", behaviors[random.nextInt(4)]);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; jsonObject.put("ts", format.format(new Date(date_long)));
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String msg = jsonObject.toString();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; System.out.println(msg);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ProducerRecord<String, String&amp;gt; record = new
&gt; &gt; ProducerRecord<&amp;gt;(topic, msg);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; producer.send(record);
&gt; &gt; //&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; date_long +=500+random.nextGaussian()*1000;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; date_long +=800+random.nextGaussian()*1500;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Thread.sleep(60);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } catch (InterruptedException e) {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; e.printStackTrace();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; private static long getDate() {
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Date date = new Date();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Calendar c = Calendar.getInstance();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.setTime(date);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //设置为1号,当前日期既为本月第一天
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.set(Calendar.DAY_OF_MONTH, 1);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将小时至0
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.set(Calendar.HOUR_OF_DAY, 0);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将分钟至0
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.set(Calendar.MINUTE, 0);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将秒至0
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.set(Calendar.SECOND,0);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将毫秒至0
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.set(Calendar.MILLISECOND, 0);
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 本月第一天的时间戳转换为字符串
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return c.getTimeInMillis();
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; &gt; }
&gt; &gt;
&gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; &gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
&gt; &gt; 发送时间:&amp;nbsp;2020年4月18日(星期六) 晚上10:08
&gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt; &gt;
&gt; &gt; 主题:&amp;nbsp;Re: 问题请教-flinksql的kafkasource方面
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; Hi,
&gt; &gt;
&gt; &gt; 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。
&gt; &gt; 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?
&gt; &gt;
&gt; &gt; Best,
&gt; &gt; Jark
&gt; &gt;
&gt; &gt; On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&amp;gt; wrote:
&gt; &gt;
&gt; &gt; &amp;gt; 大佬好:
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &gt; &amp;gt;
&gt; &amp;amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; &amp;amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
&gt; &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &gt; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
&gt; &gt; &amp;gt; view as ...")却会报错。报错如下:
&gt; &gt; &amp;gt; Exception in thread "main"
&gt; org.apache.flink.table.api.TableException:
&gt; &gt; &amp;gt; Unsupported query: CREATE VIEW rich_user_behavior AS
&gt; &gt; &amp;gt; SELECT U.user_id, U.item_id, U.behavior,&amp;amp;nbsp;
&gt; &gt; &amp;gt; &amp;amp;nbsp; CASE C.parent_category_id
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 1 THEN '服饰鞋包'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 2 THEN '家装家饰'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 3 THEN '家电'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 4 THEN '美妆'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 5 THEN '母婴'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 6 THEN '3C数码'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 7 THEN '运动户外'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 8 THEN '食品'
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ELSE '其他'
&gt; &gt; &amp;gt; &amp;amp;nbsp; END AS category_name
&gt; &gt; &amp;gt; FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
&gt; &gt; &amp;gt; U.proctime AS C
&gt; &gt; &amp;gt; ON U.category_id = C.sub_category_id
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
&gt; &gt; &amp;gt; Source)
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; java.util.Optional.orElseThrow(Optional.java:290)
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
&gt; &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 望解答,十分感谢!
&gt;


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 问题请教-flinksql的kafkasource方面

Benchao Li
应该是不会的。分配不到partition的source会标记为idle状态。

Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道:

> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月19日 22:43,人生若只如初见 写道:
> 嗯嗯,十分感谢
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
>  发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年4月19日(星期天) 晚上9:25
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面
>
>
>
> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>
> Jark Wu <[hidden email]&gt; 于2020年4月19日周日 下午8:22写道:
>
> &gt; Hi,
> &gt;
> &gt; 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> &gt; 根据你的 Java 代码,数据的 event time
> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> &gt; 能容忍 5s 乱序).
> &gt; 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> partition
> &gt; 进度快很多的现象,
> &gt; 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> &gt;
> &gt; 完美的解决方案还需要等 FLIP-27 的完成。
> &gt; 当前可以通过增加 watermark delay来增大迟到数据的容忍。
> &gt;
> &gt; Best,
> &gt; Jark
> &gt;
> &gt;
> &gt; On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]&gt; wrote:
> &gt;
> &gt; &gt; 你好
> &gt; &gt;
> &gt; &gt;
> &gt;
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; 附:
> &gt; &gt; userbehavior建表语句
> &gt; &gt; CREATE TABLE user_behavior (
> &gt; &gt; &amp;nbsp; &amp;nbsp; user_id BIGINT,
> &gt; &gt; &amp;nbsp; &amp;nbsp; item_id BIGINT,
> &gt; &gt; &amp;nbsp; &amp;nbsp; category_id BIGINT,
> &gt; &gt; &amp;nbsp; &amp;nbsp; behavior STRING,
> &gt; &gt; &amp;nbsp; &amp;nbsp; ts TIMESTAMP(3),
> &gt; &gt; &amp;nbsp; &amp;nbsp; proctime as PROCTIME(), &amp;nbsp; --
> 通过计算列产生一个处理时间列
> &gt; &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> SECOND &amp;nbsp;--
> &gt; &gt; 在ts上定义watermark,ts成为事件时间列
> &gt; &gt; ) WITH (
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'kafka', &amp;nbsp;--
> 使用 kafka connector
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = 'universal',
> &amp;nbsp;-- kafka
> &gt; &gt; 版本,universal 支持 0.11 以上的版本
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.topic' = 'user_behavior',
> &amp;nbsp;-- kafka topic
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.startup-mode' =
> 'earliest-offset', &amp;nbsp;-- 从起始
> &gt; &gt; offset 开始读取
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.zookeeper.connect' =
> '
> &gt; &gt; 192.168.0.150:2181', &amp;nbsp;-- zookeeper 地址
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.bootstrap.servers' =
> '
> &gt; &gt; 192.168.0.150:9092', &amp;nbsp;-- kafka broker 地址
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json' &amp;nbsp;-- 数据源格式为
> json
> &gt; &gt; )
> &gt; &gt;
> &gt; &gt; 每小时购买数建表语句
> &gt; &gt; CREATE TABLE buy_cnt_per_hour (&amp;nbsp;
> &gt; &gt; &amp;nbsp; &amp;nbsp; hour_of_day BIGINT,
> &gt; &gt; &amp;nbsp; &amp;nbsp; buy_cnt BIGINT
> &gt; &gt; ) WITH (
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'elasticsearch', -- 使用
> elasticsearch
> &gt; &gt; connector
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = '6', &amp;nbsp;--
> elasticsearch 版本,6 能支持
> &gt; &gt; es 6+ 以及 7+ 的版本
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.hosts' = '
> <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &amp;nbsp;--
> &gt; &gt; elasticsearch 地址
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.index' = 'buy_cnt_per_hour',
> &amp;nbsp;--
> &gt; &gt; elasticsearch 索引名,相当于数据库的表名
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.document-type' =
> 'user_behavior', --
> &gt; &gt; elasticsearch 的 type,相当于数据库的库名
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.bulk-flush.max-actions' = '1',
> &amp;nbsp;-- 每条数据都刷新
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json', &amp;nbsp;--
> 输出数据格式 json
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'update-mode' = 'append'
> &gt; &gt; )
> &gt; &gt;
> &gt; &gt; 插入语句
> &gt; &gt; INSERT INTO buy_cnt_per_hour&amp;nbsp;
> &gt; &gt; SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> HOUR)),COUNT(*)&amp;nbsp;
> &gt; &gt; FROM user_behavior
> &gt; &gt; WHERE behavior = 'buy'
> &gt; &gt; GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> &gt; &gt;
> &gt; &gt; kafka数据发送代码
> &gt; &gt;
> &gt; &gt; import com.alibaba.fastjson.JSONObject;
> &gt; &gt; import org.apache.kafka.clients.producer.KafkaProducer;
> &gt; &gt; import org.apache.kafka.clients.producer.ProducerRecord;
> &gt; &gt;
> &gt; &gt; import java.text.SimpleDateFormat;
> &gt; &gt; import java.util.*;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; public class UserBehaviorProducer {
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String brokerList = "
> 192.168.0.150:9092";
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;&nbsp;&nbsp; public static final
> String topic="user_behavior";
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String topic =
> "user_behavior";
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static void main(String args[]) {
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //配置生产者客户端参数
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将配置序列化
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties
> properties = new Properties();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> properties.put("key.serializer",
> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> properties.put("value.serializer",
> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> properties.put("bootstrap.servers", brokerList);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //创建KafkaProducer 实例
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> KafkaProducer<String, String&amp;gt; producer = new
> &gt; &gt; KafkaProducer<&amp;gt;(properties);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //构建待发送的消息
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
> "952483", "item_id":"310884", "category_id":
> &gt; &gt; "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
> "794777", "item_id":"5119439", "category_id":
> &gt; &gt; "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String[]
> behaviors = {"pv", "buy", "coll",
> &gt; "cart"};//浏览,购买,收藏,加入购物车
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; JSONObject
> jsonObject = new JSONObject();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; HashMap<String,
> String&amp;gt; info = new HashMap<&amp;gt;();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Random random =
> new Random();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SimpleDateFormat
> format = new
> &gt; &gt; SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long
> date_long=getDate();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (true) {
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> jsonObject.put("user_id", random.nextInt(900000) + 100000 +
> &gt; &gt; "");
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> jsonObject.put("item_id", random.nextInt(900000) + 100000 +
> &gt; &gt; "");
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> jsonObject.put("category_id", random.nextInt(1000) + "");
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> jsonObject.put("ts", format.format(new Date(date_long)));
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> String msg = jsonObject.toString();
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> System.out.println(msg);
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> ProducerRecord<String, String&amp;gt; record = new
> &gt; &gt; ProducerRecord<&amp;gt;(topic, msg);
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> producer.send(record);
> &gt; &gt;
> //&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> date_long +=500+random.nextGaussian()*1000;
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> date_long +=800+random.nextGaussian()*1500;
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> try {
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> Thread.sleep(60);
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> } catch (InterruptedException e) {
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> e.printStackTrace();
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> }
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; private static long getDate() {
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Date date = new
> Date();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Calendar c =
> Calendar.getInstance();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.setTime(date);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //设置为1号,当前日期既为本月第一天
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.set(Calendar.DAY_OF_MONTH, 1);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将小时至0
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.set(Calendar.HOUR_OF_DAY, 0);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将分钟至0
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.set(Calendar.MINUTE, 0);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将秒至0
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.set(Calendar.SECOND,0);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将毫秒至0
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.set(Calendar.MILLISECOND, 0);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //
> 本月第一天的时间戳转换为字符串
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
> c.getTimeInMillis();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt; }
> &gt; &gt;
> &gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; &gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
> &gt; &gt; 发送时间:&amp;nbsp;2020年4月18日(星期六) 晚上10:08
> &gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt; &gt;
> &gt; &gt; 主题:&amp;nbsp;Re: 问题请教-flinksql的kafkasource方面
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; Hi,
> &gt; &gt;
> &gt; &gt; 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法,
> TableEnvironment 上还未支持。
> &gt; &gt; 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?
> &gt; &gt;
> &gt; &gt; Best,
> &gt; &gt; Jark
> &gt; &gt;
> &gt; &gt; On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&amp;gt;
> wrote:
> &gt; &gt;
> &gt; &gt; &amp;gt; 大佬好:
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> &gt; &gt; &amp;gt;
> &gt;
> &amp;amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> &amp;amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> &gt; &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> &gt; &gt; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> &gt; &gt; &amp;gt; view as ...")却会报错。报错如下:
> &gt; &gt; &amp;gt; Exception in thread "main"
> &gt; org.apache.flink.table.api.TableException:
> &gt; &gt; &amp;gt; Unsupported query: CREATE VIEW rich_user_behavior AS
> &gt; &gt; &amp;gt; SELECT U.user_id, U.item_id, U.behavior,&amp;amp;nbsp;
> &gt; &gt; &amp;gt; &amp;amp;nbsp; CASE C.parent_category_id
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 1 THEN '服饰鞋包'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 2 THEN '家装家饰'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 3 THEN '家电'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 4 THEN '美妆'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 5 THEN '母婴'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 6 THEN '3C数码'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 7 THEN '运动户外'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 8 THEN '食品'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ELSE '其他'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; END AS category_name
> &gt; &gt; &amp;gt; FROM user_behavior AS U LEFT JOIN category_dim FOR
> SYSTEM_TIME AS OF
> &gt; &gt; &amp;gt; U.proctime AS C
> &gt; &gt; &amp;gt; ON U.category_id = C.sub_category_id
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> &gt; &gt; &amp;gt; Source)
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; java.util.Optional.orElseThrow(Optional.java:290)
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> &amp;amp;nbsp; 望解答,十分感谢!
> &gt;
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

回复:问题请教-flinksql的kafkasource方面

admin
我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint ,源码CheckpointCoordinator#triggerCheckpoint也有说明





| |
Sun.Zhu
|
|
邮箱:[hidden email]
|

Signature is customized by Netease Mail Master

在2020年04月20日 10:37,Benchao Li 写道:
应该是不会的。分配不到partition的source会标记为idle状态。

Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道:

> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月19日 22:43,人生若只如初见 写道:
> 嗯嗯,十分感谢
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
>  发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年4月19日(星期天) 晚上9:25
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面
>
>
>
> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>
> Jark Wu <[hidden email]&gt; 于2020年4月19日周日 下午8:22写道:
>
> &gt; Hi,
> &gt;
> &gt; 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> &gt; 根据你的 Java 代码,数据的 event time
> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> &gt; 能容忍 5s 乱序).
> &gt; 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> partition
> &gt; 进度快很多的现象,
> &gt; 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> &gt;
> &gt; 完美的解决方案还需要等 FLIP-27 的完成。
> &gt; 当前可以通过增加 watermark delay来增大迟到数据的容忍。
> &gt;
> &gt; Best,
> &gt; Jark
> &gt;
> &gt;
> &gt; On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]&gt; wrote:
> &gt;
> &gt; &gt; 你好
> &gt; &gt;
> &gt; &gt;
> &gt;
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; 附:
> &gt; &gt; userbehavior建表语句
> &gt; &gt; CREATE TABLE user_behavior (
> &gt; &gt; &amp;nbsp; &amp;nbsp; user_id BIGINT,
> &gt; &gt; &amp;nbsp; &amp;nbsp; item_id BIGINT,
> &gt; &gt; &amp;nbsp; &amp;nbsp; category_id BIGINT,
> &gt; &gt; &amp;nbsp; &amp;nbsp; behavior STRING,
> &gt; &gt; &amp;nbsp; &amp;nbsp; ts TIMESTAMP(3),
> &gt; &gt; &amp;nbsp; &amp;nbsp; proctime as PROCTIME(), &amp;nbsp; --
> 通过计算列产生一个处理时间列
> &gt; &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> SECOND &amp;nbsp;--
> &gt; &gt; 在ts上定义watermark,ts成为事件时间列
> &gt; &gt; ) WITH (
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'kafka', &amp;nbsp;--
> 使用 kafka connector
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = 'universal',
> &amp;nbsp;-- kafka
> &gt; &gt; 版本,universal 支持 0.11 以上的版本
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.topic' = 'user_behavior',
> &amp;nbsp;-- kafka topic
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.startup-mode' =
> 'earliest-offset', &amp;nbsp;-- 从起始
> &gt; &gt; offset 开始读取
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.zookeeper.connect' =
> '
> &gt; &gt; 192.168.0.150:2181', &amp;nbsp;-- zookeeper 地址
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.bootstrap.servers' =
> '
> &gt; &gt; 192.168.0.150:9092', &amp;nbsp;-- kafka broker 地址
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json' &amp;nbsp;-- 数据源格式为
> json
> &gt; &gt; )
> &gt; &gt;
> &gt; &gt; 每小时购买数建表语句
> &gt; &gt; CREATE TABLE buy_cnt_per_hour (&amp;nbsp;
> &gt; &gt; &amp;nbsp; &amp;nbsp; hour_of_day BIGINT,
> &gt; &gt; &amp;nbsp; &amp;nbsp; buy_cnt BIGINT
> &gt; &gt; ) WITH (
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'elasticsearch', -- 使用
> elasticsearch
> &gt; &gt; connector
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = '6', &amp;nbsp;--
> elasticsearch 版本,6 能支持
> &gt; &gt; es 6+ 以及 7+ 的版本
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.hosts' = '
> <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &amp;nbsp;--
> &gt; &gt; elasticsearch 地址
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.index' = 'buy_cnt_per_hour',
> &amp;nbsp;--
> &gt; &gt; elasticsearch 索引名,相当于数据库的表名
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.document-type' =
> 'user_behavior', --
> &gt; &gt; elasticsearch 的 type,相当于数据库的库名
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.bulk-flush.max-actions' = '1',
> &amp;nbsp;-- 每条数据都刷新
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json', &amp;nbsp;--
> 输出数据格式 json
> &gt; &gt; &amp;nbsp; &amp;nbsp; 'update-mode' = 'append'
> &gt; &gt; )
> &gt; &gt;
> &gt; &gt; 插入语句
> &gt; &gt; INSERT INTO buy_cnt_per_hour&amp;nbsp;
> &gt; &gt; SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> HOUR)),COUNT(*)&amp;nbsp;
> &gt; &gt; FROM user_behavior
> &gt; &gt; WHERE behavior = 'buy'
> &gt; &gt; GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> &gt; &gt;
> &gt; &gt; kafka数据发送代码
> &gt; &gt;
> &gt; &gt; import com.alibaba.fastjson.JSONObject;
> &gt; &gt; import org.apache.kafka.clients.producer.KafkaProducer;
> &gt; &gt; import org.apache.kafka.clients.producer.ProducerRecord;
> &gt; &gt;
> &gt; &gt; import java.text.SimpleDateFormat;
> &gt; &gt; import java.util.*;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; public class UserBehaviorProducer {
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String brokerList = "
> 192.168.0.150:9092";
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;&nbsp;&nbsp; public static final
> String topic="user_behavior";
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String topic =
> "user_behavior";
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static void main(String args[]) {
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //配置生产者客户端参数
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将配置序列化
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties
> properties = new Properties();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> properties.put("key.serializer",
> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> properties.put("value.serializer",
> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> properties.put("bootstrap.servers", brokerList);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //创建KafkaProducer 实例
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> KafkaProducer<String, String&amp;gt; producer = new
> &gt; &gt; KafkaProducer<&amp;gt;(properties);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //构建待发送的消息
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
> "952483", "item_id":"310884", "category_id":
> &gt; &gt; "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
> "794777", "item_id":"5119439", "category_id":
> &gt; &gt; "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String[]
> behaviors = {"pv", "buy", "coll",
> &gt; "cart"};//浏览,购买,收藏,加入购物车
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; JSONObject
> jsonObject = new JSONObject();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; HashMap<String,
> String&amp;gt; info = new HashMap<&amp;gt;();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Random random =
> new Random();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SimpleDateFormat
> format = new
> &gt; &gt; SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long
> date_long=getDate();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (true) {
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> jsonObject.put("user_id", random.nextInt(900000) + 100000 +
> &gt; &gt; "");
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> jsonObject.put("item_id", random.nextInt(900000) + 100000 +
> &gt; &gt; "");
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> jsonObject.put("category_id", random.nextInt(1000) + "");
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> jsonObject.put("ts", format.format(new Date(date_long)));
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> String msg = jsonObject.toString();
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> System.out.println(msg);
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> ProducerRecord<String, String&amp;gt; record = new
> &gt; &gt; ProducerRecord<&amp;gt;(topic, msg);
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> producer.send(record);
> &gt; &gt;
> //&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> date_long +=500+random.nextGaussian()*1000;
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> date_long +=800+random.nextGaussian()*1500;
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> try {
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> Thread.sleep(60);
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> } catch (InterruptedException e) {
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> e.printStackTrace();
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> }
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt;
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; private static long getDate() {
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Date date = new
> Date();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Calendar c =
> Calendar.getInstance();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; c.setTime(date);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //设置为1号,当前日期既为本月第一天
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.set(Calendar.DAY_OF_MONTH, 1);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将小时至0
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.set(Calendar.HOUR_OF_DAY, 0);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将分钟至0
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.set(Calendar.MINUTE, 0);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将秒至0
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.set(Calendar.SECOND,0);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将毫秒至0
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.set(Calendar.MILLISECOND, 0);
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //
> 本月第一天的时间戳转换为字符串
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
> c.getTimeInMillis();
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt; &gt; }
> &gt; &gt;
> &gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; &gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
> &gt; &gt; 发送时间:&amp;nbsp;2020年4月18日(星期六) 晚上10:08
> &gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt; &gt;
> &gt; &gt; 主题:&amp;nbsp;Re: 问题请教-flinksql的kafkasource方面
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; Hi,
> &gt; &gt;
> &gt; &gt; 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法,
> TableEnvironment 上还未支持。
> &gt; &gt; 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?
> &gt; &gt;
> &gt; &gt; Best,
> &gt; &gt; Jark
> &gt; &gt;
> &gt; &gt; On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&amp;gt;
> wrote:
> &gt; &gt;
> &gt; &gt; &amp;gt; 大佬好:
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> &gt; &gt; &amp;gt;
> &gt;
> &amp;amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> &amp;amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> &gt; &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> &gt; &gt; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> &gt; &gt; &amp;gt; view as ...")却会报错。报错如下:
> &gt; &gt; &amp;gt; Exception in thread "main"
> &gt; org.apache.flink.table.api.TableException:
> &gt; &gt; &amp;gt; Unsupported query: CREATE VIEW rich_user_behavior AS
> &gt; &gt; &amp;gt; SELECT U.user_id, U.item_id, U.behavior,&amp;amp;nbsp;
> &gt; &gt; &amp;gt; &amp;amp;nbsp; CASE C.parent_category_id
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 1 THEN '服饰鞋包'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 2 THEN '家装家饰'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 3 THEN '家电'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 4 THEN '美妆'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 5 THEN '母婴'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 6 THEN '3C数码'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 7 THEN '运动户外'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 8 THEN '食品'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ELSE '其他'
> &gt; &gt; &amp;gt; &amp;amp;nbsp; END AS category_name
> &gt; &gt; &amp;gt; FROM user_behavior AS U LEFT JOIN category_dim FOR
> SYSTEM_TIME AS OF
> &gt; &gt; &amp;gt; U.proctime AS C
> &gt; &gt; &amp;gt; ON U.category_id = C.sub_category_id
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> &gt; &gt; &amp;gt; Source)
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; java.util.Optional.orElseThrow(Optional.java:290)
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> &gt; &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> &amp;amp;nbsp; 望解答,十分感谢!
> &gt;
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 问题请教-flinksql的kafkasource方面

Benchao Li
你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。

Sun.Zhu <[hidden email]> 于2020年4月20日周一 下午5:43写道:

> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
>
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月20日 10:37,Benchao Li 写道:
> 应该是不会的。分配不到partition的source会标记为idle状态。
>
> Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道:
>
> > Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> >
> >
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 邮箱:[hidden email]
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年04月19日 22:43,人生若只如初见 写道:
> > 嗯嗯,十分感谢
> >
> >
> >
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> >  发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> > 发送时间:&nbsp;2020年4月19日(星期天) 晚上9:25
> > 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> >
> > 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面
> >
> >
> >
> > 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> >
> > Jark Wu <[hidden email]&gt; 于2020年4月19日周日 下午8:22写道:
> >
> > &gt; Hi,
> > &gt;
> > &gt; 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> > &gt; 根据你的 Java 代码,数据的 event time
> > 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> > &gt; 能容忍 5s 乱序).
> > &gt; 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> > partition
> > &gt; 进度快很多的现象,
> > &gt; 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > &gt;
> > &gt; 完美的解决方案还需要等 FLIP-27 的完成。
> > &gt; 当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > &gt;
> > &gt; Best,
> > &gt; Jark
> > &gt;
> > &gt;
> > &gt; On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]&gt; wrote:
> > &gt;
> > &gt; &gt; 你好
> > &gt; &gt;
> > &gt; &gt;
> > &gt;
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt; 附:
> > &gt; &gt; userbehavior建表语句
> > &gt; &gt; CREATE TABLE user_behavior (
> > &gt; &gt; &amp;nbsp; &amp;nbsp; user_id BIGINT,
> > &gt; &gt; &amp;nbsp; &amp;nbsp; item_id BIGINT,
> > &gt; &gt; &amp;nbsp; &amp;nbsp; category_id BIGINT,
> > &gt; &gt; &amp;nbsp; &amp;nbsp; behavior STRING,
> > &gt; &gt; &amp;nbsp; &amp;nbsp; ts TIMESTAMP(3),
> > &gt; &gt; &amp;nbsp; &amp;nbsp; proctime as PROCTIME(), &amp;nbsp; --
> > 通过计算列产生一个处理时间列
> > &gt; &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> > SECOND &amp;nbsp;--
> > &gt; &gt; 在ts上定义watermark,ts成为事件时间列
> > &gt; &gt; ) WITH (
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'kafka', &amp;nbsp;--
> > 使用 kafka connector
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = 'universal',
> > &amp;nbsp;-- kafka
> > &gt; &gt; 版本,universal 支持 0.11 以上的版本
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.topic' = 'user_behavior',
> > &amp;nbsp;-- kafka topic
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.startup-mode' =
> > 'earliest-offset', &amp;nbsp;-- 从起始
> > &gt; &gt; offset 开始读取
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.zookeeper.connect'
> =
> > '
> > &gt; &gt; 192.168.0.150:2181', &amp;nbsp;-- zookeeper 地址
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.bootstrap.servers'
> =
> > '
> > &gt; &gt; 192.168.0.150:9092', &amp;nbsp;-- kafka broker 地址
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json' &amp;nbsp;--
> 数据源格式为
> > json
> > &gt; &gt; )
> > &gt; &gt;
> > &gt; &gt; 每小时购买数建表语句
> > &gt; &gt; CREATE TABLE buy_cnt_per_hour (&amp;nbsp;
> > &gt; &gt; &amp;nbsp; &amp;nbsp; hour_of_day BIGINT,
> > &gt; &gt; &amp;nbsp; &amp;nbsp; buy_cnt BIGINT
> > &gt; &gt; ) WITH (
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'elasticsearch', -- 使用
> > elasticsearch
> > &gt; &gt; connector
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = '6', &amp;nbsp;--
> > elasticsearch 版本,6 能支持
> > &gt; &gt; es 6+ 以及 7+ 的版本
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.hosts' = '
> > <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &amp;nbsp;--
> > &gt; &gt; elasticsearch 地址
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.index' = 'buy_cnt_per_hour',
> > &amp;nbsp;--
> > &gt; &gt; elasticsearch 索引名,相当于数据库的表名
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.document-type' =
> > 'user_behavior', --
> > &gt; &gt; elasticsearch 的 type,相当于数据库的库名
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.bulk-flush.max-actions' = '1',
> > &amp;nbsp;-- 每条数据都刷新
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json', &amp;nbsp;--
> > 输出数据格式 json
> > &gt; &gt; &amp;nbsp; &amp;nbsp; 'update-mode' = 'append'
> > &gt; &gt; )
> > &gt; &gt;
> > &gt; &gt; 插入语句
> > &gt; &gt; INSERT INTO buy_cnt_per_hour&amp;nbsp;
> > &gt; &gt; SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> > HOUR)),COUNT(*)&amp;nbsp;
> > &gt; &gt; FROM user_behavior
> > &gt; &gt; WHERE behavior = 'buy'
> > &gt; &gt; GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> > &gt; &gt;
> > &gt; &gt; kafka数据发送代码
> > &gt; &gt;
> > &gt; &gt; import com.alibaba.fastjson.JSONObject;
> > &gt; &gt; import org.apache.kafka.clients.producer.KafkaProducer;
> > &gt; &gt; import org.apache.kafka.clients.producer.ProducerRecord;
> > &gt; &gt;
> > &gt; &gt; import java.text.SimpleDateFormat;
> > &gt; &gt; import java.util.*;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt; public class UserBehaviorProducer {
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String brokerList
> = "
> > 192.168.0.150:9092";
> > &gt; &gt;
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;&nbsp;&nbsp; public static
> final
> > String topic="user_behavior";
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String topic =
> > "user_behavior";
> > &gt; &gt;
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static void main(String args[])
> {
> > &gt; &gt;
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //配置生产者客户端参数
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将配置序列化
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties
> > properties = new Properties();
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > properties.put("key.serializer",
> > &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > properties.put("value.serializer",
> > &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > properties.put("bootstrap.servers", brokerList);
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > //创建KafkaProducer 实例
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > KafkaProducer<String, String&amp;gt; producer = new
> > &gt; &gt; KafkaProducer<&amp;gt;(properties);
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //构建待发送的消息
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
> > "952483", "item_id":"310884", "category_id":
> > &gt; &gt; "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
> > "794777", "item_id":"5119439", "category_id":
> > &gt; &gt; "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String[]
> > behaviors = {"pv", "buy", "coll",
> > &gt; "cart"};//浏览,购买,收藏,加入购物车
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; JSONObject
> > jsonObject = new JSONObject();
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; HashMap<String,
> > String&amp;gt; info = new HashMap<&amp;gt;();
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Random random =
> > new Random();
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> SimpleDateFormat
> > format = new
> > &gt; &gt; SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long
> > date_long=getDate();
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (true) {
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > jsonObject.put("user_id", random.nextInt(900000) + 100000 +
> > &gt; &gt; "");
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > jsonObject.put("item_id", random.nextInt(900000) + 100000 +
> > &gt; &gt; "");
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > jsonObject.put("category_id", random.nextInt(1000) + "");
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > jsonObject.put("ts", format.format(new Date(date_long)));
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > String msg = jsonObject.toString();
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > System.out.println(msg);
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > ProducerRecord<String, String&amp;gt; record = new
> > &gt; &gt; ProducerRecord<&amp;gt;(topic, msg);
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > producer.send(record);
> > &gt; &gt;
> > //&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > date_long +=500+random.nextGaussian()*1000;
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > date_long +=800+random.nextGaussian()*1500;
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > try {
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > Thread.sleep(60);
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > } catch (InterruptedException e) {
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > e.printStackTrace();
> > &gt;
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > }
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> > &gt; &gt;
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> > &gt; &gt;
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; private static long getDate() {
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Date date = new
> > Date();
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Calendar c =
> > Calendar.getInstance();
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> c.setTime(date);
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > //设置为1号,当前日期既为本月第一天
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > c.set(Calendar.DAY_OF_MONTH, 1);
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将小时至0
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > c.set(Calendar.HOUR_OF_DAY, 0);
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将分钟至0
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > c.set(Calendar.MINUTE, 0);
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将秒至0
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > c.set(Calendar.SECOND,0);
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将毫秒至0
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > c.set(Calendar.MILLISECOND, 0);
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //
> > 本月第一天的时间戳转换为字符串
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
> > c.getTimeInMillis();
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> > &gt; &gt; }
> > &gt; &gt;
> > &gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> > &gt; &gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
> > &gt; &gt; 发送时间:&amp;nbsp;2020年4月18日(星期六) 晚上10:08
> > &gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> > &gt; &gt;
> > &gt; &gt; 主题:&amp;nbsp;Re: 问题请教-flinksql的kafkasource方面
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt; Hi,
> > &gt; &gt;
> > &gt; &gt; 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法,
> > TableEnvironment 上还未支持。
> > &gt; &gt; 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了"
> 是什么意思呢?要不举个例子看下?
> > &gt; &gt;
> > &gt; &gt; Best,
> > &gt; &gt; Jark
> > &gt; &gt;
> > &gt; &gt; On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&amp;gt;
> > wrote:
> > &gt; &gt;
> > &gt; &gt; &amp;gt; 大佬好:
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> > &gt; &gt; &amp;gt;
> > &gt;
> >
> &amp;amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> > &gt; &gt; &amp;gt;
> > &gt; &gt;
> > &gt;
> >
> &amp;amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> > &gt; &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> > &gt; &gt; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> > &gt; &gt; &amp;gt; view as ...")却会报错。报错如下:
> > &gt; &gt; &amp;gt; Exception in thread "main"
> > &gt; org.apache.flink.table.api.TableException:
> > &gt; &gt; &amp;gt; Unsupported query: CREATE VIEW rich_user_behavior AS
> > &gt; &gt; &amp;gt; SELECT U.user_id, U.item_id, U.behavior,&amp;amp;nbsp;
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; CASE C.parent_category_id
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 1 THEN '服饰鞋包'
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 2 THEN '家装家饰'
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 3 THEN '家电'
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 4 THEN '美妆'
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 5 THEN '母婴'
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 6 THEN '3C数码'
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 7 THEN '运动户外'
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 8 THEN '食品'
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ELSE '其他'
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; END AS category_name
> > &gt; &gt; &amp;gt; FROM user_behavior AS U LEFT JOIN category_dim FOR
> > SYSTEM_TIME AS OF
> > &gt; &gt; &amp;gt; U.proctime AS C
> > &gt; &gt; &amp;gt; ON U.category_id = C.sub_category_id
> > &gt; &gt;
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > at
> > &gt; &gt; &amp;gt;
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> > &gt; &gt;
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > at
> > &gt; &gt; &amp;gt;
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> > &gt; &gt; &amp;gt; Source)
> > &gt; &gt;
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > at
> > &gt; &gt; java.util.Optional.orElseThrow(Optional.java:290)
> > &gt; &gt;
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > at
> > &gt; &gt; &amp;gt;
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> > &gt; &gt;
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > at
> > &gt; &gt; &amp;gt;
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> > &gt; &gt;
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > at
> > &gt; &gt; &amp;gt;
> > &gt; &gt;
> > &gt;
> >
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> > &gt; &gt; &amp;gt;
> > &gt; &gt; &amp;gt;
> > &gt; &gt; &amp;gt;
> > &gt; &gt; &amp;gt;
> > &gt; &gt; &amp;gt;
> > &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> > &amp;amp;nbsp; 望解答,十分感谢!
> > &gt;
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: [hidden email]; [hidden email]
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 问题请教-flinksql的kafkasource方面

admin
我们是1.8版本,但是这段源码应该是没变把
// check if all tasks that we need to trigger are running.
// if not, abort the checkpoint
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
   Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
   if (ee == null) {
      LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
            tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
            job);
      throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
   } else if (ee.getState() == ExecutionState.RUNNING) {
      executions[i] = ee;
   } else {
      LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
            tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
            job,
            ExecutionState.RUNNING,
            ee.getState());
      throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
   }
}
还是我理解的不对

> 2020年4月20日 下午6:21,Benchao Li <[hidden email]> 写道:
>
> 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
>
> Sun.Zhu <[hidden email]> 于2020年4月20日周一 下午5:43写道:
>
>> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
>> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
>>
>>
>>
>>
>>
>> | |
>> Sun.Zhu
>> |
>> |
>> 邮箱:[hidden email]
>> |
>>
>> Signature is customized by Netease Mail Master
>>
>> 在2020年04月20日 10:37,Benchao Li 写道:
>> 应该是不会的。分配不到partition的source会标记为idle状态。
>>
>> Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道:
>>
>>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>>>
>>>
>>>
>>>
>>> | |
>>> Sun.Zhu
>>> |
>>> |
>>> 邮箱:[hidden email]
>>> |
>>>
>>> Signature is customized by Netease Mail Master
>>>
>>> 在2020年04月19日 22:43,人生若只如初见 写道:
>>> 嗯嗯,十分感谢
>>>
>>>
>>>
>>>
>>> ------------------&nbsp;原始邮件&nbsp;------------------
>>> 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
>>> 发送时间:&nbsp;2020年4月19日(星期天) 晚上9:25
>>> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>>>
>>> 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面
>>>
>>>
>>>
>>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>>>
>>> Jark Wu <[hidden email]&gt; 于2020年4月19日周日 下午8:22写道:
>>>
>>> &gt; Hi,
>>> &gt;
>>> &gt; 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
>>> &gt; 根据你的 Java 代码,数据的 event time
>>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
>>> &gt; 能容忍 5s 乱序).
>>> &gt; 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
>>> partition
>>> &gt; 进度快很多的现象,
>>> &gt; 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
>>> &gt;
>>> &gt; 完美的解决方案还需要等 FLIP-27 的完成。
>>> &gt; 当前可以通过增加 watermark delay来增大迟到数据的容忍。
>>> &gt;
>>> &gt; Best,
>>> &gt; Jark
>>> &gt;
>>> &gt;
>>> &gt; On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]&gt; wrote:
>>> &gt;
>>> &gt; &gt; 你好
>>> &gt; &gt;
>>> &gt; &gt;
>>> &gt;
>>>
>> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
>>> &gt; &gt;
>>> &gt; &gt;
>>> &gt; &gt;
>>> &gt; &gt; 附:
>>> &gt; &gt; userbehavior建表语句
>>> &gt; &gt; CREATE TABLE user_behavior (
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; user_id BIGINT,
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; item_id BIGINT,
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; category_id BIGINT,
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; behavior STRING,
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; ts TIMESTAMP(3),
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; proctime as PROCTIME(), &amp;nbsp; --
>>> 通过计算列产生一个处理时间列
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
>>> SECOND &amp;nbsp;--
>>> &gt; &gt; 在ts上定义watermark,ts成为事件时间列
>>> &gt; &gt; ) WITH (
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'kafka', &amp;nbsp;--
>>> 使用 kafka connector
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = 'universal',
>>> &amp;nbsp;-- kafka
>>> &gt; &gt; 版本,universal 支持 0.11 以上的版本
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.topic' = 'user_behavior',
>>> &amp;nbsp;-- kafka topic
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.startup-mode' =
>>> 'earliest-offset', &amp;nbsp;-- 从起始
>>> &gt; &gt; offset 开始读取
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.zookeeper.connect'
>> =
>>> '
>>> &gt; &gt; 192.168.0.150:2181', &amp;nbsp;-- zookeeper 地址
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.properties.bootstrap.servers'
>> =
>>> '
>>> &gt; &gt; 192.168.0.150:9092', &amp;nbsp;-- kafka broker 地址
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json' &amp;nbsp;--
>> 数据源格式为
>>> json
>>> &gt; &gt; )
>>> &gt; &gt;
>>> &gt; &gt; 每小时购买数建表语句
>>> &gt; &gt; CREATE TABLE buy_cnt_per_hour (&amp;nbsp;
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; hour_of_day BIGINT,
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; buy_cnt BIGINT
>>> &gt; &gt; ) WITH (
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'elasticsearch', -- 使用
>>> elasticsearch
>>> &gt; &gt; connector
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = '6', &amp;nbsp;--
>>> elasticsearch 版本,6 能支持
>>> &gt; &gt; es 6+ 以及 7+ 的版本
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.hosts' = '
>>> <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &amp;nbsp;--
>>> &gt; &gt; elasticsearch 地址
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.index' = 'buy_cnt_per_hour',
>>> &amp;nbsp;--
>>> &gt; &gt; elasticsearch 索引名,相当于数据库的表名
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.document-type' =
>>> 'user_behavior', --
>>> &gt; &gt; elasticsearch 的 type,相当于数据库的库名
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.bulk-flush.max-actions' = '1',
>>> &amp;nbsp;-- 每条数据都刷新
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json', &amp;nbsp;--
>>> 输出数据格式 json
>>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'update-mode' = 'append'
>>> &gt; &gt; )
>>> &gt; &gt;
>>> &gt; &gt; 插入语句
>>> &gt; &gt; INSERT INTO buy_cnt_per_hour&amp;nbsp;
>>> &gt; &gt; SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
>>> HOUR)),COUNT(*)&amp;nbsp;
>>> &gt; &gt; FROM user_behavior
>>> &gt; &gt; WHERE behavior = 'buy'
>>> &gt; &gt; GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
>>> &gt; &gt;
>>> &gt; &gt; kafka数据发送代码
>>> &gt; &gt;
>>> &gt; &gt; import com.alibaba.fastjson.JSONObject;
>>> &gt; &gt; import org.apache.kafka.clients.producer.KafkaProducer;
>>> &gt; &gt; import org.apache.kafka.clients.producer.ProducerRecord;
>>> &gt; &gt;
>>> &gt; &gt; import java.text.SimpleDateFormat;
>>> &gt; &gt; import java.util.*;
>>> &gt; &gt;
>>> &gt; &gt;
>>> &gt; &gt; public class UserBehaviorProducer {
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String brokerList
>> = "
>>> 192.168.0.150:9092";
>>> &gt; &gt;
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;&nbsp;&nbsp; public static
>> final
>>> String topic="user_behavior";
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String topic =
>>> "user_behavior";
>>> &gt; &gt;
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static void main(String args[])
>> {
>>> &gt; &gt;
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //配置生产者客户端参数
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将配置序列化
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties
>>> properties = new Properties();
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> properties.put("key.serializer",
>>> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> properties.put("value.serializer",
>>> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> properties.put("bootstrap.servers", brokerList);
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> //创建KafkaProducer 实例
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> KafkaProducer<String, String&amp;gt; producer = new
>>> &gt; &gt; KafkaProducer<&amp;gt;(properties);
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //构建待发送的消息
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
>>> "952483", "item_id":"310884", "category_id":
>>> &gt; &gt; "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
>>> "794777", "item_id":"5119439", "category_id":
>>> &gt; &gt; "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String[]
>>> behaviors = {"pv", "buy", "coll",
>>> &gt; "cart"};//浏览,购买,收藏,加入购物车
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; JSONObject
>>> jsonObject = new JSONObject();
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; HashMap<String,
>>> String&amp;gt; info = new HashMap<&amp;gt;();
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Random random =
>>> new Random();
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> SimpleDateFormat
>>> format = new
>>> &gt; &gt; SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long
>>> date_long=getDate();
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (true) {
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> jsonObject.put("user_id", random.nextInt(900000) + 100000 +
>>> &gt; &gt; "");
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> jsonObject.put("item_id", random.nextInt(900000) + 100000 +
>>> &gt; &gt; "");
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> jsonObject.put("category_id", random.nextInt(1000) + "");
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> jsonObject.put("behavior", behaviors[random.nextInt(4)]);
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> jsonObject.put("ts", format.format(new Date(date_long)));
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> String msg = jsonObject.toString();
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> System.out.println(msg);
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> ProducerRecord<String, String&amp;gt; record = new
>>> &gt; &gt; ProducerRecord<&amp;gt;(topic, msg);
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> producer.send(record);
>>> &gt; &gt;
>>> //&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> date_long +=500+random.nextGaussian()*1000;
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> date_long +=800+random.nextGaussian()*1500;
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> try {
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> Thread.sleep(60);
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> } catch (InterruptedException e) {
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> e.printStackTrace();
>>> &gt;
>>>
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> }
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
>>> &gt; &gt;
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
>>> &gt; &gt;
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; private static long getDate() {
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Date date = new
>>> Date();
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Calendar c =
>>> Calendar.getInstance();
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> c.setTime(date);
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> //设置为1号,当前日期既为本月第一天
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> c.set(Calendar.DAY_OF_MONTH, 1);
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将小时至0
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> c.set(Calendar.HOUR_OF_DAY, 0);
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将分钟至0
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> c.set(Calendar.MINUTE, 0);
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将秒至0
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> c.set(Calendar.SECOND,0);
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将毫秒至0
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> c.set(Calendar.MILLISECOND, 0);
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //
>>> 本月第一天的时间戳转换为字符串
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
>>> c.getTimeInMillis();
>>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
>>> &gt; &gt; }
>>> &gt; &gt;
>>> &gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
>>> &gt; &gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
>>> &gt; &gt; 发送时间:&amp;nbsp;2020年4月18日(星期六) 晚上10:08
>>> &gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
>>> &gt; &gt;
>>> &gt; &gt; 主题:&amp;nbsp;Re: 问题请教-flinksql的kafkasource方面
>>> &gt; &gt;
>>> &gt; &gt;
>>> &gt; &gt;
>>> &gt; &gt; Hi,
>>> &gt; &gt;
>>> &gt; &gt; 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法,
>>> TableEnvironment 上还未支持。
>>> &gt; &gt; 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了"
>> 是什么意思呢?要不举个例子看下?
>>> &gt; &gt;
>>> &gt; &gt; Best,
>>> &gt; &gt; Jark
>>> &gt; &gt;
>>> &gt; &gt; On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&amp;gt;
>>> wrote:
>>> &gt; &gt;
>>> &gt; &gt; &amp;gt; 大佬好:
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
>>> &gt; &gt; &amp;gt;
>>> &gt;
>>>
>> &amp;amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt;
>>> &gt;
>>>
>> &amp;amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
>>> &gt; &gt; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
>>> &gt; &gt; &amp;gt; view as ...")却会报错。报错如下:
>>> &gt; &gt; &amp;gt; Exception in thread "main"
>>> &gt; org.apache.flink.table.api.TableException:
>>> &gt; &gt; &amp;gt; Unsupported query: CREATE VIEW rich_user_behavior AS
>>> &gt; &gt; &amp;gt; SELECT U.user_id, U.item_id, U.behavior,&amp;amp;nbsp;
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; CASE C.parent_category_id
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 1 THEN '服饰鞋包'
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 2 THEN '家装家饰'
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 3 THEN '家电'
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 4 THEN '美妆'
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 5 THEN '母婴'
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 6 THEN '3C数码'
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 7 THEN '运动户外'
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 8 THEN '食品'
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ELSE '其他'
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; END AS category_name
>>> &gt; &gt; &amp;gt; FROM user_behavior AS U LEFT JOIN category_dim FOR
>>> SYSTEM_TIME AS OF
>>> &gt; &gt; &amp;gt; U.proctime AS C
>>> &gt; &gt; &amp;gt; ON U.category_id = C.sub_category_id
>>> &gt; &gt;
>>>
>> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
>>> at
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt;
>>> &gt;
>>>
>> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
>>> &gt; &gt;
>>>
>> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
>>> at
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt;
>>> &gt;
>>>
>> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
>>> &gt; &gt; &amp;gt; Source)
>>> &gt; &gt;
>>>
>> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
>>> at
>>> &gt; &gt; java.util.Optional.orElseThrow(Optional.java:290)
>>> &gt; &gt;
>>>
>> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
>>> at
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt;
>>> &gt;
>>>
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
>>> &gt; &gt;
>>>
>> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
>>> at
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt;
>>> &gt;
>>>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>>> &gt; &gt;
>>>
>> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
>>> at
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt;
>>> &gt;
>>>
>> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt; &amp;gt;
>>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
>>> &amp;amp;nbsp; 望解答,十分感谢!
>>> &gt;
>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: [hidden email]; [hidden email]
>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: [hidden email]; [hidden email]
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: 问题请教-flinksql的kafkasource方面

Benchao Li
我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。

祝尚 <[hidden email]> 于2020年4月20日周一 下午10:29写道:

> 我们是1.8版本,但是这段源码应该是没变把
> // check if all tasks that we need to trigger are running.
> // if not, abort the checkpoint
> Execution[] executions = new Execution[tasksToTrigger.length];
> for (int i = 0; i < tasksToTrigger.length; i++) {
>    Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
>    if (ee == null) {
>       LOG.info("Checkpoint triggering task {} of job {} is not being
> executed at the moment. Aborting checkpoint.",
>             tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>             job);
>       throw new
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
>    } else if (ee.getState() == ExecutionState.RUNNING) {
>       executions[i] = ee;
>    } else {
>       LOG.info("Checkpoint triggering task {} of job {} is not in state {}
> but {} instead. Aborting checkpoint.",
>             tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>             job,
>             ExecutionState.RUNNING,
>             ee.getState());
>       throw new
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
>    }
> }
> 还是我理解的不对
>
> > 2020年4月20日 下午6:21,Benchao Li <[hidden email]> 写道:
> >
> > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> >
> > Sun.Zhu <[hidden email]> 于2020年4月20日周一 下午5:43写道:
> >
> >>
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> >>
> >>
> >>
> >>
> >>
> >> | |
> >> Sun.Zhu
> >> |
> >> |
> >> 邮箱:[hidden email]
> >> |
> >>
> >> Signature is customized by Netease Mail Master
> >>
> >> 在2020年04月20日 10:37,Benchao Li 写道:
> >> 应该是不会的。分配不到partition的source会标记为idle状态。
> >>
> >> Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道:
> >>
> >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> >>>
> >>>
> >>>
> >>>
> >>> | |
> >>> Sun.Zhu
> >>> |
> >>> |
> >>> 邮箱:[hidden email]
> >>> |
> >>>
> >>> Signature is customized by Netease Mail Master
> >>>
> >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> >>> 嗯嗯,十分感谢
> >>>
> >>>
> >>>
> >>>
> >>> ------------------&nbsp;原始邮件&nbsp;------------------
> >>> 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> >>> 发送时间:&nbsp;2020年4月19日(星期天) 晚上9:25
> >>> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> >>>
> >>> 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面
> >>>
> >>>
> >>>
> >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> >>>
> >>> Jark Wu <[hidden email]&gt; 于2020年4月19日周日 下午8:22写道:
> >>>
> >>> &gt; Hi,
> >>> &gt;
> >>> &gt; 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> >>> &gt; 根据你的 Java 代码,数据的 event time
> >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> >>> &gt; 能容忍 5s 乱序).
> >>> &gt; 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> >>> partition
> >>> &gt; 进度快很多的现象,
> >>> &gt; 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> >>> &gt;
> >>> &gt; 完美的解决方案还需要等 FLIP-27 的完成。
> >>> &gt; 当前可以通过增加 watermark delay来增大迟到数据的容忍。
> >>> &gt;
> >>> &gt; Best,
> >>> &gt; Jark
> >>> &gt;
> >>> &gt;
> >>> &gt; On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]&gt; wrote:
> >>> &gt;
> >>> &gt; &gt; 你好
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt; &gt; 附:
> >>> &gt; &gt; userbehavior建表语句
> >>> &gt; &gt; CREATE TABLE user_behavior (
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; user_id BIGINT,
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; item_id BIGINT,
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; category_id BIGINT,
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; behavior STRING,
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; ts TIMESTAMP(3),
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; proctime as PROCTIME(), &amp;nbsp; --
> >>> 通过计算列产生一个处理时间列
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> >>> SECOND &amp;nbsp;--
> >>> &gt; &gt; 在ts上定义watermark,ts成为事件时间列
> >>> &gt; &gt; ) WITH (
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'kafka',
> &amp;nbsp;--
> >>> 使用 kafka connector
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = 'universal',
> >>> &amp;nbsp;-- kafka
> >>> &gt; &gt; 版本,universal 支持 0.11 以上的版本
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.topic' = 'user_behavior',
> >>> &amp;nbsp;-- kafka topic
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.startup-mode' =
> >>> 'earliest-offset', &amp;nbsp;-- 从起始
> >>> &gt; &gt; offset 开始读取
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp;
> 'connector.properties.zookeeper.connect'
> >> =
> >>> '
> >>> &gt; &gt; 192.168.0.150:2181', &amp;nbsp;-- zookeeper 地址
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp;
> 'connector.properties.bootstrap.servers'
> >> =
> >>> '
> >>> &gt; &gt; 192.168.0.150:9092', &amp;nbsp;-- kafka broker 地址
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json' &amp;nbsp;--
> >> 数据源格式为
> >>> json
> >>> &gt; &gt; )
> >>> &gt; &gt;
> >>> &gt; &gt; 每小时购买数建表语句
> >>> &gt; &gt; CREATE TABLE buy_cnt_per_hour (&amp;nbsp;
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; hour_of_day BIGINT,
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; buy_cnt BIGINT
> >>> &gt; &gt; ) WITH (
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'elasticsearch', --
> 使用
> >>> elasticsearch
> >>> &gt; &gt; connector
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = '6', &amp;nbsp;--
> >>> elasticsearch 版本,6 能支持
> >>> &gt; &gt; es 6+ 以及 7+ 的版本
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.hosts' = '
> >>> <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &amp;nbsp;--
> >>> &gt; &gt; elasticsearch 地址
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.index' = 'buy_cnt_per_hour',
> >>> &amp;nbsp;--
> >>> &gt; &gt; elasticsearch 索引名,相当于数据库的表名
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.document-type' =
> >>> 'user_behavior', --
> >>> &gt; &gt; elasticsearch 的 type,相当于数据库的库名
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.bulk-flush.max-actions' =
> '1',
> >>> &amp;nbsp;-- 每条数据都刷新
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json', &amp;nbsp;--
> >>> 输出数据格式 json
> >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'update-mode' = 'append'
> >>> &gt; &gt; )
> >>> &gt; &gt;
> >>> &gt; &gt; 插入语句
> >>> &gt; &gt; INSERT INTO buy_cnt_per_hour&amp;nbsp;
> >>> &gt; &gt; SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> >>> HOUR)),COUNT(*)&amp;nbsp;
> >>> &gt; &gt; FROM user_behavior
> >>> &gt; &gt; WHERE behavior = 'buy'
> >>> &gt; &gt; GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> >>> &gt; &gt;
> >>> &gt; &gt; kafka数据发送代码
> >>> &gt; &gt;
> >>> &gt; &gt; import com.alibaba.fastjson.JSONObject;
> >>> &gt; &gt; import org.apache.kafka.clients.producer.KafkaProducer;
> >>> &gt; &gt; import org.apache.kafka.clients.producer.ProducerRecord;
> >>> &gt; &gt;
> >>> &gt; &gt; import java.text.SimpleDateFormat;
> >>> &gt; &gt; import java.util.*;
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt; &gt; public class UserBehaviorProducer {
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String brokerList
> >> = "
> >>> 192.168.0.150:9092";
> >>> &gt; &gt;
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;&nbsp;&nbsp; public static
> >> final
> >>> String topic="user_behavior";
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String topic =
> >>> "user_behavior";
> >>> &gt; &gt;
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static void main(String
> args[])
> >> {
> >>> &gt; &gt;
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //配置生产者客户端参数
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将配置序列化
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties
> >>> properties = new Properties();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> properties.put("key.serializer",
> >>> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> properties.put("value.serializer",
> >>> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> properties.put("bootstrap.servers", brokerList);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> //创建KafkaProducer 实例
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> KafkaProducer<String, String&amp;gt; producer = new
> >>> &gt; &gt; KafkaProducer<&amp;gt;(properties);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //构建待发送的消息
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
> >>> "952483", "item_id":"310884", "category_id":
> >>> &gt; &gt; "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //{"user_id":
> >>> "794777", "item_id":"5119439", "category_id":
> >>> &gt; &gt; "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String[]
> >>> behaviors = {"pv", "buy", "coll",
> >>> &gt; "cart"};//浏览,购买,收藏,加入购物车
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; JSONObject
> >>> jsonObject = new JSONObject();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> HashMap<String,
> >>> String&amp;gt; info = new HashMap<&amp;gt;();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Random
> random =
> >>> new Random();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >> SimpleDateFormat
> >>> format = new
> >>> &gt; &gt; SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long
> >>> date_long=getDate();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while (true)
> {
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> jsonObject.put("user_id", random.nextInt(900000) + 100000 +
> >>> &gt; &gt; "");
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> jsonObject.put("item_id", random.nextInt(900000) + 100000 +
> >>> &gt; &gt; "");
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> jsonObject.put("category_id", random.nextInt(1000) + "");
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> jsonObject.put("ts", format.format(new Date(date_long)));
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> String msg = jsonObject.toString();
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> System.out.println(msg);
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> ProducerRecord<String, String&amp;gt; record = new
> >>> &gt; &gt; ProducerRecord<&amp;gt;(topic, msg);
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> producer.send(record);
> >>> &gt; &gt;
> >>> //&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> date_long +=500+random.nextGaussian()*1000;
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> date_long +=800+random.nextGaussian()*1500;
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> try {
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> Thread.sleep(60);
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> } catch (InterruptedException e) {
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> e.printStackTrace();
> >>> &gt;
> >>>
> >>
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> }
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> >>> &gt; &gt;
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> >>> &gt; &gt;
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; private static long getDate() {
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Date date =
> new
> >>> Date();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Calendar c =
> >>> Calendar.getInstance();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >> c.setTime(date);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> //设置为1号,当前日期既为本月第一天
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> c.set(Calendar.DAY_OF_MONTH, 1);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将小时至0
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> c.set(Calendar.HOUR_OF_DAY, 0);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将分钟至0
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> c.set(Calendar.MINUTE, 0);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将秒至0
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> c.set(Calendar.SECOND,0);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将毫秒至0
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >>> c.set(Calendar.MILLISECOND, 0);
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //
> >>> 本月第一天的时间戳转换为字符串
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
> >>> c.getTimeInMillis();
> >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> >>> &gt; &gt; }
> >>> &gt; &gt;
> >>> &gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> >>> &gt; &gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
> >>> &gt; &gt; 发送时间:&amp;nbsp;2020年4月18日(星期六) 晚上10:08
> >>> &gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> >>> &gt; &gt;
> >>> &gt; &gt; 主题:&amp;nbsp;Re: 问题请教-flinksql的kafkasource方面
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt; &gt;
> >>> &gt; &gt; Hi,
> >>> &gt; &gt;
> >>> &gt; &gt; 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法,
> >>> TableEnvironment 上还未支持。
> >>> &gt; &gt; 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了"
> >> 是什么意思呢?要不举个例子看下?
> >>> &gt; &gt;
> >>> &gt; &gt; Best,
> >>> &gt; &gt; Jark
> >>> &gt; &gt;
> >>> &gt; &gt; On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]&amp;gt;
> >>> wrote:
> >>> &gt; &gt;
> >>> &gt; &gt; &amp;gt; 大佬好:
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> >>> &gt; &gt; &amp;gt;
> >>> &gt;
> >>>
> >>
> &amp;amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> &amp;amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> >>> &gt; &gt; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> >>> &gt; &gt; &amp;gt; view as ...")却会报错。报错如下:
> >>> &gt; &gt; &amp;gt; Exception in thread "main"
> >>> &gt; org.apache.flink.table.api.TableException:
> >>> &gt; &gt; &amp;gt; Unsupported query: CREATE VIEW rich_user_behavior AS
> >>> &gt; &gt; &amp;gt; SELECT U.user_id, U.item_id,
> U.behavior,&amp;amp;nbsp;
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; CASE C.parent_category_id
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 1 THEN '服饰鞋包'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 2 THEN '家装家饰'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 3 THEN '家电'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 4 THEN '美妆'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 5 THEN '母婴'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 6 THEN '3C数码'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 7 THEN '运动户外'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 8 THEN '食品'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ELSE '其他'
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; END AS category_name
> >>> &gt; &gt; &amp;gt; FROM user_behavior AS U LEFT JOIN category_dim FOR
> >>> SYSTEM_TIME AS OF
> >>> &gt; &gt; &amp;gt; U.proctime AS C
> >>> &gt; &gt; &amp;gt; ON U.category_id = C.sub_category_id
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> >>> &gt; &gt; &amp;gt; Source)
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; java.util.Optional.orElseThrow(Optional.java:290)
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> >>> &gt; &gt;
> >>>
> >>
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> >>> at
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt;
> >>> &gt;
> >>>
> >>
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt; &amp;gt;
> >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> >>> &amp;amp;nbsp; 望解答,十分感谢!
> >>> &gt;
> >>>
> >>>
> >>> --
> >>>
> >>> Benchao Li
> >>> School of Electronics Engineering and Computer Science, Peking
> University
> >>> Tel:+86-15650713730
> >>> Email: [hidden email]; [hidden email]
> >>
> >>
> >>
> >> --
> >>
> >> Benchao Li
> >> School of Electronics Engineering and Computer Science, Peking
> University
> >> Tel:+86-15650713730
> >> Email: [hidden email]; [hidden email]
> >>
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: [hidden email]; [hidden email]
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: 问题请教-flinksql的kafkasource方面

Jark
Administrator
Hi,

你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。

Best,
Jark

On Mon, 20 Apr 2020 at 22:33, Benchao Li <[hidden email]> wrote:

> 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。
>
> 祝尚 <[hidden email]> 于2020年4月20日周一 下午10:29写道:
>
> > 我们是1.8版本,但是这段源码应该是没变把
> > // check if all tasks that we need to trigger are running.
> > // if not, abort the checkpoint
> > Execution[] executions = new Execution[tasksToTrigger.length];
> > for (int i = 0; i < tasksToTrigger.length; i++) {
> >    Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
> >    if (ee == null) {
> >       LOG.info("Checkpoint triggering task {} of job {} is not being
> > executed at the moment. Aborting checkpoint.",
> >             tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> >             job);
> >       throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >    } else if (ee.getState() == ExecutionState.RUNNING) {
> >       executions[i] = ee;
> >    } else {
> >       LOG.info("Checkpoint triggering task {} of job {} is not in state
> {}
> > but {} instead. Aborting checkpoint.",
> >             tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> >             job,
> >             ExecutionState.RUNNING,
> >             ee.getState());
> >       throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >    }
> > }
> > 还是我理解的不对
> >
> > > 2020年4月20日 下午6:21,Benchao Li <[hidden email]> 写道:
> > >
> > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> > >
> > > Sun.Zhu <[hidden email]> 于2020年4月20日周一 下午5:43写道:
> > >
> > >>
> >
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> | |
> > >> Sun.Zhu
> > >> |
> > >> |
> > >> 邮箱:[hidden email]
> > >> |
> > >>
> > >> Signature is customized by Netease Mail Master
> > >>
> > >> 在2020年04月20日 10:37,Benchao Li 写道:
> > >> 应该是不会的。分配不到partition的source会标记为idle状态。
> > >>
> > >> Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道:
> > >>
> > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> | |
> > >>> Sun.Zhu
> > >>> |
> > >>> |
> > >>> 邮箱:[hidden email]
> > >>> |
> > >>>
> > >>> Signature is customized by Netease Mail Master
> > >>>
> > >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> > >>> 嗯嗯,十分感谢
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> ------------------&nbsp;原始邮件&nbsp;------------------
> > >>> 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> > >>> 发送时间:&nbsp;2020年4月19日(星期天) 晚上9:25
> > >>> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> > >>>
> > >>> 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面
> > >>>
> > >>>
> > >>>
> > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> > >>>
> > >>> Jark Wu <[hidden email]&gt; 于2020年4月19日周日 下午8:22写道:
> > >>>
> > >>> &gt; Hi,
> > >>> &gt;
> > >>> &gt; 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> > >>> &gt; 根据你的 Java 代码,数据的 event time
> > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> > >>> &gt; 能容忍 5s 乱序).
> > >>> &gt; 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition
> 进度比某些
> > >>> partition
> > >>> &gt; 进度快很多的现象,
> > >>> &gt;
> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > >>> &gt;
> > >>> &gt; 完美的解决方案还需要等 FLIP-27 的完成。
> > >>> &gt; 当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > >>> &gt;
> > >>> &gt; Best,
> > >>> &gt; Jark
> > >>> &gt;
> > >>> &gt;
> > >>> &gt; On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]&gt; wrote:
> > >>> &gt;
> > >>> &gt; &gt; 你好
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt; &gt; 附:
> > >>> &gt; &gt; userbehavior建表语句
> > >>> &gt; &gt; CREATE TABLE user_behavior (
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; user_id BIGINT,
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; item_id BIGINT,
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; category_id BIGINT,
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; behavior STRING,
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; ts TIMESTAMP(3),
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; proctime as PROCTIME(), &amp;nbsp; --
> > >>> 通过计算列产生一个处理时间列
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> > >>> SECOND &amp;nbsp;--
> > >>> &gt; &gt; 在ts上定义watermark,ts成为事件时间列
> > >>> &gt; &gt; ) WITH (
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'kafka',
> > &amp;nbsp;--
> > >>> 使用 kafka connector
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = 'universal',
> > >>> &amp;nbsp;-- kafka
> > >>> &gt; &gt; 版本,universal 支持 0.11 以上的版本
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.topic' = 'user_behavior',
> > >>> &amp;nbsp;-- kafka topic
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.startup-mode' =
> > >>> 'earliest-offset', &amp;nbsp;-- 从起始
> > >>> &gt; &gt; offset 开始读取
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp;
> > 'connector.properties.zookeeper.connect'
> > >> =
> > >>> '
> > >>> &gt; &gt; 192.168.0.150:2181', &amp;nbsp;-- zookeeper 地址
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp;
> > 'connector.properties.bootstrap.servers'
> > >> =
> > >>> '
> > >>> &gt; &gt; 192.168.0.150:9092', &amp;nbsp;-- kafka broker 地址
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json' &amp;nbsp;--
> > >> 数据源格式为
> > >>> json
> > >>> &gt; &gt; )
> > >>> &gt; &gt;
> > >>> &gt; &gt; 每小时购买数建表语句
> > >>> &gt; &gt; CREATE TABLE buy_cnt_per_hour (&amp;nbsp;
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; hour_of_day BIGINT,
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; buy_cnt BIGINT
> > >>> &gt; &gt; ) WITH (
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'elasticsearch',
> --
> > 使用
> > >>> elasticsearch
> > >>> &gt; &gt; connector
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = '6',
> &amp;nbsp;--
> > >>> elasticsearch 版本,6 能支持
> > >>> &gt; &gt; es 6+ 以及 7+ 的版本
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.hosts' = '
> > >>> <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &amp;nbsp;--
> > >>> &gt; &gt; elasticsearch 地址
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.index' =
> 'buy_cnt_per_hour',
> > >>> &amp;nbsp;--
> > >>> &gt; &gt; elasticsearch 索引名,相当于数据库的表名
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.document-type' =
> > >>> 'user_behavior', --
> > >>> &gt; &gt; elasticsearch 的 type,相当于数据库的库名
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.bulk-flush.max-actions' =
> > '1',
> > >>> &amp;nbsp;-- 每条数据都刷新
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json', &amp;nbsp;--
> > >>> 输出数据格式 json
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'update-mode' = 'append'
> > >>> &gt; &gt; )
> > >>> &gt; &gt;
> > >>> &gt; &gt; 插入语句
> > >>> &gt; &gt; INSERT INTO buy_cnt_per_hour&amp;nbsp;
> > >>> &gt; &gt; SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> > >>> HOUR)),COUNT(*)&amp;nbsp;
> > >>> &gt; &gt; FROM user_behavior
> > >>> &gt; &gt; WHERE behavior = 'buy'
> > >>> &gt; &gt; GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> > >>> &gt; &gt;
> > >>> &gt; &gt; kafka数据发送代码
> > >>> &gt; &gt;
> > >>> &gt; &gt; import com.alibaba.fastjson.JSONObject;
> > >>> &gt; &gt; import org.apache.kafka.clients.producer.KafkaProducer;
> > >>> &gt; &gt; import org.apache.kafka.clients.producer.ProducerRecord;
> > >>> &gt; &gt;
> > >>> &gt; &gt; import java.text.SimpleDateFormat;
> > >>> &gt; &gt; import java.util.*;
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt; &gt; public class UserBehaviorProducer {
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String
> brokerList
> > >> = "
> > >>> 192.168.0.150:9092";
> > >>> &gt; &gt;
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;&nbsp;&nbsp; public static
> > >> final
> > >>> String topic="user_behavior";
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String topic =
> > >>> "user_behavior";
> > >>> &gt; &gt;
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static void main(String
> > args[])
> > >> {
> > >>> &gt; &gt;
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //配置生产者客户端参数
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将配置序列化
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties
> > >>> properties = new Properties();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> properties.put("key.serializer",
> > >>> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> properties.put("value.serializer",
> > >>> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> properties.put("bootstrap.servers", brokerList);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> //创建KafkaProducer 实例
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> KafkaProducer<String, String&amp;gt; producer = new
> > >>> &gt; &gt; KafkaProducer<&amp;gt;(properties);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //构建待发送的消息
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //{"user_id":
> > >>> "952483", "item_id":"310884", "category_id":
> > >>> &gt; &gt; "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //{"user_id":
> > >>> "794777", "item_id":"5119439", "category_id":
> > >>> &gt; &gt; "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String[]
> > >>> behaviors = {"pv", "buy", "coll",
> > >>> &gt; "cart"};//浏览,购买,收藏,加入购物车
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; JSONObject
> > >>> jsonObject = new JSONObject();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > HashMap<String,
> > >>> String&amp;gt; info = new HashMap<&amp;gt;();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Random
> > random =
> > >>> new Random();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >> SimpleDateFormat
> > >>> format = new
> > >>> &gt; &gt; SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long
> > >>> date_long=getDate();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while
> (true)
> > {
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> jsonObject.put("user_id", random.nextInt(900000) + 100000 +
> > >>> &gt; &gt; "");
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> jsonObject.put("item_id", random.nextInt(900000) + 100000 +
> > >>> &gt; &gt; "");
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> jsonObject.put("category_id", random.nextInt(1000) + "");
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> jsonObject.put("ts", format.format(new Date(date_long)));
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> String msg = jsonObject.toString();
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> System.out.println(msg);
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> ProducerRecord<String, String&amp;gt; record = new
> > >>> &gt; &gt; ProducerRecord<&amp;gt;(topic, msg);
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> producer.send(record);
> > >>> &gt; &gt;
> > >>> //&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> date_long +=500+random.nextGaussian()*1000;
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> date_long +=800+random.nextGaussian()*1500;
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> try {
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> Thread.sleep(60);
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> } catch (InterruptedException e) {
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> e.printStackTrace();
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> }
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> > >>> &gt; &gt;
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> > >>> &gt; &gt;
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; private static long getDate() {
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Date date =
> > new
> > >>> Date();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Calendar c
> =
> > >>> Calendar.getInstance();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >> c.setTime(date);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> //设置为1号,当前日期既为本月第一天
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> c.set(Calendar.DAY_OF_MONTH, 1);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将小时至0
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> c.set(Calendar.HOUR_OF_DAY, 0);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将分钟至0
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> c.set(Calendar.MINUTE, 0);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将秒至0
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> c.set(Calendar.SECOND,0);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将毫秒至0
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> c.set(Calendar.MILLISECOND, 0);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //
> > >>> 本月第一天的时间戳转换为字符串
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
> > >>> c.getTimeInMillis();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> > >>> &gt; &gt; }
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> > >>> &gt; &gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
> > >>> &gt; &gt; 发送时间:&amp;nbsp;2020年4月18日(星期六) 晚上10:08
> > >>> &gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> > >>> &gt; &gt;
> > >>> &gt; &gt; 主题:&amp;nbsp;Re: 问题请教-flinksql的kafkasource方面
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt; &gt; Hi,
> > >>> &gt; &gt;
> > >>> &gt; &gt; 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法,
> > >>> TableEnvironment 上还未支持。
> > >>> &gt; &gt; 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了"
> > >> 是什么意思呢?要不举个例子看下?
> > >>> &gt; &gt;
> > >>> &gt; &gt; Best,
> > >>> &gt; &gt; Jark
> > >>> &gt; &gt;
> > >>> &gt; &gt; On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]
> &amp;gt;
> > >>> wrote:
> > >>> &gt; &gt;
> > >>> &gt; &gt; &amp;gt; 大佬好:
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt;
> > >>>
> > >>
> >
> &amp;amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> &amp;amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> > >>> &gt; &gt; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> > >>> &gt; &gt; &amp;gt; view as ...")却会报错。报错如下:
> > >>> &gt; &gt; &amp;gt; Exception in thread "main"
> > >>> &gt; org.apache.flink.table.api.TableException:
> > >>> &gt; &gt; &amp;gt; Unsupported query: CREATE VIEW rich_user_behavior
> AS
> > >>> &gt; &gt; &amp;gt; SELECT U.user_id, U.item_id,
> > U.behavior,&amp;amp;nbsp;
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; CASE C.parent_category_id
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 1 THEN '服饰鞋包'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 2 THEN '家装家饰'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 3 THEN '家电'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 4 THEN '美妆'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 5 THEN '母婴'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 6 THEN '3C数码'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 7 THEN '运动户外'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 8 THEN '食品'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ELSE '其他'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; END AS category_name
> > >>> &gt; &gt; &amp;gt; FROM user_behavior AS U LEFT JOIN category_dim FOR
> > >>> SYSTEM_TIME AS OF
> > >>> &gt; &gt; &amp;gt; U.proctime AS C
> > >>> &gt; &gt; &amp;gt; ON U.category_id = C.sub_category_id
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> > >>> &gt; &gt; &amp;gt; Source)
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; java.util.Optional.orElseThrow(Optional.java:290)
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> > >>> &amp;amp;nbsp; 望解答,十分感谢!
> > >>> &gt;
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>> Benchao Li
> > >>> School of Electronics Engineering and Computer Science, Peking
> > University
> > >>> Tel:+86-15650713730
> > >>> Email: [hidden email]; [hidden email]
> > >>
> > >>
> > >>
> > >> --
> > >>
> > >> Benchao Li
> > >> School of Electronics Engineering and Computer Science, Peking
> > University
> > >> Tel:+86-15650713730
> > >> Email: [hidden email]; [hidden email]
> > >>
> > >
> > >
> > > --
> > >
> > > Benchao Li
> > > School of Electronics Engineering and Computer Science, Peking
> University
> > > Tel:+86-15650713730
> > > Email: [hidden email]; [hidden email]
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

回复:问题请教-flinksql的kafkasource方面

admin
嗯是的,都设置成小于等于partition数




| |
Sun.Zhu
|
|
邮箱:[hidden email]
|

Signature is customized by Netease Mail Master

在2020年04月21日 00:28,Jark Wu 写道:
Hi,

你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。

Best,
Jark

On Mon, 20 Apr 2020 at 22:33, Benchao Li <[hidden email]> wrote:

> 我对checkpoint这块不是很了解。不过subtask标记为idle跟finish还是有区别的吧。
>
> 祝尚 <[hidden email]> 于2020年4月20日周一 下午10:29写道:
>
> > 我们是1.8版本,但是这段源码应该是没变把
> > // check if all tasks that we need to trigger are running.
> > // if not, abort the checkpoint
> > Execution[] executions = new Execution[tasksToTrigger.length];
> > for (int i = 0; i < tasksToTrigger.length; i++) {
> >    Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
> >    if (ee == null) {
> >       LOG.info("Checkpoint triggering task {} of job {} is not being
> > executed at the moment. Aborting checkpoint.",
> >             tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> >             job);
> >       throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >    } else if (ee.getState() == ExecutionState.RUNNING) {
> >       executions[i] = ee;
> >    } else {
> >       LOG.info("Checkpoint triggering task {} of job {} is not in state
> {}
> > but {} instead. Aborting checkpoint.",
> >             tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
> >             job,
> >             ExecutionState.RUNNING,
> >             ee.getState());
> >       throw new
> >
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
> >    }
> > }
> > 还是我理解的不对
> >
> > > 2020年4月20日 下午6:21,Benchao Li <[hidden email]> 写道:
> > >
> > > 你们用的是哪个版本?我们用的是1.9,还没有遇到过这个问题呢。
> > >
> > > Sun.Zhu <[hidden email]> 于2020年4月20日周一 下午5:43写道:
> > >
> > >>
> >
> 我们生产上遇到过这个问题,分不到partition的source短暂时间后变成了finished状态,在做checkpoint时会检查所有executor的状态是不是running,否则将不做checkpoint
> > >> ,源码CheckpointCoordinator#triggerCheckpoint也有说明
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> | |
> > >> Sun.Zhu
> > >> |
> > >> |
> > >> 邮箱:[hidden email]
> > >> |
> > >>
> > >> Signature is customized by Netease Mail Master
> > >>
> > >> 在2020年04月20日 10:37,Benchao Li 写道:
> > >> 应该是不会的。分配不到partition的source会标记为idle状态。
> > >>
> > >> Sun.Zhu <[hidden email]> 于2020年4月20日周一 上午10:28写道:
> > >>
> > >>> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> | |
> > >>> Sun.Zhu
> > >>> |
> > >>> |
> > >>> 邮箱:[hidden email]
> > >>> |
> > >>>
> > >>> Signature is customized by Netease Mail Master
> > >>>
> > >>> 在2020年04月19日 22:43,人生若只如初见 写道:
> > >>> 嗯嗯,十分感谢
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> ------------------&nbsp;原始邮件&nbsp;------------------
> > >>> 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> > >>> 发送时间:&nbsp;2020年4月19日(星期天) 晚上9:25
> > >>> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> > >>>
> > >>> 主题:&nbsp;Re: 问题请教-flinksql的kafkasource方面
> > >>>
> > >>>
> > >>>
> > >>> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
> > >>>
> > >>> Jark Wu <[hidden email]&gt; 于2020年4月19日周日 下午8:22写道:
> > >>>
> > >>> &gt; Hi,
> > >>> &gt;
> > >>> &gt; 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> > >>> &gt; 根据你的 Java 代码,数据的 event time
> > >>> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> > >>> &gt; 能容忍 5s 乱序).
> > >>> &gt; 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition
> 进度比某些
> > >>> partition
> > >>> &gt; 进度快很多的现象,
> > >>> &gt;
> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> > >>> &gt;
> > >>> &gt; 完美的解决方案还需要等 FLIP-27 的完成。
> > >>> &gt; 当前可以通过增加 watermark delay来增大迟到数据的容忍。
> > >>> &gt;
> > >>> &gt; Best,
> > >>> &gt; Jark
> > >>> &gt;
> > >>> &gt;
> > >>> &gt; On Sat, 18 Apr 2020 at 22:53, 人生若只如初见 <[hidden email]&gt; wrote:
> > >>> &gt;
> > >>> &gt; &gt; 你好
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt; &gt; 附:
> > >>> &gt; &gt; userbehavior建表语句
> > >>> &gt; &gt; CREATE TABLE user_behavior (
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; user_id BIGINT,
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; item_id BIGINT,
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; category_id BIGINT,
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; behavior STRING,
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; ts TIMESTAMP(3),
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; proctime as PROCTIME(), &amp;nbsp; --
> > >>> 通过计算列产生一个处理时间列
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> > >>> SECOND &amp;nbsp;--
> > >>> &gt; &gt; 在ts上定义watermark,ts成为事件时间列
> > >>> &gt; &gt; ) WITH (
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'kafka',
> > &amp;nbsp;--
> > >>> 使用 kafka connector
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = 'universal',
> > >>> &amp;nbsp;-- kafka
> > >>> &gt; &gt; 版本,universal 支持 0.11 以上的版本
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.topic' = 'user_behavior',
> > >>> &amp;nbsp;-- kafka topic
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.startup-mode' =
> > >>> 'earliest-offset', &amp;nbsp;-- 从起始
> > >>> &gt; &gt; offset 开始读取
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp;
> > 'connector.properties.zookeeper.connect'
> > >> =
> > >>> '
> > >>> &gt; &gt; 192.168.0.150:2181', &amp;nbsp;-- zookeeper 地址
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp;
> > 'connector.properties.bootstrap.servers'
> > >> =
> > >>> '
> > >>> &gt; &gt; 192.168.0.150:9092', &amp;nbsp;-- kafka broker 地址
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json' &amp;nbsp;--
> > >> 数据源格式为
> > >>> json
> > >>> &gt; &gt; )
> > >>> &gt; &gt;
> > >>> &gt; &gt; 每小时购买数建表语句
> > >>> &gt; &gt; CREATE TABLE buy_cnt_per_hour (&amp;nbsp;
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; hour_of_day BIGINT,
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; buy_cnt BIGINT
> > >>> &gt; &gt; ) WITH (
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.type' = 'elasticsearch',
> --
> > 使用
> > >>> elasticsearch
> > >>> &gt; &gt; connector
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.version' = '6',
> &amp;nbsp;--
> > >>> elasticsearch 版本,6 能支持
> > >>> &gt; &gt; es 6+ 以及 7+ 的版本
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.hosts' = '
> > >>> <a href="http://192.168.0.150:9200'">http://192.168.0.150:9200', &amp;nbsp;--
> > >>> &gt; &gt; elasticsearch 地址
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.index' =
> 'buy_cnt_per_hour',
> > >>> &amp;nbsp;--
> > >>> &gt; &gt; elasticsearch 索引名,相当于数据库的表名
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.document-type' =
> > >>> 'user_behavior', --
> > >>> &gt; &gt; elasticsearch 的 type,相当于数据库的库名
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'connector.bulk-flush.max-actions' =
> > '1',
> > >>> &amp;nbsp;-- 每条数据都刷新
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'format.type' = 'json', &amp;nbsp;--
> > >>> 输出数据格式 json
> > >>> &gt; &gt; &amp;nbsp; &amp;nbsp; 'update-mode' = 'append'
> > >>> &gt; &gt; )
> > >>> &gt; &gt;
> > >>> &gt; &gt; 插入语句
> > >>> &gt; &gt; INSERT INTO buy_cnt_per_hour&amp;nbsp;
> > >>> &gt; &gt; SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> > >>> HOUR)),COUNT(*)&amp;nbsp;
> > >>> &gt; &gt; FROM user_behavior
> > >>> &gt; &gt; WHERE behavior = 'buy'
> > >>> &gt; &gt; GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> > >>> &gt; &gt;
> > >>> &gt; &gt; kafka数据发送代码
> > >>> &gt; &gt;
> > >>> &gt; &gt; import com.alibaba.fastjson.JSONObject;
> > >>> &gt; &gt; import org.apache.kafka.clients.producer.KafkaProducer;
> > >>> &gt; &gt; import org.apache.kafka.clients.producer.ProducerRecord;
> > >>> &gt; &gt;
> > >>> &gt; &gt; import java.text.SimpleDateFormat;
> > >>> &gt; &gt; import java.util.*;
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt; &gt; public class UserBehaviorProducer {
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String
> brokerList
> > >> = "
> > >>> 192.168.0.150:9092";
> > >>> &gt; &gt;
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; //&nbsp;&nbsp;&nbsp; public static
> > >> final
> > >>> String topic="user_behavior";
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static final String topic =
> > >>> "user_behavior";
> > >>> &gt; &gt;
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; public static void main(String
> > args[])
> > >> {
> > >>> &gt; &gt;
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //配置生产者客户端参数
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将配置序列化
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties
> > >>> properties = new Properties();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> properties.put("key.serializer",
> > >>> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> properties.put("value.serializer",
> > >>> &gt; &gt; "org.apache.kafka.common.serialization.StringSerializer");
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> properties.put("bootstrap.servers", brokerList);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> //创建KafkaProducer 实例
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> KafkaProducer<String, String&amp;gt; producer = new
> > >>> &gt; &gt; KafkaProducer<&amp;gt;(properties);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //构建待发送的消息
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //{"user_id":
> > >>> "952483", "item_id":"310884", "category_id":
> > >>> &gt; &gt; "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //{"user_id":
> > >>> "794777", "item_id":"5119439", "category_id":
> > >>> &gt; &gt; "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String[]
> > >>> behaviors = {"pv", "buy", "coll",
> > >>> &gt; "cart"};//浏览,购买,收藏,加入购物车
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; JSONObject
> > >>> jsonObject = new JSONObject();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > HashMap<String,
> > >>> String&amp;gt; info = new HashMap<&amp;gt;();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Random
> > random =
> > >>> new Random();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >> SimpleDateFormat
> > >>> format = new
> > >>> &gt; &gt; SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; long
> > >>> date_long=getDate();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while
> (true)
> > {
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> jsonObject.put("user_id", random.nextInt(900000) + 100000 +
> > >>> &gt; &gt; "");
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> jsonObject.put("item_id", random.nextInt(900000) + 100000 +
> > >>> &gt; &gt; "");
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> jsonObject.put("category_id", random.nextInt(1000) + "");
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> jsonObject.put("ts", format.format(new Date(date_long)));
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> String msg = jsonObject.toString();
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> System.out.println(msg);
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> ProducerRecord<String, String&amp;gt; record = new
> > >>> &gt; &gt; ProducerRecord<&amp;gt;(topic, msg);
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> producer.send(record);
> > >>> &gt; &gt;
> > >>> //&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> date_long +=500+random.nextGaussian()*1000;
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> date_long +=800+random.nextGaussian()*1500;
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> try {
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> Thread.sleep(60);
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> } catch (InterruptedException e) {
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> e.printStackTrace();
> > >>> &gt;
> > >>>
> > >>
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> }
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> > >>> &gt; &gt;
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> > >>> &gt; &gt;
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; private static long getDate() {
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Date date =
> > new
> > >>> Date();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Calendar c
> =
> > >>> Calendar.getInstance();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >> c.setTime(date);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> //设置为1号,当前日期既为本月第一天
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> c.set(Calendar.DAY_OF_MONTH, 1);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将小时至0
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> c.set(Calendar.HOUR_OF_DAY, 0);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将分钟至0
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> c.set(Calendar.MINUTE, 0);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将秒至0
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> c.set(Calendar.SECOND,0);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //将毫秒至0
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > >>> c.set(Calendar.MILLISECOND, 0);
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //
> > >>> 本月第一天的时间戳转换为字符串
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
> > >>> c.getTimeInMillis();
> > >>> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; }
> > >>> &gt; &gt; }
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> > >>> &gt; &gt; 发件人:&amp;nbsp;"Jark Wu"<[hidden email]&amp;gt;;
> > >>> &gt; &gt; 发送时间:&amp;nbsp;2020年4月18日(星期六) 晚上10:08
> > >>> &gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> > >>> &gt; &gt;
> > >>> &gt; &gt; 主题:&amp;nbsp;Re: 问题请教-flinksql的kafkasource方面
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt; &gt;
> > >>> &gt; &gt; Hi,
> > >>> &gt; &gt;
> > >>> &gt; &gt; 关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法,
> > >>> TableEnvironment 上还未支持。
> > >>> &gt; &gt; 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了"
> > >> 是什么意思呢?要不举个例子看下?
> > >>> &gt; &gt;
> > >>> &gt; &gt; Best,
> > >>> &gt; &gt; Jark
> > >>> &gt; &gt;
> > >>> &gt; &gt; On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]
> &amp;gt;
> > >>> wrote:
> > >>> &gt; &gt;
> > >>> &gt; &gt; &amp;gt; 大佬好:
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt;
> > >>>
> > >>
> >
> &amp;amp;nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> &amp;amp;nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> > >>> &gt; &gt; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> > >>> &gt; &gt; &amp;gt; view as ...")却会报错。报错如下:
> > >>> &gt; &gt; &amp;gt; Exception in thread "main"
> > >>> &gt; org.apache.flink.table.api.TableException:
> > >>> &gt; &gt; &amp;gt; Unsupported query: CREATE VIEW rich_user_behavior
> AS
> > >>> &gt; &gt; &amp;gt; SELECT U.user_id, U.item_id,
> > U.behavior,&amp;amp;nbsp;
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; CASE C.parent_category_id
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 1 THEN '服饰鞋包'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 2 THEN '家装家饰'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 3 THEN '家电'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 4 THEN '美妆'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 5 THEN '母婴'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 6 THEN '3C数码'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 7 THEN '运动户外'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; WHEN 8 THEN '食品'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ELSE '其他'
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; END AS category_name
> > >>> &gt; &gt; &amp;gt; FROM user_behavior AS U LEFT JOIN category_dim FOR
> > >>> SYSTEM_TIME AS OF
> > >>> &gt; &gt; &amp;gt; U.proctime AS C
> > >>> &gt; &gt; &amp;gt; ON U.category_id = C.sub_category_id
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> > >>> &gt; &gt; &amp;gt; Source)
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; java.util.Optional.orElseThrow(Optional.java:290)
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> > >>> &gt; &gt;
> > >>>
> > >>
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > >>> at
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt;
> > >>> &gt;
> > >>>
> > >>
> >
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt; &amp;gt;
> > >>> &gt; &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> > >>> &amp;amp;nbsp; 望解答,十分感谢!
> > >>> &gt;
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>> Benchao Li
> > >>> School of Electronics Engineering and Computer Science, Peking
> > University
> > >>> Tel:+86-15650713730
> > >>> Email: [hidden email]; [hidden email]
> > >>
> > >>
> > >>
> > >> --
> > >>
> > >> Benchao Li
> > >> School of Electronics Engineering and Computer Science, Peking
> > University
> > >> Tel:+86-15650713730
> > >> Email: [hidden email]; [hidden email]
> > >>
> > >
> > >
> > > --
> > >
> > > Benchao Li
> > > School of Electronics Engineering and Computer Science, Peking
> University
> > > Tel:+86-15650713730
> > > Email: [hidden email]; [hidden email]
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>