大佬好:
请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create view as ...")却会报错。报错如下: Exception in thread "main" org.apache.flink.table.api.TableException: Unsupported query: CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, CASE C.parent_category_id WHEN 1 THEN '服饰鞋包' WHEN 2 THEN '家装家饰' WHEN 3 THEN '家电' WHEN 4 THEN '美妆' WHEN 5 THEN '母婴' WHEN 6 THEN '3C数码' WHEN 7 THEN '运动户外' WHEN 8 THEN '食品' ELSE '其他' END AS category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.sub_category_id at org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) at org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown Source) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) 望解答,十分感谢! |
Administrator
|
Hi,
关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。 关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下? Best, Jark On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <[hidden email]> wrote: > 大佬好: > > 请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 > > 问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 > > 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create > view as ...")却会报错。报错如下: > Exception in thread "main" org.apache.flink.table.api.TableException: > Unsupported query: CREATE VIEW rich_user_behavior AS > SELECT U.user_id, U.item_id, U.behavior, > CASE C.parent_category_id > WHEN 1 THEN '服饰鞋包' > WHEN 2 THEN '家装家饰' > WHEN 3 THEN '家电' > WHEN 4 THEN '美妆' > WHEN 5 THEN '母婴' > WHEN 6 THEN '3C数码' > WHEN 7 THEN '运动户外' > WHEN 8 THEN '食品' > ELSE '其他' > END AS category_name > FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF > U.proctime AS C > ON U.category_id = C.sub_category_id > at > org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) > at > org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown > Source) > at java.util.Optional.orElseThrow(Optional.java:290) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51) > > > > > > 望解答,十分感谢! |
Free forum by Nabble | Edit this page |