temparol table join后无法sink

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

temparol table join后无法sink

Lucas
Hi,

流程是从两个kafka队列中取数据,做完temparol table join后取滚动窗口做UDAF,然后sink,代码大概如下
joined_table = t_env.sql_query("""
    SELECT
        o.exchangeCode_ as code,
        o.price,
        o.volume,
        o.eventTime
    FROM orders AS o INNER JOIN quotes FOR SYSTEM_TIME AS OF o.eventTime q
    ON o.exchangeCode_ = q.exchangeCode_
""")
tumble_window = Tumble.over(expr.lit(500).millis) \
    .on(expr.col("eventTime")) \
    .alias("w")
aggregate_table = joined_table.window(tumble_window) \
    .group_by("w") \
    .select("orderCalc(code, price, volume) as aggValue") \
    .execute_insert("kafkaSink")
然后执行的时候数据都堆积在TemporalJoin环节,没法进入sink环节。执行图如下
https://ftp.bmp.ovh/imgs/2020/12/702ccb600bb01968.png
最后sink环节的bytes received一直是0,然后运行到最后就因为内存不足失败。
看了taskmanager的日志里面没有报错。

想问下这种问题应该从哪里进行排查,多谢。


[hidden email]