上游是upsert-kafka connector 创建的table, 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
(为了测试方便,table里只有同一个PK的数据) |
Administrator
|
可以发下代码吗?
On Thu, 10 Dec 2020 at 11:19, bulterman <[hidden email]> wrote: > 上游是upsert-kafka connector 创建的table, > 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 > (为了测试方便,table里只有同一个PK的数据) |
// kafka table
tableEnv.execuetSql("CREATE TABLE market_stock(\n" + " Code STRING,\n" + " Amount BIGINT,\n" + ...... " PRIMARY KEY (Code) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'upsert-kafka',\n" + " 'topic' = 'zzz',\n" + " 'properties.bootstrap.servers' = '10.0.3.20:9092,10.0.3.24:9092,10.0.3.26:9092',\n" + " 'properties.group.id' = 'sqltest46',\n" + " 'key.format' = 'raw',\n" + " 'value.format' = 'json'\n" + ")"); // 使用UDAF计算 Table table = bsTableEnv.sqlQuery("select Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock GROUP BY Code"); env.toRetractStream(table,Row.class).print(); // UDAF的定义如下 public class MainFundFlowFunc extends AggregateFunction<Row, AmountAccum> { @Override public Row getValue(AmountAccum acc) { Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs; double mfr = acc.lastAmount > 0 ? MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0; return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb,acc.sb,mf,mfr); } @Override public AmountAccum createAccumulator() { return new AmountAccum(); } public void accumulate(AmountAccum acc, Long amount, Double askPrice1, Double bidPrice1, Double last) { //...... acc.lastAmount = amount; acc.lastAskPrice1 = askPrice1; acc.lastBidPrice1 = bidPrice1; } public void retract(AmountAccum acc, Long amount, Double askPrice1, Double bidPrice1, Double last) { acc.lastAmount = amount; acc.lastAskPrice1 = askPrice1; acc.lastBidPrice1 = bidPrice1; } } // acc public class AmountAccum { public Double lastAskPrice1; public Double lastBidPrice1; public Long lastAmount = 0L; public Long ebs = 0L; public Long bs = 0L; public Long ms = 0L; public Long ss = 0L; public Long ebb = 0L; public Long bb = 0L; public Long mb = 0L; public Long sb = 0L; } debug观察acc的lastAmount值,一直是0. 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。 是我的使用姿势不对吗= = 在 2020-12-10 11:30:31,"Jark Wu" <[hidden email]> 写道: >可以发下代码吗? > >On Thu, 10 Dec 2020 at 11:19, bulterman <[hidden email]> wrote: > >> 上游是upsert-kafka connector 创建的table, >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 >> (为了测试方便,table里只有同一个PK的数据) |
Administrator
|
因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。 Best, Jark On Thu, 10 Dec 2020 at 12:36, bulterman <[hidden email]> wrote: > // kafka table > tableEnv.execuetSql("CREATE TABLE market_stock(\n" + > > " Code STRING,\n" + > > " Amount BIGINT,\n" + > > ...... > > " PRIMARY KEY (Code) NOT ENFORCED\n" + > > ") WITH (\n" + > > " 'connector' = 'upsert-kafka',\n" + > > " 'topic' = 'zzz',\n" + > > " 'properties.bootstrap.servers' = '10.0.3.20:9092, > 10.0.3.24:9092,10.0.3.26:9092',\n" + > > " 'properties.group.id' = 'sqltest46',\n" + > > " 'key.format' = 'raw',\n" + > > " 'value.format' = 'json'\n" + > > ")"); > // 使用UDAF计算 > Table table = bsTableEnv.sqlQuery("select > Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock > GROUP BY Code"); > env.toRetractStream(table,Row.class).print(); > > > // UDAF的定义如下 > public class MainFundFlowFunc extends AggregateFunction<Row, AmountAccum> { > @Override > public Row getValue(AmountAccum acc) { > Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs; > double mfr = acc.lastAmount > 0 ? > MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0; > return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb, > acc.sb,mf,mfr); > } > @Override > public AmountAccum createAccumulator() { > return new AmountAccum(); > } > > public void accumulate(AmountAccum acc, Long amount, Double askPrice1, > Double bidPrice1, Double last) { > //...... > acc.lastAmount = amount; > acc.lastAskPrice1 = askPrice1; > acc.lastBidPrice1 = bidPrice1; > } > public void retract(AmountAccum acc, Long amount, Double askPrice1, > Double bidPrice1, Double last) { > acc.lastAmount = amount; > acc.lastAskPrice1 = askPrice1; > acc.lastBidPrice1 = bidPrice1; > } > > } > > > > > // acc > public class AmountAccum { > public Double lastAskPrice1; > public Double lastBidPrice1; > > public Long lastAmount = 0L; > > public Long ebs = 0L; > > public Long bs = 0L; > > public Long ms = 0L; > > public Long ss = 0L; > > public Long ebb = 0L; > > public Long bb = 0L; > > public Long mb = 0L; > > public Long sb = 0L; > } > > > debug观察acc的lastAmount值,一直是0. > > > 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY > Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。 > 是我的使用姿势不对吗= = > > 在 2020-12-10 11:30:31,"Jark Wu" <[hidden email]> 写道: > >可以发下代码吗? > > > >On Thu, 10 Dec 2020 at 11:19, bulterman <[hidden email]> wrote: > > > >> 上游是upsert-kafka connector 创建的table, > >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 > >> (为了测试方便,table里只有同一个PK的数据) > |
假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200
1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是 :0, 100, 200 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount = Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0? 也是因为表中仅保留一条Code X的数据的关系吗? 那在upsert kafka table中(Code X只保留最新一条数据),假设要累加Code X的Amount,期望的输出是:0,100,300...,应该如何实现? 求大佬解惑>< 在 2020-12-10 13:47:57,"Jark Wu" <[hidden email]> 写道: >因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code >下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。 > >Best, >Jark > >On Thu, 10 Dec 2020 at 12:36, bulterman <[hidden email]> wrote: > >> // kafka table >> tableEnv.execuetSql("CREATE TABLE market_stock(\n" + >> >> " Code STRING,\n" + >> >> " Amount BIGINT,\n" + >> >> ...... >> >> " PRIMARY KEY (Code) NOT ENFORCED\n" + >> >> ") WITH (\n" + >> >> " 'connector' = 'upsert-kafka',\n" + >> >> " 'topic' = 'zzz',\n" + >> >> " 'properties.bootstrap.servers' = '10.0.3.20:9092, >> 10.0.3.24:9092,10.0.3.26:9092',\n" + >> >> " 'properties.group.id' = 'sqltest46',\n" + >> >> " 'key.format' = 'raw',\n" + >> >> " 'value.format' = 'json'\n" + >> >> ")"); >> // 使用UDAF计算 >> Table table = bsTableEnv.sqlQuery("select >> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock >> GROUP BY Code"); >> env.toRetractStream(table,Row.class).print(); >> >> >> // UDAF的定义如下 >> public class MainFundFlowFunc extends AggregateFunction<Row, AmountAccum> { >> @Override >> public Row getValue(AmountAccum acc) { >> Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs; >> double mfr = acc.lastAmount > 0 ? >> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0; >> return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb, >> acc.sb,mf,mfr); >> } >> @Override >> public AmountAccum createAccumulator() { >> return new AmountAccum(); >> } >> >> public void accumulate(AmountAccum acc, Long amount, Double askPrice1, >> Double bidPrice1, Double last) { >> //...... >> acc.lastAmount = amount; >> acc.lastAskPrice1 = askPrice1; >> acc.lastBidPrice1 = bidPrice1; >> } >> public void retract(AmountAccum acc, Long amount, Double askPrice1, >> Double bidPrice1, Double last) { >> acc.lastAmount = amount; >> acc.lastAskPrice1 = askPrice1; >> acc.lastBidPrice1 = bidPrice1; >> } >> >> } >> >> >> >> >> // acc >> public class AmountAccum { >> public Double lastAskPrice1; >> public Double lastBidPrice1; >> >> public Long lastAmount = 0L; >> >> public Long ebs = 0L; >> >> public Long bs = 0L; >> >> public Long ms = 0L; >> >> public Long ss = 0L; >> >> public Long ebb = 0L; >> >> public Long bb = 0L; >> >> public Long mb = 0L; >> >> public Long sb = 0L; >> } >> >> >> debug观察acc的lastAmount值,一直是0. >> >> >> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY >> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。 >> 是我的使用姿势不对吗= = >> >> 在 2020-12-10 11:30:31,"Jark Wu" <[hidden email]> 写道: >> >可以发下代码吗? >> > >> >On Thu, 10 Dec 2020 at 11:19, bulterman <[hidden email]> wrote: >> > >> >> 上游是upsert-kafka connector 创建的table, >> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 >> >> (为了测试方便,table里只有同一个PK的数据) >> |
Administrator
|
你可以把 upsert kafka 想象成是 mysql 表的实时物化视图,
你在 mysql 里面 code 是 key,amount 是 value。当你把 amount 从0 更新成 100, 200。 那么最后的 sum(amount) 结果自然是 200。 如果你想要 0 -> 100 -> 300, 说明你不想把这个数据看成是有 pk 更新的数据,而是一条条独立的数据,这个时候你声明成 kafka connector,不定义 pk 即可,也就是当成普通 log 处理了。 关于你的 UDAF 的问题,估计是你实现的问题,因为你在 retract 方法中又把值设回 previous 值了。 Best, Jark On Thu, 10 Dec 2020 at 15:04, bulterman <[hidden email]> wrote: > 假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200 > 1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是 > :0, 100, 200 > 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount = > Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0? > 也是因为表中仅保留一条Code X的数据的关系吗? > > > 那在upsert kafka table中(Code X只保留最新一条数据),假设要累加Code > X的Amount,期望的输出是:0,100,300...,应该如何实现? > 求大佬解惑>< > > > > > > > > > > > > > > > > > > 在 2020-12-10 13:47:57,"Jark Wu" <[hidden email]> 写道: > >因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code > >下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。 > > > >Best, > >Jark > > > >On Thu, 10 Dec 2020 at 12:36, bulterman <[hidden email]> wrote: > > > >> // kafka table > >> tableEnv.execuetSql("CREATE TABLE market_stock(\n" + > >> > >> " Code STRING,\n" + > >> > >> " Amount BIGINT,\n" + > >> > >> ...... > >> > >> " PRIMARY KEY (Code) NOT ENFORCED\n" + > >> > >> ") WITH (\n" + > >> > >> " 'connector' = 'upsert-kafka',\n" + > >> > >> " 'topic' = 'zzz',\n" + > >> > >> " 'properties.bootstrap.servers' = '10.0.3.20:9092, > >> 10.0.3.24:9092,10.0.3.26:9092',\n" + > >> > >> " 'properties.group.id' = 'sqltest46',\n" + > >> > >> " 'key.format' = 'raw',\n" + > >> > >> " 'value.format' = 'json'\n" + > >> > >> ")"); > >> // 使用UDAF计算 > >> Table table = bsTableEnv.sqlQuery("select > >> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock > >> GROUP BY Code"); > >> env.toRetractStream(table,Row.class).print(); > >> > >> > >> // UDAF的定义如下 > >> public class MainFundFlowFunc extends AggregateFunction<Row, > AmountAccum> { > >> @Override > >> public Row getValue(AmountAccum acc) { > >> Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs; > >> double mfr = acc.lastAmount > 0 ? > >> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0; > >> return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb > ,acc.mb, > >> acc.sb,mf,mfr); > >> } > >> @Override > >> public AmountAccum createAccumulator() { > >> return new AmountAccum(); > >> } > >> > >> public void accumulate(AmountAccum acc, Long amount, Double > askPrice1, > >> Double bidPrice1, Double last) { > >> //...... > >> acc.lastAmount = amount; > >> acc.lastAskPrice1 = askPrice1; > >> acc.lastBidPrice1 = bidPrice1; > >> } > >> public void retract(AmountAccum acc, Long amount, Double askPrice1, > >> Double bidPrice1, Double last) { > >> acc.lastAmount = amount; > >> acc.lastAskPrice1 = askPrice1; > >> acc.lastBidPrice1 = bidPrice1; > >> } > >> > >> } > >> > >> > >> > >> > >> // acc > >> public class AmountAccum { > >> public Double lastAskPrice1; > >> public Double lastBidPrice1; > >> > >> public Long lastAmount = 0L; > >> > >> public Long ebs = 0L; > >> > >> public Long bs = 0L; > >> > >> public Long ms = 0L; > >> > >> public Long ss = 0L; > >> > >> public Long ebb = 0L; > >> > >> public Long bb = 0L; > >> > >> public Long mb = 0L; > >> > >> public Long sb = 0L; > >> } > >> > >> > >> debug观察acc的lastAmount值,一直是0. > >> > >> > >> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY > >> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。 > >> 是我的使用姿势不对吗= = > >> > >> 在 2020-12-10 11:30:31,"Jark Wu" <[hidden email]> 写道: > >> >可以发下代码吗? > >> > > >> >On Thu, 10 Dec 2020 at 11:19, bulterman <[hidden email]> wrote: > >> > > >> >> 上游是upsert-kafka connector 创建的table, > >> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 > >> >> (为了测试方便,table里只有同一个PK的数据) > >> > |
Free forum by Nabble | Edit this page |