|
code:
val inpurtDS = streamTableEnv.toAppendStream[BehaviorInfo](behaviorTable)inpurtDS.print()val pattern = Pattern.begin[BehaviorInfo]("start")
.where(_.clickCount > 7)val patternStream = CEP.pattern(inpurtDS, pattern)
val result: DataStream[BehaviorInfo] = patternStream.process(
new PatternProcessFunction[BehaviorInfo, BehaviorInfo]() {
override def processMatch(
matchPattern: util.Map[String, util.List[BehaviorInfo]],
ctx: PatternProcessFunction.Context,
out: Collector[BehaviorInfo]): Unit = {
try {
println(
s"""
|matchPattern: $matchPattern
|util.List[BehaviorInfo]: ${matchPattern.get("start")}
|""".stripMargin)
out.collect(matchPattern.get("start").get(0))
} catch {
case exception: Exception =>
println(exception)
}
}
})
result.print()
问题:inpurtDS.print()可以输出数据,输出的数据满足pattern的条件result.print()没有输出PatternProcessFunction中的processMatch,没有输出数据,也没有异常输出,谁能帮忙看下,是哪有问题啊
Thanks a lot!
|