请教一个flink CEP的问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

请教一个flink CEP的问题

Jun Zhang-2
大家好:
    请教一个flink cep的问题,我想做一个简单的报警,比如连续三次大于5就报警,连续三次小于等于5就算报警恢复。
 
   示例程序如下:


     DataStream<Integer&gt; stream = env.fromElements(
1, 2, 3, 4, 13, 7, 8, 9, 3, 14, 2, 2, 3, 4, 10, 11, 9, 1, 1, 3);


Pattern pattern = Pattern.<Integer&gt;begin("alert").where(new IterativeCondition<Integer&gt;(){
@Override
public boolean filter(
Integer i, Context<Integer&gt; context) throws Exception{
return i &gt; 5;
}
}).times(3).consecutive().followedBy("recovery").where(new IterativeCondition<Integer&gt;(){
@Override
public boolean filter(
Integer i,
Context<Integer&gt; context) throws Exception{
return i <= 5;
}
}).times(3).consecutive();


DataStream dataStream = org.apache.flink.cep.CEP.pattern(
stream,
pattern).select((PatternSelectFunction) pattern1-&gt;pattern1);
dataStream.print();





&nbsp; 实际得到的结果是这样的
{alert=[10, 11, 9], recovery=[1, 1, 3]}


但是我期望的是还有以下的数据,因为这个也是符合要求的
{alert=[13, 7, 8], recovery=[2, 2, 3]}
{alert=[7, 8, 9], recovery=[2, 2, 3]}


请问这个是什么原因呢?谢谢