flink cep result DataStream no data print

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink cep result DataStream no data print

Zhou Zach-2
code:


val inpurtDS = streamTableEnv.toAppendStream[BehaviorInfo](behaviorTable)inpurtDS.print()val pattern = Pattern.begin[BehaviorInfo]("start")
  .where(_.clickCount > 7)val patternStream = CEP.pattern(inpurtDS, pattern)
val result: DataStream[BehaviorInfo] = patternStream.process(
  new PatternProcessFunction[BehaviorInfo, BehaviorInfo]() {
    override def processMatch(
                               matchPattern: util.Map[String, util.List[BehaviorInfo]],
                               ctx: PatternProcessFunction.Context,
                               out: Collector[BehaviorInfo]): Unit = {
      try {
        println(
          s"""
             |matchPattern: $matchPattern
             |util.List[BehaviorInfo]: ${matchPattern.get("start")}
             |""".stripMargin)
        out.collect(matchPattern.get("start").get(0))
      } catch {
        case exception: Exception =>
          println(exception)
      }
    }
  })
result.print()



问题:inpurtDS.print()可以输出数据,输出的数据满足pattern的条件result.print()没有输出PatternProcessFunction中的processMatch,没有输出数据,也没有异常输出,谁能帮忙看下,是哪有问题啊


Thanks a lot!