flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

junbaozhang
Hi, all:
本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
insert into
  x.report.bi_report_fence_common_indicators
select
  fence_id,
  'finishedOrderCnt' as indicator_name,
  TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
  count(1) as indicator_val
from
  (
    select
      dt,
      fence_id,
      fence_coordinates_array,
      c.driver_location
    from
      (
        select
          *
        from
          (
            select
              dt,
              driver_location,
              r1.f1.fence_info as fence_info
            from
              (
                select
                  o.dt,
                  o.driver_location,
                  MD5(r.city_code) as k,
                  PROCTIME() as proctime
                from
                  (
                    select
                      order_no,
                      dt,
                      driver_location,
                      PROCTIME() as proctime
                    from
                      x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
                    where
                      _type = 'insert'
                      and event_code = 'arriveAndSettlement'
                  ) o
                  LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
              ) o1
              LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
          ) a
        where
          fence_info is not null
      ) c
      LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
  ) as b
where
  in_fence(fence_coordinates_array, driver_location)
group by
  TUMBLE(dt, INTERVAL '5' MINUTE),
  fence_id;
           其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
           CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
  _type STRING,
  _old_id BIGINT,
  id BIGINT,
  _old_order_no STRING,
  order_no STRING,
  _old_event_code STRING,
  event_code STRING,
  _old_from_state TINYINT,
  from_state TINYINT,
  _old_to_state TINYINT,
  to_state TINYINT,
  _old_operator_type TINYINT,
  operator_type TINYINT,
  _old_passenger_location STRING,
  passenger_location STRING,
  _old_driver_location STRING,
  driver_location STRING,
  _old_trans_time STRING,
  trans_time STRING,
  _old_create_time STRING,
  create_time STRING,
  _old_update_time STRING,
  update_time STRING,
  _old_passenger_poi_address STRING,
  passenger_poi_address STRING,
  _old_passenger_detail_address STRING,
  passenger_detail_address STRING,
  _old_driver_poi_address STRING,
  driver_poi_address STRING,
  _old_driver_detail_address STRING,
  driver_detail_address STRING,
  _old_operator STRING,
  operator STRING,
  _old_partition_index TINYINT,
  partition_index TINYINT,
  dt as TO_TIMESTAMP(trans_time),
  WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.properties.bootstrap.servers' = '*',
  'connector.properties.zookeeper.connect' = '*',
  'connector.version' = 'universal',
  'format.type' = 'json',
  'connector.properties.group.id' = 'testGroup',
  'connector.startup-mode' = 'group-offsets',
  'connector.topic' = 'xxxxx'
)
junbaozhang
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

zilong xiao
topic是几个分区呢?如果是一个分区,要加一个rebalance参数吧?

[hidden email] <[hidden email]> 于2020年7月13日周一 上午11:46写道:

> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka,
> 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session
> web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
>   x.report.bi_report_fence_common_indicators
> select
>   fence_id,
>   'finishedOrderCnt' as indicator_name,
>   TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
>   count(1) as indicator_val
> from
>   (
>     select
>       dt,
>       fence_id,
>       fence_coordinates_array,
>       c.driver_location
>     from
>       (
>         select
>           *
>         from
>           (
>             select
>               dt,
>               driver_location,
>               r1.f1.fence_info as fence_info
>             from
>               (
>                 select
>                   o.dt,
>                   o.driver_location,
>                   MD5(r.city_code) as k,
>                   PROCTIME() as proctime
>                 from
>                   (
>                     select
>                       order_no,
>                       dt,
>                       driver_location,
>                       PROCTIME() as proctime
>                     from
>                       x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
>                     where
>                       _type = 'insert'
>                       and event_code = 'arriveAndSettlement'
>                   ) o
>                   LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME
> AS OF o.proctime AS r ON r.order_no = o.order_no
>               ) o1
>               LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime
> AS r1 ON r1.k = o1.k
>           ) a
>         where
>           fence_info is not null
>       ) c
>       LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id,
> fence_coordinates_array) ON TRUE
>   ) as b
> where
>   in_fence(fence_coordinates_array, driver_location)
> group by
>   TUMBLE(dt, INTERVAL '5' MINUTE),
>   fence_id;
>            其中
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
>            CREATE TABLE
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
>   _type STRING,
>   _old_id BIGINT,
>   id BIGINT,
>   _old_order_no STRING,
>   order_no STRING,
>   _old_event_code STRING,
>   event_code STRING,
>   _old_from_state TINYINT,
>   from_state TINYINT,
>   _old_to_state TINYINT,
>   to_state TINYINT,
>   _old_operator_type TINYINT,
>   operator_type TINYINT,
>   _old_passenger_location STRING,
>   passenger_location STRING,
>   _old_driver_location STRING,
>   driver_location STRING,
>   _old_trans_time STRING,
>   trans_time STRING,
>   _old_create_time STRING,
>   create_time STRING,
>   _old_update_time STRING,
>   update_time STRING,
>   _old_passenger_poi_address STRING,
>   passenger_poi_address STRING,
>   _old_passenger_detail_address STRING,
>   passenger_detail_address STRING,
>   _old_driver_poi_address STRING,
>   driver_poi_address STRING,
>   _old_driver_detail_address STRING,
>   driver_detail_address STRING,
>   _old_operator STRING,
>   operator STRING,
>   _old_partition_index TINYINT,
>   partition_index TINYINT,
>   dt as TO_TIMESTAMP(trans_time),
>   WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.properties.bootstrap.servers' = '*',
>   'connector.properties.zookeeper.connect' = '*',
>   'connector.version' = 'universal',
>   'format.type' = 'json',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'group-offsets',
>   'connector.topic' = 'xxxxx'
> )
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

Leonard Xu
In reply to this post by junbaozhang
Hi,

可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。

祝好,
Leonard Xu

> 在 2020年7月13日,11:46,[hidden email] 写道:
>
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
>  x.report.bi_report_fence_common_indicators
> select
>  fence_id,
>  'finishedOrderCnt' as indicator_name,
>  TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
>  count(1) as indicator_val
> from
>  (
>    select
>      dt,
>      fence_id,
>      fence_coordinates_array,
>      c.driver_location
>    from
>      (
>        select
>          *
>        from
>          (
>            select
>              dt,
>              driver_location,
>              r1.f1.fence_info as fence_info
>            from
>              (
>                select
>                  o.dt,
>                  o.driver_location,
>                  MD5(r.city_code) as k,
>                  PROCTIME() as proctime
>                from
>                  (
>                    select
>                      order_no,
>                      dt,
>                      driver_location,
>                      PROCTIME() as proctime
>                    from
>                      x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
>                    where
>                      _type = 'insert'
>                      and event_code = 'arriveAndSettlement'
>                  ) o
>                  LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
>              ) o1
>              LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
>          ) a
>        where
>          fence_info is not null
>      ) c
>      LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
>  ) as b
> where
>  in_fence(fence_coordinates_array, driver_location)
> group by
>  TUMBLE(dt, INTERVAL '5' MINUTE),
>  fence_id;
>           其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
>           CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
>  _type STRING,
>  _old_id BIGINT,
>  id BIGINT,
>  _old_order_no STRING,
>  order_no STRING,
>  _old_event_code STRING,
>  event_code STRING,
>  _old_from_state TINYINT,
>  from_state TINYINT,
>  _old_to_state TINYINT,
>  to_state TINYINT,
>  _old_operator_type TINYINT,
>  operator_type TINYINT,
>  _old_passenger_location STRING,
>  passenger_location STRING,
>  _old_driver_location STRING,
>  driver_location STRING,
>  _old_trans_time STRING,
>  trans_time STRING,
>  _old_create_time STRING,
>  create_time STRING,
>  _old_update_time STRING,
>  update_time STRING,
>  _old_passenger_poi_address STRING,
>  passenger_poi_address STRING,
>  _old_passenger_detail_address STRING,
>  passenger_detail_address STRING,
>  _old_driver_poi_address STRING,
>  driver_poi_address STRING,
>  _old_driver_detail_address STRING,
>  driver_detail_address STRING,
>  _old_operator STRING,
>  operator STRING,
>  _old_partition_index TINYINT,
>  partition_index TINYINT,
>  dt as TO_TIMESTAMP(trans_time),
>  WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.properties.bootstrap.servers' = '*',
>  'connector.properties.zookeeper.connect' = '*',
>  'connector.version' = 'universal',
>  'format.type' = 'json',
>  'connector.properties.group.id' = 'testGroup',
>  'connector.startup-mode' = 'group-offsets',
>  'connector.topic' = 'xxxxx'
> )

