关于 stream-stream Interval Join 的问题

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

关于 stream-stream Interval Join 的问题

macia kk
Hi, 各位大佬

  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order item
信息,所以 我用:

 SELECT *
 FROM A
 LEFT OUT JOIN B
 ON order_id
 Where A.event_time > B.event_time + 30 s
     A.event_time > B.event_time - 30 s

我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark Structural
Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
Reply | Threaded
Open this post in threaded view
|

Re:关于 stream-stream Interval Join 的问题

hailongwang
Hi,
其中 条件是
`Where A.event_time < B.event_time + 30 s and A.event_time > B.event_time - 30 s ` 吧
可以参考以下例子[1],看下有木有写错。
[1] https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183


Best,
Hailong
在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:

>Hi, 各位大佬
>
>  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
>Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order item
>信息,所以 我用:
>
> SELECT *
> FROM A
> LEFT OUT JOIN B
> ON order_id
> Where A.event_time > B.event_time + 30 s
>     A.event_time > B.event_time - 30 s
>
>我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark Structural
>Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
>所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

macia kk
不好意思,我上边贴错了

SELECT *
 FROM A
 LEFT OUT JOIN B
 ON order_id
 Where A.event_time > B.event_time -  30 s
     A.event_time > B.event_time + 30 s

event_time 是 Time Attributes 设置的 event_time

这样是没有输出的。



interval join 左右表在 state 中是缓存多久的?






hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:

> Hi,
> 其中 条件是
> `Where A.event_time < B.event_time + 30 s and A.event_time > B.event_time
> - 30 s ` 吧
> 可以参考以下例子[1],看下有木有写错。
> [1]
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
>
>
> Best,
> Hailong
> 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
> >Hi, 各位大佬
> >
> >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order item
> >信息,所以 我用:
> >
> > SELECT *
> > FROM A
> > LEFT OUT JOIN B
> > ON order_id
> > Where A.event_time > B.event_time + 30 s
> >     A.event_time > B.event_time - 30 s
> >
> >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> Structural
> >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

nobleyd
准确点,2个条件之间没and?2个都是>?

macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:

> 不好意思,我上边贴错了
>
> SELECT *
>  FROM A
>  LEFT OUT JOIN B
>  ON order_id
>  Where A.event_time > B.event_time -  30 s
>      A.event_time > B.event_time + 30 s
>
> event_time 是 Time Attributes 设置的 event_time
>
> 这样是没有输出的。
>
>
>
> interval join 左右表在 state 中是缓存多久的?
>
>
>
>
>
>
> hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:
>
> > Hi,
> > 其中 条件是
> > `Where A.event_time < B.event_time + 30 s and A.event_time > B.event_time
> > - 30 s ` 吧
> > 可以参考以下例子[1],看下有木有写错。
> > [1]
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> >
> >
> > Best,
> > Hailong
> > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
> > >Hi, 各位大佬
> > >
> > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order
> item
> > >信息,所以 我用:
> > >
> > > SELECT *
> > > FROM A
> > > LEFT OUT JOIN B
> > > ON order_id
> > > Where A.event_time > B.event_time + 30 s
> > >     A.event_time > B.event_time - 30 s
> > >
> > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> > Structural
> > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

macia kk
抱歉,是 >-30 and <+30

贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有

赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:

> 准确点,2个条件之间没and?2个都是>?
>
> macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
>
> > 不好意思,我上边贴错了
> >
> > SELECT *
> >  FROM A
> >  LEFT OUT JOIN B
> >  ON order_id
> >  Where A.event_time > B.event_time -  30 s
> >      A.event_time > B.event_time + 30 s
> >
> > event_time 是 Time Attributes 设置的 event_time
> >
> > 这样是没有输出的。
> >
> >
> >
> > interval join 左右表在 state 中是缓存多久的?
> >
> >
> >
> >
> >
> >
> > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:
> >
> > > Hi,
> > > 其中 条件是
> > > `Where A.event_time < B.event_time + 30 s and A.event_time >
> B.event_time
> > > - 30 s ` 吧
> > > 可以参考以下例子[1],看下有木有写错。
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > >
> > >
> > > Best,
> > > Hailong
> > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
> > > >Hi, 各位大佬
> > > >
> > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order
> > item
> > > >信息,所以 我用:
> > > >
> > > > SELECT *
> > > > FROM A
> > > > LEFT OUT JOIN B
> > > > ON order_id
> > > > Where A.event_time > B.event_time + 30 s
> > > >     A.event_time > B.event_time - 30 s
> > > >
> > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> > > Structural
> > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

Benchao Li-2
hi macia,

事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?

macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:

> 抱歉,是 >-30 and <+30
>
> 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
>
> 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
>
> > 准确点,2个条件之间没and?2个都是>?
> >
> > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
> >
> > > 不好意思,我上边贴错了
> > >
> > > SELECT *
> > >  FROM A
> > >  LEFT OUT JOIN B
> > >  ON order_id
> > >  Where A.event_time > B.event_time -  30 s
> > >      A.event_time > B.event_time + 30 s
> > >
> > > event_time 是 Time Attributes 设置的 event_time
> > >
> > > 这样是没有输出的。
> > >
> > >
> > >
> > > interval join 左右表在 state 中是缓存多久的?
> > >
> > >
> > >
> > >
> > >
> > >
> > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:
> > >
> > > > Hi,
> > > > 其中 条件是
> > > > `Where A.event_time < B.event_time + 30 s and A.event_time >
> > B.event_time
> > > > - 30 s ` 吧
> > > > 可以参考以下例子[1],看下有木有写错。
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > > >
> > > >
> > > > Best,
> > > > Hailong
> > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
> > > > >Hi, 各位大佬
> > > > >
> > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order
> > > item
> > > > >信息,所以 我用:
> > > > >
> > > > > SELECT *
> > > > > FROM A
> > > > > LEFT OUT JOIN B
> > > > > ON order_id
> > > > > Where A.event_time > B.event_time + 30 s
> > > > >     A.event_time > B.event_time - 30 s
> > > > >
> > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> > > > Structural
> > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > > >
> > >
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

macia kk
@Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
FLink,可能我的Case 太特殊了.

我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要
filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
中的两个表。所以这里的字段我定义的是 两张表的字段的并集.

还要注意的是 even time 是 create_time, 这里问题非常大:
 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 watermark
forward on.

    bsTableEnv.executeSql("""
      CREATE TABLE input_database (
        `table` STRING,
        `database` STRING,
        `data` ROW(
          reference_id STRING,
          transaction_sn STRING,
          transaction_type BIGINT,
          merchant_id BIGINT,
          transaction_id BIGINT,
          status BIGINT
         ),
        ts BIGINT,
        event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
        WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
     ) WITH (
       'connector.type' = 'kafka',
       'connector.version' = '0.11',
       'connector.topic' = 'mytopic',
       'connector.properties.bootstrap.servers' = 'xxxx',
       'format.type' = 'json'
     )
    """)


分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。

    val main_db = bsTableEnv.sqlQuery("""
      | SELECT *
      | FROM input_database
      | WHERE `database` = 'main_db'
      |  AND `table` LIKE 'transaction_tab%'
      | """.stripMargin)

    val merchant_db = bsTableEnv.sqlQuery("""
      | SELECT *
      | FROM input_database
      | WHERE `database` = 'merchant_db'
      |   AND `table` LIKE 'transaction_tab%'
      | """.stripMargin)

    bsTableEnv.createTemporaryView("main_db", main_db)
    bsTableEnv.createTemporaryView("merchant_db", merchant_db)

    val result = bsTableEnv.sqlQuery("""
       SELECT *
       FROM (
          SELECT t1.`table`, t1.`database`, t1.transaction_type,
t1.transaction_id,
            t1.reference_id, t1.transaction_sn, t1.merchant_id,
t1.status, t1.event_time
          FROM main_db as t1
          LEFT JOIN merchant_db as t2
          ON t1.reference_id = t2.reference_id
          WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
           AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
       )
      """.stripMargin)



事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
-----
你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark
来驱动。
我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出
join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.






Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:

> hi macia,
>
> 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>
> macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
>
> > 抱歉,是 >-30 and <+30
> >
> > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> >
> > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
> >
> > > 准确点,2个条件之间没and?2个都是>?
> > >
> > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
> > >
> > > > 不好意思,我上边贴错了
> > > >
> > > > SELECT *
> > > >  FROM A
> > > >  LEFT OUT JOIN B
> > > >  ON order_id
> > > >  Where A.event_time > B.event_time -  30 s
> > > >      A.event_time > B.event_time + 30 s
> > > >
> > > > event_time 是 Time Attributes 设置的 event_time
> > > >
> > > > 这样是没有输出的。
> > > >
> > > >
> > > >
> > > > interval join 左右表在 state 中是缓存多久的?
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:
> > > >
> > > > > Hi,
> > > > > 其中 条件是
> > > > > `Where A.event_time < B.event_time + 30 s and A.event_time >
> > > B.event_time
> > > > > - 30 s ` 吧
> > > > > 可以参考以下例子[1],看下有木有写错。
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > > > >
> > > > >
> > > > > Best,
> > > > > Hailong
> > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
> > > > > >Hi, 各位大佬
> > > > > >
> > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是
> order
> > > > item
> > > > > >信息,所以 我用:
> > > > > >
> > > > > > SELECT *
> > > > > > FROM A
> > > > > > LEFT OUT JOIN B
> > > > > > ON order_id
> > > > > > Where A.event_time > B.event_time + 30 s
> > > > > >     A.event_time > B.event_time - 30 s
> > > > > >
> > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> > > > > Structural
> > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

nobleyd
重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。

