关于CEP处理事件的问题

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

关于CEP处理事件的问题

sherlock zw

大佬们,请教一下,我现在使用CEP时遇到一个问题,我现在的场景是需要输入三次相同字符串打印一次匹配的List集合,但是遇到的问题是每次都需要输入第四条数据才会触发Patternselect函数去打印List

具体实现代码如下:

public class Run3 {
   
public static void main(String[] args) throws Exception {
       
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       
env.setParallelism(1);
        final
DataStream<String> source = env.socketTextStream("localhost", 8888)
               
.assignTimestampsAndWatermarks(
                       
WatermarkStrategy.<String>forMonotonousTimestamps()
                               
.withTimestampAssigner((String s, long ts) -> System.currentTimeMillis())
                )
               
.keyBy(s -> s);
       
source.print("source ");
        final
Pattern<String, String> pattern = Pattern.<String>begin("begin", AfterMatchSkipStrategy.skipPastLastEvent())
               
.where(new SimpleCondition<String>() {
                   
@Override
                   
public boolean filter(String s) throws Exception {
                       
return true;
                   
}
                })
.times(3);
        final
PatternStream<String> patternStream = CEP.pattern(source, pattern);
       
patternStream.select(new PatternSelectFunction<String, Object>() {
           
@Override
           
public Object select(Map<String, List<String>> pattern) {
               
return pattern.get("begin");
           
}
        })
.print("result ");
       
env.execute();
   
}
}

 

环境如下:

Flink 1.12.2

OSWindows 10

编程工具:IDEA 2021.1.2

使用的是Flink默认的事件时间,水位线用的是单调递增的,使用的是系统时间

 

运行结果如下所示:

 

Reply | Threaded
Open this post in threaded view
|

Re: 关于CEP处理事件的问题

yue ma
hello ,使用EventTime的前提下是这样的。事件来了之后不会立即去触发匹配,而是会注册一个timer,然后将数据缓存起来。当后续有事件
advanceWatermark 触发 timer之后才会开始计算。

sherlock zw <[hidden email]> 于2021年6月10日周四 下午9:55写道:

> 大佬们,请教一下,我现在使用CEP时遇到一个问题,我现在的场景是需要输入三次相同字符串打印一次匹配的List
> 集合,但是遇到的问题是每次都需要输入第四条数据才会触发Pattern的select函数去打印List。
>
> 具体实现代码如下:
>
> public class Run3
> *{     *public static void main*(*String*[] *args*) *throws Exception
> *{         *final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.*getExecutionEnvironment**()*;
>         env.setParallelism*(*1*)*;
>         final DataStream*<*String*> *source = env.socketTextStream*(*
> "localhost", 8888
> *)                 *.assignTimestampsAndWatermarks
> *(                         *WatermarkStrategy.*<*String*>*
> *forMonotonousTimestamps*
> *()                                 *.withTimestampAssigner*((*String s,
> long ts*) *-> System.*currentTimeMillis*
>
> *())                 )                 *.keyBy*(*s -> s*)*;
>         source.print*(*"source "*)*;
>         final Pattern*<*String, String*> *pattern = Pattern.*<*String*>*
> *begin**(*"begin", AfterMatchSkipStrategy.*skipPastLastEvent*
> *())                 *.where*(*new SimpleCondition*<*String
> *>() {                     *@Override
>                     public boolean filter*(*String s*) *throws Exception
> *{                         *return true;
>
> *}                 })*.times*(*3*)*;
>         final PatternStream*<*String*> *patternStream = CEP.*pattern**(*
> source, pattern*)*;
>         patternStream.select*(*new PatternSelectFunction*<*String, Object
> *>() {             *@Override
>             public Object select*(*Map*<*String, List*<*String*>> *pattern
> *) {                 *return pattern.get*(*"begin"*)*;
>
> *}         })*.print*(*"result "*)*;
>         env.execute*()*;
>
> *} }*
>
>
>
> 环境如下:
>
> Flink 1.12.2
>
> OS:Windows 10
>
> 编程工具:IDEA 2021.1.2
>
> 使用的是Flink默认的事件时间,水位线用的是单调递增的,使用的是系统时间
>
>
>
> 运行结果如下所示:
>
>
>