各位大佬,我用flink sql写了一些指标计算程序,消费kafka写到influxdb,发现夜晚kafka日志生产速度在10几k/min 时,程序没有问题,但是到白天涨到100k/min后 就渐渐卡住消费不动了,用的是flink 1.9 ,现在觉得是Flink sql执行这一层有点慢,窗口是滚动5分钟,目前是用的两个solt,调大并行度试了一下也没效果,这个有什么解决方案吗? 代码如下: val windowWidth = 5 //stream config val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(1000 * 60*5) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) env.getCheckpointConfig.setCheckpointTimeout(60000*10) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) // env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //table config val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) tEnv.getConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(10)) tEnv.registerFunction("TimeFormatJava", new TimeFormatJava()) tEnv.registerFunction("TimeFormatUDF", TimeFormatUDF) //Kafka Source val kafkaProperties: Properties = new Properties kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers) kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId) kafkaProperties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"5000") kafkaProperties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"500") var topics: util.List[String] = new util.ArrayList[String] for (topic <- kafkaTopics.split(SPLIT)) { topics.add(topic) } val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topics, new SimpleStringSchema, kafkaProperties) val driverSearchDstream: DataStream[DriverSearch] = env.addSource(kafkaConsumer.setStartFromLatest()).map(msg => { val info: String = msg.substring(msg.indexOf("{"), msg.length) val createTime = msg.substring(0, 19) val timeStamp = getLongTime(createTime) val json = JSON.parseObject(info) DriverSearch( json.getString("driverId") + "_" + timeStamp, json.getString("driverId"), json.getIntValue("searchType"), timeStamp ) }).setParallelism(2) val driverSearchDstreamWithEventTime: DataStream[DriverSearch] = driverSearchDstream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[DriverSearch](org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)) { override def extractTimestamp(element: DriverSearch): Long = element.timestamp } ) driverSearchDstream.map(info=>println(info+"time:"+System.currentTimeMillis())) val table: Table = tEnv.fromDataStream(driverSearchDstreamWithEventTime, 'rowKey, 'driverId, 'searchType, 'timestamp.rowtime as 'w) val sql1: String = s""" select TimeFormatJava(TUMBLE_END(w, INTERVAL '$windowWidth' MINUTE),8) as time_end, searchType, count(distinct driverId) as typeUv, count(distinct rowKey) as typePv from $table group by TUMBLE(w, INTERVAL '$windowWidth' MINUTE),searchType """.stripMargin val resultTable1: Table = tEnv.sqlQuery(sql1) val typeMap= immutable.Map(1->"1-goWorkSearch",2->"2-offWorkSearch",3->"3-nearbySearch",4->"4-temporarySearch",5->"5-commonSearch",6->"6-multiplySearch") val influxStream: DataStream[InfluxDBPoint] = tEnv.toAppendStream[Row](resultTable1).map { row => { val typeName: String= typeMap(row.getField(1).asInstanceOf[Int]) val point = new InfluxDBPoint("Carpool_Search_Pv_Uv", row.getField(0).asInstanceOf[Long]) //udf +8hour val fields = new util.HashMap[String,Object]() val tags = new util.HashMap[String,String]() fields.put("typeUv", row.getField(2)) fields.put("typePv",row.getField(3)) point.setFields(fields) tags.put("typeName",typeName) point.setTags(tags) point } } influxStream.map{ point=>{ println( println("influxPoint:"+point.getFields+"==" +point.getTags+"=="+point.getMeasurement +"=="+point.getTimestamp+"time:"+System.currentTimeMillis()) ) } } val influxDBConfig = InfluxDBConfig.builder("http://host:8086", "admin", "admin", "aimetric").build influxStream.addSink(new InfluxDBSink(influxDBConfig)) env.execute() } def getLongTime(str:String) ={ val format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") val time: Long = format.parse(str).getTime time [hidden email] |
Free forum by Nabble | Edit this page |