Flink SQL No Watermark

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL No Watermark

forideal
大家好,请教一个问题


           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
           一直是    No Watermark。 暂时找不到排查问题的思路。
          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
|
No Watermark |
           SQL如下


          DDL:
           create table test(
                       user_id varchar,
                       action varchar,
                       event_time TIMESTAMP(3),
                       WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
           ) with();


          DML:
insert into
  console
select
  user_id,
  f_get_str(bind_id) as id_list
from
  (
    select
      action as bind_id,
      user_id,
      event_time
    from
      (
        SELECT
          user_id,
          action,
          PROCTIME() as proc_time,
          event_time
        FROM
          test
      ) T
    where
      user_id is not null
      and user_id <> ''
      and CHARACTER_LENGTH(user_id) = 24
  ) T
group by
  SESSION(event_time, INTERVAL '10' SECOND),
  user_id
 
Best forideal
Reply | Threaded
Open this post in threaded view
|

Re:Flink SQL No Watermark

forideal
大家好


        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner
        在translateToPlanInternal 中生成了如下一个 class 代码,
public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } }
         


       其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond());
确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。
在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢

  Best forideal











在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道:

>大家好,请教一个问题
>
>
>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
>           一直是    No Watermark。 暂时找不到排查问题的思路。
>          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
>|
>No Watermark |
>           SQL如下
>
>
>          DDL:
>           create table test(
>                       user_id varchar,
>                       action varchar,
>                       event_time TIMESTAMP(3),
>                       WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>           ) with();
>
>
>          DML:
>insert into
>  console
>select
>  user_id,
>  f_get_str(bind_id) as id_list
>from
>  (
>    select
>      action as bind_id,
>      user_id,
>      event_time
>    from
>      (
>        SELECT
>          user_id,
>          action,
>          PROCTIME() as proc_time,
>          event_time
>        FROM
>          test
>      ) T
>    where
>      user_id is not null
>      and user_id <> ''
>      and CHARACTER_LENGTH(user_id) = 24
>  ) T
>group by
>  SESSION(event_time, INTERVAL '10' SECOND),
>  user_id
>
>Best forideal
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL No Watermark

Shengkai Fang
hi, 你的意思是没有办法在codegen出来的代码上加断点的意思吗?
这里倒是有一个比较hack的方法:
将生成的类放在一个java文件之中,然后修改改下GeneratedClass下的newInstance方法,如果classname ==
“WatermarkGenerator$2” 则将刚才的类则返回 new WatermarkGenerator$2 这个类。

我个人对于问题的猜测是有一条数据的rowtime远远晚于其他数据,从而将整体的watermark提得很高,导致后面的“晚到”的数据一直无法触发watermark的生成。


forideal <[hidden email]> 于2020年8月13日周四 下午12:57写道:

> 大家好
>
>
>         关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是
> StreamExecWatermarkAssigner
>         在translateToPlanInternal 中生成了如下一个 class 代码,
> public final class WatermarkGenerator$2 extends
> org.apache.flink.table.runtime.generated.WatermarkGenerator { public
> WatermarkGenerator$2(Object[] references) throws Exception { } @Override
> public void open(org.apache.flink.configuration.Configuration parameters)
> throws Exception { } @Override public Long
> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws
> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean
> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp
> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) {
> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5
> = null; if (!isNull$4) { result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) {
> return null; } else { return result$5.getMillisecond(); } } @Override
> public void close() throws Exception { } }
>
>
>
>        其中关键的信息是 result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 10000L), field$3.getNanoOfMillisecond());
> 确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的
> watermark。
> 在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark
> 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
> 如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>
>   Best forideal
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道:
> >大家好,请教一个问题
> >
> >
> >           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成
> watermark。消费大量的数据的时候,就无法生成watermark。
> >           一直是    No Watermark。 暂时找不到排查问题的思路。
> >          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了
> EventTime mode 模式,Blink Planner。
> >|
> >No Watermark |
> >           SQL如下
> >
> >
> >          DDL:
> >           create table test(
> >                       user_id varchar,
> >                       action varchar,
> >                       event_time TIMESTAMP(3),
> >                       WATERMARK FOR event_time AS event_time - INTERVAL
> '10' SECOND
> >           ) with();
> >
> >
> >          DML:
> >insert into
> >  console
> >select
> >  user_id,
> >  f_get_str(bind_id) as id_list
> >from
> >  (
> >    select
> >      action as bind_id,
> >      user_id,
> >      event_time
> >    from
> >      (
> >        SELECT
> >          user_id,
> >          action,
> >          PROCTIME() as proc_time,
> >          event_time
> >        FROM
> >          test
> >      ) T
> >    where
> >      user_id is not null
> >      and user_id <> ''
> >      and CHARACTER_LENGTH(user_id) = 24
> >  ) T
> >group by
> >  SESSION(event_time, INTERVAL '10' SECOND),
> >  user_id
> >
> >Best forideal
>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Flink SQL No Watermark

