问题请教-flinksql的kafkasource方面

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

问题请教-flinksql的kafkasource方面

人生若只如初见
大佬好:
     请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
     问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。   
    问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create view as ...")却会报错。报错如下:
Exception in thread "main" org.apache.flink.table.api.TableException: Unsupported query: CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, 
  CASE C.parent_category_id
    WHEN 1 THEN '服饰鞋包'
    WHEN 2 THEN '家装家饰'
    WHEN 3 THEN '家电'
    WHEN 4 THEN '美妆'
    WHEN 5 THEN '母婴'
    WHEN 6 THEN '3C数码'
    WHEN 7 THEN '运动户外'
    WHEN 8 THEN '食品'
    ELSE '其他'
  END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id
        at org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
        at org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown Source)
        at java.util.Optional.orElseThrow(Optional.java:290)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
        at com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)





        望解答,十分感谢!
Reply | Threaded
Open this post in threaded view
|

Re: 问题请教-flinksql的kafkasource方面

Jark
Administrator
Hi,

关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。
关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?

Best,
Jark

On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]> wrote:

> 大佬好:
> &nbsp; &nbsp;
> &nbsp;请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
> &nbsp; &nbsp;
> &nbsp;问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
> &nbsp;&nbsp;
> &nbsp; &nbsp; 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> view as ...")却会报错。报错如下:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Unsupported query: CREATE VIEW rich_user_behavior AS
> SELECT U.user_id, U.item_id, U.behavior,&nbsp;
> &nbsp; CASE C.parent_category_id
> &nbsp; &nbsp; WHEN 1 THEN '服饰鞋包'
> &nbsp; &nbsp; WHEN 2 THEN '家装家饰'
> &nbsp; &nbsp; WHEN 3 THEN '家电'
> &nbsp; &nbsp; WHEN 4 THEN '美妆'
> &nbsp; &nbsp; WHEN 5 THEN '母婴'
> &nbsp; &nbsp; WHEN 6 THEN '3C数码'
> &nbsp; &nbsp; WHEN 7 THEN '运动户外'
> &nbsp; &nbsp; WHEN 8 THEN '食品'
> &nbsp; &nbsp; ELSE '其他'
> &nbsp; END AS category_name
> FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
> U.proctime AS C
> ON U.category_id = C.sub_category_id
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> Source)
>         at java.util.Optional.orElseThrow(Optional.java:290)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>         at
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
>
>
>
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; 望解答,十分感谢!