(1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
join。

(2)此外,还有一个点,这个我也不确认。如果是datastream
api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。

macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道:

> @Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
> FLink,可能我的Case 太特殊了.
>
> 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要
> filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
> 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
>
> 还要注意的是 even time 是 create_time, 这里问题非常大:
>  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
>  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 watermark
> forward on.
>
>     bsTableEnv.executeSql("""
>       CREATE TABLE input_database (
>         `table` STRING,
>         `database` STRING,
>         `data` ROW(
>           reference_id STRING,
>           transaction_sn STRING,
>           transaction_type BIGINT,
>           merchant_id BIGINT,
>           transaction_id BIGINT,
>           status BIGINT
>          ),
>         ts BIGINT,
>         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
>         WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
>      ) WITH (
>        'connector.type' = 'kafka',
>        'connector.version' = '0.11',
>        'connector.topic' = 'mytopic',
>        'connector.properties.bootstrap.servers' = 'xxxx',
>        'format.type' = 'json'
>      )
>     """)
>
>
> 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
>
>     val main_db = bsTableEnv.sqlQuery("""
>       | SELECT *
>       | FROM input_database
>       | WHERE `database` = 'main_db'
>       |  AND `table` LIKE 'transaction_tab%'
>       | """.stripMargin)
>
>     val merchant_db = bsTableEnv.sqlQuery("""
>       | SELECT *
>       | FROM input_database
>       | WHERE `database` = 'merchant_db'
>       |   AND `table` LIKE 'transaction_tab%'
>       | """.stripMargin)
>
>     bsTableEnv.createTemporaryView("main_db", main_db)
>     bsTableEnv.createTemporaryView("merchant_db", merchant_db)
>
>     val result = bsTableEnv.sqlQuery("""
>        SELECT *
>        FROM (
>           SELECT t1.`table`, t1.`database`, t1.transaction_type,
> t1.transaction_id,
>             t1.reference_id, t1.transaction_sn, t1.merchant_id,
> t1.status, t1.event_time
>           FROM main_db as t1
>           LEFT JOIN merchant_db as t2
>           ON t1.reference_id = t2.reference_id
>           WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
>            AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
>        )
>       """.stripMargin)
>
>
>
> 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> -----
> 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark
> 来驱动。
> 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出
> join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
>
>
>
>
>
>
> Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:
>
> > hi macia,
> >
> > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> >
> > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
> >
> > > 抱歉,是 >-30 and <+30
> > >
> > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> > >
> > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
> > >
> > > > 准确点,2个条件之间没and?2个都是>?
> > > >
> > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
> > > >
> > > > > 不好意思,我上边贴错了
> > > > >
> > > > > SELECT *
> > > > >  FROM A
> > > > >  LEFT OUT JOIN B
> > > > >  ON order_id
> > > > >  Where A.event_time > B.event_time -  30 s
> > > > >      A.event_time > B.event_time + 30 s
> > > > >
> > > > > event_time 是 Time Attributes 设置的 event_time
> > > > >
> > > > > 这样是没有输出的。
> > > > >
> > > > >
> > > > >
> > > > > interval join 左右表在 state 中是缓存多久的?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:
> > > > >
> > > > > > Hi,
> > > > > > 其中 条件是
> > > > > > `Where A.event_time < B.event_time + 30 s and A.event_time >
> > > > B.event_time
> > > > > > - 30 s ` 吧
> > > > > > 可以参考以下例子[1],看下有木有写错。
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Hailong
> > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
> > > > > > >Hi, 各位大佬
> > > > > > >
> > > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是
> > order
> > > > > item
> > > > > > >信息,所以 我用:
> > > > > > >
> > > > > > > SELECT *
> > > > > > > FROM A
> > > > > > > LEFT OUT JOIN B
> > > > > > > ON order_id
> > > > > > > Where A.event_time > B.event_time + 30 s
> > > > > > >     A.event_time > B.event_time - 30 s
> > > > > > >
> > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> > > > > > Structural
> > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

Benchao Li-2
Hi macia,

一旦回答的基本比较完整了。
watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
如果是两侧都有数据,watermark不前进,也都可以正常输出。

关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark
作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致
你的没有join到的数据下发会延迟很多了。

你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。

赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道:

> 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
>
>
> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
> join。
>
> (2)此外,还有一个点,这个我也不确认。如果是datastream
> api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
>
> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
>
> macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道:
>
> > @Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
> > FLink,可能我的Case 太特殊了.
> >
> > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要
> > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
> > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
> >
> > 还要注意的是 even time 是 create_time, 这里问题非常大:
> >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
> >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响
> watermark
> > forward on.
> >
> >     bsTableEnv.executeSql("""
> >       CREATE TABLE input_database (
> >         `table` STRING,
> >         `database` STRING,
> >         `data` ROW(
> >           reference_id STRING,
> >           transaction_sn STRING,
> >           transaction_type BIGINT,
> >           merchant_id BIGINT,
> >           transaction_id BIGINT,
> >           status BIGINT
> >          ),
> >         ts BIGINT,
> >         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
> >         WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
> >      ) WITH (
> >        'connector.type' = 'kafka',
> >        'connector.version' = '0.11',
> >        'connector.topic' = 'mytopic',
> >        'connector.properties.bootstrap.servers' = 'xxxx',
> >        'format.type' = 'json'
> >      )
> >     """)
> >
> >
> > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
> >
> >     val main_db = bsTableEnv.sqlQuery("""
> >       | SELECT *
> >       | FROM input_database
> >       | WHERE `database` = 'main_db'
> >       |  AND `table` LIKE 'transaction_tab%'
> >       | """.stripMargin)
> >
> >     val merchant_db = bsTableEnv.sqlQuery("""
> >       | SELECT *
> >       | FROM input_database
> >       | WHERE `database` = 'merchant_db'
> >       |   AND `table` LIKE 'transaction_tab%'
> >       | """.stripMargin)
> >
> >     bsTableEnv.createTemporaryView("main_db", main_db)
> >     bsTableEnv.createTemporaryView("merchant_db", merchant_db)
> >
> >     val result = bsTableEnv.sqlQuery("""
> >        SELECT *
> >        FROM (
> >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > t1.transaction_id,
> >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > t1.status, t1.event_time
> >           FROM main_db as t1
> >           LEFT JOIN merchant_db as t2
> >           ON t1.reference_id = t2.reference_id
> >           WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
> >            AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
> >        )
> >       """.stripMargin)
> >
> >
> >
> > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > -----
> > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark
> > 来驱动。
> > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出
> > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
> >
> >
> >
> >
> >
> >
> > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:
> >
> > > hi macia,
> > >
> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > >
> > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
> > >
> > > > 抱歉,是 >-30 and <+30
> > > >
> > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> > > >
> > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
> > > >
> > > > > 准确点,2个条件之间没and?2个都是>?
> > > > >
> > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
> > > > >
> > > > > > 不好意思,我上边贴错了
> > > > > >
> > > > > > SELECT *
> > > > > >  FROM A
> > > > > >  LEFT OUT JOIN B
> > > > > >  ON order_id
> > > > > >  Where A.event_time > B.event_time -  30 s
> > > > > >      A.event_time > B.event_time + 30 s
> > > > > >
> > > > > > event_time 是 Time Attributes 设置的 event_time
> > > > > >
> > > > > > 这样是没有输出的。
> > > > > >
> > > > > >
> > > > > >
> > > > > > interval join 左右表在 state 中是缓存多久的?
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:
> > > > > >
> > > > > > > Hi,
> > > > > > > 其中 条件是
> > > > > > > `Where A.event_time < B.event_time + 30 s and A.event_time >
> > > > > B.event_time
> > > > > > > - 30 s ` 吧
> > > > > > > 可以参考以下例子[1],看下有木有写错。
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Hailong
> > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
> > > > > > > >Hi, 各位大佬
> > > > > > > >
> > > > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是
> > > order
> > > > > > item
> > > > > > > >信息,所以 我用:
> > > > > > > >
> > > > > > > > SELECT *
> > > > > > > > FROM A
> > > > > > > > LEFT OUT JOIN B
> > > > > > > > ON order_id
> > > > > > > > Where A.event_time > B.event_time + 30 s
> > > > > > > >     A.event_time > B.event_time - 30 s
> > > > > > > >
> > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用
> Spark
> > > > > > > Structural
> > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join
> 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

macia kk
感谢 一旦 和 Benchao

  1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 Job
跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。

    val result = bsTableEnv.sqlQuery("""
       SELECT *
       FROM (
          SELECT t1.`table`, t1.`database`, t1.transaction_type,
t1.transaction_id,
            t1.reference_id, t1.transaction_sn, t1.merchant_id,
t1.status, t1.event_time
          FROM main_db as t1
          LEFT JOIN main_db as t2
          ON t1.reference_id = t2.reference_id
          WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
           AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
       )
      """.stripMargin)

2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到

3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
subtask的watermark。
    -------------------------------------------------------
    这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.

4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 event
time,但是有的表又没有这个字段,会导致解析的时候直接报错.

5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.


Thanks and best regards


Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道:

> Hi macia,
>
> 一旦回答的基本比较完整了。
> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
>
> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark
> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致
> 你的没有join到的数据下发会延迟很多了。
>
> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
>
> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道:
>
> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
> >
> >
> >
> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
> > join。
> >
> > (2)此外,还有一个点,这个我也不确认。如果是datastream
> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
> >
> >
> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
> >
> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道:
> >
> > > @Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
> > > FLink,可能我的Case 太特殊了.
> > >
> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
> Binlog,我需要
> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
> > >
> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响
> > watermark
> > > forward on.
> > >
> > >     bsTableEnv.executeSql("""
> > >       CREATE TABLE input_database (
> > >         `table` STRING,
> > >         `database` STRING,
> > >         `data` ROW(
> > >           reference_id STRING,
> > >           transaction_sn STRING,
> > >           transaction_type BIGINT,
> > >           merchant_id BIGINT,
> > >           transaction_id BIGINT,
> > >           status BIGINT
> > >          ),
> > >         ts BIGINT,
> > >         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
> > >         WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
> > >      ) WITH (
> > >        'connector.type' = 'kafka',
> > >        'connector.version' = '0.11',
> > >        'connector.topic' = 'mytopic',
> > >        'connector.properties.bootstrap.servers' = 'xxxx',
> > >        'format.type' = 'json'
> > >      )
> > >     """)
> > >
> > >
> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
> > >
> > >     val main_db = bsTableEnv.sqlQuery("""
> > >       | SELECT *
> > >       | FROM input_database
> > >       | WHERE `database` = 'main_db'
> > >       |  AND `table` LIKE 'transaction_tab%'
> > >       | """.stripMargin)
> > >
> > >     val merchant_db = bsTableEnv.sqlQuery("""
> > >       | SELECT *
> > >       | FROM input_database
> > >       | WHERE `database` = 'merchant_db'
> > >       |   AND `table` LIKE 'transaction_tab%'
> > >       | """.stripMargin)
> > >
> > >     bsTableEnv.createTemporaryView("main_db", main_db)
> > >     bsTableEnv.createTemporaryView("merchant_db", merchant_db)
> > >
> > >     val result = bsTableEnv.sqlQuery("""
> > >        SELECT *
> > >        FROM (
> > >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > > t1.transaction_id,
> > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > > t1.status, t1.event_time
> > >           FROM main_db as t1
> > >           LEFT JOIN merchant_db as t2
> > >           ON t1.reference_id = t2.reference_id
> > >           WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
> > >            AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
> > >        )
> > >       """.stripMargin)
> > >
> > >
> > >
> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > > -----
> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要
> watermark
> > > 来驱动。
> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以
> join上,就输出
> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
> > >
> > >
> > >
> > >
> > >
> > >
> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:
> > >
> > > > hi macia,
> > > >
> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > > >
> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
> > > >
> > > > > 抱歉,是 >-30 and <+30
> > > > >
> > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> > > > >
> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
> > > > >
> > > > > > 准确点,2个条件之间没and?2个都是>?
> > > > > >
> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
> > > > > >
> > > > > > > 不好意思,我上边贴错了
> > > > > > >
> > > > > > > SELECT *
> > > > > > >  FROM A
> > > > > > >  LEFT OUT JOIN B
> > > > > > >  ON order_id
> > > > > > >  Where A.event_time > B.event_time -  30 s
> > > > > > >      A.event_time > B.event_time + 30 s
> > > > > > >
> > > > > > > event_time 是 Time Attributes 设置的 event_time
> > > > > > >
> > > > > > > 这样是没有输出的。
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > interval join 左右表在 state 中是缓存多久的?
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > > 其中 条件是
> > > > > > > > `Where A.event_time < B.event_time + 30 s and A.event_time >
> > > > > > B.event_time
> > > > > > > > - 30 s ` 吧
> > > > > > > > 可以参考以下例子[1],看下有木有写错。
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Hailong
> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
> > > > > > > > >Hi, 各位大佬
> > > > > > > > >
> > > > > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件
> ,表B 是
> > > > order
> > > > > > > item
> > > > > > > > >信息,所以 我用:
> > > > > > > > >
> > > > > > > > > SELECT *
> > > > > > > > > FROM A
> > > > > > > > > LEFT OUT JOIN B
> > > > > > > > > ON order_id
> > > > > > > > > Where A.event_time > B.event_time + 30 s
> > > > > > > > >     A.event_time > B.event_time - 30 s
> > > > > > > > >
> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用
> > Spark
> > > > > > > > Structural
> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join
> > 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

macia kk
我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time -
INTERVAL 'x' HOUR

 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness

但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
能够反推出来数据的 currentMaxTimestamp

currentMaxTimestamp = watermark + maxOutOfOrderness

但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。


但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
{"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":"
*2020-12-10T01:02:24Z*"}

UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分
GMT+08:00)

这个 watermark 是未来的时间 😂





macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道:

> 感谢 一旦 和 Benchao
>
>   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 Job
> 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
>
>     val result = bsTableEnv.sqlQuery("""
>        SELECT *
>        FROM (
>           SELECT t1.`table`, t1.`database`, t1.transaction_type, t1.transaction_id,
>             t1.reference_id, t1.transaction_sn, t1.merchant_id, t1.status, t1.event_time
>           FROM main_db as t1
>           LEFT JOIN main_db as t2
>           ON t1.reference_id = t2.reference_id
>           WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
>            AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
>        )
>       """.stripMargin)
>
> 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
>
> 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
> subtask的watermark。
>     -------------------------------------------------------
>     这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
> 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
>
> 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 event
> time,但是有的表又没有这个字段,会导致解析的时候直接报错.
>
> 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
> 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
>
>
> Thanks and best regards
>
>
> Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道:
>
>> Hi macia,
>>
>> 一旦回答的基本比较完整了。
>> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
>> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
>>
>> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark
>> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致
>> 你的没有join到的数据下发会延迟很多了。
>>
>> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
>>
>> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道:
>>
>> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
>> >
>> >
>> >
>> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
>> > join。
>> >
>> > (2)此外,还有一个点,这个我也不确认。如果是datastream
>> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
>> >
>> >
>> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
>> >
>> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道:
>> >
>> > > @Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
>> > > FLink,可能我的Case 太特殊了.
>> > >
>> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
>> Binlog,我需要
>> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
>> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
>> > >
>> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
>> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
>> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响
>> > watermark
>> > > forward on.
>> > >
>> > >     bsTableEnv.executeSql("""
>> > >       CREATE TABLE input_database (
>> > >         `table` STRING,
>> > >         `database` STRING,
>> > >         `data` ROW(
>> > >           reference_id STRING,
>> > >           transaction_sn STRING,
>> > >           transaction_type BIGINT,
>> > >           merchant_id BIGINT,
>> > >           transaction_id BIGINT,
>> > >           status BIGINT
>> > >          ),
>> > >         ts BIGINT,
>> > >         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
>> > >         WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
>> > >      ) WITH (
>> > >        'connector.type' = 'kafka',
>> > >        'connector.version' = '0.11',
>> > >        'connector.topic' = 'mytopic',
>> > >        'connector.properties.bootstrap.servers' = 'xxxx',
>> > >        'format.type' = 'json'
>> > >      )
>> > >     """)
>> > >
>> > >
>> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
>> > >
>> > >     val main_db = bsTableEnv.sqlQuery("""
>> > >       | SELECT *
>> > >       | FROM input_database
>> > >       | WHERE `database` = 'main_db'
>> > >       |  AND `table` LIKE 'transaction_tab%'
>> > >       | """.stripMargin)
>> > >
>> > >     val merchant_db = bsTableEnv.sqlQuery("""
>> > >       | SELECT *
>> > >       | FROM input_database
>> > >       | WHERE `database` = 'merchant_db'
>> > >       |   AND `table` LIKE 'transaction_tab%'
>> > >       | """.stripMargin)
>> > >
>> > >     bsTableEnv.createTemporaryView("main_db", main_db)
>> > >     bsTableEnv.createTemporaryView("merchant_db", merchant_db)
>> > >
>> > >     val result = bsTableEnv.sqlQuery("""
>> > >        SELECT *
>> > >        FROM (
>> > >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
>> > > t1.transaction_id,
>> > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
>> > > t1.status, t1.event_time
>> > >           FROM main_db as t1
>> > >           LEFT JOIN merchant_db as t2
>> > >           ON t1.reference_id = t2.reference_id
>> > >           WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
>> > >            AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
>> > >        )
>> > >       """.stripMargin)
>> > >
>> > >
>> > >
>> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>> > > -----
>> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要
>> watermark
>> > > 来驱动。
>> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以
>> join上,就输出
>> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:
>> > >
>> > > > hi macia,
>> > > >
>> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>> > > >
>> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
>> > > >
>> > > > > 抱歉,是 >-30 and <+30
>> > > > >
>> > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
>> > > > >
>> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
>> > > > >
>> > > > > > 准确点,2个条件之间没and?2个都是>?
>> > > > > >
>> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
>> > > > > >
>> > > > > > > 不好意思,我上边贴错了
>> > > > > > >
>> > > > > > > SELECT *
>> > > > > > >  FROM A
>> > > > > > >  LEFT OUT JOIN B
>> > > > > > >  ON order_id
>> > > > > > >  Where A.event_time > B.event_time -  30 s
>> > > > > > >      A.event_time > B.event_time + 30 s
>> > > > > > >
>> > > > > > > event_time 是 Time Attributes 设置的 event_time
>> > > > > > >
>> > > > > > > 这样是没有输出的。
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > interval join 左右表在 state 中是缓存多久的?
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:
>> > > > > > >
>> > > > > > > > Hi,
>> > > > > > > > 其中 条件是
>> > > > > > > > `Where A.event_time < B.event_time + 30 s and A.event_time >
>> > > > > > B.event_time
>> > > > > > > > - 30 s ` 吧
>> > > > > > > > 可以参考以下例子[1],看下有木有写错。
>> > > > > > > > [1]
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > > Hailong
>> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
>> > > > > > > > >Hi, 各位大佬
>> > > > > > > > >
>> > > > > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
>> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件
>> ,表B 是
>> > > > order
>> > > > > > > item
>> > > > > > > > >信息,所以 我用:
>> > > > > > > > >
>> > > > > > > > > SELECT *
>> > > > > > > > > FROM A
>> > > > > > > > > LEFT OUT JOIN B
>> > > > > > > > > ON order_id
>> > > > > > > > > Where A.event_time > B.event_time + 30 s
>> > > > > > > > >     A.event_time > B.event_time - 30 s
>> > > > > > > > >
>> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用
>> > Spark
>> > > > > > > > Structural
>> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
>> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join
>> > 在具体实现的逻辑是什么,我在处理上哪里有问题?
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > > --
>> > > >
>> > > > Best,
>> > > > Benchao Li
>> > > >
>> > >
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

Benchao Li-2
你用的是哪个版本的Flink呢?

看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
所以你的binlog是怎么读进来的呢?自定义的format?

macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道:

> 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time -
> INTERVAL 'x' HOUR
>
>  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
>
> 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
> 能够反推出来数据的 currentMaxTimestamp
>
> currentMaxTimestamp = watermark + maxOutOfOrderness
>
> 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
> 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
>
>
> 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
>
> {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":"
> *2020-12-10T01:02:24Z*"}
>
> UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分
> GMT+08:00)
>
> 这个 watermark 是未来的时间 😂
>
>
>
>
>
> macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道:
>
> > 感谢 一旦 和 Benchao
> >
> >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我
> Job
> > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
> >
> >     val result = bsTableEnv.sqlQuery("""
> >        SELECT *
> >        FROM (
> >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
> t1.transaction_id,
> >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
> t1.status, t1.event_time
> >           FROM main_db as t1
> >           LEFT JOIN main_db as t2
> >           ON t1.reference_id = t2.reference_id
> >           WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
> >            AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
> >        )
> >       """.stripMargin)
> >
> > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
> >
> > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
> > subtask的watermark。
> >     -------------------------------------------------------
> >     这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
> > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
> >
> > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 event
> > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
> >
> > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
> > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
> >
> >
> > Thanks and best regards
> >
> >
> > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道:
> >
> >> Hi macia,
> >>
> >> 一旦回答的基本比较完整了。
> >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
> >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
> >>
> >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source
> subtask见到的最大的watermark
> >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay
> 10个小时,这个已经会导致
> >> 你的没有join到的数据下发会延迟很多了。
> >>
> >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
> >>
> >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道:
> >>
> >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
> >> >
> >> >
> >> >
> >>
> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
> >> > join。
> >> >
> >> > (2)此外,还有一个点,这个我也不确认。如果是datastream
> >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
> >> >
> >> >
> >>
> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
> >> >
> >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道:
> >> >
> >> > > @Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
> >> > > FLink,可能我的Case 太特殊了.
> >> > >
> >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
> >> Binlog,我需要
> >> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个
> DB
> >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
> >> > >
> >> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
> >> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
> >> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响
> >> > watermark
> >> > > forward on.
> >> > >
> >> > >     bsTableEnv.executeSql("""
> >> > >       CREATE TABLE input_database (
> >> > >         `table` STRING,
> >> > >         `database` STRING,
> >> > >         `data` ROW(
> >> > >           reference_id STRING,
> >> > >           transaction_sn STRING,
> >> > >           transaction_type BIGINT,
> >> > >           merchant_id BIGINT,
> >> > >           transaction_id BIGINT,
> >> > >           status BIGINT
> >> > >          ),
> >> > >         ts BIGINT,
> >> > >         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
> >> > >         WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
> >> > >      ) WITH (
> >> > >        'connector.type' = 'kafka',
> >> > >        'connector.version' = '0.11',
> >> > >        'connector.topic' = 'mytopic',
> >> > >        'connector.properties.bootstrap.servers' = 'xxxx',
> >> > >        'format.type' = 'json'
> >> > >      )
> >> > >     """)
> >> > >
> >> > >
> >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
> >> > >
> >> > >     val main_db = bsTableEnv.sqlQuery("""
> >> > >       | SELECT *
> >> > >       | FROM input_database
> >> > >       | WHERE `database` = 'main_db'
> >> > >       |  AND `table` LIKE 'transaction_tab%'
> >> > >       | """.stripMargin)
> >> > >
> >> > >     val merchant_db = bsTableEnv.sqlQuery("""
> >> > >       | SELECT *
> >> > >       | FROM input_database
> >> > >       | WHERE `database` = 'merchant_db'
> >> > >       |   AND `table` LIKE 'transaction_tab%'
> >> > >       | """.stripMargin)
> >> > >
> >> > >     bsTableEnv.createTemporaryView("main_db", main_db)
> >> > >     bsTableEnv.createTemporaryView("merchant_db", merchant_db)
> >> > >
> >> > >     val result = bsTableEnv.sqlQuery("""
> >> > >        SELECT *
> >> > >        FROM (
> >> > >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
> >> > > t1.transaction_id,
> >> > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
> >> > > t1.status, t1.event_time
> >> > >           FROM main_db as t1
> >> > >           LEFT JOIN merchant_db as t2
> >> > >           ON t1.reference_id = t2.reference_id
> >> > >           WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
> >> > >            AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
> >> > >        )
> >> > >       """.stripMargin)
> >> > >
> >> > >
> >> > >
> >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> >> > > -----
> >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要
> >> watermark
> >> > > 来驱动。
> >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以
> >> join上,就输出
> >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:
> >> > >
> >> > > > hi macia,
> >> > > >
> >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> >> > > >
> >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
> >> > > >
> >> > > > > 抱歉,是 >-30 and <+30
> >> > > > >
> >> > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> >> > > > >
> >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
> >> > > > >
> >> > > > > > 准确点,2个条件之间没and?2个都是>?
> >> > > > > >
> >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
> >> > > > > >
> >> > > > > > > 不好意思,我上边贴错了
> >> > > > > > >
> >> > > > > > > SELECT *
> >> > > > > > >  FROM A
> >> > > > > > >  LEFT OUT JOIN B
> >> > > > > > >  ON order_id
> >> > > > > > >  Where A.event_time > B.event_time -  30 s
> >> > > > > > >      A.event_time > B.event_time + 30 s
> >> > > > > > >
> >> > > > > > > event_time 是 Time Attributes 设置的 event_time
> >> > > > > > >
> >> > > > > > > 这样是没有输出的。
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > interval join 左右表在 state 中是缓存多久的?
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:
> >> > > > > > >
> >> > > > > > > > Hi,
> >> > > > > > > > 其中 条件是
> >> > > > > > > > `Where A.event_time < B.event_time + 30 s and
> A.event_time >
> >> > > > > > B.event_time
> >> > > > > > > > - 30 s ` 吧
> >> > > > > > > > 可以参考以下例子[1],看下有木有写错。
> >> > > > > > > > [1]
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Best,
> >> > > > > > > > Hailong
> >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
> >> > > > > > > > >Hi, 各位大佬
> >> > > > > > > > >
> >> > > > > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件
> >> ,表B 是
> >> > > > order
> >> > > > > > > item
> >> > > > > > > > >信息,所以 我用:
> >> > > > > > > > >
> >> > > > > > > > > SELECT *
> >> > > > > > > > > FROM A
> >> > > > > > > > > LEFT OUT JOIN B
> >> > > > > > > > > ON order_id
> >> > > > > > > > > Where A.event_time > B.event_time + 30 s
> >> > > > > > > > >     A.event_time > B.event_time - 30 s
> >> > > > > > > > >
> >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join
> 之后就没有输出数据了,可以确认的是我用
> >> > Spark
> >> > > > > > > > Structural
> >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join
> >> > 在具体实现的逻辑是什么,我在处理上哪里有问题?
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > >
> >> > > > Best,
> >> > > > Benchao Li
> >> > > >
> >> > >
> >> >
> >>
> >>
> >> --
> >>
> >> Best,
> >> Benchao Li
> >>
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

macia kk
你用的是哪个版本的Flink呢?
-----
1.11.2

看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
所以你的binlog是怎么读进来的呢?自定义的format?
-----
ts 就是时间戳

    bsTableEnv.executeSql("""
      CREATE TABLE input_database (
        `table` STRING,
        `database` STRING,
        `data` ROW(
          reference_id STRING,
          transaction_sn STRING,
          transaction_type BIGINT,
          merchant_id BIGINT,
          transaction_id BIGINT,
          status BIGINT
         ),
        ts BIGINT,
        event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
        WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
     ) WITH (
       'connector.type' = 'kafka',
       'connector.version' = '0.11',
       'connector.topic' = 'mytopic',
       'connector.properties.bootstrap.servers' = 'xxxx',
       'format.type' = 'json'
     )
    )



```



Benchao Li <[hidden email]> 于2020年12月10日周四 下午6:14写道:

> 你用的是哪个版本的Flink呢?
>
> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
> 所以你的binlog是怎么读进来的呢?自定义的format?
>
> macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道:
>
> > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time
> -
> > INTERVAL 'x' HOUR
> >
> >  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
> >
> > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
> > 能够反推出来数据的 currentMaxTimestamp
> >
> > currentMaxTimestamp = watermark + maxOutOfOrderness
> >
> > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
> > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
> >
> >
> > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
> >
> >
> {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":"
> > *2020-12-10T01:02:24Z*"}
> >
> > UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分
> > GMT+08:00)
> >
> > 这个 watermark 是未来的时间 😂
> >
> >
> >
> >
> >
> > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道:
> >
> > > 感谢 一旦 和 Benchao
> > >
> > >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我
> > Job
> > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
> > >
> > >     val result = bsTableEnv.sqlQuery("""
> > >        SELECT *
> > >        FROM (
> > >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > t1.transaction_id,
> > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > t1.status, t1.event_time
> > >           FROM main_db as t1
> > >           LEFT JOIN main_db as t2
> > >           ON t1.reference_id = t2.reference_id
> > >           WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
> > >            AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
> > >        )
> > >       """.stripMargin)
> > >
> > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
> > >
> > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
> > > subtask的watermark。
> > >     -------------------------------------------------------
> > >     这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
> > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
> > >
> > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为
> event
> > > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
> > >
> > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
> > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
> > >
> > >
> > > Thanks and best regards
> > >
> > >
> > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道:
> > >
> > >> Hi macia,
> > >>
> > >> 一旦回答的基本比较完整了。
> > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
> > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
> > >>
> > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source
> > subtask见到的最大的watermark
> > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay
> > 10个小时,这个已经会导致
> > >> 你的没有join到的数据下发会延迟很多了。
> > >>
> > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
> > >>
> > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道:
> > >>
> > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
> > >> >
> > >> >
> > >> >
> > >>
> >
> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
> > >> > join。
> > >> >
> > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream
> > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
> > >> >
> > >> >
> > >>
> >
> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
> > >> >
> > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道:
> > >> >
> > >> > > @Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
> > >> > > FLink,可能我的Case 太特殊了.
> > >> > >
> > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
> > >> Binlog,我需要
> > >> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab,
> 两个
> > DB
> > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
> > >> > >
> > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
> > >> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
> > >> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响
> > >> > watermark
> > >> > > forward on.
> > >> > >
> > >> > >     bsTableEnv.executeSql("""
> > >> > >       CREATE TABLE input_database (
> > >> > >         `table` STRING,
> > >> > >         `database` STRING,
> > >> > >         `data` ROW(
> > >> > >           reference_id STRING,
> > >> > >           transaction_sn STRING,
> > >> > >           transaction_type BIGINT,
> > >> > >           merchant_id BIGINT,
> > >> > >           transaction_id BIGINT,
> > >> > >           status BIGINT
> > >> > >          ),
> > >> > >         ts BIGINT,
> > >> > >         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
> > >> > >         WATERMARK FOR event_time AS event_time - INTERVAL '10'
> HOUR
> > >> > >      ) WITH (
> > >> > >        'connector.type' = 'kafka',
> > >> > >        'connector.version' = '0.11',
> > >> > >        'connector.topic' = 'mytopic',
> > >> > >        'connector.properties.bootstrap.servers' = 'xxxx',
> > >> > >        'format.type' = 'json'
> > >> > >      )
> > >> > >     """)
> > >> > >
> > >> > >
> > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
> > >> > >
> > >> > >     val main_db = bsTableEnv.sqlQuery("""
> > >> > >       | SELECT *
> > >> > >       | FROM input_database
> > >> > >       | WHERE `database` = 'main_db'
> > >> > >       |  AND `table` LIKE 'transaction_tab%'
> > >> > >       | """.stripMargin)
> > >> > >
> > >> > >     val merchant_db = bsTableEnv.sqlQuery("""
> > >> > >       | SELECT *
> > >> > >       | FROM input_database
> > >> > >       | WHERE `database` = 'merchant_db'
> > >> > >       |   AND `table` LIKE 'transaction_tab%'
> > >> > >       | """.stripMargin)
> > >> > >
> > >> > >     bsTableEnv.createTemporaryView("main_db", main_db)
> > >> > >     bsTableEnv.createTemporaryView("merchant_db", merchant_db)
> > >> > >
> > >> > >     val result = bsTableEnv.sqlQuery("""
> > >> > >        SELECT *
> > >> > >        FROM (
> > >> > >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > >> > > t1.transaction_id,
> > >> > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > >> > > t1.status, t1.event_time
> > >> > >           FROM main_db as t1
> > >> > >           LEFT JOIN merchant_db as t2
> > >> > >           ON t1.reference_id = t2.reference_id
> > >> > >           WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
> > >> > >            AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
> > >> > >        )
> > >> > >       """.stripMargin)
> > >> > >
> > >> > >
> > >> > >
> > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > >> > > -----
> > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要
> > >> watermark
> > >> > > 来驱动。
> > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以
> > >> join上,就输出
> > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:
> > >> > >
> > >> > > > hi macia,
> > >> > > >
> > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > >> > > >
> > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
> > >> > > >
> > >> > > > > 抱歉,是 >-30 and <+30
> > >> > > > >
> > >> > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> > >> > > > >
> > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
> > >> > > > >
> > >> > > > > > 准确点,2个条件之间没and?2个都是>?
> > >> > > > > >
> > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
> > >> > > > > >
> > >> > > > > > > 不好意思,我上边贴错了
> > >> > > > > > >
> > >> > > > > > > SELECT *
> > >> > > > > > >  FROM A
> > >> > > > > > >  LEFT OUT JOIN B
> > >> > > > > > >  ON order_id
> > >> > > > > > >  Where A.event_time > B.event_time -  30 s
> > >> > > > > > >      A.event_time > B.event_time + 30 s
> > >> > > > > > >
> > >> > > > > > > event_time 是 Time Attributes 设置的 event_time
> > >> > > > > > >
> > >> > > > > > > 这样是没有输出的。
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > interval join 左右表在 state 中是缓存多久的?
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道:
> > >> > > > > > >
> > >> > > > > > > > Hi,
> > >> > > > > > > > 其中 条件是
> > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and
> > A.event_time >
> > >> > > > > > B.event_time
> > >> > > > > > > > - 30 s ` 吧
> > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。
> > >> > > > > > > > [1]
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > Best,
> > >> > > > > > > > Hailong
> > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道:
> > >> > > > > > > > >Hi, 各位大佬
> > >> > > > > > > > >
> > >> > > > > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog
> 打进去了。我的
> > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是
> order事件
> > >> ,表B 是
> > >> > > > order
> > >> > > > > > > item
> > >> > > > > > > > >信息,所以 我用:
> > >> > > > > > > > >
> > >> > > > > > > > > SELECT *
> > >> > > > > > > > > FROM A
> > >> > > > > > > > > LEFT OUT JOIN B
> > >> > > > > > > > > ON order_id
> > >> > > > > > > > > Where A.event_time > B.event_time + 30 s
> > >> > > > > > > > >     A.event_time > B.event_time - 30 s
> > >> > > > > > > > >
> > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join
> > 之后就没有输出数据了,可以确认的是我用
> > >> > Spark
> > >> > > > > > > > Structural
> > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join
> > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > > >
> > >> > > > --
> > >> > > >
> > >> > > > Best,
> > >> > > > Benchao Li
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> > >>
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

nobleyd
这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。

之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time
attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。

但是呢,目前不这么做好像也还不行。因为分窗必须基于time
attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。


————————
比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00
UTC+8
这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。

————————
按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。
要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。



macia kk <[hidden email]> 于2020年12月11日周五 下午3:04写道:

> 你用的是哪个版本的Flink呢?
> -----
> 1.11.2
>
> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
> 所以你的binlog是怎么读进来的呢?自定义的format?
> -----
> ts 就是时间戳
>
>     bsTableEnv.executeSql("""
>       CREATE TABLE input_database (
>         `table` STRING,
>         `database` STRING,
>         `data` ROW(
>           reference_id STRING,
>           transaction_sn STRING,
>           transaction_type BIGINT,
>           merchant_id BIGINT,
>           transaction_id BIGINT,
>           status BIGINT
>          ),
>         ts BIGINT,
>         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
>         WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
>      ) WITH (
>        'connector.type' = 'kafka',
>        'connector.version' = '0.11',
>        'connector.topic' = 'mytopic',
>        'connector.properties.bootstrap.servers' = 'xxxx',
>        'format.type' = 'json'
>      )
>     )
>
>
>
> ```
>
>
>
> Benchao Li <[hidden email]> 于2020年12月10日周四 下午6:14写道:
>
> > 你用的是哪个版本的Flink呢?
> >
> > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
> > 所以你的binlog是怎么读进来的呢?自定义的format?
> >
> > macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道:
> >
> > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS
> event_time
> > -
> > > INTERVAL 'x' HOUR
> > >
> > >  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
> > >
> > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
> > > 能够反推出来数据的 currentMaxTimestamp
> > >
> > > currentMaxTimestamp = watermark + maxOutOfOrderness
> > >
> > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
> > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
> > >
> > >
> > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
> > >
> > >
> >
> {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":"
> > > *2020-12-10T01:02:24Z*"}
> > >
> > > UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分
> > > GMT+08:00)
> > >
> > > 这个 watermark 是未来的时间 😂
> > >
> > >
> > >
> > >
> > >
> > > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道:
> > >
> > > > 感谢 一旦 和 Benchao
> > > >
> > > >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join
> 上的数据,但是我
> > > Job
> > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
> > > >
> > > >     val result = bsTableEnv.sqlQuery("""
> > > >        SELECT *
> > > >        FROM (
> > > >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > > t1.transaction_id,
> > > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > > t1.status, t1.event_time
> > > >           FROM main_db as t1
> > > >           LEFT JOIN main_db as t2
> > > >           ON t1.reference_id = t2.reference_id
> > > >           WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
> > > >            AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
> > > >        )
> > > >       """.stripMargin)
> > > >
> > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
> > > >
> > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
> > > > subtask的watermark。
> > > >     -------------------------------------------------------
> > > >     这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
> > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
> > > >
> > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为
> > event
> > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
> > > >
> > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
> > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
> > > >
> > > >
> > > > Thanks and best regards
> > > >
> > > >
> > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道:
> > > >
> > > >> Hi macia,
> > > >>
> > > >> 一旦回答的基本比较完整了。
> > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
> > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
> > > >>
> > > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source
> > > subtask见到的最大的watermark
> > > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay
> > > 10个小时,这个已经会导致
> > > >> 你的没有join到的数据下发会延迟很多了。
> > > >>
> > > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
> > > >>
> > > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道:
> > > >>
> > > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
> > > >> >
> > > >> >
> > > >> >
> > > >>
> > >
> >
> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
> > > >> > join。
> > > >> >
> > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream
> > > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
> > > >> >
> > > >> >
> > > >>
> > >
> >
> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
> > > >> >
> > > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道:
> > > >> >
> > > >> > > @Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
> > > >> > > FLink,可能我的Case 太特殊了.
> > > >> > >
> > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
> > > >> Binlog,我需要
> > > >> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab,
> > 两个
> > > DB
> > > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
> > > >> > >
> > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
> > > >> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
> > > >> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create
> time进来,可能会影响
> > > >> > watermark
> > > >> > > forward on.
> > > >> > >
> > > >> > >     bsTableEnv.executeSql("""
> > > >> > >       CREATE TABLE input_database (
> > > >> > >         `table` STRING,
> > > >> > >         `database` STRING,
> > > >> > >         `data` ROW(
> > > >> > >           reference_id STRING,
> > > >> > >           transaction_sn STRING,
> > > >> > >           transaction_type BIGINT,
> > > >> > >           merchant_id BIGINT,
> > > >> > >           transaction_id BIGINT,
> > > >> > >           status BIGINT
> > > >> > >          ),
> > > >> > >         ts BIGINT,
> > > >> > >         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
> > > >> > >         WATERMARK FOR event_time AS event_time - INTERVAL '10'
> > HOUR
> > > >> > >      ) WITH (
> > > >> > >        'connector.type' = 'kafka',
> > > >> > >        'connector.version' = '0.11',
> > > >> > >        'connector.topic' = 'mytopic',
> > > >> > >        'connector.properties.bootstrap.servers' = 'xxxx',
> > > >> > >        'format.type' = 'json'
> > > >> > >      )
> > > >> > >     """)
> > > >> > >
> > > >> > >
> > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
> > > >> > >
> > > >> > >     val main_db = bsTableEnv.sqlQuery("""
> > > >> > >       | SELECT *
> > > >> > >       | FROM input_database
> > > >> > >       | WHERE `database` = 'main_db'
> > > >> > >       |  AND `table` LIKE 'transaction_tab%'
> > > >> > >       | """.stripMargin)
> > > >> > >
> > > >> > >     val merchant_db = bsTableEnv.sqlQuery("""
> > > >> > >       | SELECT *
> > > >> > >       | FROM input_database
> > > >> > >       | WHERE `database` = 'merchant_db'
> > > >> > >       |   AND `table` LIKE 'transaction_tab%'
> > > >> > >       | """.stripMargin)
> > > >> > >
> > > >> > >     bsTableEnv.createTemporaryView("main_db", main_db)
> > > >> > >     bsTableEnv.createTemporaryView("merchant_db", merchant_db)
> > > >> > >
> > > >> > >     val result = bsTableEnv.sqlQuery("""
> > > >> > >        SELECT *
> > > >> > >        FROM (
> > > >> > >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > > >> > > t1.transaction_id,
> > > >> > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > > >> > > t1.status, t1.event_time
> > > >> > >           FROM main_db as t1
> > > >> > >           LEFT JOIN merchant_db as t2
> > > >> > >           ON t1.reference_id = t2.reference_id
> > > >> > >           WHERE t1.event_time >= t2.event_time + INTERVAL '1'
> HOUR
> > > >> > >            AND t1.event_time <= t2.event_time - INTERVAL '1'
> HOUR
> > > >> > >        )
> > > >> > >       """.stripMargin)
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > > >> > > -----
> > > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要
> > > >> watermark
> > > >> > > 来驱动。
> > > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以
> > > >> join上,就输出
> > > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:
> > > >> > >
> > > >> > > > hi macia,
> > > >> > > >
> > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > > >> > > >
> > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
> > > >> > > >
> > > >> > > > > 抱歉,是 >-30 and <+30
> > > >> > > > >
> > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left
> Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> > > >> > > > >
> > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
> > > >> > > > >
> > > >> > > > > > 准确点,2个条件之间没and?2个都是>?
> > > >> > > > > >
> > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
> > > >> > > > > >
> > > >> > > > > > > 不好意思,我上边贴错了
> > > >> > > > > > >
> > > >> > > > > > > SELECT *
> > > >> > > > > > >  FROM A
> > > >> > > > > > >  LEFT OUT JOIN B
> > > >> > > > > > >  ON order_id
> > > >> > > > > > >  Where A.event_time > B.event_time -  30 s
> > > >> > > > > > >      A.event_time > B.event_time + 30 s
> > > >> > > > > > >
> > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time
> > > >> > > > > > >
> > > >> > > > > > > 这样是没有输出的。
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > interval join 左右表在 state 中是缓存多久的?
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一
> 下午8:05写道:
> > > >> > > > > > >
> > > >> > > > > > > > Hi,
> > > >> > > > > > > > 其中 条件是
> > > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and
> > > A.event_time >
> > > >> > > > > > B.event_time
> > > >> > > > > > > > - 30 s ` 吧
> > > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。
> > > >> > > > > > > > [1]
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > Best,
> > > >> > > > > > > > Hailong
> > > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]>
> 写道:
> > > >> > > > > > > > >Hi, 各位大佬
> > > >> > > > > > > > >
> > > >> > > > > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog
> > 打进去了。我的
> > > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是
> > order事件
> > > >> ,表B 是
> > > >> > > > order
> > > >> > > > > > > item
> > > >> > > > > > > > >信息,所以 我用:
> > > >> > > > > > > > >
> > > >> > > > > > > > > SELECT *
> > > >> > > > > > > > > FROM A
> > > >> > > > > > > > > LEFT OUT JOIN B
> > > >> > > > > > > > > ON order_id
> > > >> > > > > > > > > Where A.event_time > B.event_time + 30 s
> > > >> > > > > > > > >     A.event_time > B.event_time - 30 s
> > > >> > > > > > > > >
> > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join
> > > 之后就没有输出数据了,可以确认的是我用
> > > >> > Spark
> > > >> > > > > > > > Structural
> > > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join
> > > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > > >
> > > >> > > > --
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Benchao Li
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >>
> > > >> Best,
> > > >> Benchao Li
> > > >>
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