forideal
In reply to this post by forideal
大家好

       问题的原因定位到了。
       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
       这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
       发现在  WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  table.exec.source.idle-timeout = 10s 参数即可。
       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。


Best forideal








在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道:

>大家好
>
>
>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner
>        在translateToPlanInternal 中生成了如下一个 class 代码,
>public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } }
>        
>
>
>       其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond());
>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。
>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>
>  Best forideal
>
>
>
>
>
>
>
>
>
>
>
>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道:
>>大家好,请教一个问题
>>
>>
>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
>>           一直是    No Watermark。 暂时找不到排查问题的思路。
>>          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
>>|
>>No Watermark |
>>           SQL如下
>>
>>
>>          DDL:
>>           create table test(
>>                       user_id varchar,
>>                       action varchar,
>>                       event_time TIMESTAMP(3),
>>                       WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>>           ) with();
>>
>>
>>          DML:
>>insert into
>>  console
>>select
>>  user_id,
>>  f_get_str(bind_id) as id_list
>>from
>>  (
>>    select
>>      action as bind_id,
>>      user_id,
>>      event_time
>>    from
>>      (
>>        SELECT
>>          user_id,
>>          action,
>>          PROCTIME() as proc_time,
>>          event_time
>>        FROM
>>          test
>>      ) T
>>    where
>>      user_id is not null
>>      and user_id <> ''
>>      and CHARACTER_LENGTH(user_id) = 24
>>  ) T
>>group by
>>  SESSION(event_time, INTERVAL '10' SECOND),
>>  user_id
>>
>>Best forideal
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Flink SQL No Watermark

Zhou Zach



Hi forideal,
我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:


    val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))

    val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))

    streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")


并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的














在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道:

>大家好
>
>       问题的原因定位到了。
>       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>       这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
>       发现在  WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
>       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  table.exec.source.idle-timeout = 10s 参数即可。
>       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>
>
>Best forideal
>
>
>
>
>
>
>
>
>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道:
>>大家好
>>
>>
>>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner
>>        在translateToPlanInternal 中生成了如下一个 class 代码,
>>public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } }
>>        
>>
>>
>>       其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond());
>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。
>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>>
>>  Best forideal
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道:
>>>大家好,请教一个问题
>>>
>>>
>>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
>>>           一直是    No Watermark。 暂时找不到排查问题的思路。
>>>          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
>>>|
>>>No Watermark |
>>>           SQL如下
>>>
>>>
>>>          DDL:
>>>           create table test(
>>>                       user_id varchar,
>>>                       action varchar,
>>>                       event_time TIMESTAMP(3),
>>>                       WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>>>           ) with();
>>>
>>>
>>>          DML:
>>>insert into
>>>  console
>>>select
>>>  user_id,
>>>  f_get_str(bind_id) as id_list
>>>from
>>>  (
>>>    select
>>>      action as bind_id,
>>>      user_id,
>>>      event_time
>>>    from
>>>      (
>>>        SELECT
>>>          user_id,
>>>          action,
>>>          PROCTIME() as proc_time,
>>>          event_time
>>>        FROM
>>>          test
>>>      ) T
>>>    where
>>>      user_id is not null
>>>      and user_id <> ''
>>>      and CHARACTER_LENGTH(user_id) = 24
>>>  ) T
>>>group by
>>>  SESSION(event_time, INTERVAL '10' SECOND),
>>>  user_id
>>>
>>>Best forideal
Reply | Threaded
Open this post in threaded view
|

