retract stream UDAF使用问题

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

retract stream UDAF使用问题

bulterman
上游是upsert-kafka connector 创建的table, 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
(为了测试方便,table里只有同一个PK的数据)
Reply | Threaded
Open this post in threaded view
|

Re: retract stream UDAF使用问题

Jark
Administrator
可以发下代码吗?

On Thu, 10 Dec 2020 at 11:19, bulterman <[hidden email]> wrote:

> 上游是upsert-kafka connector 创建的table,
> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> (为了测试方便,table里只有同一个PK的数据)
Reply | Threaded
Open this post in threaded view
|

Re:Re: retract stream UDAF使用问题

bulterman
// kafka table
tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +

                "    Code STRING,\n" +

                "    Amount BIGINT,\n" +

                ......

                "    PRIMARY KEY (Code) NOT ENFORCED\n" +

                ") WITH (\n" +

                "    'connector' = 'upsert-kafka',\n" +

                "    'topic' = 'zzz',\n" +

                "    'properties.bootstrap.servers' = '10.0.3.20:9092,10.0.3.24:9092,10.0.3.26:9092',\n" +

                "    'properties.group.id' = 'sqltest46',\n" +

                "    'key.format' = 'raw',\n" +

                "    'value.format' = 'json'\n" +

                ")");
// 使用UDAF计算
Table table = bsTableEnv.sqlQuery("select Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock GROUP BY Code");
env.toRetractStream(table,Row.class).print();


// UDAF的定义如下
public class MainFundFlowFunc extends AggregateFunction<Row, AmountAccum> {
    @Override
    public Row getValue(AmountAccum acc) {
        Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
        double mfr = acc.lastAmount > 0 ? MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
        return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb,acc.sb,mf,mfr);
    }
    @Override
    public AmountAccum createAccumulator() {
        return new AmountAccum();
    }
   
    public void accumulate(AmountAccum acc, Long amount, Double askPrice1, Double bidPrice1, Double last) {
        //......
       acc.lastAmount = amount;
        acc.lastAskPrice1 = askPrice1;
        acc.lastBidPrice1 = bidPrice1;
    }
    public void retract(AmountAccum acc, Long amount, Double askPrice1, Double bidPrice1, Double last) {
        acc.lastAmount = amount;
        acc.lastAskPrice1 = askPrice1;
        acc.lastBidPrice1 = bidPrice1;
    }
   
}




// acc
public class AmountAccum {
    public Double lastAskPrice1;
    public Double lastBidPrice1;
   
    public Long lastAmount = 0L;
   
    public Long ebs = 0L;
   
    public Long bs = 0L;
   
    public Long ms = 0L;
   
    public Long ss = 0L;
   
    public Long ebb = 0L;
   
    public Long bb = 0L;
   
    public Long mb = 0L;
   
    public Long sb = 0L;
}


debug观察acc的lastAmount值,一直是0.


刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
是我的使用姿势不对吗= =

在 2020-12-10 11:30:31,"Jark Wu" <[hidden email]> 写道:
>可以发下代码吗?
>
>On Thu, 10 Dec 2020 at 11:19, bulterman <[hidden email]> wrote:
>
>> 上游是upsert-kafka connector 创建的table,
>> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
>> (为了测试方便,table里只有同一个PK的数据)
Reply | Threaded
Open this post in threaded view
|

Re: Re: retract stream UDAF使用问题

Jark
Administrator
因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。

Best,
Jark

On Thu, 10 Dec 2020 at 12:36, bulterman <[hidden email]> wrote:

> // kafka table
> tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +
>
>                 "    Code STRING,\n" +
>
>                 "    Amount BIGINT,\n" +
>
>                 ......
>
>                 "    PRIMARY KEY (Code) NOT ENFORCED\n" +
>
>                 ") WITH (\n" +
>
>                 "    'connector' = 'upsert-kafka',\n" +
>
>                 "    'topic' = 'zzz',\n" +
>
>                 "    'properties.bootstrap.servers' = '10.0.3.20:9092,
> 10.0.3.24:9092,10.0.3.26:9092',\n" +
>
>                 "    'properties.group.id' = 'sqltest46',\n" +
>
>                 "    'key.format' = 'raw',\n" +
>
>                 "    'value.format' = 'json'\n" +
>
>                 ")");
> // 使用UDAF计算
> Table table = bsTableEnv.sqlQuery("select
> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock
> GROUP BY Code");
> env.toRetractStream(table,Row.class).print();
>
>
> // UDAF的定义如下
> public class MainFundFlowFunc extends AggregateFunction<Row, AmountAccum> {
>     @Override
>     public Row getValue(AmountAccum acc) {
>         Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
>         double mfr = acc.lastAmount > 0 ?
> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
>         return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb,
> acc.sb,mf,mfr);
>     }
>     @Override
>     public AmountAccum createAccumulator() {
>         return new AmountAccum();
>     }
>
>     public void accumulate(AmountAccum acc, Long amount, Double askPrice1,
> Double bidPrice1, Double last) {
>         //......
>        acc.lastAmount = amount;
>         acc.lastAskPrice1 = askPrice1;
>         acc.lastBidPrice1 = bidPrice1;
>     }
>     public void retract(AmountAccum acc, Long amount, Double askPrice1,
> Double bidPrice1, Double last) {
>         acc.lastAmount = amount;
>         acc.lastAskPrice1 = askPrice1;
>         acc.lastBidPrice1 = bidPrice1;
>     }
>
> }
>
>
>
>
> // acc
> public class AmountAccum {
>     public Double lastAskPrice1;
>     public Double lastBidPrice1;
>
>     public Long lastAmount = 0L;
>
>     public Long ebs = 0L;
>
>     public Long bs = 0L;
>
>     public Long ms = 0L;
>
>     public Long ss = 0L;
>
>     public Long ebb = 0L;
>
>     public Long bb = 0L;
>
>     public Long mb = 0L;
>
>     public Long sb = 0L;
> }
>
>
> debug观察acc的lastAmount值,一直是0.
>
>
> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY
> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
> 是我的使用姿势不对吗= =
>
> 在 2020-12-10 11:30:31,"Jark Wu" <[hidden email]> 写道:
> >可以发下代码吗?
> >
> >On Thu, 10 Dec 2020 at 11:19, bulterman <[hidden email]> wrote:
> >
> >> 上游是upsert-kafka connector 创建的table,
> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> >> (为了测试方便,table里只有同一个PK的数据)
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: retract stream UDAF使用问题