nobleyd
这个问题很早前我提过,没人在意,或者说大家没觉得这是个问题。但实际上如果和DataStream
API去对比的话,FlinkSQL的这种表现肯定是有问题的。
换种说法,FlinkSQL通过更改ts方式实现了UTC+8时区下的分窗的合理性,但其“实现方式”本身就是“代价”,即使用了不合理的ts,ui上当然就展示不合理的ts。

这本来是应该在window分窗处基于offset实现的功能。

赵一旦 <[hidden email]> 于2020年12月15日周二 上午11:22写道:

> 这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。
>
> 之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time
> attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。
>
> 但是呢,目前不这么做好像也还不行。因为分窗必须基于time
> attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。
>
>
> ————————
> 比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00
> UTC+8
> 这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。
>
> ————————
> 按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。
> 要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。
>
>
>
> macia kk <[hidden email]> 于2020年12月11日周五 下午3:04写道:
>
>> 你用的是哪个版本的Flink呢?
>> -----
>> 1.11.2
>>
>> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
>> 所以你的binlog是怎么读进来的呢?自定义的format?
>> -----
>> ts 就是时间戳
>>
>>     bsTableEnv.executeSql("""
>>       CREATE TABLE input_database (
>>         `table` STRING,
>>         `database` STRING,
>>         `data` ROW(
>>           reference_id STRING,
>>           transaction_sn STRING,
>>           transaction_type BIGINT,
>>           merchant_id BIGINT,
>>           transaction_id BIGINT,
>>           status BIGINT
>>          ),
>>         ts BIGINT,
>>         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
>>         WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
>>      ) WITH (
>>        'connector.type' = 'kafka',
>>        'connector.version' = '0.11',
>>        'connector.topic' = 'mytopic',
>>        'connector.properties.bootstrap.servers' = 'xxxx',
>>        'format.type' = 'json'
>>      )
>>     )
>>
>>
>>
>> ```
>>
>>
>>
>> Benchao Li <[hidden email]> 于2020年12月10日周四 下午6:14写道:
>>
>> > 你用的是哪个版本的Flink呢?
>> >
>> > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
>> > 所以你的binlog是怎么读进来的呢?自定义的format?
>> >
>> > macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道:
>> >
>> > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS
>> event_time
>> > -
>> > > INTERVAL 'x' HOUR
>> > >
>> > >  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
>> > >
>> > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
>> > > 能够反推出来数据的 currentMaxTimestamp
>> > >
>> > > currentMaxTimestamp = watermark + maxOutOfOrderness
>> > >
>> > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
>> > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
>> > >
>> > >
>> > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
>> > >
>> > >
>> >
>> {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":"
>> > > *2020-12-10T01:02:24Z*"}
>> > >
>> > > UI 上显示的 watermark 是 1607555031000(Your time zone:
>> 2020年12月10日星期四早上7点02分
>> > > GMT+08:00)
>> > >
>> > > 这个 watermark 是未来的时间 😂
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道:
>> > >
>> > > > 感谢 一旦 和 Benchao
>> > > >
>> > > >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join
>> 上的数据,但是我
>> > > Job
>> > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
>> > > >
>> > > >     val result = bsTableEnv.sqlQuery("""
>> > > >        SELECT *
>> > > >        FROM (
>> > > >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
>> > > t1.transaction_id,
>> > > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
>> > > t1.status, t1.event_time
>> > > >           FROM main_db as t1
>> > > >           LEFT JOIN main_db as t2
>> > > >           ON t1.reference_id = t2.reference_id
>> > > >           WHERE t1.event_time >= t2.event_time + INTERVAL '5'
>> MINUTES
>> > > >            AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
>> > > >        )
>> > > >       """.stripMargin)
>> > > >
>> > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
>> > > >
>> > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
>> > > > subtask的watermark。
>> > > >     -------------------------------------------------------
>> > > >     这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
>> > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
>> > > >
>> > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为
>> > event
>> > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
>> > > >
>> > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
>> > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
>> > > >
>> > > >
>> > > > Thanks and best regards
>> > > >
>> > > >
>> > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道:
>> > > >
>> > > >> Hi macia,
>> > > >>
>> > > >> 一旦回答的基本比较完整了。
>> > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
>> > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
>> > > >>
>> > > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source
>> > > subtask见到的最大的watermark
>> > > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay
>> > > 10个小时,这个已经会导致
>> > > >> 你的没有join到的数据下发会延迟很多了。
>> > > >>
>> > > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
>> > > >>
>> > > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道:
>> > > >>
>> > > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
>> > > >> >
>> > > >> >
>> > > >> >
>> > > >>
>> > >
>> >
>> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
>> > > >> > join。
>> > > >> >
>> > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream
>> > > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
>> > > >> >
>> > > >> >
>> > > >>
>> > >
>> >
>> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
>> > > >> >
>> > > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道:
>> > > >> >
>> > > >> > > @Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
>> > > >> > > FLink,可能我的Case 太特殊了.
>> > > >> > >
>> > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
>> > > >> Binlog,我需要
>> > > >> > > filter 出来 main_db__tansaction_tab,
>> merchant_db__transaction_tab,
>> > 两个
>> > > DB
>> > > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
>> > > >> > >
>> > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
>> > > >> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
>> > > >> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create
>> time进来,可能会影响
>> > > >> > watermark
>> > > >> > > forward on.
>> > > >> > >
>> > > >> > >     bsTableEnv.executeSql("""
>> > > >> > >       CREATE TABLE input_database (
>> > > >> > >         `table` STRING,
>> > > >> > >         `database` STRING,
>> > > >> > >         `data` ROW(
>> > > >> > >           reference_id STRING,
>> > > >> > >           transaction_sn STRING,
>> > > >> > >           transaction_type BIGINT,
>> > > >> > >           merchant_id BIGINT,
>> > > >> > >           transaction_id BIGINT,
>> > > >> > >           status BIGINT
>> > > >> > >          ),
>> > > >> > >         ts BIGINT,
>> > > >> > >         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
>> > > >> > >         WATERMARK FOR event_time AS event_time - INTERVAL '10'
>> > HOUR
>> > > >> > >      ) WITH (
>> > > >> > >        'connector.type' = 'kafka',
>> > > >> > >        'connector.version' = '0.11',
>> > > >> > >        'connector.topic' = 'mytopic',
>> > > >> > >        'connector.properties.bootstrap.servers' = 'xxxx',
>> > > >> > >        'format.type' = 'json'
>> > > >> > >      )
>> > > >> > >     """)
>> > > >> > >
>> > > >> > >
>> > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
>> > > >> > >
>> > > >> > >     val main_db = bsTableEnv.sqlQuery("""
>> > > >> > >       | SELECT *
>> > > >> > >       | FROM input_database
>> > > >> > >       | WHERE `database` = 'main_db'
>> > > >> > >       |  AND `table` LIKE 'transaction_tab%'
>> > > >> > >       | """.stripMargin)
>> > > >> > >
>> > > >> > >     val merchant_db = bsTableEnv.sqlQuery("""
>> > > >> > >       | SELECT *
>> > > >> > >       | FROM input_database
>> > > >> > >       | WHERE `database` = 'merchant_db'
>> > > >> > >       |   AND `table` LIKE 'transaction_tab%'
>> > > >> > >       | """.stripMargin)
>> > > >> > >
>> > > >> > >     bsTableEnv.createTemporaryView("main_db", main_db)
>> > > >> > >     bsTableEnv.createTemporaryView("merchant_db", merchant_db)
>> > > >> > >
>> > > >> > >     val result = bsTableEnv.sqlQuery("""
>> > > >> > >        SELECT *
>> > > >> > >        FROM (
>> > > >> > >           SELECT t1.`table`, t1.`database`,
>> t1.transaction_type,
>> > > >> > > t1.transaction_id,
>> > > >> > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
>> > > >> > > t1.status, t1.event_time
>> > > >> > >           FROM main_db as t1
>> > > >> > >           LEFT JOIN merchant_db as t2
>> > > >> > >           ON t1.reference_id = t2.reference_id
>> > > >> > >           WHERE t1.event_time >= t2.event_time + INTERVAL '1'
>> HOUR
>> > > >> > >            AND t1.event_time <= t2.event_time - INTERVAL '1'
>> HOUR
>> > > >> > >        )
>> > > >> > >       """.stripMargin)
>> > > >> > >
>> > > >> > >
>> > > >> > >
>> > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>> > > >> > > -----
>> > > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要
>> > > >> watermark
>> > > >> > > 来驱动。
>> > > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以
>> > > >> join上,就输出
>> > > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
>> > > >> > >
>> > > >> > >
>> > > >> > >
>> > > >> > >
>> > > >> > >
>> > > >> > >
>> > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:
>> > > >> > >
>> > > >> > > > hi macia,
>> > > >> > > >
>> > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>> > > >> > > >
>> > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
>> > > >> > > >
>> > > >> > > > > 抱歉,是 >-30 and <+30
>> > > >> > > > >
>> > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left
>> Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
>> > > >> > > > >
>> > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
>> > > >> > > > >
>> > > >> > > > > > 准确点,2个条件之间没and?2个都是>?
>> > > >> > > > > >
>> > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
>> > > >> > > > > >
>> > > >> > > > > > > 不好意思,我上边贴错了
>> > > >> > > > > > >
>> > > >> > > > > > > SELECT *
>> > > >> > > > > > >  FROM A
>> > > >> > > > > > >  LEFT OUT JOIN B
>> > > >> > > > > > >  ON order_id
>> > > >> > > > > > >  Where A.event_time > B.event_time -  30 s
>> > > >> > > > > > >      A.event_time > B.event_time + 30 s
>> > > >> > > > > > >
>> > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time
>> > > >> > > > > > >
>> > > >> > > > > > > 这样是没有输出的。
>> > > >> > > > > > >
>> > > >> > > > > > >
>> > > >> > > > > > >
>> > > >> > > > > > > interval join 左右表在 state 中是缓存多久的?
>> > > >> > > > > > >
>> > > >> > > > > > >
>> > > >> > > > > > >
>> > > >> > > > > > >
>> > > >> > > > > > >
>> > > >> > > > > > >
>> > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一
>> 下午8:05写道:
>> > > >> > > > > > >
>> > > >> > > > > > > > Hi,
>> > > >> > > > > > > > 其中 条件是
>> > > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and
>> > > A.event_time >
>> > > >> > > > > > B.event_time
>> > > >> > > > > > > > - 30 s ` 吧
>> > > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。
>> > > >> > > > > > > > [1]
>> > > >> > > > > > > >
>> > > >> > > > > > >
>> > > >> > > > > >
>> > > >> > > > >
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
>> > > >> > > > > > > >
>> > > >> > > > > > > >
>> > > >> > > > > > > > Best,
>> > > >> > > > > > > > Hailong
>> > > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]>
>> 写道:
>> > > >> > > > > > > > >Hi, 各位大佬
>> > > >> > > > > > > > >
>> > > >> > > > > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog
>> > 打进去了。我的
>> > > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是
>> > order事件
>> > > >> ,表B 是
>> > > >> > > > order
>> > > >> > > > > > > item
>> > > >> > > > > > > > >信息,所以 我用:
>> > > >> > > > > > > > >
>> > > >> > > > > > > > > SELECT *
>> > > >> > > > > > > > > FROM A
>> > > >> > > > > > > > > LEFT OUT JOIN B
>> > > >> > > > > > > > > ON order_id
>> > > >> > > > > > > > > Where A.event_time > B.event_time + 30 s
>> > > >> > > > > > > > >     A.event_time > B.event_time - 30 s
>> > > >> > > > > > > > >
>> > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join
>> > > 之后就没有输出数据了,可以确认的是我用
>> > > >> > Spark
>> > > >> > > > > > > > Structural
>> > > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
>> > > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join
>> > > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题?
>> > > >> > > > > > > >
>> > > >> > > > > > >
>> > > >> > > > > >
>> > > >> > > > >
>> > > >> > > >
>> > > >> > > >
>> > > >> > > > --
>> > > >> > > >
>> > > >> > > > Best,
>> > > >> > > > Benchao Li
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > > >>
>> > > >> --
>> > > >>
>> > > >> Best,
>> > > >> Benchao Li
>> > > >>
>> > > >
>> > >
>> >
>> >
>> > --
>> >
>> > Best,
>> > Benchao Li
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于 stream-stream Interval Join 的问题

nobleyd
补充,实际FROM_UNIXTIME应该返回 TIMESTAMP WITH LOCAL TIME ZONE
这个类型。(然后FlinkSQL可以自己转为TIMESTAMP)。

此外,关于分窗,除了offset这种显示的由用户来解决时区分窗以外。还可以通过支持 TIMESTAMP WITH LOCAL TIME ZONE
类型作为 event time 实现,当然内部当然还是通过offset实现,只是FlinkSQL语法层可以基于支持 TIMESTAMP WITH
LOCAL TIME ZONE 作为eventtime来实现这种效果。

如上是个人观点哈。。。

赵一旦 <[hidden email]> 于2020年12月15日周二 上午11:29写道:

> 这个问题很早前我提过,没人在意,或者说大家没觉得这是个问题。但实际上如果和DataStream
> API去对比的话,FlinkSQL的这种表现肯定是有问题的。
>
> 换种说法,FlinkSQL通过更改ts方式实现了UTC+8时区下的分窗的合理性,但其“实现方式”本身就是“代价”,即使用了不合理的ts,ui上当然就展示不合理的ts。
>
> 这本来是应该在window分窗处基于offset实现的功能。
>
> 赵一旦 <[hidden email]> 于2020年12月15日周二 上午11:22写道:
>
>> 这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。
>>
>> 之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time
>> attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。
>>
>> 但是呢,目前不这么做好像也还不行。因为分窗必须基于time
>> attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。
>>
>>
>> ————————
>> 比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00
>> UTC+8
>> 这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。
>>
>> ————————
>> 按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。
>> 要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。
>>
>>
>>
>> macia kk <[hidden email]> 于2020年12月11日周五 下午3:04写道:
>>
>>> 你用的是哪个版本的Flink呢?
>>> -----
>>> 1.11.2
>>>
>>> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
>>> 所以你的binlog是怎么读进来的呢?自定义的format?
>>> -----
>>> ts 就是时间戳
>>>
>>>     bsTableEnv.executeSql("""
>>>       CREATE TABLE input_database (
>>>         `table` STRING,
>>>         `database` STRING,
>>>         `data` ROW(
>>>           reference_id STRING,
>>>           transaction_sn STRING,
>>>           transaction_type BIGINT,
>>>           merchant_id BIGINT,
>>>           transaction_id BIGINT,
>>>           status BIGINT
>>>          ),
>>>         ts BIGINT,
>>>         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
>>>         WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
>>>      ) WITH (
>>>        'connector.type' = 'kafka',
>>>        'connector.version' = '0.11',
>>>        'connector.topic' = 'mytopic',
>>>        'connector.properties.bootstrap.servers' = 'xxxx',
>>>        'format.type' = 'json'
>>>      )
>>>     )
>>>
>>>
>>>
>>> ```
>>>
>>>
>>>
>>> Benchao Li <[hidden email]> 于2020年12月10日周四 下午6:14写道:
>>>
>>> > 你用的是哪个版本的Flink呢?
>>> >
>>> > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
>>> > 所以你的binlog是怎么读进来的呢?自定义的format?
>>> >
>>> > macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道:
>>> >
>>> > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS
>>> event_time
>>> > -
>>> > > INTERVAL 'x' HOUR
>>> > >
>>> > >  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
>>> > >
>>> > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
>>> > > 能够反推出来数据的 currentMaxTimestamp
>>> > >
>>> > > currentMaxTimestamp = watermark + maxOutOfOrderness
>>> > >
>>> > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
>>> > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
>>> > >
>>> > >
>>> > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
>>> > >
>>> > >
>>> >
>>> {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":"
>>> > > *2020-12-10T01:02:24Z*"}
>>> > >
>>> > > UI 上显示的 watermark 是 1607555031000(Your time zone:
>>> 2020年12月10日星期四早上7点02分
>>> > > GMT+08:00)
>>> > >
>>> > > 这个 watermark 是未来的时间 😂
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道:
>>> > >
>>> > > > 感谢 一旦 和 Benchao
>>> > > >
>>> > > >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join
>>> 上的数据,但是我
>>> > > Job
>>> > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
>>> > > >
>>> > > >     val result = bsTableEnv.sqlQuery("""
>>> > > >        SELECT *
>>> > > >        FROM (
>>> > > >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
>>> > > t1.transaction_id,
>>> > > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
>>> > > t1.status, t1.event_time
>>> > > >           FROM main_db as t1
>>> > > >           LEFT JOIN main_db as t2
>>> > > >           ON t1.reference_id = t2.reference_id
>>> > > >           WHERE t1.event_time >= t2.event_time + INTERVAL '5'
>>> MINUTES
>>> > > >            AND t1.event_time <= t2.event_time - INTERVAL '5'
>>> MINUTES
>>> > > >        )
>>> > > >       """.stripMargin)
>>> > > >
>>> > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
>>> > > >
>>> > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
>>> > > > subtask的watermark。
>>> > > >     -------------------------------------------------------
>>> > > >     这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的
>>> watermark
>>> > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
>>> > > >
>>> > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为
>>> > event
>>> > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
>>> > > >
>>> > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
>>> > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
>>> > > >
>>> > > >
>>> > > > Thanks and best regards
>>> > > >
>>> > > >
>>> > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道:
>>> > > >
>>> > > >> Hi macia,
>>> > > >>
>>> > > >> 一旦回答的基本比较完整了。
>>> > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
>>> > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
>>> > > >>
>>> > > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source
>>> > > subtask见到的最大的watermark
>>> > > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay
>>> > > 10个小时,这个已经会导致
>>> > > >> 你的没有join到的数据下发会延迟很多了。
>>> > > >>
>>> > > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
>>> > > >>
>>> > > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道:
>>> > > >>
>>> > > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
>>> > > >> >
>>> > > >> >
>>> > > >> >
>>> > > >>
>>> > >
>>> >
>>> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
>>> > > >> > join。
>>> > > >> >
>>> > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream
>>> > > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
>>> > > >> >
>>> > > >> >
>>> > > >>
>>> > >
>>> >
>>> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
>>> > > >> >
>>> > > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道:
>>> > > >> >
>>> > > >> > > @Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
>>> > > >> > > FLink,可能我的Case 太特殊了.
>>> > > >> > >
>>> > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
>>> > > >> Binlog,我需要
>>> > > >> > > filter 出来 main_db__tansaction_tab,
>>> merchant_db__transaction_tab,
>>> > 两个
>>> > > DB
>>> > > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
>>> > > >> > >
>>> > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
>>> > > >> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
>>> > > >> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create
>>> time进来,可能会影响
>>> > > >> > watermark
>>> > > >> > > forward on.
>>> > > >> > >
>>> > > >> > >     bsTableEnv.executeSql("""
>>> > > >> > >       CREATE TABLE input_database (
>>> > > >> > >         `table` STRING,
>>> > > >> > >         `database` STRING,
>>> > > >> > >         `data` ROW(
>>> > > >> > >           reference_id STRING,
>>> > > >> > >           transaction_sn STRING,
>>> > > >> > >           transaction_type BIGINT,
>>> > > >> > >           merchant_id BIGINT,
>>> > > >> > >           transaction_id BIGINT,
>>> > > >> > >           status BIGINT
>>> > > >> > >          ),
>>> > > >> > >         ts BIGINT,
>>> > > >> > >         event_time AS
>>> TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
>>> > > >> > >         WATERMARK FOR event_time AS event_time - INTERVAL '10'
>>> > HOUR
>>> > > >> > >      ) WITH (
>>> > > >> > >        'connector.type' = 'kafka',
>>> > > >> > >        'connector.version' = '0.11',
>>> > > >> > >        'connector.topic' = 'mytopic',
>>> > > >> > >        'connector.properties.bootstrap.servers' = 'xxxx',
>>> > > >> > >        'format.type' = 'json'
>>> > > >> > >      )
>>> > > >> > >     """)
>>> > > >> > >
>>> > > >> > >
>>> > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
>>> > > >> > >
>>> > > >> > >     val main_db = bsTableEnv.sqlQuery("""
>>> > > >> > >       | SELECT *
>>> > > >> > >       | FROM input_database
>>> > > >> > >       | WHERE `database` = 'main_db'
>>> > > >> > >       |  AND `table` LIKE 'transaction_tab%'
>>> > > >> > >       | """.stripMargin)
>>> > > >> > >
>>> > > >> > >     val merchant_db = bsTableEnv.sqlQuery("""
>>> > > >> > >       | SELECT *
>>> > > >> > >       | FROM input_database
>>> > > >> > >       | WHERE `database` = 'merchant_db'
>>> > > >> > >       |   AND `table` LIKE 'transaction_tab%'
>>> > > >> > >       | """.stripMargin)
>>> > > >> > >
>>> > > >> > >     bsTableEnv.createTemporaryView("main_db", main_db)
>>> > > >> > >     bsTableEnv.createTemporaryView("merchant_db", merchant_db)
>>> > > >> > >
>>> > > >> > >     val result = bsTableEnv.sqlQuery("""
>>> > > >> > >        SELECT *
>>> > > >> > >        FROM (
>>> > > >> > >           SELECT t1.`table`, t1.`database`,
>>> t1.transaction_type,
>>> > > >> > > t1.transaction_id,
>>> > > >> > >             t1.reference_id, t1.transaction_sn,
>>> t1.merchant_id,
>>> > > >> > > t1.status, t1.event_time
>>> > > >> > >           FROM main_db as t1
>>> > > >> > >           LEFT JOIN merchant_db as t2
>>> > > >> > >           ON t1.reference_id = t2.reference_id
>>> > > >> > >           WHERE t1.event_time >= t2.event_time + INTERVAL '1'
>>> HOUR
>>> > > >> > >            AND t1.event_time <= t2.event_time - INTERVAL '1'
>>> HOUR
>>> > > >> > >        )
>>> > > >> > >       """.stripMargin)
>>> > > >> > >
>>> > > >> > >
>>> > > >> > >
>>> > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>>> > > >> > > -----
>>> > > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要
>>> > > >> watermark
>>> > > >> > > 来驱动。
>>> > > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以
>>> > > >> join上,就输出
>>> > > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
>>> > > >> > >
>>> > > >> > >
>>> > > >> > >
>>> > > >> > >
>>> > > >> > >
>>> > > >> > >
>>> > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:
>>> > > >> > >
>>> > > >> > > > hi macia,
>>> > > >> > > >
>>> > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>>> > > >> > > >
>>> > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
>>> > > >> > > >
>>> > > >> > > > > 抱歉,是 >-30 and <+30
>>> > > >> > > > >
>>> > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left
>>> Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
>>> > > >> > > > >
>>> > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
>>> > > >> > > > >
>>> > > >> > > > > > 准确点,2个条件之间没and?2个都是>?
>>> > > >> > > > > >
>>> > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
>>> > > >> > > > > >
>>> > > >> > > > > > > 不好意思,我上边贴错了
>>> > > >> > > > > > >
>>> > > >> > > > > > > SELECT *
>>> > > >> > > > > > >  FROM A
>>> > > >> > > > > > >  LEFT OUT JOIN B
>>> > > >> > > > > > >  ON order_id
>>> > > >> > > > > > >  Where A.event_time > B.event_time -  30 s
>>> > > >> > > > > > >      A.event_time > B.event_time + 30 s
>>> > > >> > > > > > >
>>> > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time
>>> > > >> > > > > > >
>>> > > >> > > > > > > 这样是没有输出的。
>>> > > >> > > > > > >
>>> > > >> > > > > > >
>>> > > >> > > > > > >
>>> > > >> > > > > > > interval join 左右表在 state 中是缓存多久的?
>>> > > >> > > > > > >
>>> > > >> > > > > > >
>>> > > >> > > > > > >
>>> > > >> > > > > > >
>>> > > >> > > > > > >
>>> > > >> > > > > > >
>>> > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一
>>> 下午8:05写道:
>>> > > >> > > > > > >
>>> > > >> > > > > > > > Hi,
>>> > > >> > > > > > > > 其中 条件是
>>> > > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and
>>> > > A.event_time >
>>> > > >> > > > > > B.event_time
>>> > > >> > > > > > > > - 30 s ` 吧
>>> > > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。
>>> > > >> > > > > > > > [1]
>>> > > >> > > > > > > >
>>> > > >> > > > > > >
>>> > > >> > > > > >
>>> > > >> > > > >
>>> > > >> > > >
>>> > > >> > >
>>> > > >> >
>>> > > >>
>>> > >
>>> >
>>> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
>>> > > >> > > > > > > >
>>> > > >> > > > > > > >
>>> > > >> > > > > > > > Best,
>>> > > >> > > > > > > > Hailong
>>> > > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]>
>>> 写道:
>>> > > >> > > > > > > > >Hi, 各位大佬
>>> > > >> > > > > > > > >
>>> > > >> > > > > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog
>>> > 打进去了。我的
>>> > > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是
>>> > order事件
>>> > > >> ,表B 是
>>> > > >> > > > order
>>> > > >> > > > > > > item
>>> > > >> > > > > > > > >信息,所以 我用:
>>> > > >> > > > > > > > >
>>> > > >> > > > > > > > > SELECT *
>>> > > >> > > > > > > > > FROM A
>>> > > >> > > > > > > > > LEFT OUT JOIN B
>>> > > >> > > > > > > > > ON order_id
>>> > > >> > > > > > > > > Where A.event_time > B.event_time + 30 s
>>> > > >> > > > > > > > >     A.event_time > B.event_time - 30 s
>>> > > >> > > > > > > > >
>>> > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join
>>> > > 之后就没有输出数据了,可以确认的是我用
>>> > > >> > Spark
>>> > > >> > > > > > > > Structural
>>> > > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
>>> > > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join
>>> > > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题?
>>> > > >> > > > > > > >
>>> > > >> > > > > > >
>>> > > >> > > > > >
>>> > > >> > > > >
>>> > > >> > > >
>>> > > >> > > >
>>> > > >> > > > --
>>> > > >> > > >
>>> > > >> > > > Best,
>>> > > >> > > > Benchao Li
>>> > > >> > > >
>>> > > >> > >
>>> > > >> >
>>> > > >>
>>> > > >>
>>> > > >> --
>>> > > >>
>>> > > >> Best,
>>> > > >> Benchao Li
>>> > > >>
>>> > > >
>>> > >
>>> >
>>> >
>>> > --
>>> >
>>> > Best,
>>> > Benchao Li
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: 关于 stream-stream Interval Join 的问题

