|
大家好:
请教一个flink cep的问题,我想做一个简单的报警,比如连续三次大于5就报警,连续三次小于等于5就算报警恢复。
示例程序如下:
DataStream<Integer> stream = env.fromElements(
1, 2, 3, 4, 13, 7, 8, 9, 3, 14, 2, 2, 3, 4, 10, 11, 9, 1, 1, 3);
Pattern pattern = Pattern.<Integer>begin("alert").where(new IterativeCondition<Integer>(){
@Override
public boolean filter(
Integer i, Context<Integer> context) throws Exception{
return i > 5;
}
}).times(3).consecutive().followedBy("recovery").where(new IterativeCondition<Integer>(){
@Override
public boolean filter(
Integer i,
Context<Integer> context) throws Exception{
return i <= 5;
}
}).times(3).consecutive();
DataStream dataStream = org.apache.flink.cep.CEP.pattern(
stream,
pattern).select((PatternSelectFunction) pattern1->pattern1);
dataStream.print();
实际得到的结果是这样的
{alert=[10, 11, 9], recovery=[1, 1, 3]}
但是我期望的是还有以下的数据,因为这个也是符合要求的
{alert=[13, 7, 8], recovery=[2, 2, 3]}
{alert=[7, 8, 9], recovery=[2, 2, 3]}
请问这个是什么原因呢?谢谢
|