Hi, 各位大佬
我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order item 信息,所以 我用: SELECT * FROM A LEFT OUT JOIN B ON order_id Where A.event_time > B.event_time + 30 s A.event_time > B.event_time - 30 s 我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark Structural Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, 所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题? |
Hi,
其中 条件是 `Where A.event_time < B.event_time + 30 s and A.event_time > B.event_time - 30 s ` 吧 可以参考以下例子[1],看下有木有写错。 [1] https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 Best, Hailong 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: >Hi, 各位大佬 > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order item >信息,所以 我用: > > SELECT * > FROM A > LEFT OUT JOIN B > ON order_id > Where A.event_time > B.event_time + 30 s > A.event_time > B.event_time - 30 s > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark Structural >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题? |
不好意思,我上边贴错了
SELECT * FROM A LEFT OUT JOIN B ON order_id Where A.event_time > B.event_time - 30 s A.event_time > B.event_time + 30 s event_time 是 Time Attributes 设置的 event_time 这样是没有输出的。 interval join 左右表在 state 中是缓存多久的? hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: > Hi, > 其中 条件是 > `Where A.event_time < B.event_time + 30 s and A.event_time > B.event_time > - 30 s ` 吧 > 可以参考以下例子[1],看下有木有写错。 > [1] > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > Best, > Hailong > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: > >Hi, 各位大佬 > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order item > >信息,所以 我用: > > > > SELECT * > > FROM A > > LEFT OUT JOIN B > > ON order_id > > Where A.event_time > B.event_time + 30 s > > A.event_time > B.event_time - 30 s > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark > Structural > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题? > |
准确点,2个条件之间没and?2个都是>?
macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: > 不好意思,我上边贴错了 > > SELECT * > FROM A > LEFT OUT JOIN B > ON order_id > Where A.event_time > B.event_time - 30 s > A.event_time > B.event_time + 30 s > > event_time 是 Time Attributes 设置的 event_time > > 这样是没有输出的。 > > > > interval join 左右表在 state 中是缓存多久的? > > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: > > > Hi, > > 其中 条件是 > > `Where A.event_time < B.event_time + 30 s and A.event_time > B.event_time > > - 30 s ` 吧 > > 可以参考以下例子[1],看下有木有写错。 > > [1] > > > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > > > > Best, > > Hailong > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: > > >Hi, 各位大佬 > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order > item > > >信息,所以 我用: > > > > > > SELECT * > > > FROM A > > > LEFT OUT JOIN B > > > ON order_id > > > Where A.event_time > B.event_time + 30 s > > > A.event_time > B.event_time - 30 s > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark > > Structural > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题? > > > |
抱歉,是 >-30 and <+30
贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: > 准确点,2个条件之间没and?2个都是>? > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: > > > 不好意思,我上边贴错了 > > > > SELECT * > > FROM A > > LEFT OUT JOIN B > > ON order_id > > Where A.event_time > B.event_time - 30 s > > A.event_time > B.event_time + 30 s > > > > event_time 是 Time Attributes 设置的 event_time > > > > 这样是没有输出的。 > > > > > > > > interval join 左右表在 state 中是缓存多久的? > > > > > > > > > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: > > > > > Hi, > > > 其中 条件是 > > > `Where A.event_time < B.event_time + 30 s and A.event_time > > B.event_time > > > - 30 s ` 吧 > > > 可以参考以下例子[1],看下有木有写错。 > > > [1] > > > > > > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > > > > > > > Best, > > > Hailong > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: > > > >Hi, 各位大佬 > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order > > item > > > >信息,所以 我用: > > > > > > > > SELECT * > > > > FROM A > > > > LEFT OUT JOIN B > > > > ON order_id > > > > Where A.event_time > B.event_time + 30 s > > > > A.event_time > B.event_time - 30 s > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark > > > Structural > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题? > > > > > > |
hi macia,
事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: > 抱歉,是 >-30 and <+30 > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: > > > 准确点,2个条件之间没and?2个都是>? > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: > > > > > 不好意思,我上边贴错了 > > > > > > SELECT * > > > FROM A > > > LEFT OUT JOIN B > > > ON order_id > > > Where A.event_time > B.event_time - 30 s > > > A.event_time > B.event_time + 30 s > > > > > > event_time 是 Time Attributes 设置的 event_time > > > > > > 这样是没有输出的。 > > > > > > > > > > > > interval join 左右表在 state 中是缓存多久的? > > > > > > > > > > > > > > > > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: > > > > > > > Hi, > > > > 其中 条件是 > > > > `Where A.event_time < B.event_time + 30 s and A.event_time > > > B.event_time > > > > - 30 s ` 吧 > > > > 可以参考以下例子[1],看下有木有写错。 > > > > [1] > > > > > > > > > > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > > > > > > > > > > Best, > > > > Hailong > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: > > > > >Hi, 各位大佬 > > > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order > > > item > > > > >信息,所以 我用: > > > > > > > > > > SELECT * > > > > > FROM A > > > > > LEFT OUT JOIN B > > > > > ON order_id > > > > > Where A.event_time > B.event_time + 30 s > > > > > A.event_time > B.event_time - 30 s > > > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark > > > > Structural > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题? > > > > > > > > > > -- Best, Benchao Li |
@Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
FLink,可能我的Case 太特殊了. 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要 filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. 还要注意的是 even time 是 create_time, 这里问题非常大: 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 watermark forward on. bsTableEnv.executeSql(""" CREATE TABLE input_database ( `table` STRING, `database` STRING, `data` ROW( reference_id STRING, transaction_sn STRING, transaction_type BIGINT, merchant_id BIGINT, transaction_id BIGINT, status BIGINT ), ts BIGINT, event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)), WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'mytopic', 'connector.properties.bootstrap.servers' = 'xxxx', 'format.type' = 'json' ) """) 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 val main_db = bsTableEnv.sqlQuery(""" | SELECT * | FROM input_database | WHERE `database` = 'main_db' | AND `table` LIKE 'transaction_tab%' | """.stripMargin) val merchant_db = bsTableEnv.sqlQuery(""" | SELECT * | FROM input_database | WHERE `database` = 'merchant_db' | AND `table` LIKE 'transaction_tab%' | """.stripMargin) bsTableEnv.createTemporaryView("main_db", main_db) bsTableEnv.createTemporaryView("merchant_db", merchant_db) val result = bsTableEnv.sqlQuery(""" SELECT * FROM ( SELECT t1.`table`, t1.`database`, t1.transaction_type, t1.transaction_id, t1.reference_id, t1.transaction_sn, t1.merchant_id, t1.status, t1.event_time FROM main_db as t1 LEFT JOIN merchant_db as t2 ON t1.reference_id = t2.reference_id WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR ) """.stripMargin) 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? ----- 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark 来驱动。 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出 join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: > hi macia, > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: > > > 抱歉,是 >-30 and <+30 > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: > > > > > 准确点,2个条件之间没and?2个都是>? > > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: > > > > > > > 不好意思,我上边贴错了 > > > > > > > > SELECT * > > > > FROM A > > > > LEFT OUT JOIN B > > > > ON order_id > > > > Where A.event_time > B.event_time - 30 s > > > > A.event_time > B.event_time + 30 s > > > > > > > > event_time 是 Time Attributes 设置的 event_time > > > > > > > > 这样是没有输出的。 > > > > > > > > > > > > > > > > interval join 左右表在 state 中是缓存多久的? > > > > > > > > > > > > > > > > > > > > > > > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: > > > > > > > > > Hi, > > > > > 其中 条件是 > > > > > `Where A.event_time < B.event_time + 30 s and A.event_time > > > > B.event_time > > > > > - 30 s ` 吧 > > > > > 可以参考以下例子[1],看下有木有写错。 > > > > > [1] > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > > > > > > > > > > > > > Best, > > > > > Hailong > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: > > > > > >Hi, 各位大佬 > > > > > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 > order > > > > item > > > > > >信息,所以 我用: > > > > > > > > > > > > SELECT * > > > > > > FROM A > > > > > > LEFT OUT JOIN B > > > > > > ON order_id > > > > > > Where A.event_time > B.event_time + 30 s > > > > > > A.event_time > B.event_time - 30 s > > > > > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark > > > > > Structural > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题? > > > > > > > > > > > > > > > > > -- > > Best, > Benchao Li > |
重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
(1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left join。 (2)此外,还有一个点,这个我也不确认。如果是datastream api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道: > @Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 > FLink,可能我的Case 太特殊了. > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要 > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. > > 还要注意的是 even time 是 create_time, 这里问题非常大: > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 watermark > forward on. > > bsTableEnv.executeSql(""" > CREATE TABLE input_database ( > `table` STRING, > `database` STRING, > `data` ROW( > reference_id STRING, > transaction_sn STRING, > transaction_type BIGINT, > merchant_id BIGINT, > transaction_id BIGINT, > status BIGINT > ), > ts BIGINT, > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)), > WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'mytopic', > 'connector.properties.bootstrap.servers' = 'xxxx', > 'format.type' = 'json' > ) > """) > > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 > > val main_db = bsTableEnv.sqlQuery(""" > | SELECT * > | FROM input_database > | WHERE `database` = 'main_db' > | AND `table` LIKE 'transaction_tab%' > | """.stripMargin) > > val merchant_db = bsTableEnv.sqlQuery(""" > | SELECT * > | FROM input_database > | WHERE `database` = 'merchant_db' > | AND `table` LIKE 'transaction_tab%' > | """.stripMargin) > > bsTableEnv.createTemporaryView("main_db", main_db) > bsTableEnv.createTemporaryView("merchant_db", merchant_db) > > val result = bsTableEnv.sqlQuery(""" > SELECT * > FROM ( > SELECT t1.`table`, t1.`database`, t1.transaction_type, > t1.transaction_id, > t1.reference_id, t1.transaction_sn, t1.merchant_id, > t1.status, t1.event_time > FROM main_db as t1 > LEFT JOIN merchant_db as t2 > ON t1.reference_id = t2.reference_id > WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR > AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR > ) > """.stripMargin) > > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > ----- > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark > 来驱动。 > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出 > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. > > > > > > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: > > > hi macia, > > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: > > > > > 抱歉,是 >-30 and <+30 > > > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 > > > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: > > > > > > > 准确点,2个条件之间没and?2个都是>? > > > > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: > > > > > > > > > 不好意思,我上边贴错了 > > > > > > > > > > SELECT * > > > > > FROM A > > > > > LEFT OUT JOIN B > > > > > ON order_id > > > > > Where A.event_time > B.event_time - 30 s > > > > > A.event_time > B.event_time + 30 s > > > > > > > > > > event_time 是 Time Attributes 设置的 event_time > > > > > > > > > > 这样是没有输出的。 > > > > > > > > > > > > > > > > > > > > interval join 左右表在 state 中是缓存多久的? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: > > > > > > > > > > > Hi, > > > > > > 其中 条件是 > > > > > > `Where A.event_time < B.event_time + 30 s and A.event_time > > > > > B.event_time > > > > > > - 30 s ` 吧 > > > > > > 可以参考以下例子[1],看下有木有写错。 > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > > > > > > > > > > > > > > > > Best, > > > > > > Hailong > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: > > > > > > >Hi, 各位大佬 > > > > > > > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 > > order > > > > > item > > > > > > >信息,所以 我用: > > > > > > > > > > > > > > SELECT * > > > > > > > FROM A > > > > > > > LEFT OUT JOIN B > > > > > > > ON order_id > > > > > > > Where A.event_time > B.event_time + 30 s > > > > > > > A.event_time > B.event_time - 30 s > > > > > > > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark > > > > > > Structural > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题? > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > |
Hi macia,
一旦回答的基本比较完整了。 watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 如果是两侧都有数据,watermark不前进,也都可以正常输出。 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致 你的没有join到的数据下发会延迟很多了。 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道: > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 > > > (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left > join。 > > (2)此外,还有一个点,这个我也不确认。如果是datastream > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 > > input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 > > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道: > > > @Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 > > FLink,可能我的Case 太特殊了. > > > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要 > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. > > > > 还要注意的是 even time 是 create_time, 这里问题非常大: > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 > watermark > > forward on. > > > > bsTableEnv.executeSql(""" > > CREATE TABLE input_database ( > > `table` STRING, > > `database` STRING, > > `data` ROW( > > reference_id STRING, > > transaction_sn STRING, > > transaction_type BIGINT, > > merchant_id BIGINT, > > transaction_id BIGINT, > > status BIGINT > > ), > > ts BIGINT, > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)), > > WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = '0.11', > > 'connector.topic' = 'mytopic', > > 'connector.properties.bootstrap.servers' = 'xxxx', > > 'format.type' = 'json' > > ) > > """) > > > > > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 > > > > val main_db = bsTableEnv.sqlQuery(""" > > | SELECT * > > | FROM input_database > > | WHERE `database` = 'main_db' > > | AND `table` LIKE 'transaction_tab%' > > | """.stripMargin) > > > > val merchant_db = bsTableEnv.sqlQuery(""" > > | SELECT * > > | FROM input_database > > | WHERE `database` = 'merchant_db' > > | AND `table` LIKE 'transaction_tab%' > > | """.stripMargin) > > > > bsTableEnv.createTemporaryView("main_db", main_db) > > bsTableEnv.createTemporaryView("merchant_db", merchant_db) > > > > val result = bsTableEnv.sqlQuery(""" > > SELECT * > > FROM ( > > SELECT t1.`table`, t1.`database`, t1.transaction_type, > > t1.transaction_id, > > t1.reference_id, t1.transaction_sn, t1.merchant_id, > > t1.status, t1.event_time > > FROM main_db as t1 > > LEFT JOIN merchant_db as t2 > > ON t1.reference_id = t2.reference_id > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR > > AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR > > ) > > """.stripMargin) > > > > > > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > ----- > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark > > 来驱动。 > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出 > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. > > > > > > > > > > > > > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: > > > > > hi macia, > > > > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > > > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: > > > > > > > 抱歉,是 >-30 and <+30 > > > > > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 > > > > > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: > > > > > > > > > 准确点,2个条件之间没and?2个都是>? > > > > > > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: > > > > > > > > > > > 不好意思,我上边贴错了 > > > > > > > > > > > > SELECT * > > > > > > FROM A > > > > > > LEFT OUT JOIN B > > > > > > ON order_id > > > > > > Where A.event_time > B.event_time - 30 s > > > > > > A.event_time > B.event_time + 30 s > > > > > > > > > > > > event_time 是 Time Attributes 设置的 event_time > > > > > > > > > > > > 这样是没有输出的。 > > > > > > > > > > > > > > > > > > > > > > > > interval join 左右表在 state 中是缓存多久的? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: > > > > > > > > > > > > > Hi, > > > > > > > 其中 条件是 > > > > > > > `Where A.event_time < B.event_time + 30 s and A.event_time > > > > > > B.event_time > > > > > > > - 30 s ` 吧 > > > > > > > 可以参考以下例子[1],看下有木有写错。 > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Hailong > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: > > > > > > > >Hi, 各位大佬 > > > > > > > > > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 > > > order > > > > > > item > > > > > > > >信息,所以 我用: > > > > > > > > > > > > > > > > SELECT * > > > > > > > > FROM A > > > > > > > > LEFT OUT JOIN B > > > > > > > > ON order_id > > > > > > > > Where A.event_time > B.event_time + 30 s > > > > > > > > A.event_time > B.event_time - 30 s > > > > > > > > > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 > Spark > > > > > > > Structural > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join > 在具体实现的逻辑是什么,我在处理上哪里有问题? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > -- Best, Benchao Li |
感谢 一旦 和 Benchao
1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 Job 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 val result = bsTableEnv.sqlQuery(""" SELECT * FROM ( SELECT t1.`table`, t1.`database`, t1.transaction_type, t1.transaction_id, t1.reference_id, t1.transaction_sn, t1.merchant_id, t1.status, t1.event_time FROM main_db as t1 LEFT JOIN main_db as t2 ON t1.reference_id = t2.reference_id WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES ) """.stripMargin) 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source subtask的watermark。 ------------------------------------------------------- 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog. 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 event time,但是有的表又没有这个字段,会导致解析的时候直接报错. 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为. Thanks and best regards Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道: > Hi macia, > > 一旦回答的基本比较完整了。 > watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 > 如果是两侧都有数据,watermark不前进,也都可以正常输出。 > > 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark > 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致 > 你的没有join到的数据下发会延迟很多了。 > > 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 > > 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道: > > > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 > > > > > > > (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left > > join。 > > > > (2)此外,还有一个点,这个我也不确认。如果是datastream > > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 > > > > > input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 > > > > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道: > > > > > @Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 > > > FLink,可能我的Case 太特殊了. > > > > > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 > Binlog,我需要 > > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB > > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. > > > > > > 还要注意的是 even time 是 create_time, 这里问题非常大: > > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 > > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 > > watermark > > > forward on. > > > > > > bsTableEnv.executeSql(""" > > > CREATE TABLE input_database ( > > > `table` STRING, > > > `database` STRING, > > > `data` ROW( > > > reference_id STRING, > > > transaction_sn STRING, > > > transaction_type BIGINT, > > > merchant_id BIGINT, > > > transaction_id BIGINT, > > > status BIGINT > > > ), > > > ts BIGINT, > > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)), > > > WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR > > > ) WITH ( > > > 'connector.type' = 'kafka', > > > 'connector.version' = '0.11', > > > 'connector.topic' = 'mytopic', > > > 'connector.properties.bootstrap.servers' = 'xxxx', > > > 'format.type' = 'json' > > > ) > > > """) > > > > > > > > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 > > > > > > val main_db = bsTableEnv.sqlQuery(""" > > > | SELECT * > > > | FROM input_database > > > | WHERE `database` = 'main_db' > > > | AND `table` LIKE 'transaction_tab%' > > > | """.stripMargin) > > > > > > val merchant_db = bsTableEnv.sqlQuery(""" > > > | SELECT * > > > | FROM input_database > > > | WHERE `database` = 'merchant_db' > > > | AND `table` LIKE 'transaction_tab%' > > > | """.stripMargin) > > > > > > bsTableEnv.createTemporaryView("main_db", main_db) > > > bsTableEnv.createTemporaryView("merchant_db", merchant_db) > > > > > > val result = bsTableEnv.sqlQuery(""" > > > SELECT * > > > FROM ( > > > SELECT t1.`table`, t1.`database`, t1.transaction_type, > > > t1.transaction_id, > > > t1.reference_id, t1.transaction_sn, t1.merchant_id, > > > t1.status, t1.event_time > > > FROM main_db as t1 > > > LEFT JOIN merchant_db as t2 > > > ON t1.reference_id = t2.reference_id > > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR > > > AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR > > > ) > > > """.stripMargin) > > > > > > > > > > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > > ----- > > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 > watermark > > > 来驱动。 > > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 > join上,就输出 > > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. > > > > > > > > > > > > > > > > > > > > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: > > > > > > > hi macia, > > > > > > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > > > > > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: > > > > > > > > > 抱歉,是 >-30 and <+30 > > > > > > > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 > > > > > > > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: > > > > > > > > > > > 准确点,2个条件之间没and?2个都是>? > > > > > > > > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: > > > > > > > > > > > > > 不好意思,我上边贴错了 > > > > > > > > > > > > > > SELECT * > > > > > > > FROM A > > > > > > > LEFT OUT JOIN B > > > > > > > ON order_id > > > > > > > Where A.event_time > B.event_time - 30 s > > > > > > > A.event_time > B.event_time + 30 s > > > > > > > > > > > > > > event_time 是 Time Attributes 设置的 event_time > > > > > > > > > > > > > > 这样是没有输出的。 > > > > > > > > > > > > > > > > > > > > > > > > > > > > interval join 左右表在 state 中是缓存多久的? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: > > > > > > > > > > > > > > > Hi, > > > > > > > > 其中 条件是 > > > > > > > > `Where A.event_time < B.event_time + 30 s and A.event_time > > > > > > > B.event_time > > > > > > > > - 30 s ` 吧 > > > > > > > > 可以参考以下例子[1],看下有木有写错。 > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > Hailong > > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: > > > > > > > > >Hi, 各位大佬 > > > > > > > > > > > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 > ,表B 是 > > > > order > > > > > > > item > > > > > > > > >信息,所以 我用: > > > > > > > > > > > > > > > > > > SELECT * > > > > > > > > > FROM A > > > > > > > > > LEFT OUT JOIN B > > > > > > > > > ON order_id > > > > > > > > > Where A.event_time > B.event_time + 30 s > > > > > > > > > A.event_time > B.event_time - 30 s > > > > > > > > > > > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 > > Spark > > > > > > > > Structural > > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join > > 在具体实现的逻辑是什么,我在处理上哪里有问题? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > > > -- > > Best, > Benchao Li > |
我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time -
INTERVAL 'x' HOUR 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x, 能够反推出来数据的 currentMaxTimestamp currentMaxTimestamp = watermark + maxOutOfOrderness 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55 {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":" *2020-12-10T01:02:24Z*"} UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分 GMT+08:00) 这个 watermark 是未来的时间 😂 macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道: > 感谢 一旦 和 Benchao > > 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 Job > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 > > val result = bsTableEnv.sqlQuery(""" > SELECT * > FROM ( > SELECT t1.`table`, t1.`database`, t1.transaction_type, t1.transaction_id, > t1.reference_id, t1.transaction_sn, t1.merchant_id, t1.status, t1.event_time > FROM main_db as t1 > LEFT JOIN main_db as t2 > ON t1.reference_id = t2.reference_id > WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES > AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES > ) > """.stripMargin) > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到 > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source > subtask的watermark。 > ------------------------------------------------------- > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog. > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 event > time,但是有的表又没有这个字段,会导致解析的时候直接报错. > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为. > > > Thanks and best regards > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道: > >> Hi macia, >> >> 一旦回答的基本比较完整了。 >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。 >> >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致 >> 你的没有join到的数据下发会延迟很多了。 >> >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 >> >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道: >> >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 >> > >> > >> > >> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left >> > join。 >> > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 >> > >> > >> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 >> > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道: >> > >> > > @Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 >> > > FLink,可能我的Case 太特殊了. >> > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 >> Binlog,我需要 >> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. >> > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大: >> > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 >> > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 >> > watermark >> > > forward on. >> > > >> > > bsTableEnv.executeSql(""" >> > > CREATE TABLE input_database ( >> > > `table` STRING, >> > > `database` STRING, >> > > `data` ROW( >> > > reference_id STRING, >> > > transaction_sn STRING, >> > > transaction_type BIGINT, >> > > merchant_id BIGINT, >> > > transaction_id BIGINT, >> > > status BIGINT >> > > ), >> > > ts BIGINT, >> > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)), >> > > WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR >> > > ) WITH ( >> > > 'connector.type' = 'kafka', >> > > 'connector.version' = '0.11', >> > > 'connector.topic' = 'mytopic', >> > > 'connector.properties.bootstrap.servers' = 'xxxx', >> > > 'format.type' = 'json' >> > > ) >> > > """) >> > > >> > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 >> > > >> > > val main_db = bsTableEnv.sqlQuery(""" >> > > | SELECT * >> > > | FROM input_database >> > > | WHERE `database` = 'main_db' >> > > | AND `table` LIKE 'transaction_tab%' >> > > | """.stripMargin) >> > > >> > > val merchant_db = bsTableEnv.sqlQuery(""" >> > > | SELECT * >> > > | FROM input_database >> > > | WHERE `database` = 'merchant_db' >> > > | AND `table` LIKE 'transaction_tab%' >> > > | """.stripMargin) >> > > >> > > bsTableEnv.createTemporaryView("main_db", main_db) >> > > bsTableEnv.createTemporaryView("merchant_db", merchant_db) >> > > >> > > val result = bsTableEnv.sqlQuery(""" >> > > SELECT * >> > > FROM ( >> > > SELECT t1.`table`, t1.`database`, t1.transaction_type, >> > > t1.transaction_id, >> > > t1.reference_id, t1.transaction_sn, t1.merchant_id, >> > > t1.status, t1.event_time >> > > FROM main_db as t1 >> > > LEFT JOIN merchant_db as t2 >> > > ON t1.reference_id = t2.reference_id >> > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR >> > > AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR >> > > ) >> > > """.stripMargin) >> > > >> > > >> > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? >> > > ----- >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 >> watermark >> > > 来驱动。 >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 >> join上,就输出 >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. >> > > >> > > >> > > >> > > >> > > >> > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: >> > > >> > > > hi macia, >> > > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? >> > > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: >> > > > >> > > > > 抱歉,是 >-30 and <+30 >> > > > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 >> > > > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: >> > > > > >> > > > > > 准确点,2个条件之间没and?2个都是>? >> > > > > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: >> > > > > > >> > > > > > > 不好意思,我上边贴错了 >> > > > > > > >> > > > > > > SELECT * >> > > > > > > FROM A >> > > > > > > LEFT OUT JOIN B >> > > > > > > ON order_id >> > > > > > > Where A.event_time > B.event_time - 30 s >> > > > > > > A.event_time > B.event_time + 30 s >> > > > > > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time >> > > > > > > >> > > > > > > 这样是没有输出的。 >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > interval join 左右表在 state 中是缓存多久的? >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: >> > > > > > > >> > > > > > > > Hi, >> > > > > > > > 其中 条件是 >> > > > > > > > `Where A.event_time < B.event_time + 30 s and A.event_time > >> > > > > > B.event_time >> > > > > > > > - 30 s ` 吧 >> > > > > > > > 可以参考以下例子[1],看下有木有写错。 >> > > > > > > > [1] >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 >> > > > > > > > >> > > > > > > > >> > > > > > > > Best, >> > > > > > > > Hailong >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: >> > > > > > > > >Hi, 各位大佬 >> > > > > > > > > >> > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 >> ,表B 是 >> > > > order >> > > > > > > item >> > > > > > > > >信息,所以 我用: >> > > > > > > > > >> > > > > > > > > SELECT * >> > > > > > > > > FROM A >> > > > > > > > > LEFT OUT JOIN B >> > > > > > > > > ON order_id >> > > > > > > > > Where A.event_time > B.event_time + 30 s >> > > > > > > > > A.event_time > B.event_time - 30 s >> > > > > > > > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 >> > Spark >> > > > > > > > Structural >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join >> > 在具体实现的逻辑是什么,我在处理上哪里有问题? >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > >> > > > -- >> > > > >> > > > Best, >> > > > Benchao Li >> > > > >> > > >> > >> >> >> -- >> >> Best, >> Benchao Li >> > |
你用的是哪个版本的Flink呢?
看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 所以你的binlog是怎么读进来的呢?自定义的format? macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道: > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time - > INTERVAL 'x' HOUR > > 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x, > 能够反推出来数据的 currentMaxTimestamp > > currentMaxTimestamp = watermark + maxOutOfOrderness > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快 > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。 > > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55 > > {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":" > *2020-12-10T01:02:24Z*"} > > UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分 > GMT+08:00) > > 这个 watermark 是未来的时间 😂 > > > > > > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道: > > > 感谢 一旦 和 Benchao > > > > 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 > Job > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 > > > > val result = bsTableEnv.sqlQuery(""" > > SELECT * > > FROM ( > > SELECT t1.`table`, t1.`database`, t1.transaction_type, > t1.transaction_id, > > t1.reference_id, t1.transaction_sn, t1.merchant_id, > t1.status, t1.event_time > > FROM main_db as t1 > > LEFT JOIN main_db as t2 > > ON t1.reference_id = t2.reference_id > > WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES > > AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES > > ) > > """.stripMargin) > > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到 > > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source > > subtask的watermark。 > > ------------------------------------------------------- > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog. > > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 event > > time,但是有的表又没有这个字段,会导致解析的时候直接报错. > > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为. > > > > > > Thanks and best regards > > > > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道: > > > >> Hi macia, > >> > >> 一旦回答的基本比较完整了。 > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。 > >> > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source > subtask见到的最大的watermark > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay > 10个小时,这个已经会导致 > >> 你的没有join到的数据下发会延迟很多了。 > >> > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 > >> > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道: > >> > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 > >> > > >> > > >> > > >> > (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left > >> > join。 > >> > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 > >> > > >> > > >> > input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 > >> > > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道: > >> > > >> > > @Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 > >> > > FLink,可能我的Case 太特殊了. > >> > > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 > >> Binlog,我需要 > >> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 > DB > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. > >> > > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大: > >> > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 > >> > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 > >> > watermark > >> > > forward on. > >> > > > >> > > bsTableEnv.executeSql(""" > >> > > CREATE TABLE input_database ( > >> > > `table` STRING, > >> > > `database` STRING, > >> > > `data` ROW( > >> > > reference_id STRING, > >> > > transaction_sn STRING, > >> > > transaction_type BIGINT, > >> > > merchant_id BIGINT, > >> > > transaction_id BIGINT, > >> > > status BIGINT > >> > > ), > >> > > ts BIGINT, > >> > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)), > >> > > WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR > >> > > ) WITH ( > >> > > 'connector.type' = 'kafka', > >> > > 'connector.version' = '0.11', > >> > > 'connector.topic' = 'mytopic', > >> > > 'connector.properties.bootstrap.servers' = 'xxxx', > >> > > 'format.type' = 'json' > >> > > ) > >> > > """) > >> > > > >> > > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 > >> > > > >> > > val main_db = bsTableEnv.sqlQuery(""" > >> > > | SELECT * > >> > > | FROM input_database > >> > > | WHERE `database` = 'main_db' > >> > > | AND `table` LIKE 'transaction_tab%' > >> > > | """.stripMargin) > >> > > > >> > > val merchant_db = bsTableEnv.sqlQuery(""" > >> > > | SELECT * > >> > > | FROM input_database > >> > > | WHERE `database` = 'merchant_db' > >> > > | AND `table` LIKE 'transaction_tab%' > >> > > | """.stripMargin) > >> > > > >> > > bsTableEnv.createTemporaryView("main_db", main_db) > >> > > bsTableEnv.createTemporaryView("merchant_db", merchant_db) > >> > > > >> > > val result = bsTableEnv.sqlQuery(""" > >> > > SELECT * > >> > > FROM ( > >> > > SELECT t1.`table`, t1.`database`, t1.transaction_type, > >> > > t1.transaction_id, > >> > > t1.reference_id, t1.transaction_sn, t1.merchant_id, > >> > > t1.status, t1.event_time > >> > > FROM main_db as t1 > >> > > LEFT JOIN merchant_db as t2 > >> > > ON t1.reference_id = t2.reference_id > >> > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR > >> > > AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR > >> > > ) > >> > > """.stripMargin) > >> > > > >> > > > >> > > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > >> > > ----- > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 > >> watermark > >> > > 来驱动。 > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 > >> join上,就输出 > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: > >> > > > >> > > > hi macia, > >> > > > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > >> > > > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: > >> > > > > >> > > > > 抱歉,是 >-30 and <+30 > >> > > > > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 > >> > > > > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: > >> > > > > > >> > > > > > 准确点,2个条件之间没and?2个都是>? > >> > > > > > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: > >> > > > > > > >> > > > > > > 不好意思,我上边贴错了 > >> > > > > > > > >> > > > > > > SELECT * > >> > > > > > > FROM A > >> > > > > > > LEFT OUT JOIN B > >> > > > > > > ON order_id > >> > > > > > > Where A.event_time > B.event_time - 30 s > >> > > > > > > A.event_time > B.event_time + 30 s > >> > > > > > > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time > >> > > > > > > > >> > > > > > > 这样是没有输出的。 > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > interval join 左右表在 state 中是缓存多久的? > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: > >> > > > > > > > >> > > > > > > > Hi, > >> > > > > > > > 其中 条件是 > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and > A.event_time > > >> > > > > > B.event_time > >> > > > > > > > - 30 s ` 吧 > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。 > >> > > > > > > > [1] > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > Best, > >> > > > > > > > Hailong > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: > >> > > > > > > > >Hi, 各位大佬 > >> > > > > > > > > > >> > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 > >> ,表B 是 > >> > > > order > >> > > > > > > item > >> > > > > > > > >信息,所以 我用: > >> > > > > > > > > > >> > > > > > > > > SELECT * > >> > > > > > > > > FROM A > >> > > > > > > > > LEFT OUT JOIN B > >> > > > > > > > > ON order_id > >> > > > > > > > > Where A.event_time > B.event_time + 30 s > >> > > > > > > > > A.event_time > B.event_time - 30 s > >> > > > > > > > > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join > 之后就没有输出数据了,可以确认的是我用 > >> > Spark > >> > > > > > > > Structural > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题? > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > -- > >> > > > > >> > > > Best, > >> > > > Benchao Li > >> > > > > >> > > > >> > > >> > >> > >> -- > >> > >> Best, > >> Benchao Li > >> > > > -- Best, Benchao Li |
你用的是哪个版本的Flink呢?
----- 1.11.2 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 所以你的binlog是怎么读进来的呢?自定义的format? ----- ts 就是时间戳 bsTableEnv.executeSql(""" CREATE TABLE input_database ( `table` STRING, `database` STRING, `data` ROW( reference_id STRING, transaction_sn STRING, transaction_type BIGINT, merchant_id BIGINT, transaction_id BIGINT, status BIGINT ), ts BIGINT, event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)), WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'mytopic', 'connector.properties.bootstrap.servers' = 'xxxx', 'format.type' = 'json' ) ) ``` Benchao Li <[hidden email]> 于2020年12月10日周四 下午6:14写道: > 你用的是哪个版本的Flink呢? > > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 > 所以你的binlog是怎么读进来的呢?自定义的format? > > macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道: > > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time > - > > INTERVAL 'x' HOUR > > > > 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness > > > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x, > > 能够反推出来数据的 currentMaxTimestamp > > > > currentMaxTimestamp = watermark + maxOutOfOrderness > > > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快 > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。 > > > > > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55 > > > > > {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":" > > *2020-12-10T01:02:24Z*"} > > > > UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分 > > GMT+08:00) > > > > 这个 watermark 是未来的时间 😂 > > > > > > > > > > > > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道: > > > > > 感谢 一旦 和 Benchao > > > > > > 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 > > Job > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 > > > > > > val result = bsTableEnv.sqlQuery(""" > > > SELECT * > > > FROM ( > > > SELECT t1.`table`, t1.`database`, t1.transaction_type, > > t1.transaction_id, > > > t1.reference_id, t1.transaction_sn, t1.merchant_id, > > t1.status, t1.event_time > > > FROM main_db as t1 > > > LEFT JOIN main_db as t2 > > > ON t1.reference_id = t2.reference_id > > > WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES > > > AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES > > > ) > > > """.stripMargin) > > > > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到 > > > > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source > > > subtask的watermark。 > > > ------------------------------------------------------- > > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog. > > > > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 > event > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错. > > > > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为. > > > > > > > > > Thanks and best regards > > > > > > > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道: > > > > > >> Hi macia, > > >> > > >> 一旦回答的基本比较完整了。 > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。 > > >> > > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source > > subtask见到的最大的watermark > > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay > > 10个小时,这个已经会导致 > > >> 你的没有join到的数据下发会延迟很多了。 > > >> > > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 > > >> > > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道: > > >> > > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 > > >> > > > >> > > > >> > > > >> > > > (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left > > >> > join。 > > >> > > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream > > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 > > >> > > > >> > > > >> > > > input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 > > >> > > > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道: > > >> > > > >> > > @Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 > > >> > > FLink,可能我的Case 太特殊了. > > >> > > > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 > > >> Binlog,我需要 > > >> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, > 两个 > > DB > > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. > > >> > > > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大: > > >> > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 > > >> > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 > > >> > watermark > > >> > > forward on. > > >> > > > > >> > > bsTableEnv.executeSql(""" > > >> > > CREATE TABLE input_database ( > > >> > > `table` STRING, > > >> > > `database` STRING, > > >> > > `data` ROW( > > >> > > reference_id STRING, > > >> > > transaction_sn STRING, > > >> > > transaction_type BIGINT, > > >> > > merchant_id BIGINT, > > >> > > transaction_id BIGINT, > > >> > > status BIGINT > > >> > > ), > > >> > > ts BIGINT, > > >> > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)), > > >> > > WATERMARK FOR event_time AS event_time - INTERVAL '10' > HOUR > > >> > > ) WITH ( > > >> > > 'connector.type' = 'kafka', > > >> > > 'connector.version' = '0.11', > > >> > > 'connector.topic' = 'mytopic', > > >> > > 'connector.properties.bootstrap.servers' = 'xxxx', > > >> > > 'format.type' = 'json' > > >> > > ) > > >> > > """) > > >> > > > > >> > > > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 > > >> > > > > >> > > val main_db = bsTableEnv.sqlQuery(""" > > >> > > | SELECT * > > >> > > | FROM input_database > > >> > > | WHERE `database` = 'main_db' > > >> > > | AND `table` LIKE 'transaction_tab%' > > >> > > | """.stripMargin) > > >> > > > > >> > > val merchant_db = bsTableEnv.sqlQuery(""" > > >> > > | SELECT * > > >> > > | FROM input_database > > >> > > | WHERE `database` = 'merchant_db' > > >> > > | AND `table` LIKE 'transaction_tab%' > > >> > > | """.stripMargin) > > >> > > > > >> > > bsTableEnv.createTemporaryView("main_db", main_db) > > >> > > bsTableEnv.createTemporaryView("merchant_db", merchant_db) > > >> > > > > >> > > val result = bsTableEnv.sqlQuery(""" > > >> > > SELECT * > > >> > > FROM ( > > >> > > SELECT t1.`table`, t1.`database`, t1.transaction_type, > > >> > > t1.transaction_id, > > >> > > t1.reference_id, t1.transaction_sn, t1.merchant_id, > > >> > > t1.status, t1.event_time > > >> > > FROM main_db as t1 > > >> > > LEFT JOIN merchant_db as t2 > > >> > > ON t1.reference_id = t2.reference_id > > >> > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR > > >> > > AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR > > >> > > ) > > >> > > """.stripMargin) > > >> > > > > >> > > > > >> > > > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > >> > > ----- > > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 > > >> watermark > > >> > > 来驱动。 > > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 > > >> join上,就输出 > > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: > > >> > > > > >> > > > hi macia, > > >> > > > > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > >> > > > > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: > > >> > > > > > >> > > > > 抱歉,是 >-30 and <+30 > > >> > > > > > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 > > >> > > > > > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: > > >> > > > > > > >> > > > > > 准确点,2个条件之间没and?2个都是>? > > >> > > > > > > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: > > >> > > > > > > > >> > > > > > > 不好意思,我上边贴错了 > > >> > > > > > > > > >> > > > > > > SELECT * > > >> > > > > > > FROM A > > >> > > > > > > LEFT OUT JOIN B > > >> > > > > > > ON order_id > > >> > > > > > > Where A.event_time > B.event_time - 30 s > > >> > > > > > > A.event_time > B.event_time + 30 s > > >> > > > > > > > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time > > >> > > > > > > > > >> > > > > > > 这样是没有输出的。 > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > interval join 左右表在 state 中是缓存多久的? > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 下午8:05写道: > > >> > > > > > > > > >> > > > > > > > Hi, > > >> > > > > > > > 其中 条件是 > > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and > > A.event_time > > > >> > > > > > B.event_time > > >> > > > > > > > - 30 s ` 吧 > > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。 > > >> > > > > > > > [1] > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > Best, > > >> > > > > > > > Hailong > > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> 写道: > > >> > > > > > > > >Hi, 各位大佬 > > >> > > > > > > > > > > >> > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog > 打进去了。我的 > > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 > order事件 > > >> ,表B 是 > > >> > > > order > > >> > > > > > > item > > >> > > > > > > > >信息,所以 我用: > > >> > > > > > > > > > > >> > > > > > > > > SELECT * > > >> > > > > > > > > FROM A > > >> > > > > > > > > LEFT OUT JOIN B > > >> > > > > > > > > ON order_id > > >> > > > > > > > > Where A.event_time > B.event_time + 30 s > > >> > > > > > > > > A.event_time > B.event_time - 30 s > > >> > > > > > > > > > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join > > 之后就没有输出数据了,可以确认的是我用 > > >> > Spark > > >> > > > > > > > Structural > > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join > > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题? > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > -- > > >> > > > > > >> > > > Best, > > >> > > > Benchao Li > > >> > > > > > >> > > > > >> > > > >> > > >> > > >> -- > > >> > > >> Best, > > >> Benchao Li > > >> > > > > > > > > -- > > Best, > Benchao Li > |
这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。
之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。 但是呢,目前不这么做好像也还不行。因为分窗必须基于time attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。 ———————— 比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00 UTC+8 这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。 ———————— 按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。 要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。 macia kk <[hidden email]> 于2020年12月11日周五 下午3:04写道: > 你用的是哪个版本的Flink呢? > ----- > 1.11.2 > > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 > 所以你的binlog是怎么读进来的呢?自定义的format? > ----- > ts 就是时间戳 > > bsTableEnv.executeSql(""" > CREATE TABLE input_database ( > `table` STRING, > `database` STRING, > `data` ROW( > reference_id STRING, > transaction_sn STRING, > transaction_type BIGINT, > merchant_id BIGINT, > transaction_id BIGINT, > status BIGINT > ), > ts BIGINT, > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)), > WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'mytopic', > 'connector.properties.bootstrap.servers' = 'xxxx', > 'format.type' = 'json' > ) > ) > > > > ``` > > > > Benchao Li <[hidden email]> 于2020年12月10日周四 下午6:14写道: > > > 你用的是哪个版本的Flink呢? > > > > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 > > 所以你的binlog是怎么读进来的呢?自定义的format? > > > > macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道: > > > > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS > event_time > > - > > > INTERVAL 'x' HOUR > > > > > > 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness > > > > > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x, > > > 能够反推出来数据的 currentMaxTimestamp > > > > > > currentMaxTimestamp = watermark + maxOutOfOrderness > > > > > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快 > > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。 > > > > > > > > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55 > > > > > > > > > {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":" > > > *2020-12-10T01:02:24Z*"} > > > > > > UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分 > > > GMT+08:00) > > > > > > 这个 watermark 是未来的时间 😂 > > > > > > > > > > > > > > > > > > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道: > > > > > > > 感谢 一旦 和 Benchao > > > > > > > > 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join > 上的数据,但是我 > > > Job > > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 > > > > > > > > val result = bsTableEnv.sqlQuery(""" > > > > SELECT * > > > > FROM ( > > > > SELECT t1.`table`, t1.`database`, t1.transaction_type, > > > t1.transaction_id, > > > > t1.reference_id, t1.transaction_sn, t1.merchant_id, > > > t1.status, t1.event_time > > > > FROM main_db as t1 > > > > LEFT JOIN main_db as t2 > > > > ON t1.reference_id = t2.reference_id > > > > WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES > > > > AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES > > > > ) > > > > """.stripMargin) > > > > > > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到 > > > > > > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source > > > > subtask的watermark。 > > > > ------------------------------------------------------- > > > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark > > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog. > > > > > > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 > > event > > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错. > > > > > > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark > > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为. > > > > > > > > > > > > Thanks and best regards > > > > > > > > > > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道: > > > > > > > >> Hi macia, > > > >> > > > >> 一旦回答的基本比较完整了。 > > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 > > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。 > > > >> > > > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source > > > subtask见到的最大的watermark > > > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay > > > 10个小时,这个已经会导致 > > > >> 你的没有join到的数据下发会延迟很多了。 > > > >> > > > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 > > > >> > > > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道: > > > >> > > > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 > > > >> > > > > >> > > > > >> > > > > >> > > > > > > (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left > > > >> > join。 > > > >> > > > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream > > > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 > > > >> > > > > >> > > > > >> > > > > > > input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 > > > >> > > > > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道: > > > >> > > > > >> > > @Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 > > > >> > > FLink,可能我的Case 太特殊了. > > > >> > > > > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 > > > >> Binlog,我需要 > > > >> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, > > 两个 > > > DB > > > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. > > > >> > > > > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大: > > > >> > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 > > > >> > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create > time进来,可能会影响 > > > >> > watermark > > > >> > > forward on. > > > >> > > > > > >> > > bsTableEnv.executeSql(""" > > > >> > > CREATE TABLE input_database ( > > > >> > > `table` STRING, > > > >> > > `database` STRING, > > > >> > > `data` ROW( > > > >> > > reference_id STRING, > > > >> > > transaction_sn STRING, > > > >> > > transaction_type BIGINT, > > > >> > > merchant_id BIGINT, > > > >> > > transaction_id BIGINT, > > > >> > > status BIGINT > > > >> > > ), > > > >> > > ts BIGINT, > > > >> > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)), > > > >> > > WATERMARK FOR event_time AS event_time - INTERVAL '10' > > HOUR > > > >> > > ) WITH ( > > > >> > > 'connector.type' = 'kafka', > > > >> > > 'connector.version' = '0.11', > > > >> > > 'connector.topic' = 'mytopic', > > > >> > > 'connector.properties.bootstrap.servers' = 'xxxx', > > > >> > > 'format.type' = 'json' > > > >> > > ) > > > >> > > """) > > > >> > > > > > >> > > > > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 > > > >> > > > > > >> > > val main_db = bsTableEnv.sqlQuery(""" > > > >> > > | SELECT * > > > >> > > | FROM input_database > > > >> > > | WHERE `database` = 'main_db' > > > >> > > | AND `table` LIKE 'transaction_tab%' > > > >> > > | """.stripMargin) > > > >> > > > > > >> > > val merchant_db = bsTableEnv.sqlQuery(""" > > > >> > > | SELECT * > > > >> > > | FROM input_database > > > >> > > | WHERE `database` = 'merchant_db' > > > >> > > | AND `table` LIKE 'transaction_tab%' > > > >> > > | """.stripMargin) > > > >> > > > > > >> > > bsTableEnv.createTemporaryView("main_db", main_db) > > > >> > > bsTableEnv.createTemporaryView("merchant_db", merchant_db) > > > >> > > > > > >> > > val result = bsTableEnv.sqlQuery(""" > > > >> > > SELECT * > > > >> > > FROM ( > > > >> > > SELECT t1.`table`, t1.`database`, t1.transaction_type, > > > >> > > t1.transaction_id, > > > >> > > t1.reference_id, t1.transaction_sn, t1.merchant_id, > > > >> > > t1.status, t1.event_time > > > >> > > FROM main_db as t1 > > > >> > > LEFT JOIN merchant_db as t2 > > > >> > > ON t1.reference_id = t2.reference_id > > > >> > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' > HOUR > > > >> > > AND t1.event_time <= t2.event_time - INTERVAL '1' > HOUR > > > >> > > ) > > > >> > > """.stripMargin) > > > >> > > > > > >> > > > > > >> > > > > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > > >> > > ----- > > > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 > > > >> watermark > > > >> > > 来驱动。 > > > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 > > > >> join上,就输出 > > > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: > > > >> > > > > > >> > > > hi macia, > > > >> > > > > > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > > >> > > > > > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: > > > >> > > > > > > >> > > > > 抱歉,是 >-30 and <+30 > > > >> > > > > > > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left > Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 > > > >> > > > > > > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: > > > >> > > > > > > > >> > > > > > 准确点,2个条件之间没and?2个都是>? > > > >> > > > > > > > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: > > > >> > > > > > > > > >> > > > > > > 不好意思,我上边贴错了 > > > >> > > > > > > > > > >> > > > > > > SELECT * > > > >> > > > > > > FROM A > > > >> > > > > > > LEFT OUT JOIN B > > > >> > > > > > > ON order_id > > > >> > > > > > > Where A.event_time > B.event_time - 30 s > > > >> > > > > > > A.event_time > B.event_time + 30 s > > > >> > > > > > > > > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time > > > >> > > > > > > > > > >> > > > > > > 这样是没有输出的。 > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > interval join 左右表在 state 中是缓存多久的? > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 > 下午8:05写道: > > > >> > > > > > > > > > >> > > > > > > > Hi, > > > >> > > > > > > > 其中 条件是 > > > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and > > > A.event_time > > > > >> > > > > > B.event_time > > > >> > > > > > > > - 30 s ` 吧 > > > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。 > > > >> > > > > > > > [1] > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > Best, > > > >> > > > > > > > Hailong > > > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> > 写道: > > > >> > > > > > > > >Hi, 各位大佬 > > > >> > > > > > > > > > > > >> > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog > > 打进去了。我的 > > > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 > > order事件 > > > >> ,表B 是 > > > >> > > > order > > > >> > > > > > > item > > > >> > > > > > > > >信息,所以 我用: > > > >> > > > > > > > > > > > >> > > > > > > > > SELECT * > > > >> > > > > > > > > FROM A > > > >> > > > > > > > > LEFT OUT JOIN B > > > >> > > > > > > > > ON order_id > > > >> > > > > > > > > Where A.event_time > B.event_time + 30 s > > > >> > > > > > > > > A.event_time > B.event_time - 30 s > > > >> > > > > > > > > > > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join > > > 之后就没有输出数据了,可以确认的是我用 > > > >> > Spark > > > >> > > > > > > > Structural > > > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join > > > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题? > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > -- > > > >> > > > > > > >> > > > Best, > > > >> > > > Benchao Li > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > >> -- > > > >> > > > >> Best, > > > >> Benchao Li > > > >> > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > |
这个问题很早前我提过,没人在意,或者说大家没觉得这是个问题。但实际上如果和DataStream
API去对比的话,FlinkSQL的这种表现肯定是有问题的。 换种说法,FlinkSQL通过更改ts方式实现了UTC+8时区下的分窗的合理性,但其“实现方式”本身就是“代价”,即使用了不合理的ts,ui上当然就展示不合理的ts。 这本来是应该在window分窗处基于offset实现的功能。 赵一旦 <[hidden email]> 于2020年12月15日周二 上午11:22写道: > 这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。 > > 之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time > attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。 > > 但是呢,目前不这么做好像也还不行。因为分窗必须基于time > attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。 > > > ———————— > 比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00 > UTC+8 > 这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。 > > ———————— > 按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。 > 要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。 > > > > macia kk <[hidden email]> 于2020年12月11日周五 下午3:04写道: > >> 你用的是哪个版本的Flink呢? >> ----- >> 1.11.2 >> >> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 >> 所以你的binlog是怎么读进来的呢?自定义的format? >> ----- >> ts 就是时间戳 >> >> bsTableEnv.executeSql(""" >> CREATE TABLE input_database ( >> `table` STRING, >> `database` STRING, >> `data` ROW( >> reference_id STRING, >> transaction_sn STRING, >> transaction_type BIGINT, >> merchant_id BIGINT, >> transaction_id BIGINT, >> status BIGINT >> ), >> ts BIGINT, >> event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)), >> WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR >> ) WITH ( >> 'connector.type' = 'kafka', >> 'connector.version' = '0.11', >> 'connector.topic' = 'mytopic', >> 'connector.properties.bootstrap.servers' = 'xxxx', >> 'format.type' = 'json' >> ) >> ) >> >> >> >> ``` >> >> >> >> Benchao Li <[hidden email]> 于2020年12月10日周四 下午6:14写道: >> >> > 你用的是哪个版本的Flink呢? >> > >> > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 >> > 所以你的binlog是怎么读进来的呢?自定义的format? >> > >> > macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道: >> > >> > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS >> event_time >> > - >> > > INTERVAL 'x' HOUR >> > > >> > > 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness >> > > >> > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x, >> > > 能够反推出来数据的 currentMaxTimestamp >> > > >> > > currentMaxTimestamp = watermark + maxOutOfOrderness >> > > >> > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快 >> > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。 >> > > >> > > >> > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55 >> > > >> > > >> > >> {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":" >> > > *2020-12-10T01:02:24Z*"} >> > > >> > > UI 上显示的 watermark 是 1607555031000(Your time zone: >> 2020年12月10日星期四早上7点02分 >> > > GMT+08:00) >> > > >> > > 这个 watermark 是未来的时间 😂 >> > > >> > > >> > > >> > > >> > > >> > > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道: >> > > >> > > > 感谢 一旦 和 Benchao >> > > > >> > > > 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join >> 上的数据,但是我 >> > > Job >> > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 >> > > > >> > > > val result = bsTableEnv.sqlQuery(""" >> > > > SELECT * >> > > > FROM ( >> > > > SELECT t1.`table`, t1.`database`, t1.transaction_type, >> > > t1.transaction_id, >> > > > t1.reference_id, t1.transaction_sn, t1.merchant_id, >> > > t1.status, t1.event_time >> > > > FROM main_db as t1 >> > > > LEFT JOIN main_db as t2 >> > > > ON t1.reference_id = t2.reference_id >> > > > WHERE t1.event_time >= t2.event_time + INTERVAL '5' >> MINUTES >> > > > AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES >> > > > ) >> > > > """.stripMargin) >> > > > >> > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到 >> > > > >> > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source >> > > > subtask的watermark。 >> > > > ------------------------------------------------------- >> > > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark >> > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog. >> > > > >> > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 >> > event >> > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错. >> > > > >> > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark >> > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为. >> > > > >> > > > >> > > > Thanks and best regards >> > > > >> > > > >> > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道: >> > > > >> > > >> Hi macia, >> > > >> >> > > >> 一旦回答的基本比较完整了。 >> > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 >> > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。 >> > > >> >> > > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source >> > > subtask见到的最大的watermark >> > > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay >> > > 10个小时,这个已经会导致 >> > > >> 你的没有join到的数据下发会延迟很多了。 >> > > >> >> > > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 >> > > >> >> > > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道: >> > > >> >> > > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 >> > > >> > >> > > >> > >> > > >> > >> > > >> >> > > >> > >> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left >> > > >> > join。 >> > > >> > >> > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream >> > > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 >> > > >> > >> > > >> > >> > > >> >> > > >> > >> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 >> > > >> > >> > > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道: >> > > >> > >> > > >> > > @Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 >> > > >> > > FLink,可能我的Case 太特殊了. >> > > >> > > >> > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 >> > > >> Binlog,我需要 >> > > >> > > filter 出来 main_db__tansaction_tab, >> merchant_db__transaction_tab, >> > 两个 >> > > DB >> > > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. >> > > >> > > >> > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大: >> > > >> > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 >> > > >> > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create >> time进来,可能会影响 >> > > >> > watermark >> > > >> > > forward on. >> > > >> > > >> > > >> > > bsTableEnv.executeSql(""" >> > > >> > > CREATE TABLE input_database ( >> > > >> > > `table` STRING, >> > > >> > > `database` STRING, >> > > >> > > `data` ROW( >> > > >> > > reference_id STRING, >> > > >> > > transaction_sn STRING, >> > > >> > > transaction_type BIGINT, >> > > >> > > merchant_id BIGINT, >> > > >> > > transaction_id BIGINT, >> > > >> > > status BIGINT >> > > >> > > ), >> > > >> > > ts BIGINT, >> > > >> > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)), >> > > >> > > WATERMARK FOR event_time AS event_time - INTERVAL '10' >> > HOUR >> > > >> > > ) WITH ( >> > > >> > > 'connector.type' = 'kafka', >> > > >> > > 'connector.version' = '0.11', >> > > >> > > 'connector.topic' = 'mytopic', >> > > >> > > 'connector.properties.bootstrap.servers' = 'xxxx', >> > > >> > > 'format.type' = 'json' >> > > >> > > ) >> > > >> > > """) >> > > >> > > >> > > >> > > >> > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 >> > > >> > > >> > > >> > > val main_db = bsTableEnv.sqlQuery(""" >> > > >> > > | SELECT * >> > > >> > > | FROM input_database >> > > >> > > | WHERE `database` = 'main_db' >> > > >> > > | AND `table` LIKE 'transaction_tab%' >> > > >> > > | """.stripMargin) >> > > >> > > >> > > >> > > val merchant_db = bsTableEnv.sqlQuery(""" >> > > >> > > | SELECT * >> > > >> > > | FROM input_database >> > > >> > > | WHERE `database` = 'merchant_db' >> > > >> > > | AND `table` LIKE 'transaction_tab%' >> > > >> > > | """.stripMargin) >> > > >> > > >> > > >> > > bsTableEnv.createTemporaryView("main_db", main_db) >> > > >> > > bsTableEnv.createTemporaryView("merchant_db", merchant_db) >> > > >> > > >> > > >> > > val result = bsTableEnv.sqlQuery(""" >> > > >> > > SELECT * >> > > >> > > FROM ( >> > > >> > > SELECT t1.`table`, t1.`database`, >> t1.transaction_type, >> > > >> > > t1.transaction_id, >> > > >> > > t1.reference_id, t1.transaction_sn, t1.merchant_id, >> > > >> > > t1.status, t1.event_time >> > > >> > > FROM main_db as t1 >> > > >> > > LEFT JOIN merchant_db as t2 >> > > >> > > ON t1.reference_id = t2.reference_id >> > > >> > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' >> HOUR >> > > >> > > AND t1.event_time <= t2.event_time - INTERVAL '1' >> HOUR >> > > >> > > ) >> > > >> > > """.stripMargin) >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? >> > > >> > > ----- >> > > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 >> > > >> watermark >> > > >> > > 来驱动。 >> > > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 >> > > >> join上,就输出 >> > > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: >> > > >> > > >> > > >> > > > hi macia, >> > > >> > > > >> > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? >> > > >> > > > >> > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: >> > > >> > > > >> > > >> > > > > 抱歉,是 >-30 and <+30 >> > > >> > > > > >> > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left >> Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 >> > > >> > > > > >> > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: >> > > >> > > > > >> > > >> > > > > > 准确点,2个条件之间没and?2个都是>? >> > > >> > > > > > >> > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: >> > > >> > > > > > >> > > >> > > > > > > 不好意思,我上边贴错了 >> > > >> > > > > > > >> > > >> > > > > > > SELECT * >> > > >> > > > > > > FROM A >> > > >> > > > > > > LEFT OUT JOIN B >> > > >> > > > > > > ON order_id >> > > >> > > > > > > Where A.event_time > B.event_time - 30 s >> > > >> > > > > > > A.event_time > B.event_time + 30 s >> > > >> > > > > > > >> > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time >> > > >> > > > > > > >> > > >> > > > > > > 这样是没有输出的。 >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > interval join 左右表在 state 中是缓存多久的? >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 >> 下午8:05写道: >> > > >> > > > > > > >> > > >> > > > > > > > Hi, >> > > >> > > > > > > > 其中 条件是 >> > > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and >> > > A.event_time > >> > > >> > > > > > B.event_time >> > > >> > > > > > > > - 30 s ` 吧 >> > > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。 >> > > >> > > > > > > > [1] >> > > >> > > > > > > > >> > > >> > > > > > > >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > >> > >> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 >> > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > Best, >> > > >> > > > > > > > Hailong >> > > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> >> 写道: >> > > >> > > > > > > > >Hi, 各位大佬 >> > > >> > > > > > > > > >> > > >> > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog >> > 打进去了。我的 >> > > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 >> > order事件 >> > > >> ,表B 是 >> > > >> > > > order >> > > >> > > > > > > item >> > > >> > > > > > > > >信息,所以 我用: >> > > >> > > > > > > > > >> > > >> > > > > > > > > SELECT * >> > > >> > > > > > > > > FROM A >> > > >> > > > > > > > > LEFT OUT JOIN B >> > > >> > > > > > > > > ON order_id >> > > >> > > > > > > > > Where A.event_time > B.event_time + 30 s >> > > >> > > > > > > > > A.event_time > B.event_time - 30 s >> > > >> > > > > > > > > >> > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join >> > > 之后就没有输出数据了,可以确认的是我用 >> > > >> > Spark >> > > >> > > > > > > > Structural >> > > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, >> > > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join >> > > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题? >> > > >> > > > > > > > >> > > >> > > > > > > >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > >> > > >> > > > >> > > >> > > > -- >> > > >> > > > >> > > >> > > > Best, >> > > >> > > > Benchao Li >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > >> >> > > >> -- >> > > >> >> > > >> Best, >> > > >> Benchao Li >> > > >> >> > > > >> > > >> > >> > >> > -- >> > >> > Best, >> > Benchao Li >> > >> > |
补充,实际FROM_UNIXTIME应该返回 TIMESTAMP WITH LOCAL TIME ZONE
这个类型。(然后FlinkSQL可以自己转为TIMESTAMP)。 此外,关于分窗,除了offset这种显示的由用户来解决时区分窗以外。还可以通过支持 TIMESTAMP WITH LOCAL TIME ZONE 类型作为 event time 实现,当然内部当然还是通过offset实现,只是FlinkSQL语法层可以基于支持 TIMESTAMP WITH LOCAL TIME ZONE 作为eventtime来实现这种效果。 如上是个人观点哈。。。 赵一旦 <[hidden email]> 于2020年12月15日周二 上午11:29写道: > 这个问题很早前我提过,没人在意,或者说大家没觉得这是个问题。但实际上如果和DataStream > API去对比的话,FlinkSQL的这种表现肯定是有问题的。 > > 换种说法,FlinkSQL通过更改ts方式实现了UTC+8时区下的分窗的合理性,但其“实现方式”本身就是“代价”,即使用了不合理的ts,ui上当然就展示不合理的ts。 > > 这本来是应该在window分窗处基于offset实现的功能。 > > 赵一旦 <[hidden email]> 于2020年12月15日周二 上午11:22写道: > >> 这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。 >> >> 之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time >> attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。 >> >> 但是呢,目前不这么做好像也还不行。因为分窗必须基于time >> attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。 >> >> >> ———————— >> 比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00 >> UTC+8 >> 这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。 >> >> ———————— >> 按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。 >> 要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。 >> >> >> >> macia kk <[hidden email]> 于2020年12月11日周五 下午3:04写道: >> >>> 你用的是哪个版本的Flink呢? >>> ----- >>> 1.11.2 >>> >>> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 >>> 所以你的binlog是怎么读进来的呢?自定义的format? >>> ----- >>> ts 就是时间戳 >>> >>> bsTableEnv.executeSql(""" >>> CREATE TABLE input_database ( >>> `table` STRING, >>> `database` STRING, >>> `data` ROW( >>> reference_id STRING, >>> transaction_sn STRING, >>> transaction_type BIGINT, >>> merchant_id BIGINT, >>> transaction_id BIGINT, >>> status BIGINT >>> ), >>> ts BIGINT, >>> event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)), >>> WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR >>> ) WITH ( >>> 'connector.type' = 'kafka', >>> 'connector.version' = '0.11', >>> 'connector.topic' = 'mytopic', >>> 'connector.properties.bootstrap.servers' = 'xxxx', >>> 'format.type' = 'json' >>> ) >>> ) >>> >>> >>> >>> ``` >>> >>> >>> >>> Benchao Li <[hidden email]> 于2020年12月10日周四 下午6:14写道: >>> >>> > 你用的是哪个版本的Flink呢? >>> > >>> > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 >>> > 所以你的binlog是怎么读进来的呢?自定义的format? >>> > >>> > macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道: >>> > >>> > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS >>> event_time >>> > - >>> > > INTERVAL 'x' HOUR >>> > > >>> > > 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness >>> > > >>> > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x, >>> > > 能够反推出来数据的 currentMaxTimestamp >>> > > >>> > > currentMaxTimestamp = watermark + maxOutOfOrderness >>> > > >>> > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快 >>> > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。 >>> > > >>> > > >>> > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55 >>> > > >>> > > >>> > >>> {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":" >>> > > *2020-12-10T01:02:24Z*"} >>> > > >>> > > UI 上显示的 watermark 是 1607555031000(Your time zone: >>> 2020年12月10日星期四早上7点02分 >>> > > GMT+08:00) >>> > > >>> > > 这个 watermark 是未来的时间 😂 >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道: >>> > > >>> > > > 感谢 一旦 和 Benchao >>> > > > >>> > > > 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join >>> 上的数据,但是我 >>> > > Job >>> > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 >>> > > > >>> > > > val result = bsTableEnv.sqlQuery(""" >>> > > > SELECT * >>> > > > FROM ( >>> > > > SELECT t1.`table`, t1.`database`, t1.transaction_type, >>> > > t1.transaction_id, >>> > > > t1.reference_id, t1.transaction_sn, t1.merchant_id, >>> > > t1.status, t1.event_time >>> > > > FROM main_db as t1 >>> > > > LEFT JOIN main_db as t2 >>> > > > ON t1.reference_id = t2.reference_id >>> > > > WHERE t1.event_time >= t2.event_time + INTERVAL '5' >>> MINUTES >>> > > > AND t1.event_time <= t2.event_time - INTERVAL '5' >>> MINUTES >>> > > > ) >>> > > > """.stripMargin) >>> > > > >>> > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到 >>> > > > >>> > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source >>> > > > subtask的watermark。 >>> > > > ------------------------------------------------------- >>> > > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 >>> watermark >>> > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog. >>> > > > >>> > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 >>> > event >>> > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错. >>> > > > >>> > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark >>> > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为. >>> > > > >>> > > > >>> > > > Thanks and best regards >>> > > > >>> > > > >>> > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道: >>> > > > >>> > > >> Hi macia, >>> > > >> >>> > > >> 一旦回答的基本比较完整了。 >>> > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 >>> > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。 >>> > > >> >>> > > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source >>> > > subtask见到的最大的watermark >>> > > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay >>> > > 10个小时,这个已经会导致 >>> > > >> 你的没有join到的数据下发会延迟很多了。 >>> > > >> >>> > > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 >>> > > >> >>> > > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道: >>> > > >> >>> > > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 >>> > > >> > >>> > > >> > >>> > > >> > >>> > > >> >>> > > >>> > >>> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left >>> > > >> > join。 >>> > > >> > >>> > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream >>> > > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 >>> > > >> > >>> > > >> > >>> > > >> >>> > > >>> > >>> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 >>> > > >> > >>> > > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道: >>> > > >> > >>> > > >> > > @Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 >>> > > >> > > FLink,可能我的Case 太特殊了. >>> > > >> > > >>> > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 >>> > > >> Binlog,我需要 >>> > > >> > > filter 出来 main_db__tansaction_tab, >>> merchant_db__transaction_tab, >>> > 两个 >>> > > DB >>> > > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. >>> > > >> > > >>> > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大: >>> > > >> > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 >>> > > >> > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create >>> time进来,可能会影响 >>> > > >> > watermark >>> > > >> > > forward on. >>> > > >> > > >>> > > >> > > bsTableEnv.executeSql(""" >>> > > >> > > CREATE TABLE input_database ( >>> > > >> > > `table` STRING, >>> > > >> > > `database` STRING, >>> > > >> > > `data` ROW( >>> > > >> > > reference_id STRING, >>> > > >> > > transaction_sn STRING, >>> > > >> > > transaction_type BIGINT, >>> > > >> > > merchant_id BIGINT, >>> > > >> > > transaction_id BIGINT, >>> > > >> > > status BIGINT >>> > > >> > > ), >>> > > >> > > ts BIGINT, >>> > > >> > > event_time AS >>> TO_TIMESTAMP(FROM_UNIXTIME(create_time)), >>> > > >> > > WATERMARK FOR event_time AS event_time - INTERVAL '10' >>> > HOUR >>> > > >> > > ) WITH ( >>> > > >> > > 'connector.type' = 'kafka', >>> > > >> > > 'connector.version' = '0.11', >>> > > >> > > 'connector.topic' = 'mytopic', >>> > > >> > > 'connector.properties.bootstrap.servers' = 'xxxx', >>> > > >> > > 'format.type' = 'json' >>> > > >> > > ) >>> > > >> > > """) >>> > > >> > > >>> > > >> > > >>> > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 >>> > > >> > > >>> > > >> > > val main_db = bsTableEnv.sqlQuery(""" >>> > > >> > > | SELECT * >>> > > >> > > | FROM input_database >>> > > >> > > | WHERE `database` = 'main_db' >>> > > >> > > | AND `table` LIKE 'transaction_tab%' >>> > > >> > > | """.stripMargin) >>> > > >> > > >>> > > >> > > val merchant_db = bsTableEnv.sqlQuery(""" >>> > > >> > > | SELECT * >>> > > >> > > | FROM input_database >>> > > >> > > | WHERE `database` = 'merchant_db' >>> > > >> > > | AND `table` LIKE 'transaction_tab%' >>> > > >> > > | """.stripMargin) >>> > > >> > > >>> > > >> > > bsTableEnv.createTemporaryView("main_db", main_db) >>> > > >> > > bsTableEnv.createTemporaryView("merchant_db", merchant_db) >>> > > >> > > >>> > > >> > > val result = bsTableEnv.sqlQuery(""" >>> > > >> > > SELECT * >>> > > >> > > FROM ( >>> > > >> > > SELECT t1.`table`, t1.`database`, >>> t1.transaction_type, >>> > > >> > > t1.transaction_id, >>> > > >> > > t1.reference_id, t1.transaction_sn, >>> t1.merchant_id, >>> > > >> > > t1.status, t1.event_time >>> > > >> > > FROM main_db as t1 >>> > > >> > > LEFT JOIN merchant_db as t2 >>> > > >> > > ON t1.reference_id = t2.reference_id >>> > > >> > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' >>> HOUR >>> > > >> > > AND t1.event_time <= t2.event_time - INTERVAL '1' >>> HOUR >>> > > >> > > ) >>> > > >> > > """.stripMargin) >>> > > >> > > >>> > > >> > > >>> > > >> > > >>> > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? >>> > > >> > > ----- >>> > > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 >>> > > >> watermark >>> > > >> > > 来驱动。 >>> > > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 >>> > > >> join上,就输出 >>> > > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. >>> > > >> > > >>> > > >> > > >>> > > >> > > >>> > > >> > > >>> > > >> > > >>> > > >> > > >>> > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: >>> > > >> > > >>> > > >> > > > hi macia, >>> > > >> > > > >>> > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? >>> > > >> > > > >>> > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: >>> > > >> > > > >>> > > >> > > > > 抱歉,是 >-30 and <+30 >>> > > >> > > > > >>> > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left >>> Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 >>> > > >> > > > > >>> > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: >>> > > >> > > > > >>> > > >> > > > > > 准确点,2个条件之间没and?2个都是>? >>> > > >> > > > > > >>> > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: >>> > > >> > > > > > >>> > > >> > > > > > > 不好意思,我上边贴错了 >>> > > >> > > > > > > >>> > > >> > > > > > > SELECT * >>> > > >> > > > > > > FROM A >>> > > >> > > > > > > LEFT OUT JOIN B >>> > > >> > > > > > > ON order_id >>> > > >> > > > > > > Where A.event_time > B.event_time - 30 s >>> > > >> > > > > > > A.event_time > B.event_time + 30 s >>> > > >> > > > > > > >>> > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time >>> > > >> > > > > > > >>> > > >> > > > > > > 这样是没有输出的。 >>> > > >> > > > > > > >>> > > >> > > > > > > >>> > > >> > > > > > > >>> > > >> > > > > > > interval join 左右表在 state 中是缓存多久的? >>> > > >> > > > > > > >>> > > >> > > > > > > >>> > > >> > > > > > > >>> > > >> > > > > > > >>> > > >> > > > > > > >>> > > >> > > > > > > >>> > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 >>> 下午8:05写道: >>> > > >> > > > > > > >>> > > >> > > > > > > > Hi, >>> > > >> > > > > > > > 其中 条件是 >>> > > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and >>> > > A.event_time > >>> > > >> > > > > > B.event_time >>> > > >> > > > > > > > - 30 s ` 吧 >>> > > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。 >>> > > >> > > > > > > > [1] >>> > > >> > > > > > > > >>> > > >> > > > > > > >>> > > >> > > > > > >>> > > >> > > > > >>> > > >> > > > >>> > > >> > > >>> > > >> > >>> > > >> >>> > > >>> > >>> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 >>> > > >> > > > > > > > >>> > > >> > > > > > > > >>> > > >> > > > > > > > Best, >>> > > >> > > > > > > > Hailong >>> > > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> >>> 写道: >>> > > >> > > > > > > > >Hi, 各位大佬 >>> > > >> > > > > > > > > >>> > > >> > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog >>> > 打进去了。我的 >>> > > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 >>> > order事件 >>> > > >> ,表B 是 >>> > > >> > > > order >>> > > >> > > > > > > item >>> > > >> > > > > > > > >信息,所以 我用: >>> > > >> > > > > > > > > >>> > > >> > > > > > > > > SELECT * >>> > > >> > > > > > > > > FROM A >>> > > >> > > > > > > > > LEFT OUT JOIN B >>> > > >> > > > > > > > > ON order_id >>> > > >> > > > > > > > > Where A.event_time > B.event_time + 30 s >>> > > >> > > > > > > > > A.event_time > B.event_time - 30 s >>> > > >> > > > > > > > > >>> > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join >>> > > 之后就没有输出数据了,可以确认的是我用 >>> > > >> > Spark >>> > > >> > > > > > > > Structural >>> > > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, >>> > > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join >>> > > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题? >>> > > >> > > > > > > > >>> > > >> > > > > > > >>> > > >> > > > > > >>> > > >> > > > > >>> > > >> > > > >>> > > >> > > > >>> > > >> > > > -- >>> > > >> > > > >>> > > >> > > > Best, >>> > > >> > > > Benchao Li >>> > > >> > > > >>> > > >> > > >>> > > >> > >>> > > >> >>> > > >> >>> > > >> -- >>> > > >> >>> > > >> Best, >>> > > >> Benchao Li >>> > > >> >>> > > > >>> > > >>> > >>> > >>> > -- >>> > >>> > Best, >>> > Benchao Li >>> > >>> >> |
Hi,
可以关注: https://issues.apache.org/jira/browse/FLINK-20162 https://issues.apache.org/jira/browse/FLINK-20387 Best, Hailong 在 2020-12-15 10:40:19,"赵一旦" <[hidden email]> 写道: >补充,实际FROM_UNIXTIME应该返回 TIMESTAMP WITH LOCAL TIME ZONE >这个类型。(然后FlinkSQL可以自己转为TIMESTAMP)。 > >此外,关于分窗,除了offset这种显示的由用户来解决时区分窗以外。还可以通过支持 TIMESTAMP WITH LOCAL TIME ZONE >类型作为 event time 实现,当然内部当然还是通过offset实现,只是FlinkSQL语法层可以基于支持 TIMESTAMP WITH >LOCAL TIME ZONE 作为eventtime来实现这种效果。 > >如上是个人观点哈。。。 > >赵一旦 <[hidden email]> 于2020年12月15日周二 上午11:29写道: > >> 这个问题很早前我提过,没人在意,或者说大家没觉得这是个问题。但实际上如果和DataStream >> API去对比的话,FlinkSQL的这种表现肯定是有问题的。 >> >> 换种说法,FlinkSQL通过更改ts方式实现了UTC+8时区下的分窗的合理性,但其“实现方式”本身就是“代价”,即使用了不合理的ts,ui上当然就展示不合理的ts。 >> >> 这本来是应该在window分窗处基于offset实现的功能。 >> >> 赵一旦 <[hidden email]> 于2020年12月15日周二 上午11:22写道: >> >>> 这个问题是存在的,只不过不清楚算不算bug,可能只算是FlinkSQL在这部分处理的不足感觉。 >>> >>> 之所以出现这个问题,是因为你用了FROM_UNIXTIME等函数,这些函数会自动根据当前时区将ts转换为TIMESTAMP(这导致了在time >>> attribute部分不应该使用这种函数,否则会导致watermark显示超前8小时的问题)。 >>> >>> 但是呢,目前不这么做好像也还不行。因为分窗必须基于time >>> attribute,同时flinkSQL目前不支持offset的指定,因此只能基于这种方式去间接实现分窗的正确性。 >>> >>> >>> ———————— >>> 比如:ts=0,代表的是 1970-1-1 00:00:00,FROM_UNIXTIME(0) 返回的是 1970-1-1 08:00:00 >>> UTC+8 >>> 这个时间,而这个时间作为TIMESTAMP类型的eventtime被处理了,但TIMESTAMP本身是无时区含义的,你却给了它一个带时区含义的日期。这导致分窗可以正常按照中国人习惯分,但从底层考虑却不对,因为这个时间点,它的ts变为了28800s。因为flinkSQL将你那个eventtime按照UTC+0转换出来的ts就是ts=28800(s)。 >>> >>> ———————— >>> 按我说的话,要么继续这么用,你忽略ui上的watermark,这并不影响你的代码逻辑和业务逻辑。 >>> 要么就是调整不使用那些函数,这样可以保证watermark那个ui展示正确,但是却导致小时/天窗口出问题,并暂时无解(因为不支持offset窗口)。 >>> >>> >>> >>> macia kk <[hidden email]> 于2020年12月11日周五 下午3:04写道: >>> >>>> 你用的是哪个版本的Flink呢? >>>> ----- >>>> 1.11.2 >>>> >>>> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 >>>> 所以你的binlog是怎么读进来的呢?自定义的format? >>>> ----- >>>> ts 就是时间戳 >>>> >>>> bsTableEnv.executeSql(""" >>>> CREATE TABLE input_database ( >>>> `table` STRING, >>>> `database` STRING, >>>> `data` ROW( >>>> reference_id STRING, >>>> transaction_sn STRING, >>>> transaction_type BIGINT, >>>> merchant_id BIGINT, >>>> transaction_id BIGINT, >>>> status BIGINT >>>> ), >>>> ts BIGINT, >>>> event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)), >>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR >>>> ) WITH ( >>>> 'connector.type' = 'kafka', >>>> 'connector.version' = '0.11', >>>> 'connector.topic' = 'mytopic', >>>> 'connector.properties.bootstrap.servers' = 'xxxx', >>>> 'format.type' = 'json' >>>> ) >>>> ) >>>> >>>> >>>> >>>> ``` >>>> >>>> >>>> >>>> Benchao Li <[hidden email]> 于2020年12月10日周四 下午6:14写道: >>>> >>>> > 你用的是哪个版本的Flink呢? >>>> > >>>> > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 >>>> > 所以你的binlog是怎么读进来的呢?自定义的format? >>>> > >>>> > macia kk <[hidden email]> 于2020年12月10日周四 上午1:06写道: >>>> > >>>> > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS >>>> event_time >>>> > - >>>> > > INTERVAL 'x' HOUR >>>> > > >>>> > > 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness >>>> > > >>>> > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x, >>>> > > 能够反推出来数据的 currentMaxTimestamp >>>> > > >>>> > > currentMaxTimestamp = watermark + maxOutOfOrderness >>>> > > >>>> > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快 >>>> > > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。 >>>> > > >>>> > > >>>> > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55 >>>> > > >>>> > > >>>> > >>>> {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":" >>>> > > *2020-12-10T01:02:24Z*"} >>>> > > >>>> > > UI 上显示的 watermark 是 1607555031000(Your time zone: >>>> 2020年12月10日星期四早上7点02分 >>>> > > GMT+08:00) >>>> > > >>>> > > 这个 watermark 是未来的时间 >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > macia kk <[hidden email]> 于2020年12月9日周三 下午11:36写道: >>>> > > >>>> > > > 感谢 一旦 和 Benchao >>>> > > > >>>> > > > 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join >>>> 上的数据,但是我 >>>> > > Job >>>> > > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 >>>> > > > >>>> > > > val result = bsTableEnv.sqlQuery(""" >>>> > > > SELECT * >>>> > > > FROM ( >>>> > > > SELECT t1.`table`, t1.`database`, t1.transaction_type, >>>> > > t1.transaction_id, >>>> > > > t1.reference_id, t1.transaction_sn, t1.merchant_id, >>>> > > t1.status, t1.event_time >>>> > > > FROM main_db as t1 >>>> > > > LEFT JOIN main_db as t2 >>>> > > > ON t1.reference_id = t2.reference_id >>>> > > > WHERE t1.event_time >= t2.event_time + INTERVAL '5' >>>> MINUTES >>>> > > > AND t1.event_time <= t2.event_time - INTERVAL '5' >>>> MINUTES >>>> > > > ) >>>> > > > """.stripMargin) >>>> > > > >>>> > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到 >>>> > > > >>>> > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source >>>> > > > subtask的watermark。 >>>> > > > ------------------------------------------------------- >>>> > > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 >>>> watermark >>>> > > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog. >>>> > > > >>>> > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 >>>> > event >>>> > > > time,但是有的表又没有这个字段,会导致解析的时候直接报错. >>>> > > > >>>> > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark >>>> > > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为. >>>> > > > >>>> > > > >>>> > > > Thanks and best regards >>>> > > > >>>> > > > >>>> > > > Benchao Li <[hidden email]> 于2020年12月9日周三 上午10:24写道: >>>> > > > >>>> > > >> Hi macia, >>>> > > >> >>>> > > >> 一旦回答的基本比较完整了。 >>>> > > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 >>>> > > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。 >>>> > > >> >>>> > > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source >>>> > > subtask见到的最大的watermark >>>> > > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay >>>> > > 10个小时,这个已经会导致 >>>> > > >> 你的没有join到的数据下发会延迟很多了。 >>>> > > >> >>>> > > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 >>>> > > >> >>>> > > >> 赵一旦 <[hidden email]> 于2020年12月9日周三 上午10:15写道: >>>> > > >> >>>> > > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 >>>> > > >> > >>>> > > >> > >>>> > > >> > >>>> > > >> >>>> > > >>>> > >>>> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left >>>> > > >> > join。 >>>> > > >> > >>>> > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream >>>> > > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 >>>> > > >> > >>>> > > >> > >>>> > > >> >>>> > > >>>> > >>>> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 >>>> > > >> > >>>> > > >> > macia kk <[hidden email]> 于2020年12月9日周三 上午1:17写道: >>>> > > >> > >>>> > > >> > > @Benchao Li <[hidden email]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 >>>> > > >> > > FLink,可能我的Case 太特殊了. >>>> > > >> > > >>>> > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 >>>> > > >> Binlog,我需要 >>>> > > >> > > filter 出来 main_db__tansaction_tab, >>>> merchant_db__transaction_tab, >>>> > 两个 >>>> > > DB >>>> > > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. >>>> > > >> > > >>>> > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大: >>>> > > >> > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 >>>> > > >> > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create >>>> time进来,可能会影响 >>>> > > >> > watermark >>>> > > >> > > forward on. >>>> > > >> > > >>>> > > >> > > bsTableEnv.executeSql(""" >>>> > > >> > > CREATE TABLE input_database ( >>>> > > >> > > `table` STRING, >>>> > > >> > > `database` STRING, >>>> > > >> > > `data` ROW( >>>> > > >> > > reference_id STRING, >>>> > > >> > > transaction_sn STRING, >>>> > > >> > > transaction_type BIGINT, >>>> > > >> > > merchant_id BIGINT, >>>> > > >> > > transaction_id BIGINT, >>>> > > >> > > status BIGINT >>>> > > >> > > ), >>>> > > >> > > ts BIGINT, >>>> > > >> > > event_time AS >>>> TO_TIMESTAMP(FROM_UNIXTIME(create_time)), >>>> > > >> > > WATERMARK FOR event_time AS event_time - INTERVAL '10' >>>> > HOUR >>>> > > >> > > ) WITH ( >>>> > > >> > > 'connector.type' = 'kafka', >>>> > > >> > > 'connector.version' = '0.11', >>>> > > >> > > 'connector.topic' = 'mytopic', >>>> > > >> > > 'connector.properties.bootstrap.servers' = 'xxxx', >>>> > > >> > > 'format.type' = 'json' >>>> > > >> > > ) >>>> > > >> > > """) >>>> > > >> > > >>>> > > >> > > >>>> > > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 >>>> > > >> > > >>>> > > >> > > val main_db = bsTableEnv.sqlQuery(""" >>>> > > >> > > | SELECT * >>>> > > >> > > | FROM input_database >>>> > > >> > > | WHERE `database` = 'main_db' >>>> > > >> > > | AND `table` LIKE 'transaction_tab%' >>>> > > >> > > | """.stripMargin) >>>> > > >> > > >>>> > > >> > > val merchant_db = bsTableEnv.sqlQuery(""" >>>> > > >> > > | SELECT * >>>> > > >> > > | FROM input_database >>>> > > >> > > | WHERE `database` = 'merchant_db' >>>> > > >> > > | AND `table` LIKE 'transaction_tab%' >>>> > > >> > > | """.stripMargin) >>>> > > >> > > >>>> > > >> > > bsTableEnv.createTemporaryView("main_db", main_db) >>>> > > >> > > bsTableEnv.createTemporaryView("merchant_db", merchant_db) >>>> > > >> > > >>>> > > >> > > val result = bsTableEnv.sqlQuery(""" >>>> > > >> > > SELECT * >>>> > > >> > > FROM ( >>>> > > >> > > SELECT t1.`table`, t1.`database`, >>>> t1.transaction_type, >>>> > > >> > > t1.transaction_id, >>>> > > >> > > t1.reference_id, t1.transaction_sn, >>>> t1.merchant_id, >>>> > > >> > > t1.status, t1.event_time >>>> > > >> > > FROM main_db as t1 >>>> > > >> > > LEFT JOIN merchant_db as t2 >>>> > > >> > > ON t1.reference_id = t2.reference_id >>>> > > >> > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' >>>> HOUR >>>> > > >> > > AND t1.event_time <= t2.event_time - INTERVAL '1' >>>> HOUR >>>> > > >> > > ) >>>> > > >> > > """.stripMargin) >>>> > > >> > > >>>> > > >> > > >>>> > > >> > > >>>> > > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? >>>> > > >> > > ----- >>>> > > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 >>>> > > >> watermark >>>> > > >> > > 来驱动。 >>>> > > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 >>>> > > >> join上,就输出 >>>> > > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. >>>> > > >> > > >>>> > > >> > > >>>> > > >> > > >>>> > > >> > > >>>> > > >> > > >>>> > > >> > > >>>> > > >> > > Benchao Li <[hidden email]> 于2020年12月8日周二 下午3:23写道: >>>> > > >> > > >>>> > > >> > > > hi macia, >>>> > > >> > > > >>>> > > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? >>>> > > >> > > > >>>> > > >> > > > macia kk <[hidden email]> 于2020年12月8日周二 上午1:15写道: >>>> > > >> > > > >>>> > > >> > > > > 抱歉,是 >-30 and <+30 >>>> > > >> > > > > >>>> > > >> > > > > 贴的只是demo,我的疑问是,既然是 Left >>>> Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 >>>> > > >> > > > > >>>> > > >> > > > > 赵一旦 <[hidden email]>于2020年12月7日 周一23:28写道: >>>> > > >> > > > > >>>> > > >> > > > > > 准确点,2个条件之间没and?2个都是>? >>>> > > >> > > > > > >>>> > > >> > > > > > macia kk <[hidden email]> 于2020年12月7日周一 下午10:30写道: >>>> > > >> > > > > > >>>> > > >> > > > > > > 不好意思,我上边贴错了 >>>> > > >> > > > > > > >>>> > > >> > > > > > > SELECT * >>>> > > >> > > > > > > FROM A >>>> > > >> > > > > > > LEFT OUT JOIN B >>>> > > >> > > > > > > ON order_id >>>> > > >> > > > > > > Where A.event_time > B.event_time - 30 s >>>> > > >> > > > > > > A.event_time > B.event_time + 30 s >>>> > > >> > > > > > > >>>> > > >> > > > > > > event_time 是 Time Attributes 设置的 event_time >>>> > > >> > > > > > > >>>> > > >> > > > > > > 这样是没有输出的。 >>>> > > >> > > > > > > >>>> > > >> > > > > > > >>>> > > >> > > > > > > >>>> > > >> > > > > > > interval join 左右表在 state 中是缓存多久的? >>>> > > >> > > > > > > >>>> > > >> > > > > > > >>>> > > >> > > > > > > >>>> > > >> > > > > > > >>>> > > >> > > > > > > >>>> > > >> > > > > > > >>>> > > >> > > > > > > hailongwang <[hidden email]> 于2020年12月7日周一 >>>> 下午8:05写道: >>>> > > >> > > > > > > >>>> > > >> > > > > > > > Hi, >>>> > > >> > > > > > > > 其中 条件是 >>>> > > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and >>>> > > A.event_time > >>>> > > >> > > > > > B.event_time >>>> > > >> > > > > > > > - 30 s ` 吧 >>>> > > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。 >>>> > > >> > > > > > > > [1] >>>> > > >> > > > > > > > >>>> > > >> > > > > > > >>>> > > >> > > > > > >>>> > > >> > > > > >>>> > > >> > > > >>>> > > >> > > >>>> > > >> > >>>> > > >> >>>> > > >>>> > >>>> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 >>>> > > >> > > > > > > > >>>> > > >> > > > > > > > >>>> > > >> > > > > > > > Best, >>>> > > >> > > > > > > > Hailong >>>> > > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[hidden email]> >>>> 写道: >>>> > > >> > > > > > > > >Hi, 各位大佬 >>>> > > >> > > > > > > > > >>>> > > >> > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog >>>> > 打进去了。我的 >>>> > > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 >>>> > order事件 >>>> > > >> ,表B 是 >>>> > > >> > > > order >>>> > > >> > > > > > > item >>>> > > >> > > > > > > > >信息,所以 我用: >>>> > > >> > > > > > > > > >>>> > > >> > > > > > > > > SELECT * >>>> > > >> > > > > > > > > FROM A >>>> > > >> > > > > > > > > LEFT OUT JOIN B >>>> > > >> > > > > > > > > ON order_id >>>> > > >> > > > > > > > > Where A.event_time > B.event_time + 30 s >>>> > > >> > > > > > > > > A.event_time > B.event_time - 30 s >>>> > > >> > > > > > > > > >>>> > > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join >>>> > > 之后就没有输出数据了,可以确认的是我用 >>>> > > >> > Spark >>>> > > >> > > > > > > > Structural >>>> > > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, >>>> > > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join >>>> > > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题? >>>> > > >> > > > > > > > >>>> > > >> > > > > > > >>>> > > >> > > > > > >>>> > > >> > > > > >>>> > > >> > > > >>>> > > >> > > > >>>> > > >> > > > -- >>>> > > >> > > > >>>> > > >> > > > Best, >>>> > > >> > > > Benchao Li >>>> > > >> > > > >>>> > > >> > > >>>> > > >> > >>>> > > >> >>>> > > >> >>>> > > >> -- >>>> > > >> >>>> > > >> Best, >>>> > > >> Benchao Li >>>> > > >> >>>> > > > >>>> > > >>>> > >>>> > >>>> > -- >>>> > >>>> > Best, >>>> > Benchao Li >>>> > >>>> >>> |
Free forum by Nabble | Edit this page |