大家好,请教一个问题
我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 一直是 No Watermark。 暂时找不到排查问题的思路。 Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 | No Watermark | SQL如下 DDL: create table test( user_id varchar, action varchar, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) with(); DML: insert into console select user_id, f_get_str(bind_id) as id_list from ( select action as bind_id, user_id, event_time from ( SELECT user_id, action, PROCTIME() as proc_time, event_time FROM test ) T where user_id is not null and user_id <> '' and CHARACTER_LENGTH(user_id) = 24 ) T group by SESSION(event_time, INTERVAL '10' SECOND), user_id Best forideal |
大家好
关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner 在translateToPlanInternal 中生成了如下一个 class 代码, public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } } 其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); 确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。 在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 Best forideal 在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道: >大家好,请教一个问题 > > > 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 > 一直是 No Watermark。 暂时找不到排查问题的思路。 > Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 >| >No Watermark | > SQL如下 > > > DDL: > create table test( > user_id varchar, > action varchar, > event_time TIMESTAMP(3), > WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND > ) with(); > > > DML: >insert into > console >select > user_id, > f_get_str(bind_id) as id_list >from > ( > select > action as bind_id, > user_id, > event_time > from > ( > SELECT > user_id, > action, > PROCTIME() as proc_time, > event_time > FROM > test > ) T > where > user_id is not null > and user_id <> '' > and CHARACTER_LENGTH(user_id) = 24 > ) T >group by > SESSION(event_time, INTERVAL '10' SECOND), > user_id > >Best forideal |
hi, 你的意思是没有办法在codegen出来的代码上加断点的意思吗?
这里倒是有一个比较hack的方法: 将生成的类放在一个java文件之中,然后修改改下GeneratedClass下的newInstance方法,如果classname == “WatermarkGenerator$2” 则将刚才的类则返回 new WatermarkGenerator$2 这个类。 我个人对于问题的猜测是有一条数据的rowtime远远晚于其他数据,从而将整体的watermark提得很高,导致后面的“晚到”的数据一直无法触发watermark的生成。 forideal <[hidden email]> 于2020年8月13日周四 下午12:57写道: > 大家好 > > > 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 > StreamExecWatermarkAssigner > 在translateToPlanInternal 中生成了如下一个 class 代码, > public final class WatermarkGenerator$2 extends > org.apache.flink.table.runtime.generated.WatermarkGenerator { public > WatermarkGenerator$2(Object[] references) throws Exception { } @Override > public void open(org.apache.flink.configuration.Configuration parameters) > throws Exception { } @Override public Long > currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws > Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean > isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp > result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { > field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 > = null; if (!isNull$4) { result$5 = > org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() > - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { > return null; } else { return result$5.getMillisecond(); } } @Override > public void close() throws Exception { } } > > > > 其中关键的信息是 result$5 = > org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() > - ((long) 10000L), field$3.getNanoOfMillisecond()); > 确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 > watermark。 > 在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark > 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 > 如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 > > Best forideal > > > > > > > > > > > > 在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道: > >大家好,请教一个问题 > > > > > > 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 > watermark。消费大量的数据的时候,就无法生成watermark。 > > 一直是 No Watermark。 暂时找不到排查问题的思路。 > > Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 > EventTime mode 模式,Blink Planner。 > >| > >No Watermark | > > SQL如下 > > > > > > DDL: > > create table test( > > user_id varchar, > > action varchar, > > event_time TIMESTAMP(3), > > WATERMARK FOR event_time AS event_time - INTERVAL > '10' SECOND > > ) with(); > > > > > > DML: > >insert into > > console > >select > > user_id, > > f_get_str(bind_id) as id_list > >from > > ( > > select > > action as bind_id, > > user_id, > > event_time > > from > > ( > > SELECT > > user_id, > > action, > > PROCTIME() as proc_time, > > event_time > > FROM > > test > > ) T > > where > > user_id is not null > > and user_id <> '' > > and CHARACTER_LENGTH(user_id) = 24 > > ) T > >group by > > SESSION(event_time, INTERVAL '10' SECOND), > > user_id > > > >Best forideal > |
In reply to this post by forideal
大家好
问题的原因定位到了。 由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。 这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题) 发现在 WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。 定位到由于 kafka 部分 partition 无数据导致 No watermark 加上 table.exec.source.idle-timeout = 10s 参数即可。 当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。 Best forideal 在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道: >大家好 > > > 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner > 在translateToPlanInternal 中生成了如下一个 class 代码, >public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } } > > > > 其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); >确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。 >在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 >如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 > > Best forideal > > > > > > > > > > > >在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道: >>大家好,请教一个问题 >> >> >> 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 >> 一直是 No Watermark。 暂时找不到排查问题的思路。 >> Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 >>| >>No Watermark | >> SQL如下 >> >> >> DDL: >> create table test( >> user_id varchar, >> action varchar, >> event_time TIMESTAMP(3), >> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND >> ) with(); >> >> >> DML: >>insert into >> console >>select >> user_id, >> f_get_str(bind_id) as id_list >>from >> ( >> select >> action as bind_id, >> user_id, >> event_time >> from >> ( >> SELECT >> user_id, >> action, >> PROCTIME() as proc_time, >> event_time >> FROM >> test >> ) T >> where >> user_id is not null >> and user_id <> '' >> and CHARACTER_LENGTH(user_id) = 24 >> ) T >>group by >> SESSION(event_time, INTERVAL '10' SECOND), >> user_id >> >>Best forideal |
Hi forideal, 我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s") 并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下) 在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的 在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道: >大家好 > > 问题的原因定位到了。 > 由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。 > 这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题) > 发现在 WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。 > 定位到由于 kafka 部分 partition 无数据导致 No watermark 加上 table.exec.source.idle-timeout = 10s 参数即可。 > 当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。 > > >Best forideal > > > > > > > > >在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道: >>大家好 >> >> >> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner >> 在translateToPlanInternal 中生成了如下一个 class 代码, >>public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } } >> >> >> >> 其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); >>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。 >>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 >>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 >> >> Best forideal >> >> >> >> >> >> >> >> >> >> >> >>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道: >>>大家好,请教一个问题 >>> >>> >>> 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 >>> 一直是 No Watermark。 暂时找不到排查问题的思路。 >>> Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 >>>| >>>No Watermark | >>> SQL如下 >>> >>> >>> DDL: >>> create table test( >>> user_id varchar, >>> action varchar, >>> event_time TIMESTAMP(3), >>> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND >>> ) with(); >>> >>> >>> DML: >>>insert into >>> console >>>select >>> user_id, >>> f_get_str(bind_id) as id_list >>>from >>> ( >>> select >>> action as bind_id, >>> user_id, >>> event_time >>> from >>> ( >>> SELECT >>> user_id, >>> action, >>> PROCTIME() as proc_time, >>> event_time >>> FROM >>> test >>> ) T >>> where >>> user_id is not null >>> and user_id <> '' >>> and CHARACTER_LENGTH(user_id) = 24 >>> ) T >>>group by >>> SESSION(event_time, INTERVAL '10' SECOND), >>> user_id >>> >>>Best forideal |
hi 那你有没有试过将并行度设置为partition的数量
Zhou Zach <[hidden email]>于2020年8月13日 周四下午3:21写道: > > > > Hi forideal, > 我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: > > > val streamExecutionEnv = > StreamExecutionEnvironment.getExecutionEnvironment > > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamExecutionEnv.setStateBackend(new > RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) > > val blinkEnvSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, > blinkEnvSettings) > > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) > > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) > > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) > > > streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s") > > > 并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下) > 在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的 > > > > > > > > > > > > > > > 在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道: > >大家好 > > > > 问题的原因定位到了。 > > 由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。 > > 这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 > op chain 在一起,不能确定到底是那个环节存在问题) > > 发现在 WatermarkAssigner(rowtime=[event_time], > watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source > chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark > 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。 > > 定位到由于 kafka 部分 partition 无数据导致 No watermark 加上 > table.exec.source.idle-timeout = 10s 参数即可。 > > 当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task > 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain > 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。 > > > > > >Best forideal > > > > > > > > > > > > > > > > > >在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道: > >>大家好 > >> > >> > >> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 > StreamExecWatermarkAssigner > >> 在translateToPlanInternal 中生成了如下一个 class 代码, > >>public final class WatermarkGenerator$2 extends > org.apache.flink.table.runtime.generated.WatermarkGenerator { public > WatermarkGenerator$2(Object[] references) throws Exception { } @Override > public void open(org.apache.flink.configuration.Configuration parameters) > throws Exception { } @Override public Long > currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws > Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean > isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp > result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { > field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 > = null; if (!isNull$4) { result$5 = > org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() > - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { > return null; } else { return result$5.getMillisecond(); } } @Override > public void close() throws Exception { } } > >> > >> > >> > >> 其中关键的信息是 result$5 = > org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() > - ((long) 10000L), field$3.getNanoOfMillisecond()); > >>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND > 的定义获取的 watermark。 > >>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark > 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 > >>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 > >> > >> Best forideal > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道: > >>>大家好,请教一个问题 > >>> > >>> > >>> 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 > watermark。消费大量的数据的时候,就无法生成watermark。 > >>> 一直是 No Watermark。 暂时找不到排查问题的思路。 > >>> Flink 版本号是 1.10,kafka > 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 > >>>| > >>>No Watermark | > >>> SQL如下 > >>> > >>> > >>> DDL: > >>> create table test( > >>> user_id varchar, > >>> action varchar, > >>> event_time TIMESTAMP(3), > >>> WATERMARK FOR event_time AS event_time - > INTERVAL '10' SECOND > >>> ) with(); > >>> > >>> > >>> DML: > >>>insert into > >>> console > >>>select > >>> user_id, > >>> f_get_str(bind_id) as id_list > >>>from > >>> ( > >>> select > >>> action as bind_id, > >>> user_id, > >>> event_time > >>> from > >>> ( > >>> SELECT > >>> user_id, > >>> action, > >>> PROCTIME() as proc_time, > >>> event_time > >>> FROM > >>> test > >>> ) T > >>> where > >>> user_id is not null > >>> and user_id <> '' > >>> and CHARACTER_LENGTH(user_id) = 24 > >>> ) T > >>>group by > >>> SESSION(event_time, INTERVAL '10' SECOND), > >>> user_id > >>> > >>>Best forideal > |
In reply to this post by Zhou Zach
Hi Zhou Zach:
你可以试试 env.disableOperatorChaining(); 然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。 > 我是怎么设置参数的 我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样 tableEnv.getConfig().getConfiguration() .setString(key, configs.getString(key, null)); 同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND Best forideal 在 2020-08-13 15:20:13,"Zhou Zach" <[hidden email]> 写道: > > > >Hi forideal, >我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: > > > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) > > val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) > > streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s") > > >并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下) >在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的 > > > > > > > > > > > > > > >在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道: >>大家好 >> >> 问题的原因定位到了。 >> 由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。 >> 这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题) >> 发现在 WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。 >> 定位到由于 kafka 部分 partition 无数据导致 No watermark 加上 table.exec.source.idle-timeout = 10s 参数即可。 >> 当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。 >> >> >>Best forideal >> >> >> >> >> >> >> >> >>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道: >>>大家好 >>> >>> >>> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner >>> 在translateToPlanInternal 中生成了如下一个 class 代码, >>>public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } } >>> >>> >>> >>> 其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); >>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。 >>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 >>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 >>> >>> Best forideal >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道: >>>>大家好,请教一个问题 >>>> >>>> >>>> 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 >>>> 一直是 No Watermark。 暂时找不到排查问题的思路。 >>>> Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 >>>>| >>>>No Watermark | >>>> SQL如下 >>>> >>>> >>>> DDL: >>>> create table test( >>>> user_id varchar, >>>> action varchar, >>>> event_time TIMESTAMP(3), >>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND >>>> ) with(); >>>> >>>> >>>> DML: >>>>insert into >>>> console >>>>select >>>> user_id, >>>> f_get_str(bind_id) as id_list >>>>from >>>> ( >>>> select >>>> action as bind_id, >>>> user_id, >>>> event_time >>>> from >>>> ( >>>> SELECT >>>> user_id, >>>> action, >>>> PROCTIME() as proc_time, >>>> event_time >>>> FROM >>>> test >>>> ) T >>>> where >>>> user_id is not null >>>> and user_id <> '' >>>> and CHARACTER_LENGTH(user_id) = 24 >>>> ) T >>>>group by >>>> SESSION(event_time, INTERVAL '10' SECOND), >>>> user_id >>>> >>>>Best forideal |
Hi forideal, Shengkai Fang, 加上env.disableOperatorChaining()之后,发现5个算子, Source: TableSourceScan(table=[[default_catalog, default_database, user]], fields=[uid, sex, age, created_time]) -> Calc(select=[uid, sex, age, created_time, () AS procTime, TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd HH:mm:ss')) AS eventTime]) -> WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime - 3000:INTERVAL SECOND)]) -> Calc(select=[uid, sex, age, created_time]) -> Sink: Sink(table=[default_catalog.default_database.user_mysql], fields=[uid, sex, age, created_time]) 但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的 在 2020-08-13 15:39:44,"forideal" <[hidden email]> 写道: >Hi Zhou Zach: >你可以试试 env.disableOperatorChaining(); >然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。 >> 我是怎么设置参数的 >我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样 >tableEnv.getConfig().getConfiguration() .setString(key, configs.getString(key, null)); >同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND > >Best forideal > > >在 2020-08-13 15:20:13,"Zhou Zach" <[hidden email]> 写道: >> >> >> >>Hi forideal, >>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: >> >> >> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) >> >> val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) >> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) >> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) >> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) >> >> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s") >> >> >>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下) >>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道: >>>大家好 >>> >>> 问题的原因定位到了。 >>> 由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。 >>> 这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题) >>> 发现在 WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。 >>> 定位到由于 kafka 部分 partition 无数据导致 No watermark 加上 table.exec.source.idle-timeout = 10s 参数即可。 >>> 当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。 >>> >>> >>>Best forideal >>> >>> >>> >>> >>> >>> >>> >>> >>>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道: >>>>大家好 >>>> >>>> >>>> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner >>>> 在translateToPlanInternal 中生成了如下一个 class 代码, >>>>public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } } >>>> >>>> >>>> >>>> 其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); >>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。 >>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 >>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 >>>> >>>> Best forideal >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道: >>>>>大家好,请教一个问题 >>>>> >>>>> >>>>> 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 >>>>> 一直是 No Watermark。 暂时找不到排查问题的思路。 >>>>> Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 >>>>>| >>>>>No Watermark | >>>>> SQL如下 >>>>> >>>>> >>>>> DDL: >>>>> create table test( >>>>> user_id varchar, >>>>> action varchar, >>>>> event_time TIMESTAMP(3), >>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND >>>>> ) with(); >>>>> >>>>> >>>>> DML: >>>>>insert into >>>>> console >>>>>select >>>>> user_id, >>>>> f_get_str(bind_id) as id_list >>>>>from >>>>> ( >>>>> select >>>>> action as bind_id, >>>>> user_id, >>>>> event_time >>>>> from >>>>> ( >>>>> SELECT >>>>> user_id, >>>>> action, >>>>> PROCTIME() as proc_time, >>>>> event_time >>>>> FROM >>>>> test >>>>> ) T >>>>> where >>>>> user_id is not null >>>>> and user_id <> '' >>>>> and CHARACTER_LENGTH(user_id) = 24 >>>>> ) T >>>>>group by >>>>> SESSION(event_time, INTERVAL '10' SECOND), >>>>> user_id >>>>> >>>>>Best forideal |
hi, watermark本来就是通过watermark assigner生成的。这是正常现象。
我想问问 你有没有试过调大并行度来解决这个问题?因为不同partition的数据可能存在时间上的差异。 Zhou Zach <[hidden email]> 于2020年8月13日周四 下午4:33写道: > > > > Hi forideal, Shengkai Fang, > > 加上env.disableOperatorChaining()之后,发现5个算子, > > > > > Source: TableSourceScan(table=[[default_catalog, default_database, user]], > fields=[uid, sex, age, created_time]) -> > > Calc(select=[uid, sex, age, created_time, () AS procTime, > TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd > HH:mm:ss')) AS eventTime]) -> > > WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime - > 3000:INTERVAL SECOND)]) -> > > Calc(select=[uid, sex, age, created_time]) -> > > Sink: Sink(table=[default_catalog.default_database.user_mysql], > fields=[uid, sex, age, created_time]) > 但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink > ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的 > > > > > > > > > > > > > > > 在 2020-08-13 15:39:44,"forideal" <[hidden email]> 写道: > >Hi Zhou Zach: > >你可以试试 env.disableOperatorChaining(); > >然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。 > >> 我是怎么设置参数的 > >我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样 > >tableEnv.getConfig().getConfiguration() .setString(key, > configs.getString(key, null)); > >同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL > '10' SECOND > > > >Best forideal > > > > > >在 2020-08-13 15:20:13,"Zhou Zach" <[hidden email]> 写道: > >> > >> > >> > >>Hi forideal, > >>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: > >> > >> > >> val streamExecutionEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >> > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> streamExecutionEnv.setStateBackend(new > RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) > >> > >> val blinkEnvSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >> val streamTableEnv = > StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) > >> > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) > >> > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) > >> > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) > >> > >> > streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s") > >> > >> > >>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下) > >>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >>在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道: > >>>大家好 > >>> > >>> 问题的原因定位到了。 > >>> 由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。 > >>> 这个时候,我进行了 disable chain,观察 watermark > 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题) > >>> 发现在 WatermarkAssigner(rowtime=[event_time], > watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source > chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark > 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。 > >>> 定位到由于 kafka 部分 partition 无数据导致 No watermark 加上 > table.exec.source.idle-timeout = 10s 参数即可。 > >>> 当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task > 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain > 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。 > >>> > >>> > >>>Best forideal > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道: > >>>>大家好 > >>>> > >>>> > >>>> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 > StreamExecWatermarkAssigner > >>>> 在translateToPlanInternal 中生成了如下一个 class 代码, > >>>>public final class WatermarkGenerator$2 extends > org.apache.flink.table.runtime.generated.WatermarkGenerator { public > WatermarkGenerator$2(Object[] references) throws Exception { } @Override > public void open(org.apache.flink.configuration.Configuration parameters) > throws Exception { } @Override public Long > currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws > Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean > isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp > result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { > field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 > = null; if (!isNull$4) { result$5 = > org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() > - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { > return null; } else { return result$5.getMillisecond(); } } @Override > public void close() throws Exception { } } > >>>> > >>>> > >>>> > >>>> 其中关键的信息是 result$5 = > org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() > - ((long) 10000L), field$3.getNanoOfMillisecond()); > >>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND > 的定义获取的 watermark。 > >>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark > 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 > >>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 > >>>> > >>>> Best forideal > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道: > >>>>>大家好,请教一个问题 > >>>>> > >>>>> > >>>>> 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic > 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 > >>>>> 一直是 No Watermark。 暂时找不到排查问题的思路。 > >>>>> Flink 版本号是 1.10,kafka > 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 > >>>>>| > >>>>>No Watermark | > >>>>> SQL如下 > >>>>> > >>>>> > >>>>> DDL: > >>>>> create table test( > >>>>> user_id varchar, > >>>>> action varchar, > >>>>> event_time TIMESTAMP(3), > >>>>> WATERMARK FOR event_time AS event_time - > INTERVAL '10' SECOND > >>>>> ) with(); > >>>>> > >>>>> > >>>>> DML: > >>>>>insert into > >>>>> console > >>>>>select > >>>>> user_id, > >>>>> f_get_str(bind_id) as id_list > >>>>>from > >>>>> ( > >>>>> select > >>>>> action as bind_id, > >>>>> user_id, > >>>>> event_time > >>>>> from > >>>>> ( > >>>>> SELECT > >>>>> user_id, > >>>>> action, > >>>>> PROCTIME() as proc_time, > >>>>> event_time > >>>>> FROM > >>>>> test > >>>>> ) T > >>>>> where > >>>>> user_id is not null > >>>>> and user_id <> '' > >>>>> and CHARACTER_LENGTH(user_id) = 24 > >>>>> ) T > >>>>>group by > >>>>> SESSION(event_time, INTERVAL '10' SECOND), > >>>>> user_id > >>>>> > >>>>>Best forideal > |
Hi,试了,将并行度设置为2和kafka分区数9,都试了,都只有一个consumer有watermark,可能是因为我开了一个producer吧 在 2020-08-13 16:57:25,"Shengkai Fang" <[hidden email]> 写道: >hi, watermark本来就是通过watermark assigner生成的。这是正常现象。 >我想问问 你有没有试过调大并行度来解决这个问题?因为不同partition的数据可能存在时间上的差异。 > >Zhou Zach <[hidden email]> 于2020年8月13日周四 下午4:33写道: > >> >> >> >> Hi forideal, Shengkai Fang, >> >> 加上env.disableOperatorChaining()之后,发现5个算子, >> >> >> >> >> Source: TableSourceScan(table=[[default_catalog, default_database, user]], >> fields=[uid, sex, age, created_time]) -> >> >> Calc(select=[uid, sex, age, created_time, () AS procTime, >> TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd >> HH:mm:ss')) AS eventTime]) -> >> >> WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime - >> 3000:INTERVAL SECOND)]) -> >> >> Calc(select=[uid, sex, age, created_time]) -> >> >> Sink: Sink(table=[default_catalog.default_database.user_mysql], >> fields=[uid, sex, age, created_time]) >> 但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink >> ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-08-13 15:39:44,"forideal" <[hidden email]> 写道: >> >Hi Zhou Zach: >> >你可以试试 env.disableOperatorChaining(); >> >然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。 >> >> 我是怎么设置参数的 >> >我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样 >> >tableEnv.getConfig().getConfiguration() .setString(key, >> configs.getString(key, null)); >> >同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL >> '10' SECOND >> > >> >Best forideal >> > >> > >> >在 2020-08-13 15:20:13,"Zhou Zach" <[hidden email]> 写道: >> >> >> >> >> >> >> >>Hi forideal, >> >>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: >> >> >> >> >> >> val streamExecutionEnv = >> StreamExecutionEnvironment.getExecutionEnvironment >> >> >> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> >> streamExecutionEnv.setStateBackend(new >> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) >> >> >> >> val blinkEnvSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> >> val streamTableEnv = >> StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) >> >> >> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) >> >> >> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) >> >> >> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) >> >> >> >> >> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s") >> >> >> >> >> >>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下) >> >>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道: >> >>>大家好 >> >>> >> >>> 问题的原因定位到了。 >> >>> 由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。 >> >>> 这个时候,我进行了 disable chain,观察 watermark >> 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题) >> >>> 发现在 WatermarkAssigner(rowtime=[event_time], >> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source >> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark >> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。 >> >>> 定位到由于 kafka 部分 partition 无数据导致 No watermark 加上 >> table.exec.source.idle-timeout = 10s 参数即可。 >> >>> 当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task >> 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain >> 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。 >> >>> >> >>> >> >>>Best forideal >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道: >> >>>>大家好 >> >>>> >> >>>> >> >>>> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 >> StreamExecWatermarkAssigner >> >>>> 在translateToPlanInternal 中生成了如下一个 class 代码, >> >>>>public final class WatermarkGenerator$2 extends >> org.apache.flink.table.runtime.generated.WatermarkGenerator { public >> WatermarkGenerator$2(Object[] references) throws Exception { } @Override >> public void open(org.apache.flink.configuration.Configuration parameters) >> throws Exception { } @Override public Long >> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws >> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean >> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp >> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { >> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 >> = null; if (!isNull$4) { result$5 = >> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() >> - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { >> return null; } else { return result$5.getMillisecond(); } } @Override >> public void close() throws Exception { } } >> >>>> >> >>>> >> >>>> >> >>>> 其中关键的信息是 result$5 = >> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() >> - ((long) 10000L), field$3.getNanoOfMillisecond()); >> >>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND >> 的定义获取的 watermark。 >> >>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark >> 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 >> >>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 >> >>>> >> >>>> Best forideal >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道: >> >>>>>大家好,请教一个问题 >> >>>>> >> >>>>> >> >>>>> 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic >> 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 >> >>>>> 一直是 No Watermark。 暂时找不到排查问题的思路。 >> >>>>> Flink 版本号是 1.10,kafka >> 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 >> >>>>>| >> >>>>>No Watermark | >> >>>>> SQL如下 >> >>>>> >> >>>>> >> >>>>> DDL: >> >>>>> create table test( >> >>>>> user_id varchar, >> >>>>> action varchar, >> >>>>> event_time TIMESTAMP(3), >> >>>>> WATERMARK FOR event_time AS event_time - >> INTERVAL '10' SECOND >> >>>>> ) with(); >> >>>>> >> >>>>> >> >>>>> DML: >> >>>>>insert into >> >>>>> console >> >>>>>select >> >>>>> user_id, >> >>>>> f_get_str(bind_id) as id_list >> >>>>>from >> >>>>> ( >> >>>>> select >> >>>>> action as bind_id, >> >>>>> user_id, >> >>>>> event_time >> >>>>> from >> >>>>> ( >> >>>>> SELECT >> >>>>> user_id, >> >>>>> action, >> >>>>> PROCTIME() as proc_time, >> >>>>> event_time >> >>>>> FROM >> >>>>> test >> >>>>> ) T >> >>>>> where >> >>>>> user_id is not null >> >>>>> and user_id <> '' >> >>>>> and CHARACTER_LENGTH(user_id) = 24 >> >>>>> ) T >> >>>>>group by >> >>>>> SESSION(event_time, INTERVAL '10' SECOND), >> >>>>> user_id >> >>>>> >> >>>>>Best forideal >> |
In reply to this post by Zhou Zach
Hi Zhou Zach:
“但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的” 关于这个问题,我昨天也和李本超进行了线下沟通,大概的结论是: >1.如果不直接看每个operator的metrics,只看 flink ui 那个 graph 图,不进行 disable chain 的话,是看不出来问题的。如果打开了disable chain 有可能能看出来问题。在我们这个场景下能看出来问题。其他场景可能会不那么直接。 >2.我们打开了disable chain 看 flink ui,其实也是看的相关的 metric。 总体来看,就是要用 metrics 来诊断问题。不过,在某些场景下,一个用户开发了一个 Flink SQL 然后,去看监控,也增加了对应的 cost。一个是说这个用户之前不怎么看metrics,一个是说平台metrics也做的不好。所以如果能在 Flink ui 上面解决一定量的问题,将能减少用户的成本。 Best forideal 在 2020-08-13 16:33:29,"Zhou Zach" <[hidden email]> 写道: > > > >Hi forideal, Shengkai Fang, > >加上env.disableOperatorChaining()之后,发现5个算子, > > > > >Source: TableSourceScan(table=[[default_catalog, default_database, user]], fields=[uid, sex, age, created_time]) -> > >Calc(select=[uid, sex, age, created_time, () AS procTime, TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd HH:mm:ss')) AS eventTime]) -> > >WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime - 3000:INTERVAL SECOND)]) -> > >Calc(select=[uid, sex, age, created_time]) -> > >Sink: Sink(table=[default_catalog.default_database.user_mysql], fields=[uid, sex, age, created_time]) >但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的 > > > > > > > > > > > > > > >在 2020-08-13 15:39:44,"forideal" <[hidden email]> 写道: >>Hi Zhou Zach: >>你可以试试 env.disableOperatorChaining(); >>然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。 >>> 我是怎么设置参数的 >>我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样 >>tableEnv.getConfig().getConfiguration() .setString(key, configs.getString(key, null)); >>同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND >> >>Best forideal >> >> >>在 2020-08-13 15:20:13,"Zhou Zach" <[hidden email]> 写道: >>> >>> >>> >>>Hi forideal, >>>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: >>> >>> >>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> streamExecutionEnv.setStateBackend(new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) >>> >>> val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) >>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) >>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) >>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) >>> >>> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s") >>> >>> >>>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下) >>>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>在 2020-08-13 14:02:58,"forideal" <[hidden email]> 写道: >>>>大家好 >>>> >>>> 问题的原因定位到了。 >>>> 由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。 >>>> 这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题) >>>> 发现在 WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。 >>>> 定位到由于 kafka 部分 partition 无数据导致 No watermark 加上 table.exec.source.idle-timeout = 10s 参数即可。 >>>> 当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。 >>>> >>>> >>>>Best forideal >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>在 2020-08-13 12:56:57,"forideal" <[hidden email]> 写道: >>>>>大家好 >>>>> >>>>> >>>>> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner >>>>> 在translateToPlanInternal 中生成了如下一个 class 代码, >>>>>public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } } >>>>> >>>>> >>>>> >>>>> 其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); >>>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。 >>>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 >>>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 >>>>> >>>>> Best forideal >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>>在 2020-08-11 17:13:01,"forideal" <[hidden email]> 写道: >>>>>>大家好,请教一个问题 >>>>>> >>>>>> >>>>>> 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 >>>>>> 一直是 No Watermark。 暂时找不到排查问题的思路。 >>>>>> Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 >>>>>>| >>>>>>No Watermark | >>>>>> SQL如下 >>>>>> >>>>>> >>>>>> DDL: >>>>>> create table test( >>>>>> user_id varchar, >>>>>> action varchar, >>>>>> event_time TIMESTAMP(3), >>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND >>>>>> ) with(); >>>>>> >>>>>> >>>>>> DML: >>>>>>insert into >>>>>> console >>>>>>select >>>>>> user_id, >>>>>> f_get_str(bind_id) as id_list >>>>>>from >>>>>> ( >>>>>> select >>>>>> action as bind_id, >>>>>> user_id, >>>>>> event_time >>>>>> from >>>>>> ( >>>>>> SELECT >>>>>> user_id, >>>>>> action, >>>>>> PROCTIME() as proc_time, >>>>>> event_time >>>>>> FROM >>>>>> test >>>>>> ) T >>>>>> where >>>>>> user_id is not null >>>>>> and user_id <> '' >>>>>> and CHARACTER_LENGTH(user_id) = 24 >>>>>> ) T >>>>>>group by >>>>>> SESSION(event_time, INTERVAL '10' SECOND), >>>>>> user_id >>>>>> >>>>>>Best forideal |
Free forum by Nabble | Edit this page |