Hi, all:
本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下: insert into x.report.bi_report_fence_common_indicators select fence_id, 'finishedOrderCnt' as indicator_name, TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts, count(1) as indicator_val from ( select dt, fence_id, fence_coordinates_array, c.driver_location from ( select * from ( select dt, driver_location, r1.f1.fence_info as fence_info from ( select o.dt, o.driver_location, MD5(r.city_code) as k, PROCTIME() as proctime from ( select order_no, dt, driver_location, PROCTIME() as proctime from x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner where _type = 'insert' and event_code = 'arriveAndSettlement' ) o LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no ) o1 LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k ) a where fence_info is not null ) c LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE ) as b where in_fence(fence_coordinates_array, driver_location) group by TUMBLE(dt, INTERVAL '5' MINUTE), fence_id; 其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下: CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner( _type STRING, _old_id BIGINT, id BIGINT, _old_order_no STRING, order_no STRING, _old_event_code STRING, event_code STRING, _old_from_state TINYINT, from_state TINYINT, _old_to_state TINYINT, to_state TINYINT, _old_operator_type TINYINT, operator_type TINYINT, _old_passenger_location STRING, passenger_location STRING, _old_driver_location STRING, driver_location STRING, _old_trans_time STRING, trans_time STRING, _old_create_time STRING, create_time STRING, _old_update_time STRING, update_time STRING, _old_passenger_poi_address STRING, passenger_poi_address STRING, _old_passenger_detail_address STRING, passenger_detail_address STRING, _old_driver_poi_address STRING, driver_poi_address STRING, _old_driver_detail_address STRING, driver_detail_address STRING, _old_operator STRING, operator STRING, _old_partition_index TINYINT, partition_index TINYINT, dt as TO_TIMESTAMP(trans_time), WATERMARK FOR dt AS dt - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.properties.bootstrap.servers' = '*', 'connector.properties.zookeeper.connect' = '*', 'connector.version' = 'universal', 'format.type' = 'json', 'connector.properties.group.id' = 'testGroup', 'connector.startup-mode' = 'group-offsets', 'connector.topic' = 'xxxxx' )
junbaozhang
|
topic是几个分区呢?如果是一个分区,要加一个rebalance参数吧?
[hidden email] <[hidden email]> 于2020年7月13日周一 上午11:46写道: > Hi, all: > 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, > 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session > web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下: > insert into > x.report.bi_report_fence_common_indicators > select > fence_id, > 'finishedOrderCnt' as indicator_name, > TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts, > count(1) as indicator_val > from > ( > select > dt, > fence_id, > fence_coordinates_array, > c.driver_location > from > ( > select > * > from > ( > select > dt, > driver_location, > r1.f1.fence_info as fence_info > from > ( > select > o.dt, > o.driver_location, > MD5(r.city_code) as k, > PROCTIME() as proctime > from > ( > select > order_no, > dt, > driver_location, > PROCTIME() as proctime > from > x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner > where > _type = 'insert' > and event_code = 'arriveAndSettlement' > ) o > LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME > AS OF o.proctime AS r ON r.order_no = o.order_no > ) o1 > LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime > AS r1 ON r1.k = o1.k > ) a > where > fence_info is not null > ) c > LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, > fence_coordinates_array) ON TRUE > ) as b > where > in_fence(fence_coordinates_array, driver_location) > group by > TUMBLE(dt, INTERVAL '5' MINUTE), > fence_id; > 其中 > x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下: > CREATE TABLE > x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner( > _type STRING, > _old_id BIGINT, > id BIGINT, > _old_order_no STRING, > order_no STRING, > _old_event_code STRING, > event_code STRING, > _old_from_state TINYINT, > from_state TINYINT, > _old_to_state TINYINT, > to_state TINYINT, > _old_operator_type TINYINT, > operator_type TINYINT, > _old_passenger_location STRING, > passenger_location STRING, > _old_driver_location STRING, > driver_location STRING, > _old_trans_time STRING, > trans_time STRING, > _old_create_time STRING, > create_time STRING, > _old_update_time STRING, > update_time STRING, > _old_passenger_poi_address STRING, > passenger_poi_address STRING, > _old_passenger_detail_address STRING, > passenger_detail_address STRING, > _old_driver_poi_address STRING, > driver_poi_address STRING, > _old_driver_detail_address STRING, > driver_detail_address STRING, > _old_operator STRING, > operator STRING, > _old_partition_index TINYINT, > partition_index TINYINT, > dt as TO_TIMESTAMP(trans_time), > WATERMARK FOR dt AS dt - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.properties.bootstrap.servers' = '*', > 'connector.properties.zookeeper.connect' = '*', > 'connector.version' = 'universal', > 'format.type' = 'json', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'group-offsets', > 'connector.topic' = 'xxxxx' > ) > |
In reply to this post by junbaozhang
Hi,
可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。 祝好, Leonard Xu > 在 2020年7月13日,11:46,[hidden email] 写道: > > Hi, all: > 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下: > insert into > x.report.bi_report_fence_common_indicators > select > fence_id, > 'finishedOrderCnt' as indicator_name, > TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts, > count(1) as indicator_val > from > ( > select > dt, > fence_id, > fence_coordinates_array, > c.driver_location > from > ( > select > * > from > ( > select > dt, > driver_location, > r1.f1.fence_info as fence_info > from > ( > select > o.dt, > o.driver_location, > MD5(r.city_code) as k, > PROCTIME() as proctime > from > ( > select > order_no, > dt, > driver_location, > PROCTIME() as proctime > from > x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner > where > _type = 'insert' > and event_code = 'arriveAndSettlement' > ) o > LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no > ) o1 > LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k > ) a > where > fence_info is not null > ) c > LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE > ) as b > where > in_fence(fence_coordinates_array, driver_location) > group by > TUMBLE(dt, INTERVAL '5' MINUTE), > fence_id; > 其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下: > CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner( > _type STRING, > _old_id BIGINT, > id BIGINT, > _old_order_no STRING, > order_no STRING, > _old_event_code STRING, > event_code STRING, > _old_from_state TINYINT, > from_state TINYINT, > _old_to_state TINYINT, > to_state TINYINT, > _old_operator_type TINYINT, > operator_type TINYINT, > _old_passenger_location STRING, > passenger_location STRING, > _old_driver_location STRING, > driver_location STRING, > _old_trans_time STRING, > trans_time STRING, > _old_create_time STRING, > create_time STRING, > _old_update_time STRING, > update_time STRING, > _old_passenger_poi_address STRING, > passenger_poi_address STRING, > _old_passenger_detail_address STRING, > passenger_detail_address STRING, > _old_driver_poi_address STRING, > driver_poi_address STRING, > _old_driver_detail_address STRING, > driver_detail_address STRING, > _old_operator STRING, > operator STRING, > _old_partition_index TINYINT, > partition_index TINYINT, > dt as TO_TIMESTAMP(trans_time), > WATERMARK FOR dt AS dt - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.properties.bootstrap.servers' = '*', > 'connector.properties.zookeeper.connect' = '*', > 'connector.version' = 'universal', > 'format.type' = 'json', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'group-offsets', > 'connector.topic' = 'xxxxx' > ) |
Hi,
确实是一共三个分区,只有一个分区有数据,已经解决,谢谢。 Best, Junbao Zhang ________________________________ 发件人: Leonard Xu <[hidden email]> 发送时间: 2020年7月13日 11:57 收件人: user-zh <[hidden email]> 主题: Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark Hi, 可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。 祝好, Leonard Xu > 在 2020年7月13日,11:46,[hidden email] 写道: > > Hi, all: > 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下: > insert into > x.report.bi_report_fence_common_indicators > select > fence_id, > 'finishedOrderCnt' as indicator_name, > TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts, > count(1) as indicator_val > from > ( > select > dt, > fence_id, > fence_coordinates_array, > c.driver_location > from > ( > select > * > from > ( > select > dt, > driver_location, > r1.f1.fence_info as fence_info > from > ( > select > o.dt, > o.driver_location, > MD5(r.city_code) as k, > PROCTIME() as proctime > from > ( > select > order_no, > dt, > driver_location, > PROCTIME() as proctime > from > x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner > where > _type = 'insert' > and event_code = 'arriveAndSettlement' > ) o > LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no > ) o1 > LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k > ) a > where > fence_info is not null > ) c > LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE > ) as b > where > in_fence(fence_coordinates_array, driver_location) > group by > TUMBLE(dt, INTERVAL '5' MINUTE), > fence_id; > 其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下: > CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner( > _type STRING, > _old_id BIGINT, > id BIGINT, > _old_order_no STRING, > order_no STRING, > _old_event_code STRING, > event_code STRING, > _old_from_state TINYINT, > from_state TINYINT, > _old_to_state TINYINT, > to_state TINYINT, > _old_operator_type TINYINT, > operator_type TINYINT, > _old_passenger_location STRING, > passenger_location STRING, > _old_driver_location STRING, > driver_location STRING, > _old_trans_time STRING, > trans_time STRING, > _old_create_time STRING, > create_time STRING, > _old_update_time STRING, > update_time STRING, > _old_passenger_poi_address STRING, > passenger_poi_address STRING, > _old_passenger_detail_address STRING, > passenger_detail_address STRING, > _old_driver_poi_address STRING, > driver_poi_address STRING, > _old_driver_detail_address STRING, > driver_detail_address STRING, > _old_operator STRING, > operator STRING, > _old_partition_index TINYINT, > partition_index TINYINT, > dt as TO_TIMESTAMP(trans_time), > WATERMARK FOR dt AS dt - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.properties.bootstrap.servers' = '*', > 'connector.properties.zookeeper.connect' = '*', > 'connector.version' = 'universal', > 'format.type' = 'json', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'group-offsets', > 'connector.topic' = 'xxxxx' > )
junbaozhang
|
Free forum by Nabble | Edit this page |