Re: Re:Re:Flink SQL No Watermark

Shengkai Fang
hi 那你有没有试过将并行度设置为partition的数量

Zhou Zach <[hidden email]>于2020年8月13日 周四下午3:21写道:

>
>
>
> Hi forideal,
> 我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>
>
>     val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     streamExecutionEnv.setStateBackend(new
> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>
>     val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> blinkEnvSettings)
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>
>
> 并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
> 在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道:
> >大家好
> >
> >       问题的原因定位到了。
> >       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
> >       这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个
> op chain 在一起,不能确定到底是那个环节存在问题)
> >       发现在  WatermarkAssigner(rowtime=[event_time],
> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source
> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark
> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
> >       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上
> table.exec.source.idle-timeout = 10s 参数即可。
> >       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task
> 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain
> 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
> >
> >
> >Best forideal
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道:
> >>大家好
> >>
> >>
> >>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是
> StreamExecWatermarkAssigner
> >>        在translateToPlanInternal 中生成了如下一个 class 代码,
> >>public final class WatermarkGenerator$2 extends
> org.apache.flink.table.runtime.generated.WatermarkGenerator { public
> WatermarkGenerator$2(Object[] references) throws Exception { } @Override
> public void open(org.apache.flink.configuration.Configuration parameters)
> throws Exception { } @Override public Long
> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws
> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean
> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp
> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) {
> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5
> = null; if (!isNull$4) { result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) {
> return null; } else { return result$5.getMillisecond(); } } @Override
> public void close() throws Exception { } }
> >>
> >>
> >>
> >>       其中关键的信息是 result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 10000L), field$3.getNanoOfMillisecond());
> >>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> 的定义获取的 watermark。
> >>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark
> 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
> >>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
> >>
> >>  Best forideal
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道:
> >>>大家好,请教一个问题
> >>>
> >>>
> >>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成
> watermark。消费大量的数据的时候,就无法生成watermark。
> >>>           一直是    No Watermark。 暂时找不到排查问题的思路。
> >>>          Flink 版本号是 1.10,kafka
> 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
> >>>|
> >>>No Watermark |
> >>>           SQL如下
> >>>
> >>>
> >>>          DDL:
> >>>           create table test(
> >>>                       user_id varchar,
> >>>                       action varchar,
> >>>                       event_time TIMESTAMP(3),
> >>>                       WATERMARK FOR event_time AS event_time -
> INTERVAL '10' SECOND
> >>>           ) with();
> >>>
> >>>
> >>>          DML:
> >>>insert into
> >>>  console
> >>>select
> >>>  user_id,
> >>>  f_get_str(bind_id) as id_list
> >>>from
> >>>  (
> >>>    select
> >>>      action as bind_id,
> >>>      user_id,
> >>>      event_time
> >>>    from
> >>>      (
> >>>        SELECT
> >>>          user_id,
> >>>          action,
> >>>          PROCTIME() as proc_time,
> >>>          event_time
> >>>        FROM
> >>>          test
> >>>      ) T
> >>>    where
> >>>      user_id is not null
> >>>      and user_id <> ''
> >>>      and CHARACTER_LENGTH(user_id) = 24
> >>>  ) T
> >>>group by
> >>>  SESSION(event_time, INTERVAL '10' SECOND),
> >>>  user_id
> >>>
> >>>Best forideal
>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re:Flink SQL No Watermark

forideal
In reply to this post by Zhou Zach
Hi Zhou Zach:
你可以试试 env.disableOperatorChaining();
然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
> 我是怎么设置参数的
我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
tableEnv.getConfig().getConfiguration() .setString(key, configs.getString(key, null));
同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND

Best forideal


在 2020-08-13 15:20:13,"Zhou Zach" <[hidden email]> 写道:

>
>
>
>Hi forideal,
>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>
>
>    val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>    streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>    streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>
>    val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
>    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>
>    streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>
>
>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道:
>>大家好
>>
>>       问题的原因定位到了。
>>       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>>       这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
>>       发现在  WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
>>       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  table.exec.source.idle-timeout = 10s 参数即可。
>>       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>>
>>
>>Best forideal
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道:
>>>大家好
>>>
>>>
>>>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner
>>>        在translateToPlanInternal 中生成了如下一个 class 代码,
>>>public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } }
>>>        
>>>
>>>
>>>       其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond());
>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。
>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>>>
>>>  Best forideal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道:
>>>>大家好,请教一个问题
>>>>
>>>>
>>>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
>>>>           一直是    No Watermark。 暂时找不到排查问题的思路。
>>>>          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
>>>>|
>>>>No Watermark |
>>>>           SQL如下
>>>>
>>>>
>>>>          DDL:
>>>>           create table test(
>>>>                       user_id varchar,
>>>>                       action varchar,
>>>>                       event_time TIMESTAMP(3),
>>>>                       WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>>>>           ) with();
>>>>
>>>>
>>>>          DML:
>>>>insert into
>>>>  console
>>>>select
>>>>  user_id,
>>>>  f_get_str(bind_id) as id_list
>>>>from
>>>>  (
>>>>    select
>>>>      action as bind_id,
>>>>      user_id,
>>>>      event_time
>>>>    from
>>>>      (
>>>>        SELECT
>>>>          user_id,
>>>>          action,
>>>>          PROCTIME() as proc_time,
>>>>          event_time
>>>>        FROM
>>>>          test
>>>>      ) T
>>>>    where
>>>>      user_id is not null
>>>>      and user_id <> ''
>>>>      and CHARACTER_LENGTH(user_id) = 24
>>>>  ) T
>>>>group by
>>>>  SESSION(event_time, INTERVAL '10' SECOND),
>>>>  user_id
>>>>
>>>>Best forideal
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re:Re:Flink SQL No Watermark

Zhou Zach



Hi forideal, Shengkai Fang,
   
加上env.disableOperatorChaining()之后,发现5个算子,




Source: TableSourceScan(table=[[default_catalog, default_database, user]], fields=[uid, sex, age, created_time]) ->

Calc(select=[uid, sex, age, created_time, () AS procTime, TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd HH:mm:ss')) AS eventTime]) ->

WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime - 3000:INTERVAL SECOND)]) ->

Calc(select=[uid, sex, age, created_time]) ->

Sink: Sink(table=[default_catalog.default_database.user_mysql], fields=[uid, sex, age, created_time])
但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的














在 2020-08-13 15:39:44,"forideal" <[hidden email]> 写道:

>Hi Zhou Zach:
>你可以试试 env.disableOperatorChaining();
>然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
>> 我是怎么设置参数的
>我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
>tableEnv.getConfig().getConfiguration() .setString(key, configs.getString(key, null));
>同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>
>Best forideal
>
>
>在 2020-08-13 15:20:13,"Zhou Zach" <[hidden email]> 写道:
>>
>>
>>
>>Hi forideal,
>>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>>
>>
>>    val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>    streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>    streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>>
>>    val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>    val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
>>    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>>    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>>    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>>
>>    streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>>
>>
>>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
>>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道:
>>>大家好
>>>
>>>       问题的原因定位到了。
>>>       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>>>       这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
>>>       发现在  WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
>>>       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  table.exec.source.idle-timeout = 10s 参数即可。
>>>       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>>>
>>>
>>>Best forideal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道:
>>>>大家好
>>>>
>>>>
>>>>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner
>>>>        在translateToPlanInternal 中生成了如下一个 class 代码,
>>>>public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } }
>>>>        
>>>>
>>>>
>>>>       其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond());
>>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。
>>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>>>>
>>>>  Best forideal
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道:
>>>>>大家好,请教一个问题
>>>>>
>>>>>
>>>>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
>>>>>           一直是    No Watermark。 暂时找不到排查问题的思路。
>>>>>          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
>>>>>|
>>>>>No Watermark |
>>>>>           SQL如下
>>>>>
>>>>>
>>>>>          DDL:
>>>>>           create table test(
>>>>>                       user_id varchar,
>>>>>                       action varchar,
>>>>>                       event_time TIMESTAMP(3),
>>>>>                       WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>>>>>           ) with();
>>>>>
>>>>>
>>>>>          DML:
>>>>>insert into
>>>>>  console
>>>>>select
>>>>>  user_id,
>>>>>  f_get_str(bind_id) as id_list
>>>>>from
>>>>>  (
>>>>>    select
>>>>>      action as bind_id,
>>>>>      user_id,
>>>>>      event_time
>>>>>    from
>>>>>      (
>>>>>        SELECT
>>>>>          user_id,
>>>>>          action,
>>>>>          PROCTIME() as proc_time,
>>>>>          event_time
>>>>>        FROM
>>>>>          test
>>>>>      ) T
>>>>>    where
>>>>>      user_id is not null
>>>>>      and user_id <> ''
>>>>>      and CHARACTER_LENGTH(user_id) = 24
>>>>>  ) T
>>>>>group by
>>>>>  SESSION(event_time, INTERVAL '10' SECOND),
>>>>>  user_id
>>>>>
>>>>>Best forideal
Reply | Threaded
Open this post in threaded view
|

