1:当DataStream是由 一个table 经过 group by rowtime 转换过来的就无法触发窗口
例如:
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test group by msg,rowtime"));
// 获得 DataStream,并定义wtm生成
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
// map ........
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String,
Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(((element,
recordTimestamp) -> element.f1))
);
参考 官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html// stream - 转 Table,指定Rowtime
tableEnv.createTemporaryView("test5",
r,
$("msg"),
$("rowtime").rowtime());
String sql5 = "select " +
"msg," +
"count(1) cnt" +
" from test5 " +
" group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " +
"";
tableEnv.executeSql("insert into printlnRetractSink " + sql5);
结果: 无法触发窗口操作。
查调试源码: org.apache.flink.table.runtime.operators.window.WindowOperator
// 返回的wtm永远都是 -9223372036854775808
public long getCurrentWatermark() {
return internalTimerService.currentWatermark();
}
//
查看任务,watermark是正常在生成的。InternalTimerServiceImpl.advanceWatermark是正常为currentWatermark赋值。但是
internalTimerService.currentWatermark() 却拿的是-9223372036854775808
// 当 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select
msg,rowtime from test group by msg,rowtime"));
语句改为
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test"));
结果就是正确的。
所以这是一个bug吗??
--
Sent from:
http://apache-flink.147419.n8.nabble.com/