bulterman
假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200
1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是 :0, 100, 200
2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount = Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0? 也是因为表中仅保留一条Code X的数据的关系吗?


那在upsert kafka table中(Code X只保留最新一条数据),假设要累加Code X的Amount,期望的输出是:0,100,300...,应该如何实现?
求大佬解惑><

















在 2020-12-10 13:47:57,"Jark Wu" <[hidden email]> 写道:

>因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
>下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。
>
>Best,
>Jark
>
>On Thu, 10 Dec 2020 at 12:36, bulterman <[hidden email]> wrote:
>
>> // kafka table
>> tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +
>>
>>                 "    Code STRING,\n" +
>>
>>                 "    Amount BIGINT,\n" +
>>
>>                 ......
>>
>>                 "    PRIMARY KEY (Code) NOT ENFORCED\n" +
>>
>>                 ") WITH (\n" +
>>
>>                 "    'connector' = 'upsert-kafka',\n" +
>>
>>                 "    'topic' = 'zzz',\n" +
>>
>>                 "    'properties.bootstrap.servers' = '10.0.3.20:9092,
>> 10.0.3.24:9092,10.0.3.26:9092',\n" +
>>
>>                 "    'properties.group.id' = 'sqltest46',\n" +
>>
>>                 "    'key.format' = 'raw',\n" +
>>
>>                 "    'value.format' = 'json'\n" +
>>
>>                 ")");
>> // 使用UDAF计算
>> Table table = bsTableEnv.sqlQuery("select
>> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock
>> GROUP BY Code");
>> env.toRetractStream(table,Row.class).print();
>>
>>
>> // UDAF的定义如下
>> public class MainFundFlowFunc extends AggregateFunction<Row, AmountAccum> {
>>     @Override
>>     public Row getValue(AmountAccum acc) {
>>         Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
>>         double mfr = acc.lastAmount > 0 ?
>> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
>>         return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb,
>> acc.sb,mf,mfr);
>>     }
>>     @Override
>>     public AmountAccum createAccumulator() {
>>         return new AmountAccum();
>>     }
>>
>>     public void accumulate(AmountAccum acc, Long amount, Double askPrice1,
>> Double bidPrice1, Double last) {
>>         //......
>>        acc.lastAmount = amount;
>>         acc.lastAskPrice1 = askPrice1;
>>         acc.lastBidPrice1 = bidPrice1;
>>     }
>>     public void retract(AmountAccum acc, Long amount, Double askPrice1,
>> Double bidPrice1, Double last) {
>>         acc.lastAmount = amount;
>>         acc.lastAskPrice1 = askPrice1;
>>         acc.lastBidPrice1 = bidPrice1;
>>     }
>>
>> }
>>
>>
>>
>>
>> // acc
>> public class AmountAccum {
>>     public Double lastAskPrice1;
>>     public Double lastBidPrice1;
>>
>>     public Long lastAmount = 0L;
>>
>>     public Long ebs = 0L;
>>
>>     public Long bs = 0L;
>>
>>     public Long ms = 0L;
>>
>>     public Long ss = 0L;
>>
>>     public Long ebb = 0L;
>>
>>     public Long bb = 0L;
>>
>>     public Long mb = 0L;
>>
>>     public Long sb = 0L;
>> }
>>
>>
>> debug观察acc的lastAmount值,一直是0.
>>
>>
>> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY
>> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
>> 是我的使用姿势不对吗= =
>>
>> 在 2020-12-10 11:30:31,"Jark Wu" <[hidden email]> 写道:
>> >可以发下代码吗?
>> >
>> >On Thu, 10 Dec 2020 at 11:19, bulterman <[hidden email]> wrote:
>> >
>> >> 上游是upsert-kafka connector 创建的table,
>> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
>> >> (为了测试方便,table里只有同一个PK的数据)
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: retract stream UDAF使用问题