Re: Re:Re:Re:Re:Flink SQL No Watermark

Shengkai Fang
hi, watermark本来就是通过watermark assigner生成的。这是正常现象。
我想问问 你有没有试过调大并行度来解决这个问题?因为不同partition的数据可能存在时间上的差异。

Zhou Zach <[hidden email]> 于2020年8月13日周四 下午4:33写道:

>
>
>
> Hi forideal, Shengkai Fang,
>
> 加上env.disableOperatorChaining()之后,发现5个算子,
>
>
>
>
> Source: TableSourceScan(table=[[default_catalog, default_database, user]],
> fields=[uid, sex, age, created_time]) ->
>
> Calc(select=[uid, sex, age, created_time, () AS procTime,
> TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd
> HH:mm:ss')) AS eventTime]) ->
>
> WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime -
> 3000:INTERVAL SECOND)]) ->
>
> Calc(select=[uid, sex, age, created_time]) ->
>
> Sink: Sink(table=[default_catalog.default_database.user_mysql],
> fields=[uid, sex, age, created_time])
> 但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
> ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-13 15:39:44,"forideal" <[hidden email]> 写道:
> >Hi Zhou Zach:
> >你可以试试 env.disableOperatorChaining();
> >然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
> >> 我是怎么设置参数的
> >我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
> >tableEnv.getConfig().getConfiguration() .setString(key,
> configs.getString(key, null));
> >同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL
> '10' SECOND
> >
> >Best forideal
> >
> >
> >在 2020-08-13 15:20:13,"Zhou Zach" <[hidden email]> 写道:
> >>
> >>
> >>
> >>Hi forideal,
> >>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
> >>
> >>
> >>    val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >>    streamExecutionEnv.setStateBackend(new
> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
> >>
> >>    val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>    val streamTableEnv =
> StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
> >>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
> >>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
> >>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
> >>
> >>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
> >>
> >>
> >>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
> >>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道:
> >>>大家好
> >>>
> >>>       问题的原因定位到了。
> >>>       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
> >>>       这个时候,我进行了 disable chain,观察 watermark
> 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
> >>>       发现在  WatermarkAssigner(rowtime=[event_time],
> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source
> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark
> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
> >>>       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上
> table.exec.source.idle-timeout = 10s 参数即可。
> >>>       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task
> 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain
> 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
> >>>
> >>>
> >>>Best forideal
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道:
> >>>>大家好
> >>>>
> >>>>
> >>>>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是
> StreamExecWatermarkAssigner
> >>>>        在translateToPlanInternal 中生成了如下一个 class 代码,
> >>>>public final class WatermarkGenerator$2 extends
> org.apache.flink.table.runtime.generated.WatermarkGenerator { public
> WatermarkGenerator$2(Object[] references) throws Exception { } @Override
> public void open(org.apache.flink.configuration.Configuration parameters)
> throws Exception { } @Override public Long
> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws
> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean
> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp
> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) {
> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5
> = null; if (!isNull$4) { result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) {
> return null; } else { return result$5.getMillisecond(); } } @Override
> public void close() throws Exception { } }
> >>>>
> >>>>
> >>>>
> >>>>       其中关键的信息是 result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 10000L), field$3.getNanoOfMillisecond());
> >>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> 的定义获取的 watermark。
> >>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark
> 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
> >>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
> >>>>
> >>>>  Best forideal
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道:
> >>>>>大家好,请教一个问题
> >>>>>
> >>>>>
> >>>>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic
> 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
> >>>>>           一直是    No Watermark。 暂时找不到排查问题的思路。
> >>>>>          Flink 版本号是 1.10,kafka
> 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
> >>>>>|
> >>>>>No Watermark |
> >>>>>           SQL如下
> >>>>>
> >>>>>
> >>>>>          DDL:
> >>>>>           create table test(
> >>>>>                       user_id varchar,
> >>>>>                       action varchar,
> >>>>>                       event_time TIMESTAMP(3),
> >>>>>                       WATERMARK FOR event_time AS event_time -
> INTERVAL '10' SECOND
> >>>>>           ) with();
> >>>>>
> >>>>>
> >>>>>          DML:
> >>>>>insert into
> >>>>>  console
> >>>>>select
> >>>>>  user_id,
> >>>>>  f_get_str(bind_id) as id_list
> >>>>>from
> >>>>>  (
> >>>>>    select
> >>>>>      action as bind_id,
> >>>>>      user_id,
> >>>>>      event_time
> >>>>>    from
> >>>>>      (
> >>>>>        SELECT
> >>>>>          user_id,
> >>>>>          action,
> >>>>>          PROCTIME() as proc_time,
> >>>>>          event_time
> >>>>>        FROM
> >>>>>          test
> >>>>>      ) T
> >>>>>    where
> >>>>>      user_id is not null
> >>>>>      and user_id <> ''
> >>>>>      and CHARACTER_LENGTH(user_id) = 24
> >>>>>  ) T
> >>>>>group by
> >>>>>  SESSION(event_time, INTERVAL '10' SECOND),
> >>>>>  user_id
> >>>>>
> >>>>>Best forideal
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re:Re:Re:Re:Flink SQL No Watermark

