大佬们,请教一下,我现在使用CEP时遇到一个问题,我现在的场景是需要输入三次相同字符串打印一次匹配的List集合,但是遇到的问题是每次都需要输入第四条数据才会触发Pattern的select函数去打印List。 具体实现代码如下: public class
Run3
{ 环境如下: Flink 1.12.2 OS:Windows 10 编程工具:IDEA 2021.1.2 使用的是Flink默认的事件时间,水位线用的是单调递增的,使用的是系统时间 运行结果如下所示: |
hello ,使用EventTime的前提下是这样的。事件来了之后不会立即去触发匹配,而是会注册一个timer,然后将数据缓存起来。当后续有事件
advanceWatermark 触发 timer之后才会开始计算。 sherlock zw <[hidden email]> 于2021年6月10日周四 下午9:55写道: > 大佬们,请教一下,我现在使用CEP时遇到一个问题,我现在的场景是需要输入三次相同字符串打印一次匹配的List > 集合,但是遇到的问题是每次都需要输入第四条数据才会触发Pattern的select函数去打印List。 > > 具体实现代码如下: > > public class Run3 > *{ *public static void main*(*String*[] *args*) *throws Exception > *{ *final StreamExecutionEnvironment env = > StreamExecutionEnvironment.*getExecutionEnvironment**()*; > env.setParallelism*(*1*)*; > final DataStream*<*String*> *source = env.socketTextStream*(* > "localhost", 8888 > *) *.assignTimestampsAndWatermarks > *( *WatermarkStrategy.*<*String*>* > *forMonotonousTimestamps* > *() *.withTimestampAssigner*((*String s, > long ts*) *-> System.*currentTimeMillis* > > *()) ) *.keyBy*(*s -> s*)*; > source.print*(*"source "*)*; > final Pattern*<*String, String*> *pattern = Pattern.*<*String*>* > *begin**(*"begin", AfterMatchSkipStrategy.*skipPastLastEvent* > *()) *.where*(*new SimpleCondition*<*String > *>() { *@Override > public boolean filter*(*String s*) *throws Exception > *{ *return true; > > *} })*.times*(*3*)*; > final PatternStream*<*String*> *patternStream = CEP.*pattern**(* > source, pattern*)*; > patternStream.select*(*new PatternSelectFunction*<*String, Object > *>() { *@Override > public Object select*(*Map*<*String, List*<*String*>> *pattern > *) { *return pattern.get*(*"begin"*)*; > > *} })*.print*(*"result "*)*; > env.execute*()*; > > *} }* > > > > 环境如下: > > Flink 1.12.2 > > OS:Windows 10 > > 编程工具:IDEA 2021.1.2 > > 使用的是Flink默认的事件时间,水位线用的是单调递增的,使用的是系统时间 > > > > 运行结果如下所示: > > > |
Free forum by Nabble | Edit this page |