flinksql1.11 使用eventime消费kafka多分区时,没有水位线信息,聚合计算也不出结果

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flinksql1.11 使用eventime消费kafka多分区时,没有水位线信息,聚合计算也不出结果

zhanglachun
大佬好,我在1.11版本测试flink sql时发现一个问题,用streaming api
消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
topic只有一个分区时却能这个正常触发计算,watermarks也显示正常下面是测试代码:package com.test.opsimport
java.util.Propertiesimport com.fasterxml.jackson.databind.{JsonNode,
ObjectMapper}import
org.apache.flink.api.common.restartstrategy.RestartStrategiesimport
org.apache.flink.api.common.serialization.SimpleStringSchemaimport
org.apache.flink.configuration.Configurationimport
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport
org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport
org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport
org.apache.flink.table.api.EnvironmentSettingsimport
org.apache.flink.types.Rowimport scala.collection.JavaConversions._import
org.apache.flink.api.scala._import org.apache.flink.table.api._import
org.apache.flink.table.api.bridge.scala._object FlinkSqlTest {
//源json解析结构化case类,以及基于此case类,分拆出来的RhoLog和PpoLog case类 分别计算  case class
SrcLog(userid: String, guid: String, rho: JsonNode, ts: Long)  case class
RhoLog(userid: String, guid: String, ts: Long)  def main(args:
Array[String]): Unit = {    val parallelism = 3    val kafkaBrokers =
"172.x.x.x:9092"    val jobName = "flinksql-test"    val topicNames =
List("ops_nginx_logs")    val groupName = "test-ops-100000"    val
properties = new Properties()    //流处理的环境构造    val conf: Configuration = new
Configuration()    import org.apache.flink.configuration.ConfigConstants  
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)    val
streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)  
streamEnv.enableCheckpointing(10000)  
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)  
streamEnv.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000))  
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)  
streamEnv.setParallelism(parallelism)    // table环境构造    val
blinkTableEnvSettings = EnvironmentSettings.newInstance()    
.useBlinkPlanner()      .inStreamingMode()      .build()    val tableEnv =
StreamTableEnvironment.create(streamEnv, blinkTableEnvSettings)  
properties.setProperty("bootstrap.servers", kafkaBrokers)  
properties.setProperty("group.id", groupName)    val myConsumer = new
FlinkKafkaConsumer[String](topicNames, new SimpleStringSchema(), properties)  
//从kafka最新offset开始消费    myConsumer.setStartFromLatest()    val mapper = new
ObjectMapper    val srcStream = streamEnv.addSource(myConsumer)    
.filter(_.nonEmpty).filter(_.contains("\"message\":{")).map(line => {    
val rootNode = mapper.readTree(line)      val rowTime =
rootNode.get("@timestamp").asText()      //      val timeStamp =
dateTimeToTimestampJdk8(rowTime)      val timeStamp = rowTime.toLong    
val messageNode = mapper.readTree(rootNode.get("message").toString)      val
rho = messageNode.get("rho")      val userid = if (messageNode.has("u"))
messageNode.get("u").asText else "nodata"      val guid = if
(messageNode.has("g")) messageNode.get("g").asText else "nodata"    
SrcLog(userid, guid, rho, timeStamp)    })    val rhoStream =
srcStream.map(src => {      RhoLog(src.userid, src.guid, src.ts)  
}).assignAscendingTimestamps(row => {        println(row.ts)        row.ts    
}) //水位线    //流转表    val table = tableEnv.fromDataStream(rhoStream, 'userid,
'guid, 'ts as 'ts1, 'ts.rowtime() as 'ts)    //源表转窗口聚合表    val
resTableEventtime = table.window(Tumble over 10.seconds on 'ts as 'window)    
.groupBy('userid, 'window, 'ts, 'ts1)      .select('userid, 'guid.count,
'window.start(), 'window.end(), 'ts, 'ts1)    // 窗口聚合表输出  
resTableEventtime.toAppendStream[Row].print("test")    streamEnv.execute()
}}症状:当topic为单分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic
ops_nginx_logs1Topic:ops_nginx_logs1 PartitionCount:1 ReplicationFactor:3
Configs: Topic: ops_nginx_logs1 Partition: 0 Leader: 104 Replicas:
104,107,105 Isr: 104,107,105flinksql 聚合结果:test:1>
aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:17.932,1596979157932test:2>
aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:19.932,1596979159932test:3>
aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:11.930,1596979151930test:3>
aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:15.931,1596979155931test:3>
aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:13.931,1596979153931
<http://apache-flink.147419.n8.nabble.com/file/t837/3.png> web ui
watermarks显示正常 <http://apache-flink.147419.n8.nabble.com/file/t837/4.png>
当kafka topic为多分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic
ops_nginx_logsTopic:ops_nginx_logs PartitionCount:3 ReplicationFactor:3
Configs: Topic: ops_nginx_logs Partition: 0 Leader: 104 Replicas:
104,106,107 Isr: 104,106,107 Topic: ops_nginx_logs Partition: 1 Leader: 105
Replicas: 105,107,104 Isr: 105,107,104 Topic: ops_nginx_logs Partition: 2
Leader: 106 Replicas: 106,104,105 Isr: 106,104,105flinksql 聚合计算迟迟不计算,没有结果输出:
<http://apache-flink.147419.n8.nabble.com/file/t837/2.png> web ui watermarks
显示 No Watermark <http://apache-flink.147419.n8.nabble.com/file/t837/1.png>



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flinksql1.11 使用eventime消费kafka多分区时,没有水位线信息,聚合计算也不出结果