Reply | Threaded
Open this post in threaded view
|

回复: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

junbaozhang
Hi,
    确实是一共三个分区,只有一个分区有数据,已经解决,谢谢。

Best,
Junbao Zhang
________________________________
发件人: Leonard Xu <[hidden email]>
发送时间: 2020年7月13日 11:57
收件人: user-zh <[hidden email]>
主题: Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

Hi,

可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。

祝好,
Leonard Xu

> 在 2020年7月13日,11:46,[hidden email] 写道:
>
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
>  x.report.bi_report_fence_common_indicators
> select
>  fence_id,
>  'finishedOrderCnt' as indicator_name,
>  TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
>  count(1) as indicator_val
> from
>  (
>    select
>      dt,
>      fence_id,
>      fence_coordinates_array,
>      c.driver_location
>    from
>      (
>        select
>          *
>        from
>          (
>            select
>              dt,
>              driver_location,
>              r1.f1.fence_info as fence_info
>            from
>              (
>                select
>                  o.dt,
>                  o.driver_location,
>                  MD5(r.city_code) as k,
>                  PROCTIME() as proctime
>                from
>                  (
>                    select
>                      order_no,
>                      dt,
>                      driver_location,
>                      PROCTIME() as proctime
>                    from
>                      x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
>                    where
>                      _type = 'insert'
>                      and event_code = 'arriveAndSettlement'
>                  ) o
>                  LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
>              ) o1
>              LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
>          ) a
>        where
>          fence_info is not null
>      ) c
>      LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
>  ) as b
> where
>  in_fence(fence_coordinates_array, driver_location)
> group by
>  TUMBLE(dt, INTERVAL '5' MINUTE),
>  fence_id;
>           其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
>           CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
>  _type STRING,
>  _old_id BIGINT,
>  id BIGINT,
>  _old_order_no STRING,
>  order_no STRING,
>  _old_event_code STRING,
>  event_code STRING,
>  _old_from_state TINYINT,
>  from_state TINYINT,
>  _old_to_state TINYINT,
>  to_state TINYINT,
>  _old_operator_type TINYINT,
>  operator_type TINYINT,
>  _old_passenger_location STRING,
>  passenger_location STRING,
>  _old_driver_location STRING,
>  driver_location STRING,
>  _old_trans_time STRING,
>  trans_time STRING,
>  _old_create_time STRING,
>  create_time STRING,
>  _old_update_time STRING,
>  update_time STRING,
>  _old_passenger_poi_address STRING,
>  passenger_poi_address STRING,
>  _old_passenger_detail_address STRING,
>  passenger_detail_address STRING,
>  _old_driver_poi_address STRING,
>  driver_poi_address STRING,
>  _old_driver_detail_address STRING,
>  driver_detail_address STRING,
>  _old_operator STRING,
>  operator STRING,
>  _old_partition_index TINYINT,
>  partition_index TINYINT,
>  dt as TO_TIMESTAMP(trans_time),
>  WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.properties.bootstrap.servers' = '*',
>  'connector.properties.zookeeper.connect' = '*',
>  'connector.version' = 'universal',
>  'format.type' = 'json',
>  'connector.properties.group.id' = 'testGroup',
>  'connector.startup-mode' = 'group-offsets',
>  'connector.topic' = 'xxxxx'
> )

junbaozhang