Zhou Zach



Hi,试了,将并行度设置为2和kafka分区数9,都试了,都只有一个consumer有watermark,可能是因为我开了一个producer吧














在 2020-08-13 16:57:25,"Shengkai Fang" <[hidden email]> 写道:

>hi, watermark本来就是通过watermark assigner生成的。这是正常现象。
>我想问问 你有没有试过调大并行度来解决这个问题?因为不同partition的数据可能存在时间上的差异。
>
>Zhou Zach <[hidden email]> 于2020年8月13日周四 下午4:33写道:
>
>>
>>
>>
>> Hi forideal, Shengkai Fang,
>>
>> 加上env.disableOperatorChaining()之后,发现5个算子,
>>
>>
>>
>>
>> Source: TableSourceScan(table=[[default_catalog, default_database, user]],
>> fields=[uid, sex, age, created_time]) ->
>>
>> Calc(select=[uid, sex, age, created_time, () AS procTime,
>> TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd
>> HH:mm:ss')) AS eventTime]) ->
>>
>> WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime -
>> 3000:INTERVAL SECOND)]) ->
>>
>> Calc(select=[uid, sex, age, created_time]) ->
>>
>> Sink: Sink(table=[default_catalog.default_database.user_mysql],
>> fields=[uid, sex, age, created_time])
>> 但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
>> ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-08-13 15:39:44,"forideal" <[hidden email]> 写道:
>> >Hi Zhou Zach:
>> >你可以试试 env.disableOperatorChaining();
>> >然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
>> >> 我是怎么设置参数的
>> >我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
>> >tableEnv.getConfig().getConfiguration() .setString(key,
>> configs.getString(key, null));
>> >同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL
>> '10' SECOND
>> >
>> >Best forideal
>> >
>> >
>> >在 2020-08-13 15:20:13,"Zhou Zach" <[hidden email]> 写道:
>> >>
>> >>
>> >>
>> >>Hi forideal,
>> >>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>> >>
>> >>
>> >>    val streamExecutionEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> >>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> >>    streamExecutionEnv.setStateBackend(new
>> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>> >>
>> >>    val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >>    val streamTableEnv =
>> StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>> >>
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>> >>
>> >>
>> >>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
>> >>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道:
>> >>>大家好
>> >>>
>> >>>       问题的原因定位到了。
>> >>>       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>> >>>       这个时候,我进行了 disable chain,观察 watermark
>> 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
>> >>>       发现在  WatermarkAssigner(rowtime=[event_time],
>> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source
>> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark
>> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
>> >>>       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上
>> table.exec.source.idle-timeout = 10s 参数即可。
>> >>>       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task
>> 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain
>> 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>> >>>
>> >>>
>> >>>Best forideal
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道:
>> >>>>大家好
>> >>>>
>> >>>>
>> >>>>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是
>> StreamExecWatermarkAssigner
>> >>>>        在translateToPlanInternal 中生成了如下一个 class 代码,
>> >>>>public final class WatermarkGenerator$2 extends
>> org.apache.flink.table.runtime.generated.WatermarkGenerator { public
>> WatermarkGenerator$2(Object[] references) throws Exception { } @Override
>> public void open(org.apache.flink.configuration.Configuration parameters)
>> throws Exception { } @Override public Long
>> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws
>> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean
>> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp
>> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) {
>> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5
>> = null; if (!isNull$4) { result$5 =
>> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>> - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) {
>> return null; } else { return result$5.getMillisecond(); } } @Override
>> public void close() throws Exception { } }
>> >>>>
>> >>>>
>> >>>>
>> >>>>       其中关键的信息是 result$5 =
>> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>> - ((long) 10000L), field$3.getNanoOfMillisecond());
>> >>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>> 的定义获取的 watermark。
>> >>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark
>> 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>> >>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>> >>>>
>> >>>>  Best forideal
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道:
>> >>>>>大家好,请教一个问题
>> >>>>>
>> >>>>>
>> >>>>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic
>> 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
>> >>>>>           一直是    No Watermark。 暂时找不到排查问题的思路。
>> >>>>>          Flink 版本号是 1.10,kafka
>> 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
>> >>>>>|
>> >>>>>No Watermark |
>> >>>>>           SQL如下
>> >>>>>
>> >>>>>
>> >>>>>          DDL:
>> >>>>>           create table test(
>> >>>>>                       user_id varchar,
>> >>>>>                       action varchar,
>> >>>>>                       event_time TIMESTAMP(3),
>> >>>>>                       WATERMARK FOR event_time AS event_time -
>> INTERVAL '10' SECOND
>> >>>>>           ) with();
>> >>>>>
>> >>>>>
>> >>>>>          DML:
>> >>>>>insert into
>> >>>>>  console
>> >>>>>select
>> >>>>>  user_id,
>> >>>>>  f_get_str(bind_id) as id_list
>> >>>>>from
>> >>>>>  (
>> >>>>>    select
>> >>>>>      action as bind_id,
>> >>>>>      user_id,
>> >>>>>      event_time
>> >>>>>    from
>> >>>>>      (
>> >>>>>        SELECT
>> >>>>>          user_id,
>> >>>>>          action,
>> >>>>>          PROCTIME() as proc_time,
>> >>>>>          event_time
>> >>>>>        FROM
>> >>>>>          test
>> >>>>>      ) T
>> >>>>>    where
>> >>>>>      user_id is not null
>> >>>>>      and user_id <> ''
>> >>>>>      and CHARACTER_LENGTH(user_id) = 24
>> >>>>>  ) T
>> >>>>>group by
>> >>>>>  SESSION(event_time, INTERVAL '10' SECOND),
>> >>>>>  user_id
>> >>>>>
>> >>>>>Best forideal
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re:Re:Re:Flink SQL No Watermark