Zhao,Yi(SEC)
字体太小,代码也都被格式化了,不方便看。不过按照问题描述,猜测是kafka某个分区没数据,flink的watermark机制有多输入取小机制。如果某个分区没数据,可能导致watermark一直无法生成。

在 2020/8/9 下午9:38,“zhanglachun”<[hidden email]> 写入:

    大佬好,我在1.11版本测试flink sql时发现一个问题,用streaming api
    消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
    watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
    topic只有一个分区时却能这个正常触发计算,watermarks也显示正常下面是测试代码:package com.test.opsimport
    java.util.Propertiesimport com.fasterxml.jackson.databind.{JsonNode,
    ObjectMapper}import
    org.apache.flink.api.common.restartstrategy.RestartStrategiesimport
    org.apache.flink.api.common.serialization.SimpleStringSchemaimport
    org.apache.flink.configuration.Configurationimport
    org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport
    org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import
    org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport
    org.apache.flink.table.api.EnvironmentSettingsimport
    org.apache.flink.types.Rowimport scala.collection.JavaConversions._import
    org.apache.flink.api.scala._import org.apache.flink.table.api._import
    org.apache.flink.table.api.bridge.scala._object FlinkSqlTest {
    //源json解析结构化case类,以及基于此case类,分拆出来的RhoLog和PpoLog case类 分别计算  case class
    SrcLog(userid: String, guid: String, rho: JsonNode, ts: Long)  case class
    RhoLog(userid: String, guid: String, ts: Long)  def main(args:
    Array[String]): Unit = {    val parallelism = 3    val kafkaBrokers =
    "172.x.x.x:9092"    val jobName = "flinksql-test"    val topicNames =
    List("ops_nginx_logs")    val groupName = "test-ops-100000"    val
    properties = new Properties()    //流处理的环境构造    val conf: Configuration = new
    Configuration()    import org.apache.flink.configuration.ConfigConstants  
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)    val
    streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)  
    streamEnv.enableCheckpointing(10000)  
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  
    streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)  
    streamEnv.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
    10000))  
    streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)  
    streamEnv.setParallelism(parallelism)    // table环境构造    val
    blinkTableEnvSettings = EnvironmentSettings.newInstance()    
    .useBlinkPlanner()      .inStreamingMode()      .build()    val tableEnv =
    StreamTableEnvironment.create(streamEnv, blinkTableEnvSettings)  
    properties.setProperty("bootstrap.servers", kafkaBrokers)  
    properties.setProperty("group.id", groupName)    val myConsumer = new
    FlinkKafkaConsumer[String](topicNames, new SimpleStringSchema(), properties)  
    //从kafka最新offset开始消费    myConsumer.setStartFromLatest()    val mapper = new
    ObjectMapper    val srcStream = streamEnv.addSource(myConsumer)    
    .filter(_.nonEmpty).filter(_.contains("\"message\":{")).map(line => {    
    val rootNode = mapper.readTree(line)      val rowTime =
    rootNode.get("@timestamp").asText()      //      val timeStamp =
    dateTimeToTimestampJdk8(rowTime)      val timeStamp = rowTime.toLong    
    val messageNode = mapper.readTree(rootNode.get("message").toString)      val
    rho = messageNode.get("rho")      val userid = if (messageNode.has("u"))
    messageNode.get("u").asText else "nodata"      val guid = if
    (messageNode.has("g")) messageNode.get("g").asText else "nodata"    
    SrcLog(userid, guid, rho, timeStamp)    })    val rhoStream =
    srcStream.map(src => {      RhoLog(src.userid, src.guid, src.ts)  
    }).assignAscendingTimestamps(row => {        println(row.ts)        row.ts    
    }) //水位线    //流转表    val table = tableEnv.fromDataStream(rhoStream, 'userid,
    'guid, 'ts as 'ts1, 'ts.rowtime() as 'ts)    //源表转窗口聚合表    val
    resTableEventtime = table.window(Tumble over 10.seconds on 'ts as 'window)    
    .groupBy('userid, 'window, 'ts, 'ts1)      .select('userid, 'guid.count,
    'window.start(), 'window.end(), 'ts, 'ts1)    // 窗口聚合表输出  
    resTableEventtime.toAppendStream[Row].print("test")    streamEnv.execute()
    }}症状:当topic为单分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic
    ops_nginx_logs1Topic:ops_nginx_logs1 PartitionCount:1 ReplicationFactor:3
    Configs: Topic: ops_nginx_logs1 Partition: 0 Leader: 104 Replicas:
    104,107,105 Isr: 104,107,105flinksql 聚合结果:test:1>
    aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:17.932,1596979157932test:2>
    aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:19.932,1596979159932test:3>
    aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:11.930,1596979151930test:3>
    aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:15.931,1596979155931test:3>
    aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:13.931,1596979153931
    <http://apache-flink.147419.n8.nabble.com/file/t837/3.png> web ui
    watermarks显示正常 <http://apache-flink.147419.n8.nabble.com/file/t837/4.png>
    当kafka topic为多分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic
    ops_nginx_logsTopic:ops_nginx_logs PartitionCount:3 ReplicationFactor:3
    Configs: Topic: ops_nginx_logs Partition: 0 Leader: 104 Replicas:
    104,106,107 Isr: 104,106,107 Topic: ops_nginx_logs Partition: 1 Leader: 105
    Replicas: 105,107,104 Isr: 105,107,104 Topic: ops_nginx_logs Partition: 2
    Leader: 106 Replicas: 106,104,105 Isr: 106,104,105flinksql 聚合计算迟迟不计算,没有结果输出:
    <http://apache-flink.147419.n8.nabble.com/file/t837/2.png> web ui watermarks
    显示 No Watermark <http://apache-flink.147419.n8.nabble.com/file/t837/1.png>
   
   
   
    --
    Sent from: http://apache-flink.147419.n8.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Re: flinksql1.11 使用eventime消费kafka多分区时,没有水位线信息,聚合计算也不出结果

