Hi all,
我在使用scala 开发streaming应用时,使用了AggregateFunction with a ProcessWindowFunction,flink 版本是1.11.1,但是idea报错: Error:(80, 8) overloaded method value aggregate with alternatives: 很奇怪为什么不能重载下面的方法 def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](preAggregator: AggregateFunction[T, ACC, V],windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R] my code: import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} import org.apache.flink.util.Collector val aggStream = timeStream .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .aggregate(new AccumulativeForDayAggregate(),new DayResultProcessWindowFunction()) case class DayInfo(ts: Long, category: String, value: Long) case class DayResult(category: String, cnt: Long) class AccumulativeForDayAggregate extends AggregateFunction[DayInfo, DayResult, DayResult] { override def createAccumulator(): DayResult = { DayResult("", 0L) } override def merge(acc1: DayResult, acc2: DayResult): DayResult = { DayResult(acc1.category, acc1.cnt + acc2.cnt) } override def add(value: DayInfo, acc: DayResult): DayResult = { if (acc.category.equals(value.category)) { DayResult(acc.category, value.value + acc.cnt) } else { DayResult(value.category, value.value) } } override def getResult(acc: DayResult): DayResult = { acc } } case class WindowDayResult(startWindow: Long, endWindow: Long, category: String, cnt: Long) class DayResultProcessWindowFunction extends ProcessWindowFunction[DayResult, WindowDayResult, String, TimeWindow] { def process(key: String, context: Context, elements: Iterable[DayResult], out: Collector[WindowDayResult]) = { val startWindow = context.window.getStart val endWindow = context.window.getEnd val dayResult = elements.iterator.next() out.collect(WindowDayResult(startWindow,endWindow,dayResult.category,dayResult.cnt)) } } Thanks for your help! | | Zhou Zach | | [hidden email] | 签名由网易邮箱大师定制 |
Free forum by Nabble | Edit this page |