forideal
In reply to this post by Zhou Zach
Hi Zhou Zach:
      “但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
 ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的”
 
      关于这个问题,我昨天也和李本超进行了线下沟通,大概的结论是:
       >1.如果不直接看每个operator的metrics,只看 flink ui 那个 graph 图,不进行 disable chain 的话,是看不出来问题的。如果打开了disable chain 有可能能看出来问题。在我们这个场景下能看出来问题。其他场景可能会不那么直接。
       >2.我们打开了disable chain 看 flink ui,其实也是看的相关的 metric。


       总体来看,就是要用 metrics 来诊断问题。不过,在某些场景下,一个用户开发了一个 Flink SQL 然后,去看监控,也增加了对应的 cost。一个是说这个用户之前不怎么看metrics,一个是说平台metrics也做的不好。所以如果能在 Flink ui 上面解决一定量的问题,将能减少用户的成本。


Best forideal


     

















在 2020-08-13 16:33:29,"Zhou Zach" <[hidden email]> 写道:

>
>
>
>Hi forideal, Shengkai Fang,
>  
>加上env.disableOperatorChaining()之后,发现5个算子,
>
>
>
>
>Source: TableSourceScan(table=[[default_catalog, default_database, user]], fields=[uid, sex, age, created_time]) ->
>
>Calc(select=[uid, sex, age, created_time, () AS procTime, TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd HH:mm:ss')) AS eventTime]) ->
>
>WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime - 3000:INTERVAL SECOND)]) ->
>
>Calc(select=[uid, sex, age, created_time]) ->
>
>Sink: Sink(table=[default_catalog.default_database.user_mysql], fields=[uid, sex, age, created_time])
>但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-08-13 15:39:44,"forideal" <[hidden email]> 写道:
>>Hi Zhou Zach:
>>你可以试试 env.disableOperatorChaining();
>>然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
>>> 我是怎么设置参数的
>>我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
>>tableEnv.getConfig().getConfiguration() .setString(key, configs.getString(key, null));
>>同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>>
>>Best forideal
>>
>>
>>在 2020-08-13 15:20:13,"Zhou Zach" <[hidden email]> 写道:
>>>
>>>
>>>
>>>Hi forideal,
>>>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>>>
>>>
>>>    val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>    streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>    streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>>>
>>>    val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>    val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
>>>    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>>>    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>>>    streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>>>
>>>    streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>>>
>>>
>>>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
>>>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道:
>>>>大家好
>>>>
>>>>       问题的原因定位到了。
>>>>       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>>>>       这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
>>>>       发现在  WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
>>>>       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  table.exec.source.idle-timeout = 10s 参数即可。
>>>>       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>>>>
>>>>
>>>>Best forideal
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道:
>>>>>大家好
>>>>>
>>>>>
>>>>>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner
>>>>>        在translateToPlanInternal 中生成了如下一个 class 代码,
>>>>>public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } }
>>>>>        
>>>>>
>>>>>
>>>>>       其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond());
>>>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。
>>>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>>>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>>>>>
>>>>>  Best forideal
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道:
>>>>>>大家好,请教一个问题
>>>>>>
>>>>>>
>>>>>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
>>>>>>           一直是    No Watermark。 暂时找不到排查问题的思路。
>>>>>>          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
>>>>>>|
>>>>>>No Watermark |
>>>>>>           SQL如下
>>>>>>
>>>>>>
>>>>>>          DDL:
>>>>>>           create table test(
>>>>>>                       user_id varchar,
>>>>>>                       action varchar,
>>>>>>                       event_time TIMESTAMP(3),
>>>>>>                       WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>>>>>>           ) with();
>>>>>>
>>>>>>
>>>>>>          DML:
>>>>>>insert into
>>>>>>  console
>>>>>>select
>>>>>>  user_id,
>>>>>>  f_get_str(bind_id) as id_list
>>>>>>from
>>>>>>  (
>>>>>>    select
>>>>>>      action as bind_id,
>>>>>>      user_id,
>>>>>>      event_time
>>>>>>    from
>>>>>>      (
>>>>>>        SELECT
>>>>>>          user_id,
>>>>>>          action,
>>>>>>          PROCTIME() as proc_time,
>>>>>>          event_time
>>>>>>        FROM
>>>>>>          test
>>>>>>      ) T
>>>>>>    where
>>>>>>      user_id is not null
>>>>>>      and user_id <> ''
>>>>>>      and CHARACTER_LENGTH(user_id) = 24
>>>>>>  ) T
>>>>>>group by
>>>>>>  SESSION(event_time, INTERVAL '10' SECOND),
>>>>>>  user_id
>>>>>>
>>>>>>Best forideal