Evan
Hello,
关于这个问题,可以查看官方文档中关于空闲输入或空闲源的处理策略的解释:
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
另外附上一篇相关文章:
https://mp.weixin.qq.com/s/108o9iwEZaHyMoRBGUKX8g 

希望能帮助到你




 
发件人: Zhao,Yi(SEC)
发送时间: 2020-08-10 10:08
收件人: [hidden email]
主题: Re: flinksql1.11 使用eventime消费kafka多分区时,没有水位线信息,聚合计算也不出结果
字体太小,代码也都被格式化了,不方便看。不过按照问题描述,猜测是kafka某个分区没数据,flink的watermark机制有多输入取小机制。如果某个分区没数据,可能导致watermark一直无法生成。
 
在 2020/8/9 下午9:38,“zhanglachun”<[hidden email]> 写入:
 
    大佬好,我在1.11版本测试flink sql时发现一个问题,用streaming api
    消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
    watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
    topic只有一个分区时却能这个正常触发计算,watermarks也显示正常下面是测试代码:package com.test.opsimport
    java.util.Propertiesimport com.fasterxml.jackson.databind.{JsonNode,
    ObjectMapper}import
    org.apache.flink.api.common.restartstrategy.RestartStrategiesimport
    org.apache.flink.api.common.serialization.SimpleStringSchemaimport
    org.apache.flink.configuration.Configurationimport
    org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport
    org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import
    org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport
    org.apache.flink.table.api.EnvironmentSettingsimport
    org.apache.flink.types.Rowimport scala.collection.JavaConversions._import
    org.apache.flink.api.scala._import org.apache.flink.table.api._import
    org.apache.flink.table.api.bridge.scala._object FlinkSqlTest {
    //源json解析结构化case类,以及基于此case类,分拆出来的RhoLog和PpoLog case类 分别计算  case class
    SrcLog(userid: String, guid: String, rho: JsonNode, ts: Long)  case class
    RhoLog(userid: String, guid: String, ts: Long)  def main(args:
    Array[String]): Unit = {    val parallelism = 3    val kafkaBrokers =
    "172.x.x.x:9092"    val jobName = "flinksql-test"    val topicNames =
    List("ops_nginx_logs")    val groupName = "test-ops-100000"    val
    properties = new Properties()    //流处理的环境构造    val conf: Configuration = new
    Configuration()    import org.apache.flink.configuration.ConfigConstants  
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)    val
    streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)  
    streamEnv.enableCheckpointing(10000)  
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  
    streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)  
    streamEnv.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
    10000))  
    streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)  
    streamEnv.setParallelism(parallelism)    // table环境构造    val
    blinkTableEnvSettings = EnvironmentSettings.newInstance()    
    .useBlinkPlanner()      .inStreamingMode()      .build()    val tableEnv =
    StreamTableEnvironment.create(streamEnv, blinkTableEnvSettings)  
    properties.setProperty("bootstrap.servers", kafkaBrokers)  
    properties.setProperty("group.id", groupName)    val myConsumer = new
    FlinkKafkaConsumer[String](topicNames, new SimpleStringSchema(), properties)  
    //从kafka最新offset开始消费    myConsumer.setStartFromLatest()    val mapper = new
    ObjectMapper    val srcStream = streamEnv.addSource(myConsumer)    
    .filter(_.nonEmpty).filter(_.contains("\"message\":{")).map(line => {    
    val rootNode = mapper.readTree(line)      val rowTime =
    rootNode.get("@timestamp").asText()      //      val timeStamp =
    dateTimeToTimestampJdk8(rowTime)      val timeStamp = rowTime.toLong    
    val messageNode = mapper.readTree(rootNode.get("message").toString)      val
    rho = messageNode.get("rho")      val userid = if (messageNode.has("u"))
    messageNode.get("u").asText else "nodata"      val guid = if
    (messageNode.has("g")) messageNode.get("g").asText else "nodata"    
    SrcLog(userid, guid, rho, timeStamp)    })    val rhoStream =
    srcStream.map(src => {      RhoLog(src.userid, src.guid, src.ts)  
    }).assignAscendingTimestamps(row => {        println(row.ts)        row.ts    
    }) //水位线    //流转表    val table = tableEnv.fromDataStream(rhoStream, 'userid,
    'guid, 'ts as 'ts1, 'ts.rowtime() as 'ts)    //源表转窗口聚合表    val
    resTableEventtime = table.window(Tumble over 10.seconds on 'ts as 'window)    
    .groupBy('userid, 'window, 'ts, 'ts1)      .select('userid, 'guid.count,
    'window.start(), 'window.end(), 'ts, 'ts1)    // 窗口聚合表输出  
    resTableEventtime.toAppendStream[Row].print("test")    streamEnv.execute()
    }}症状:当topic为单分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic
    ops_nginx_logs1Topic:ops_nginx_logs1 PartitionCount:1 ReplicationFactor:3
    Configs: Topic: ops_nginx_logs1 Partition: 0 Leader: 104 Replicas:
    104,107,105 Isr: 104,107,105flinksql 聚合结果:test:1>
    aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:17.932,1596979157932test:2>
    aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:19.932,1596979159932test:3>
    aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:11.930,1596979151930test:3>
    aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:15.931,1596979155931test:3>
    aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:13.931,1596979153931
    <http://apache-flink.147419.n8.nabble.com/file/t837/3.png> web ui
    watermarks显示正常 <http://apache-flink.147419.n8.nabble.com/file/t837/4.png>
    当kafka topic为多分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic
    ops_nginx_logsTopic:ops_nginx_logs PartitionCount:3 ReplicationFactor:3
    Configs: Topic: ops_nginx_logs Partition: 0 Leader: 104 Replicas:
    104,106,107 Isr: 104,106,107 Topic: ops_nginx_logs Partition: 1 Leader: 105
    Replicas: 105,107,104 Isr: 105,107,104 Topic: ops_nginx_logs Partition: 2
    Leader: 106 Replicas: 106,104,105 Isr: 106,104,105flinksql 聚合计算迟迟不计算,没有结果输出:
    <http://apache-flink.147419.n8.nabble.com/file/t837/2.png> web ui watermarks
    显示 No Watermark <http://apache-flink.147419.n8.nabble.com/file/t837/1.png>
   
   
   
    --
    Sent from: http://apache-flink.147419.n8.nabble.com/