大佬好,我在1.11版本测试flink sql时发现一个问题,用streaming api
消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka topic只有一个分区时却能这个正常触发计算,watermarks也显示正常下面是测试代码:package com.test.opsimport java.util.Propertiesimport com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}import org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.table.api.EnvironmentSettingsimport org.apache.flink.types.Rowimport scala.collection.JavaConversions._import org.apache.flink.api.scala._import org.apache.flink.table.api._import org.apache.flink.table.api.bridge.scala._object FlinkSqlTest { //源json解析结构化case类,以及基于此case类,分拆出来的RhoLog和PpoLog case类 分别计算 case class SrcLog(userid: String, guid: String, rho: JsonNode, ts: Long) case class RhoLog(userid: String, guid: String, ts: Long) def main(args: Array[String]): Unit = { val parallelism = 3 val kafkaBrokers = "172.x.x.x:9092" val jobName = "flinksql-test" val topicNames = List("ops_nginx_logs") val groupName = "test-ops-100000" val properties = new Properties() //流处理的环境构造 val conf: Configuration = new Configuration() import org.apache.flink.configuration.ConfigConstants conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) val streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) streamEnv.enableCheckpointing(10000) streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) streamEnv.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) streamEnv.setParallelism(parallelism) // table环境构造 val blinkTableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, blinkTableEnvSettings) properties.setProperty("bootstrap.servers", kafkaBrokers) properties.setProperty("group.id", groupName) val myConsumer = new FlinkKafkaConsumer[String](topicNames, new SimpleStringSchema(), properties) //从kafka最新offset开始消费 myConsumer.setStartFromLatest() val mapper = new ObjectMapper val srcStream = streamEnv.addSource(myConsumer) .filter(_.nonEmpty).filter(_.contains("\"message\":{")).map(line => { val rootNode = mapper.readTree(line) val rowTime = rootNode.get("@timestamp").asText() // val timeStamp = dateTimeToTimestampJdk8(rowTime) val timeStamp = rowTime.toLong val messageNode = mapper.readTree(rootNode.get("message").toString) val rho = messageNode.get("rho") val userid = if (messageNode.has("u")) messageNode.get("u").asText else "nodata" val guid = if (messageNode.has("g")) messageNode.get("g").asText else "nodata" SrcLog(userid, guid, rho, timeStamp) }) val rhoStream = srcStream.map(src => { RhoLog(src.userid, src.guid, src.ts) }).assignAscendingTimestamps(row => { println(row.ts) row.ts }) //水位线 //流转表 val table = tableEnv.fromDataStream(rhoStream, 'userid, 'guid, 'ts as 'ts1, 'ts.rowtime() as 'ts) //源表转窗口聚合表 val resTableEventtime = table.window(Tumble over 10.seconds on 'ts as 'window) .groupBy('userid, 'window, 'ts, 'ts1) .select('userid, 'guid.count, 'window.start(), 'window.end(), 'ts, 'ts1) // 窗口聚合表输出 resTableEventtime.toAppendStream[Row].print("test") streamEnv.execute() }}症状:当topic为单分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic ops_nginx_logs1Topic:ops_nginx_logs1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: ops_nginx_logs1 Partition: 0 Leader: 104 Replicas: 104,107,105 Isr: 104,107,105flinksql 聚合结果:test:1> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:17.932,1596979157932test:2> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:19.932,1596979159932test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:11.930,1596979151930test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:15.931,1596979155931test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:13.931,1596979153931 <http://apache-flink.147419.n8.nabble.com/file/t837/3.png> web ui watermarks显示正常 <http://apache-flink.147419.n8.nabble.com/file/t837/4.png> 当kafka topic为多分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic ops_nginx_logsTopic:ops_nginx_logs PartitionCount:3 ReplicationFactor:3 Configs: Topic: ops_nginx_logs Partition: 0 Leader: 104 Replicas: 104,106,107 Isr: 104,106,107 Topic: ops_nginx_logs Partition: 1 Leader: 105 Replicas: 105,107,104 Isr: 105,107,104 Topic: ops_nginx_logs Partition: 2 Leader: 106 Replicas: 106,104,105 Isr: 106,104,105flinksql 聚合计算迟迟不计算,没有结果输出: <http://apache-flink.147419.n8.nabble.com/file/t837/2.png> web ui watermarks 显示 No Watermark <http://apache-flink.147419.n8.nabble.com/file/t837/1.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
字体太小,代码也都被格式化了,不方便看。不过按照问题描述,猜测是kafka某个分区没数据,flink的watermark机制有多输入取小机制。如果某个分区没数据,可能导致watermark一直无法生成。
在 2020/8/9 下午9:38,“zhanglachun”<[hidden email]> 写入: 大佬好,我在1.11版本测试flink sql时发现一个问题,用streaming api 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka topic只有一个分区时却能这个正常触发计算,watermarks也显示正常下面是测试代码:package com.test.opsimport java.util.Propertiesimport com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}import org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.table.api.EnvironmentSettingsimport org.apache.flink.types.Rowimport scala.collection.JavaConversions._import org.apache.flink.api.scala._import org.apache.flink.table.api._import org.apache.flink.table.api.bridge.scala._object FlinkSqlTest { //源json解析结构化case类,以及基于此case类,分拆出来的RhoLog和PpoLog case类 分别计算 case class SrcLog(userid: String, guid: String, rho: JsonNode, ts: Long) case class RhoLog(userid: String, guid: String, ts: Long) def main(args: Array[String]): Unit = { val parallelism = 3 val kafkaBrokers = "172.x.x.x:9092" val jobName = "flinksql-test" val topicNames = List("ops_nginx_logs") val groupName = "test-ops-100000" val properties = new Properties() //流处理的环境构造 val conf: Configuration = new Configuration() import org.apache.flink.configuration.ConfigConstants conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) val streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) streamEnv.enableCheckpointing(10000) streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) streamEnv.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) streamEnv.setParallelism(parallelism) // table环境构造 val blinkTableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, blinkTableEnvSettings) properties.setProperty("bootstrap.servers", kafkaBrokers) properties.setProperty("group.id", groupName) val myConsumer = new FlinkKafkaConsumer[String](topicNames, new SimpleStringSchema(), properties) //从kafka最新offset开始消费 myConsumer.setStartFromLatest() val mapper = new ObjectMapper val srcStream = streamEnv.addSource(myConsumer) .filter(_.nonEmpty).filter(_.contains("\"message\":{")).map(line => { val rootNode = mapper.readTree(line) val rowTime = rootNode.get("@timestamp").asText() // val timeStamp = dateTimeToTimestampJdk8(rowTime) val timeStamp = rowTime.toLong val messageNode = mapper.readTree(rootNode.get("message").toString) val rho = messageNode.get("rho") val userid = if (messageNode.has("u")) messageNode.get("u").asText else "nodata" val guid = if (messageNode.has("g")) messageNode.get("g").asText else "nodata" SrcLog(userid, guid, rho, timeStamp) }) val rhoStream = srcStream.map(src => { RhoLog(src.userid, src.guid, src.ts) }).assignAscendingTimestamps(row => { println(row.ts) row.ts }) //水位线 //流转表 val table = tableEnv.fromDataStream(rhoStream, 'userid, 'guid, 'ts as 'ts1, 'ts.rowtime() as 'ts) //源表转窗口聚合表 val resTableEventtime = table.window(Tumble over 10.seconds on 'ts as 'window) .groupBy('userid, 'window, 'ts, 'ts1) .select('userid, 'guid.count, 'window.start(), 'window.end(), 'ts, 'ts1) // 窗口聚合表输出 resTableEventtime.toAppendStream[Row].print("test") streamEnv.execute() }}症状:当topic为单分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic ops_nginx_logs1Topic:ops_nginx_logs1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: ops_nginx_logs1 Partition: 0 Leader: 104 Replicas: 104,107,105 Isr: 104,107,105flinksql 聚合结果:test:1> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:17.932,1596979157932test:2> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:19.932,1596979159932test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:11.930,1596979151930test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:15.931,1596979155931test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:13.931,1596979153931 <http://apache-flink.147419.n8.nabble.com/file/t837/3.png> web ui watermarks显示正常 <http://apache-flink.147419.n8.nabble.com/file/t837/4.png> 当kafka topic为多分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic ops_nginx_logsTopic:ops_nginx_logs PartitionCount:3 ReplicationFactor:3 Configs: Topic: ops_nginx_logs Partition: 0 Leader: 104 Replicas: 104,106,107 Isr: 104,106,107 Topic: ops_nginx_logs Partition: 1 Leader: 105 Replicas: 105,107,104 Isr: 105,107,104 Topic: ops_nginx_logs Partition: 2 Leader: 106 Replicas: 106,104,105 Isr: 106,104,105flinksql 聚合计算迟迟不计算,没有结果输出: <http://apache-flink.147419.n8.nabble.com/file/t837/2.png> web ui watermarks 显示 No Watermark <http://apache-flink.147419.n8.nabble.com/file/t837/1.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hello,
关于这个问题,可以查看官方文档中关于空闲输入或空闲源的处理策略的解释: https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources 另外附上一篇相关文章: https://mp.weixin.qq.com/s/108o9iwEZaHyMoRBGUKX8g 希望能帮助到你 发件人: Zhao,Yi(SEC) 发送时间: 2020-08-10 10:08 收件人: [hidden email] 主题: Re: flinksql1.11 使用eventime消费kafka多分区时,没有水位线信息,聚合计算也不出结果 字体太小,代码也都被格式化了,不方便看。不过按照问题描述,猜测是kafka某个分区没数据,flink的watermark机制有多输入取小机制。如果某个分区没数据,可能导致watermark一直无法生成。 在 2020/8/9 下午9:38,“zhanglachun”<[hidden email]> 写入: 大佬好,我在1.11版本测试flink sql时发现一个问题,用streaming api 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka topic只有一个分区时却能这个正常触发计算,watermarks也显示正常下面是测试代码:package com.test.opsimport java.util.Propertiesimport com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}import org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.table.api.EnvironmentSettingsimport org.apache.flink.types.Rowimport scala.collection.JavaConversions._import org.apache.flink.api.scala._import org.apache.flink.table.api._import org.apache.flink.table.api.bridge.scala._object FlinkSqlTest { //源json解析结构化case类,以及基于此case类,分拆出来的RhoLog和PpoLog case类 分别计算 case class SrcLog(userid: String, guid: String, rho: JsonNode, ts: Long) case class RhoLog(userid: String, guid: String, ts: Long) def main(args: Array[String]): Unit = { val parallelism = 3 val kafkaBrokers = "172.x.x.x:9092" val jobName = "flinksql-test" val topicNames = List("ops_nginx_logs") val groupName = "test-ops-100000" val properties = new Properties() //流处理的环境构造 val conf: Configuration = new Configuration() import org.apache.flink.configuration.ConfigConstants conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) val streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) streamEnv.enableCheckpointing(10000) streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) streamEnv.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) streamEnv.setParallelism(parallelism) // table环境构造 val blinkTableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, blinkTableEnvSettings) properties.setProperty("bootstrap.servers", kafkaBrokers) properties.setProperty("group.id", groupName) val myConsumer = new FlinkKafkaConsumer[String](topicNames, new SimpleStringSchema(), properties) //从kafka最新offset开始消费 myConsumer.setStartFromLatest() val mapper = new ObjectMapper val srcStream = streamEnv.addSource(myConsumer) .filter(_.nonEmpty).filter(_.contains("\"message\":{")).map(line => { val rootNode = mapper.readTree(line) val rowTime = rootNode.get("@timestamp").asText() // val timeStamp = dateTimeToTimestampJdk8(rowTime) val timeStamp = rowTime.toLong val messageNode = mapper.readTree(rootNode.get("message").toString) val rho = messageNode.get("rho") val userid = if (messageNode.has("u")) messageNode.get("u").asText else "nodata" val guid = if (messageNode.has("g")) messageNode.get("g").asText else "nodata" SrcLog(userid, guid, rho, timeStamp) }) val rhoStream = srcStream.map(src => { RhoLog(src.userid, src.guid, src.ts) }).assignAscendingTimestamps(row => { println(row.ts) row.ts }) //水位线 //流转表 val table = tableEnv.fromDataStream(rhoStream, 'userid, 'guid, 'ts as 'ts1, 'ts.rowtime() as 'ts) //源表转窗口聚合表 val resTableEventtime = table.window(Tumble over 10.seconds on 'ts as 'window) .groupBy('userid, 'window, 'ts, 'ts1) .select('userid, 'guid.count, 'window.start(), 'window.end(), 'ts, 'ts1) // 窗口聚合表输出 resTableEventtime.toAppendStream[Row].print("test") streamEnv.execute() }}症状:当topic为单分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic ops_nginx_logs1Topic:ops_nginx_logs1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: ops_nginx_logs1 Partition: 0 Leader: 104 Replicas: 104,107,105 Isr: 104,107,105flinksql 聚合结果:test:1> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:17.932,1596979157932test:2> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:19.932,1596979159932test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:11.930,1596979151930test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:15.931,1596979155931test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:13.931,1596979153931 <http://apache-flink.147419.n8.nabble.com/file/t837/3.png> web ui watermarks显示正常 <http://apache-flink.147419.n8.nabble.com/file/t837/4.png> 当kafka topic为多分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic ops_nginx_logsTopic:ops_nginx_logs PartitionCount:3 ReplicationFactor:3 Configs: Topic: ops_nginx_logs Partition: 0 Leader: 104 Replicas: 104,106,107 Isr: 104,106,107 Topic: ops_nginx_logs Partition: 1 Leader: 105 Replicas: 105,107,104 Isr: 105,107,104 Topic: ops_nginx_logs Partition: 2 Leader: 106 Replicas: 106,104,105 Isr: 106,104,105flinksql 聚合计算迟迟不计算,没有结果输出: <http://apache-flink.147419.n8.nabble.com/file/t837/2.png> web ui watermarks 显示 No Watermark <http://apache-flink.147419.n8.nabble.com/file/t837/1.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |