|
hi,
@Override public UV get(UK userKey) throws IOException, RocksDBException { byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer); byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
return (rawValueBytes == null ? null : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer)); }
@Override public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer); byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);
backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); }
通过源码跟踪发现,RocksDBMapState每次get和put都需要序列化和反序列化。。。应该是这个原因导致比较耗时。 hi
我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。
private void bufferEvent(IN event, long currentTime) throws Exception { long currentTs = System.currentTimeMillis(); List<IN> elementsForTimestamp = elementQueueState.get(currentTime); if (elementsForTimestamp == null) { this.bufferEventGetNullhistogram.update(System.currentTimeMillis() - currentTs); elementsForTimestamp = new ArrayList<>(); }else { this.bufferEventGethistogram.update(System.currentTimeMillis()-currentTs); } elementsForTimestamp.add(event); long secondCurrentTs = System.currentTimeMillis(); elementQueueState.put(currentTime, elementsForTimestamp); this.bufferEventPuthistogram.update(System.currentTimeMillis() - secondCurrentTs); this.bufferEventhistogram.update(System.currentTimeMillis() - currentTs); }
通过复写CepOperator,加入了一些metics发现 this.bufferEventhistogram = metrics.histogram("buffer_event_delay", new DescriptiveStatisticsHistogram(1000)); this.bufferEventGethistogram = metrics.histogram("buffer_event_get_delay", new DescriptiveStatisticsHistogram(1000)); this.bufferEventGetNullhistogram = metrics.histogram("buffer_event_get_null_delay", new DescriptiveStatisticsHistogram(1000)); this.bufferEventPuthistogram = metrics.histogram("buffer_event_put_delay", new DescriptiveStatisticsHistogram(1000)); 在get和put比较耗时,整个bufferEvent 能达到200ms 从rocksdb的metric来看没有进行太多flush和compaction。
请教大家,为啥get和put这么耗时呢?有什么好的优化方案不?谢谢。
Best Wishes.
Best Wishes.
|