|
Hi all,
我在使用scala 开发streaming应用时,使用了AggregateFunction with a ProcessWindowFunction,flink 版本是1.11.1,但是idea报错:
Error:(80, 8) overloaded method value aggregate with alternatives:
很奇怪为什么不能重载下面的方法
def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](preAggregator: AggregateFunction[T, ACC, V],windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R]
my code:
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
val aggStream = timeStream
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.aggregate(new AccumulativeForDayAggregate(),new DayResultProcessWindowFunction())
case class DayInfo(ts: Long, category: String, value: Long)
case class DayResult(category: String, cnt: Long)
class AccumulativeForDayAggregate extends AggregateFunction[DayInfo, DayResult, DayResult] {
override def createAccumulator(): DayResult = {
DayResult("", 0L)
}
override def merge(acc1: DayResult, acc2: DayResult): DayResult = {
DayResult(acc1.category, acc1.cnt + acc2.cnt)
}
override def add(value: DayInfo, acc: DayResult): DayResult = {
if (acc.category.equals(value.category)) {
DayResult(acc.category, value.value + acc.cnt)
} else {
DayResult(value.category, value.value)
}
}
override def getResult(acc: DayResult): DayResult = {
acc
}
}
case class WindowDayResult(startWindow: Long, endWindow: Long, category: String, cnt: Long)
class DayResultProcessWindowFunction extends ProcessWindowFunction[DayResult, WindowDayResult, String, TimeWindow] {
def process(key: String, context: Context, elements: Iterable[DayResult], out: Collector[WindowDayResult]) = {
val startWindow = context.window.getStart
val endWindow = context.window.getEnd
val dayResult = elements.iterator.next()
out.collect(WindowDayResult(startWindow,endWindow,dayResult.category,dayResult.cnt))
}
}
|