hailongwang
Hi,


可以关注:
https://issues.apache.org/jira/browse/FLINK-20162
https://issues.apache.org/jira/browse/FLINK-20387


Best,
Hailong
在 2020-12-15 10:40:19,"赵一旦" <[hidden email]> 写道:

>补充,实际FROM_UNIXTIME应该返回 TIMESTAMP WITH LOCAL TIME ZONE
>这个类型。(然后FlinkSQL可以自己转为TIMESTAMP)。
>
>此外,关于分窗,除了offset这种显示的由用户来解决时区分窗以外。还可以通过支持 TIMESTAMP WITH LOCAL TIME ZONE
>类型作为 event time 实现,当然内部当然还是通过offset实现,只是FlinkSQL语法层可以基于支持 TIMESTAMP WITH
>LOCAL TIME ZONE 作为eventtime来实现这种效果。
>
>如上是个人观点哈。。。
>
>赵一旦 <[hidden email]> 于2020年12月15日周二 上午11:29写道:
>
>> 这个问题很早前我提过,没人在意,或者说大家没觉得这是个问题。但实际上如果和DataStream
>> API去对比的话,FlinkSQL的这种表现肯定是有问题的。
>>
>> 换种说法,FlinkSQL通过更改ts方式实现了UTC+8时区下的分窗的合理性,但其“实现方式”本身就是“代价”,即使用了不合理的ts,ui上当然就展示不合理的ts。
>>
>> 这本来是应该在window分窗处基于offset实现的功能。
>>
>> 赵一旦 <[hidden email]> 于2020年12月15日周二 上午11:22写道:
>>
>>> 这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。
>>>
>>> 之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time
>>> attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。
>>>
>>> 但是呢,目前不这么做好像也还不行。因为分窗必须基于time
>>> attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。
>>>
>>>
>>> ————————
>>> 比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00
>>> UTC+8
>>> 这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。
>>>
>>> ————————
>>> 按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。
>>> 要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。
>>>
>>>
>>>
>>> macia kk <[hidden email]> 于2020年12月11日周五 下午3:04写道:
>>>
>>>> 你用的是哪个版本的Flink呢?
>>>> -----
>>>> 1.11.2
>>>>
>>>> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
>>>> 所以你的binlog是怎么读进来的呢?自定义的format?
>>>> -----
>>>> ts 就是时间戳
>>>>
>>>>     bsTableEnv.executeSql("""
>>>>       CREATE TABLE input_database (
>>>>         `table` STRING,
>>>>         `database` STRING,
>>>>         `data` ROW(
>>>>           reference_id STRING,
>>>>           transaction_sn STRING,
>>>>           transaction_type BIGINT,
>>>>           merchant_id BIGINT,
>>>>           transaction_id BIGINT,
>>>>           status BIGINT
>>>>          ),
>>>>         ts BIGINT,
>>>>         event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
>>>>         WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
>>>>      ) WITH (
>>>>        'connector.type' = 'kafka',
>>>>        'connector.version' = '0.11',
>>>>        'connector.topic' = 'mytopic',
>>>>        'connector.properties.bootstrap.servers' = 'xxxx',
>>>>        'format.type' = 'json'
>>>>      )
>>>>     )
>>>>
>>>>
>>>>
>>>> ```
>>>>
>>>>
>>>>
>>>> Benchao Li <[hidden email]> 于2020年12月10日周四 下午6:14写道:
>>>>
>>>> > 你用的是哪个版本的Flink呢?
>>>> >
>>>> > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
>>>> > 所以你的binlog是怎么读进来的呢?自定义的format?
>>>> >
>>>> > macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道:
>>>> >
>>>> > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS
>>>> event_time
>>>> > -
>>>> > > INTERVAL 'x' HOUR
>>>> > >
>>>> > >  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
>>>> > >
>>>> > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
>>>> > > 能够反推出来数据的 currentMaxTimestamp
>>>> > >
>>>> > > currentMaxTimestamp = watermark + maxOutOfOrderness
>>>> > >
>>>> > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
>>>> > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
>>>> > >
>>>> > >
>>>> > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
>>>> > >
>>>> > >
>>>> >
>>>> {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":"
>>>> > > *2020-12-10T01:02:24Z*"}
>>>> > >
>>>> > > UI 上显示的 watermark 是 1607555031000(Your time zone:
>>>> 2020年12月10日星期四早上7点02分
>>>> > > GMT+08:00)
>>>> > >
>>>> > > 这个 watermark 是未来的时间
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道:
>>>> > >
>>>> > > > 感谢 一旦 和 Benchao
>>>> > > >
>>>> > > >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join
>>>> 上的数据,但是我
>>>> > > Job
>>>> > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
>>>> > > >
>>>> > > >     val result = bsTableEnv.sqlQuery("""
>>>> > > >        SELECT *
>>>> > > >        FROM (
>>>> > > >           SELECT t1.`table`, t1.`database`, t1.transaction_type,
>>>> > > t1.transaction_id,
>>>> > > >             t1.reference_id, t1.transaction_sn, t1.merchant_id,
>>>> > > t1.status, t1.event_time
>>>> > > >           FROM main_db as t1
>>>> > > >           LEFT JOIN main_db as t2
>>>> > > >           ON t1.reference_id = t2.reference_id
>>>> > > >           WHERE t1.event_time >= t2.event_time + INTERVAL '5'
>>>> MINUTES
>>>> > > >            AND t1.event_time <= t2.event_time - INTERVAL '5'
>>>> MINUTES
>>>> > > >        )
>>>> > > >       """.stripMargin)
>>>> > > >
>>>> > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
>>>> > > >
>>>> > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
>>>> > > > subtask的watermark。
>>>> > > >     -------------------------------------------------------
>>>> > > >     这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的
>>>> watermark
>>>> > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
>>>> > > >
>>>> > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为
>>>> > event
>>>> > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
>>>> > > >
>>>> > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
>>>> > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
>>>> > > >
>>>> > > >
>>>> > > > Thanks and best regards
>>>> > > >
>>>> > > >
>>>> > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道:
>>>> > > >
>>>> > > >> Hi macia,
>>>> > > >>
>>>> > > >> 一旦回答的基本比较完整了。
>>>> > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
>>>> > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
>>>> > > >>
>>>> > > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source
>>>> > > subtask见到的最大的watermark
>>>> > > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay
>>>> > > 10个小时,这个已经会导致
>>>> > > >> 你的没有join到的数据下发会延迟很多了。
>>>> > > >>
>>>> > > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
>>>> > > >>
>>>> > > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道:
>>>> > > >>
>>>> > > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
>>>> > > >> >
>>>> > > >> >
>>>> > > >> >
>>>> > > >>
>>>> > >
>>>> >
>>>> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
>>>> > > >> > join。
>>>> > > >> >
>>>> > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream
>>>> > > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
>>>> > > >> >
>>>> > > >> >
>>>> > > >>
>>>> > >
>>>> >
>>>> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
>>>> > > >> >
>>>> > > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道:
>>>> > > >> >
>>>> > > >> > > @Benchao Li <[hidden email]>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
>>>> > > >> > > FLink,可能我的Case 太特殊了.
>>>> > > >> > >
>>>> > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
>>>> > > >> Binlog,我需要
>>>> > > >> > > filter 出来 main_db__tansaction_tab,
>>>> merchant_db__transaction_tab,
>>>> > 两个
>>>> > > DB
>>>> > > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
>>>> > > >> > >
>>>> > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
>>>> > > >> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
>>>> > > >> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create
>>>> time进来,可能会影响
>>>> > > >> > watermark
>>>> > > >> > > forward on.
>>>> > > >> > >
>>>> > > >> > >     bsTableEnv.executeSql("""
>>>> > > >> > >       CREATE TABLE input_database (
>>>> > > >> > >         `table` STRING,
>>>> > > >> > >         `database` STRING,
>>>> > > >> > >         `data` ROW(
>>>> > > >> > >           reference_id STRING,
>>>> > > >> > >           transaction_sn STRING,
>>>> > > >> > >           transaction_type BIGINT,
>>>> > > >> > >           merchant_id BIGINT,
>>>> > > >> > >           transaction_id BIGINT,
>>>> > > >> > >           status BIGINT
>>>> > > >> > >          ),
>>>> > > >> > >         ts BIGINT,
>>>> > > >> > >         event_time AS
>>>> TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
>>>> > > >> > >         WATERMARK FOR event_time AS event_time - INTERVAL '10'
>>>> > HOUR
>>>> > > >> > >      ) WITH (
>>>> > > >> > >        'connector.type' = 'kafka',
>>>> > > >> > >        'connector.version' = '0.11',
>>>> > > >> > >        'connector.topic' = 'mytopic',
>>>> > > >> > >        'connector.properties.bootstrap.servers' = 'xxxx',
>>>> > > >> > >        'format.type' = 'json'
>>>> > > >> > >      )
>>>> > > >> > >     """)
>>>> > > >> > >
>>>> > > >> > >
>>>> > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
>>>> > > >> > >
>>>> > > >> > >     val main_db = bsTableEnv.sqlQuery("""
>>>> > > >> > >       | SELECT *
>>>> > > >> > >       | FROM input_database
>>>> > > >> > >       | WHERE `database` = 'main_db'
>>>> > > >> > >       |  AND `table` LIKE 'transaction_tab%'
>>>> > > >> > >       | """.stripMargin)
>>>> > > >> > >
>>>> > > >> > >     val merchant_db = bsTableEnv.sqlQuery("""
>>>> > > >> > >       | SELECT *
>>>> > > >> > >       | FROM input_database
>>>> > > >> > >       | WHERE `database` = 'merchant_db'
>>>> > > >> > >       |   AND `table` LIKE 'transaction_tab%'
>>>> > > >> > >       | """.stripMargin)
>>>> > > >> > >
>>>> > > >> > >     bsTableEnv.createTemporaryView("main_db", main_db)
>>>> > > >> > >     bsTableEnv.createTemporaryView("merchant_db", merchant_db)
>>>> > > >> > >
>>>> > > >> > >     val result = bsTableEnv.sqlQuery("""
>>>> > > >> > >        SELECT *
>>>> > > >> > >        FROM (
>>>> > > >> > >           SELECT t1.`table`, t1.`database`,
>>>> t1.transaction_type,
>>>> > > >> > > t1.transaction_id,
>>>> > > >> > >             t1.reference_id, t1.transaction_sn,
>>>> t1.merchant_id,
>>>> > > >> > > t1.status, t1.event_time
>>>> > > >> > >           FROM main_db as t1
>>>> > > >> > >           LEFT JOIN merchant_db as t2
>>>> > > >> > >           ON t1.reference_id = t2.reference_id
>>>> > > >> > >           WHERE t1.event_time >= t2.event_time + INTERVAL '1'
>>>> HOUR
>>>> > > >> > >            AND t1.event_time <= t2.event_time - INTERVAL '1'
>>>> HOUR
>>>> > > >> > >        )
>>>> > > >> > >       """.stripMargin)
>>>> > > >> > >
>>>> > > >> > >
>>>> > > >> > >
>>>> > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>>>> > > >> > > -----
>>>> > > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要
>>>> > > >> watermark
>>>> > > >> > > 来驱动。
>>>> > > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以
>>>> > > >> join上,就输出
>>>> > > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
>>>> > > >> > >
>>>> > > >> > >
>>>> > > >> > >
>>>> > > >> > >
>>>> > > >> > >
>>>> > > >> > >
>>>> > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道:
>>>> > > >> > >
>>>> > > >> > > > hi macia,
>>>> > > >> > > >
>>>> > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>>>> > > >> > > >
>>>> > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道:
>>>> > > >> > > >
>>>> > > >> > > > > 抱歉,是 >-30 and <+30
>>>> > > >> > > > >
>>>> > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left
>>>> Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
>>>> > > >> > > > >
>>>> > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道:
>>>> > > >> > > > >
>>>> > > >> > > > > > 准确点,2个条件之间没and?2个都是>?
>>>> > > >> > > > > >
>>>> > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道:
>>>> > > >> > > > > >
>>>> > > >> > > > > > > 不好意思,我上边贴错了
>>>> > > >> > > > > > >
>>>> > > >> > > > > > > SELECT *
>>>> > > >> > > > > > >  FROM A
>>>> > > >> > > > > > >  LEFT OUT JOIN B
>>>> > > >> > > > > > >  ON order_id
>>>> > > >> > > > > > >  Where A.event_time > B.event_time -  30 s
>>>> > > >> > > > > > >      A.event_time > B.event_time + 30 s
>>>> > > >> > > > > > >
>>>> > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time
>>>> > > >> > > > > > >
>>>> > > >> > > > > > > 这样是没有输出的。
>>>> > > >> > > > > > >
>>>> > > >> > > > > > >
>>>> > > >> > > > > > >
>>>> > > >> > > > > > > interval join 左右表在 state 中是缓存多久的?
>>>> > > >> > > > > > >
>>>> > > >> > > > > > >
>>>> > > >> > > > > > >
>>>> > > >> > > > > > >
>>>> > > >> > > > > > >
>>>> > > >> > > > > > >
>>>> > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一
>>>> 下午8:05写道:
>>>> > > >> > > > > > >
>>>> > > >> > > > > > > > Hi,
>>>> > > >> > > > > > > > 其中 条件是
>>>> > > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and
>>>> > > A.event_time >
>>>> > > >> > > > > > B.event_time
>>>> > > >> > > > > > > > - 30 s ` 吧
>>>> > > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。
>>>> > > >> > > > > > > > [1]
>>>> > > >> > > > > > > >
>>>> > > >> > > > > > >
>>>> > > >> > > > > >
>>>> > > >> > > > >
>>>> > > >> > > >
>>>> > > >> > >
>>>> > > >> >
>>>> > > >>
>>>> > >
>>>> >
>>>> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
>>>> > > >> > > > > > > >
>>>> > > >> > > > > > > >
>>>> > > >> > > > > > > > Best,
>>>> > > >> > > > > > > > Hailong
>>>> > > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]>
>>>> 写道:
>>>> > > >> > > > > > > > >Hi, 各位大佬
>>>> > > >> > > > > > > > >
>>>> > > >> > > > > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog
>>>> > 打进去了。我的
>>>> > > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是
>>>> > order事件
>>>> > > >> ,表B 是
>>>> > > >> > > > order
>>>> > > >> > > > > > > item
>>>> > > >> > > > > > > > >信息,所以 我用:
>>>> > > >> > > > > > > > >
>>>> > > >> > > > > > > > > SELECT *
>>>> > > >> > > > > > > > > FROM A
>>>> > > >> > > > > > > > > LEFT OUT JOIN B
>>>> > > >> > > > > > > > > ON order_id
>>>> > > >> > > > > > > > > Where A.event_time > B.event_time + 30 s
>>>> > > >> > > > > > > > >     A.event_time > B.event_time - 30 s
>>>> > > >> > > > > > > > >
>>>> > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join
>>>> > > 之后就没有输出数据了,可以确认的是我用
>>>> > > >> > Spark
>>>> > > >> > > > > > > > Structural
>>>> > > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
>>>> > > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join
>>>> > > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题?
>>>> > > >> > > > > > > >
>>>> > > >> > > > > > >
>>>> > > >> > > > > >
>>>> > > >> > > > >
>>>> > > >> > > >
>>>> > > >> > > >
>>>> > > >> > > > --
>>>> > > >> > > >
>>>> > > >> > > > Best,
>>>> > > >> > > > Benchao Li
>>>> > > >> > > >
>>>> > > >> > >
>>>> > > >> >
>>>> > > >>
>>>> > > >>
>>>> > > >> --
>>>> > > >>
>>>> > > >> Best,
>>>> > > >> Benchao Li
>>>> > > >>
>>>> > > >
>>>> > >
>>>> >
>>>> >
>>>> > --
>>>> >
>>>> > Best,
>>>> > Benchao Li
>>>> >
>>>>
>>>