flink DataStream scala api can not overloaded method aggregate

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink DataStream scala api can not overloaded method aggregate

Zhou Zach
Hi all,
    我在使用scala 开发streaming应用时,使用了AggregateFunction  with a ProcessWindowFunction,flink 版本是1.11.1,但是idea报错:
Error:(80, 8) overloaded method value aggregate with alternatives:
很奇怪为什么不能重载下面的方法
def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](preAggregator: AggregateFunction[T, ACC, V],windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R]


my code:
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector


val aggStream = timeStream
      .keyBy(0)
      .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
      .aggregate(new AccumulativeForDayAggregate(),new DayResultProcessWindowFunction())



case class DayInfo(ts: Long, category: String, value: Long)
case class DayResult(category: String, cnt: Long)

class AccumulativeForDayAggregate extends AggregateFunction[DayInfo, DayResult, DayResult] {

  override def createAccumulator(): DayResult = {
    DayResult("", 0L)
  }

  override def merge(acc1: DayResult, acc2: DayResult): DayResult = {
    DayResult(acc1.category, acc1.cnt + acc2.cnt)
  }

  override def add(value: DayInfo, acc: DayResult): DayResult = {
    if (acc.category.equals(value.category)) {
      DayResult(acc.category, value.value + acc.cnt)
    } else {
      DayResult(value.category, value.value)
    }
  }

  override def getResult(acc: DayResult): DayResult = {
    acc
  }
}


case class WindowDayResult(startWindow: Long, endWindow: Long, category: String, cnt: Long)

class DayResultProcessWindowFunction extends ProcessWindowFunction[DayResult, WindowDayResult, String, TimeWindow] {

  def process(key: String, context: Context, elements: Iterable[DayResult], out: Collector[WindowDayResult]) = {
    val startWindow = context.window.getStart
    val endWindow = context.window.getEnd
    val dayResult = elements.iterator.next()
    out.collect(WindowDayResult(startWindow,endWindow,dayResult.category,dayResult.cnt))
  }
}
Thanks for your help!


| |
Zhou Zach
|
|
[hidden email]
|
签名由网易邮箱大师定制