Jark
Administrator
你可以把 upsert kafka 想象成是 mysql 表的实时物化视图,
你在 mysql 里面 code 是 key,amount 是 value。当你把 amount 从0 更新成 100, 200。
那么最后的 sum(amount) 结果自然是 200。

如果你想要 0 -> 100 -> 300, 说明你不想把这个数据看成是有 pk 更新的数据,而是一条条独立的数据,这个时候你声明成 kafka
connector,不定义 pk 即可,也就是当成普通 log 处理了。

关于你的 UDAF 的问题,估计是你实现的问题,因为你在 retract 方法中又把值设回 previous 值了。

Best,
Jark



On Thu, 10 Dec 2020 at 15:04, bulterman <[hidden email]> wrote:

> 假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200
> 1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是
> :0, 100, 200
> 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount =
> Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0?
> 也是因为表中仅保留一条Code X的数据的关系吗?
>
>
> 那在upsert kafka table中(Code X只保留最新一条数据),假设要累加Code
> X的Amount,期望的输出是:0,100,300...,应该如何实现?
> 求大佬解惑><
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-12-10 13:47:57,"Jark Wu" <[hidden email]> 写道:
> >因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
> >下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。
> >
> >Best,
> >Jark
> >
> >On Thu, 10 Dec 2020 at 12:36, bulterman <[hidden email]> wrote:
> >
> >> // kafka table
> >> tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +
> >>
> >>                 "    Code STRING,\n" +
> >>
> >>                 "    Amount BIGINT,\n" +
> >>
> >>                 ......
> >>
> >>                 "    PRIMARY KEY (Code) NOT ENFORCED\n" +
> >>
> >>                 ") WITH (\n" +
> >>
> >>                 "    'connector' = 'upsert-kafka',\n" +
> >>
> >>                 "    'topic' = 'zzz',\n" +
> >>
> >>                 "    'properties.bootstrap.servers' = '10.0.3.20:9092,
> >> 10.0.3.24:9092,10.0.3.26:9092',\n" +
> >>
> >>                 "    'properties.group.id' = 'sqltest46',\n" +
> >>
> >>                 "    'key.format' = 'raw',\n" +
> >>
> >>                 "    'value.format' = 'json'\n" +
> >>
> >>                 ")");
> >> // 使用UDAF计算
> >> Table table = bsTableEnv.sqlQuery("select
> >> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock
> >> GROUP BY Code");
> >> env.toRetractStream(table,Row.class).print();
> >>
> >>
> >> // UDAF的定义如下
> >> public class MainFundFlowFunc extends AggregateFunction<Row,
> AmountAccum> {
> >>     @Override
> >>     public Row getValue(AmountAccum acc) {
> >>         Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
> >>         double mfr = acc.lastAmount > 0 ?
> >> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
> >>         return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb
> ,acc.mb,
> >> acc.sb,mf,mfr);
> >>     }
> >>     @Override
> >>     public AmountAccum createAccumulator() {
> >>         return new AmountAccum();
> >>     }
> >>
> >>     public void accumulate(AmountAccum acc, Long amount, Double
> askPrice1,
> >> Double bidPrice1, Double last) {
> >>         //......
> >>        acc.lastAmount = amount;
> >>         acc.lastAskPrice1 = askPrice1;
> >>         acc.lastBidPrice1 = bidPrice1;
> >>     }
> >>     public void retract(AmountAccum acc, Long amount, Double askPrice1,
> >> Double bidPrice1, Double last) {
> >>         acc.lastAmount = amount;
> >>         acc.lastAskPrice1 = askPrice1;
> >>         acc.lastBidPrice1 = bidPrice1;
> >>     }
> >>
> >> }
> >>
> >>
> >>
> >>
> >> // acc
> >> public class AmountAccum {
> >>     public Double lastAskPrice1;
> >>     public Double lastBidPrice1;
> >>
> >>     public Long lastAmount = 0L;
> >>
> >>     public Long ebs = 0L;
> >>
> >>     public Long bs = 0L;
> >>
> >>     public Long ms = 0L;
> >>
> >>     public Long ss = 0L;
> >>
> >>     public Long ebb = 0L;
> >>
> >>     public Long bb = 0L;
> >>
> >>     public Long mb = 0L;
> >>
> >>     public Long sb = 0L;
> >> }
> >>
> >>
> >> debug观察acc的lastAmount值,一直是0.
> >>
> >>
> >> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY
> >> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
> >> 是我的使用姿势不对吗= =
> >>
> >> 在 2020-12-10 11:30:31,"Jark Wu" <[hidden email]> 写道:
> >> >可以发下代码吗?
> >> >
> >> >On Thu, 10 Dec 2020 at 11:19, bulterman <[hidden email]> wrote:
> >> >
> >> >> 上游是upsert-kafka connector 创建的table,
> >> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> >> >> (为了测试方便,table里只有同一个PK的数据)
> >>
>