压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 |
Hi,
可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? 另外,压测时是否可以看下jstack? Best, Jingsong On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote: > 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source > writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w > 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 -- Best, Jingsong Lee |
场景很简单,就是kafka2hive
--5min入仓Hive INSERT INTO hive.temp_.hive_5min SELECT arg_service, time_local ..... FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id'='kafka_hive_test', 'scan.startup.mode'='earliest-offset') */; --kafka source表定义 CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( arg_service string COMMENT 'arg_service', .... )WITH ( 'connector' = 'kafka', 'topic' = '...', 'properties.bootstrap.servers' = '...', 'properties.group.id' = 'flink_etl_kafka_hive', 'scan.startup.mode' = 'group-offsets', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); --sink hive表定义 CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( .... ) PARTITIONED BY (dt string , hm string) stored as orc location 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( 'sink.partition-commit.trigger'='process-time', 'sink.partition-commit.delay'='0 min', 'sink.partition-commit.policy.class'='...CustomCommitPolicy', 'sink.partition-commit.policy.kind'='metastore,success-file,custom', 'sink.rolling-policy.check-interval' ='30s', 'sink.rolling-policy.rollover-interval'='10min', 'sink.rolling-policy.file-size'='128MB' ); 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 就是flink sql可以 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter 这块,有没有什么可以提升性能相关的优化参数? 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道: >Hi, > >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? > >另外,压测时是否可以看下jstack? > >Best, >Jingsong > >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote: > >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 > > > >-- >Best, Jingsong Lee |
Sink并行度
我理解是配置Sink并行度,这个一直在讨论,还没结论 HDFS性能 具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote: > 场景很简单,就是kafka2hive > --5min入仓Hive > > INSERT INTO hive.temp_.hive_5min > > SELECT > > arg_service, > > time_local > > ..... > > FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), > FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 > > FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id'='kafka_hive_test', > 'scan.startup.mode'='earliest-offset') */; > > > > --kafka source表定义 > > CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( > > arg_service string COMMENT 'arg_service', > > .... > > )WITH ( > > 'connector' = 'kafka', > > 'topic' = '...', > > 'properties.bootstrap.servers' = '...', > > 'properties.group.id' = 'flink_etl_kafka_hive', > > 'scan.startup.mode' = 'group-offsets', > > 'format' = 'json', > > 'json.fail-on-missing-field' = 'false', > > 'json.ignore-parse-errors' = 'true' > > ); > --sink hive表定义 > CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( > .... > ) > PARTITIONED BY (dt string , hm string) stored as orc location > 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( > 'sink.partition-commit.trigger'='process-time', > 'sink.partition-commit.delay'='0 min', > 'sink.partition-commit.policy.class'='...CustomCommitPolicy', > 'sink.partition-commit.policy.kind'='metastore,success-file,custom', > 'sink.rolling-policy.check-interval' ='30s', > 'sink.rolling-policy.rollover-interval'='10min', > 'sink.rolling-policy.file-size'='128MB' > ); > 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 > ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 > 就是flink sql可以 > 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter > 这块,有没有什么可以提升性能相关的优化参数? > > > > > 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道: > >Hi, > > > >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? > > > >另外,压测时是否可以看下jstack? > > > >Best, > >Jingsong > > > >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote: > > > >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 > ,source > >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w > >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 > > > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee |
@Jingsong Li
public TableSink createTableSink(TableSinkFactory.Context context) { CatalogTable table = checkNotNull(context.getTable()); Preconditions.checkArgument(table instanceof CatalogTableImpl); boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); if (!isGeneric) { return new HiveTableSink( context.getConfiguration().get( HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), context.isBounded(), new JobConf(hiveConf), context.getObjectIdentifier(), table); } else { return TableFactoryUtil.findAndCreateTableSink(context); } } HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 If it is false, using flink native writer to write parquet and orc files; If it is true, using hadoop mapred record writer to write parquet and orc files 将此参数调整成false后,同样的资源配置下,tps达到30W 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush 一些相关的参数 ? 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道: >Sink并行度 >我理解是配置Sink并行度,这个一直在讨论,还没结论 > >HDFS性能 >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO > >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote: > >> 场景很简单,就是kafka2hive >> --5min入仓Hive >> >> INSERT INTO hive.temp_.hive_5min >> >> SELECT >> >> arg_service, >> >> time_local >> >> ..... >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id'='kafka_hive_test', >> 'scan.startup.mode'='earliest-offset') */; >> >> >> >> --kafka source表定义 >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >> >> arg_service string COMMENT 'arg_service', >> >> .... >> >> )WITH ( >> >> 'connector' = 'kafka', >> >> 'topic' = '...', >> >> 'properties.bootstrap.servers' = '...', >> >> 'properties.group.id' = 'flink_etl_kafka_hive', >> >> 'scan.startup.mode' = 'group-offsets', >> >> 'format' = 'json', >> >> 'json.fail-on-missing-field' = 'false', >> >> 'json.ignore-parse-errors' = 'true' >> >> ); >> --sink hive表定义 >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >> .... >> ) >> PARTITIONED BY (dt string , hm string) stored as orc location >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( >> 'sink.partition-commit.trigger'='process-time', >> 'sink.partition-commit.delay'='0 min', >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >> 'sink.rolling-policy.check-interval' ='30s', >> 'sink.rolling-policy.rollover-interval'='10min', >> 'sink.rolling-policy.file-size'='128MB' >> ); >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 >> 就是flink sql可以 >> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter >> 这块,有没有什么可以提升性能相关的优化参数? >> >> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道: >> >Hi, >> > >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? >> > >> >另外,压测时是否可以看下jstack? >> > >> >Best, >> >Jingsong >> > >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote: >> > >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 >> ,source >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 >> > >> > >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee |
是最新的代码吗?
1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 它是影响性能的,1.11.2已经投票通过,即将发布 On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote: > @Jingsong Li > > public TableSink createTableSink(TableSinkFactory.Context context) { > CatalogTable table = checkNotNull(context.getTable()); > Preconditions.checkArgument(table instanceof CatalogTableImpl); > > boolean isGeneric = > Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); > > if (!isGeneric) { > return new HiveTableSink( > context.getConfiguration().get( > HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), > context.isBounded(), > new JobConf(hiveConf), > context.getObjectIdentifier(), > table); > } else { > return TableFactoryUtil.findAndCreateTableSink(context); > } > } > > HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop > 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 > > If it is false, using flink native writer to write parquet and orc files; > > If it is true, using hadoop mapred record writer to write parquet and orc > files > > 将此参数调整成false后,同样的资源配置下,tps达到30W > > 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush > 一些相关的参数 ? > > > > > > 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道: > >Sink并行度 > >我理解是配置Sink并行度,这个一直在讨论,还没结论 > > > >HDFS性能 > >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO > > > >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote: > > > >> 场景很简单,就是kafka2hive > >> --5min入仓Hive > >> > >> INSERT INTO hive.temp_.hive_5min > >> > >> SELECT > >> > >> arg_service, > >> > >> time_local > >> > >> ..... > >> > >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), > >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 > >> > >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id > '='kafka_hive_test', > >> 'scan.startup.mode'='earliest-offset') */; > >> > >> > >> > >> --kafka source表定义 > >> > >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( > >> > >> arg_service string COMMENT 'arg_service', > >> > >> .... > >> > >> )WITH ( > >> > >> 'connector' = 'kafka', > >> > >> 'topic' = '...', > >> > >> 'properties.bootstrap.servers' = '...', > >> > >> 'properties.group.id' = 'flink_etl_kafka_hive', > >> > >> 'scan.startup.mode' = 'group-offsets', > >> > >> 'format' = 'json', > >> > >> 'json.fail-on-missing-field' = 'false', > >> > >> 'json.ignore-parse-errors' = 'true' > >> > >> ); > >> --sink hive表定义 > >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( > >> .... > >> ) > >> PARTITIONED BY (dt string , hm string) stored as orc location > >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( > >> 'sink.partition-commit.trigger'='process-time', > >> 'sink.partition-commit.delay'='0 min', > >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', > >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', > >> 'sink.rolling-policy.check-interval' ='30s', > >> 'sink.rolling-policy.rollover-interval'='10min', > >> 'sink.rolling-policy.file-size'='128MB' > >> ); > >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 > >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 > >> 就是flink sql可以 > >> > 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter > >> 这块,有没有什么可以提升性能相关的优化参数? > >> > >> > >> > >> > >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道: > >> >Hi, > >> > > >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? > >> > > >> >另外,压测时是否可以看下jstack? > >> > > >> >Best, > >> >Jingsong > >> > > >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote: > >> > > >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 > >> ,source > >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w > >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 > >> > > >> > > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee |
是master分支代码
那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 if (userMrWriter) { builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig); LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); } else { Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd); if (bulkFactory.isPresent()) { builder = StreamingFileSink.forBulkFormat( new org.apache.flink.core.fs.Path(sd.getLocation()), new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer)) .withBucketAssigner(assigner) .withRollingPolicy(rollingPolicy) .withOutputFileConfig(outputFileConfig); LOG.info("Hive streaming sink: Use native parquet&orc writer."); } else { builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig); LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available."); } } 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道: >是最新的代码吗? >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >它是影响性能的,1.11.2已经投票通过,即将发布 > >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote: > >> @Jingsong Li >> >> public TableSink createTableSink(TableSinkFactory.Context context) { >> CatalogTable table = checkNotNull(context.getTable()); >> Preconditions.checkArgument(table instanceof CatalogTableImpl); >> >> boolean isGeneric = >> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >> >> if (!isGeneric) { >> return new HiveTableSink( >> context.getConfiguration().get( >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >> context.isBounded(), >> new JobConf(hiveConf), >> context.getObjectIdentifier(), >> table); >> } else { >> return TableFactoryUtil.findAndCreateTableSink(context); >> } >> } >> >> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >> >> If it is false, using flink native writer to write parquet and orc files; >> >> If it is true, using hadoop mapred record writer to write parquet and orc >> files >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >> 一些相关的参数 ? >> >> >> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道: >> >Sink并行度 >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >> > >> >HDFS性能 >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >> > >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote: >> > >> >> 场景很简单,就是kafka2hive >> >> --5min入仓Hive >> >> >> >> INSERT INTO hive.temp_.hive_5min >> >> >> >> SELECT >> >> >> >> arg_service, >> >> >> >> time_local >> >> >> >> ..... >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id >> '='kafka_hive_test', >> >> 'scan.startup.mode'='earliest-offset') */; >> >> >> >> >> >> >> >> --kafka source表定义 >> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >> >> >> >> arg_service string COMMENT 'arg_service', >> >> >> >> .... >> >> >> >> )WITH ( >> >> >> >> 'connector' = 'kafka', >> >> >> >> 'topic' = '...', >> >> >> >> 'properties.bootstrap.servers' = '...', >> >> >> >> 'properties.group.id' = 'flink_etl_kafka_hive', >> >> >> >> 'scan.startup.mode' = 'group-offsets', >> >> >> >> 'format' = 'json', >> >> >> >> 'json.fail-on-missing-field' = 'false', >> >> >> >> 'json.ignore-parse-errors' = 'true' >> >> >> >> ); >> >> --sink hive表定义 >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >> >> .... >> >> ) >> >> PARTITIONED BY (dt string , hm string) stored as orc location >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( >> >> 'sink.partition-commit.trigger'='process-time', >> >> 'sink.partition-commit.delay'='0 min', >> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', >> >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >> >> 'sink.rolling-policy.check-interval' ='30s', >> >> 'sink.rolling-policy.rollover-interval'='10min', >> >> 'sink.rolling-policy.file-size'='128MB' >> >> ); >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 >> >> 就是flink sql可以 >> >> >> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter >> >> 这块,有没有什么可以提升性能相关的优化参数? >> >> >> >> >> >> >> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道: >> >> >Hi, >> >> > >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? >> >> > >> >> >另外,压测时是否可以看下jstack? >> >> > >> >> >Best, >> >> >Jingsong >> >> > >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote: >> >> > >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 >> >> ,source >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 >> >> > >> >> > >> >> > >> >> >-- >> >> >Best, Jingsong Lee >> >> >> > >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee |
可以再尝试下最新的1.11.2吗?
https://flink.apache.org/downloads.html On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <[hidden email]> wrote: > 是master分支代码 > 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 > 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 > if (userMrWriter) { > builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, > rollingPolicy, outputFileConfig); > LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); > } else { > Optional<BulkWriter.Factory<RowData>> bulkFactory = > createBulkWriterFactory(partitionColumns, sd); > if (bulkFactory.isPresent()) { > builder = StreamingFileSink.forBulkFormat( > new org.apache.flink.core.fs.Path(sd.getLocation()), > new > FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer)) > .withBucketAssigner(assigner) > .withRollingPolicy(rollingPolicy) > .withOutputFileConfig(outputFileConfig); > LOG.info("Hive streaming sink: Use native parquet&orc writer."); > } else { > builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, > assigner, rollingPolicy, outputFileConfig); > LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because > BulkWriter Factory not available."); > } > } > 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道: > >是最新的代码吗? > >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 > >它是影响性能的,1.11.2已经投票通过,即将发布 > > > >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote: > > > >> @Jingsong Li > >> > >> public TableSink createTableSink(TableSinkFactory.Context context) { > >> CatalogTable table = checkNotNull(context.getTable()); > >> Preconditions.checkArgument(table instanceof CatalogTableImpl); > >> > >> boolean isGeneric = > >> > Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); > >> > >> if (!isGeneric) { > >> return new HiveTableSink( > >> context.getConfiguration().get( > >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), > >> context.isBounded(), > >> new JobConf(hiveConf), > >> context.getObjectIdentifier(), > >> table); > >> } else { > >> return TableFactoryUtil.findAndCreateTableSink(context); > >> } > >> } > >> > >> > HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop > >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 > >> > >> If it is false, using flink native writer to write parquet and orc > files; > >> > >> If it is true, using hadoop mapred record writer to write parquet and > orc > >> files > >> > >> 将此参数调整成false后,同样的资源配置下,tps达到30W > >> > >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush > >> 一些相关的参数 ? > >> > >> > >> > >> > >> > >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道: > >> >Sink并行度 > >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 > >> > > >> >HDFS性能 > >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO > >> > > >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote: > >> > > >> >> 场景很简单,就是kafka2hive > >> >> --5min入仓Hive > >> >> > >> >> INSERT INTO hive.temp_.hive_5min > >> >> > >> >> SELECT > >> >> > >> >> arg_service, > >> >> > >> >> time_local > >> >> > >> >> ..... > >> >> > >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), > >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 > >> >> > >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS(' > properties.group.id > >> '='kafka_hive_test', > >> >> 'scan.startup.mode'='earliest-offset') */; > >> >> > >> >> > >> >> > >> >> --kafka source表定义 > >> >> > >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( > >> >> > >> >> arg_service string COMMENT 'arg_service', > >> >> > >> >> .... > >> >> > >> >> )WITH ( > >> >> > >> >> 'connector' = 'kafka', > >> >> > >> >> 'topic' = '...', > >> >> > >> >> 'properties.bootstrap.servers' = '...', > >> >> > >> >> 'properties.group.id' = 'flink_etl_kafka_hive', > >> >> > >> >> 'scan.startup.mode' = 'group-offsets', > >> >> > >> >> 'format' = 'json', > >> >> > >> >> 'json.fail-on-missing-field' = 'false', > >> >> > >> >> 'json.ignore-parse-errors' = 'true' > >> >> > >> >> ); > >> >> --sink hive表定义 > >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( > >> >> .... > >> >> ) > >> >> PARTITIONED BY (dt string , hm string) stored as orc location > >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( > >> >> 'sink.partition-commit.trigger'='process-time', > >> >> 'sink.partition-commit.delay'='0 min', > >> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', > >> >> > 'sink.partition-commit.policy.kind'='metastore,success-file,custom', > >> >> 'sink.rolling-policy.check-interval' ='30s', > >> >> 'sink.rolling-policy.rollover-interval'='10min', > >> >> 'sink.rolling-policy.file-size'='128MB' > >> >> ); > >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 > >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 > >> >> 就是flink sql可以 > >> >> > >> > 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter > >> >> 这块,有没有什么可以提升性能相关的优化参数? > >> >> > >> >> > >> >> > >> >> > >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道: > >> >> >Hi, > >> >> > > >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? > >> >> > > >> >> >另外,压测时是否可以看下jstack? > >> >> > > >> >> >Best, > >> >> >Jingsong > >> >> > > >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> > wrote: > >> >> > > >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 > >> >> ,source > >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w > >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 > >> >> > > >> >> > > >> >> > > >> >> >-- > >> >> >Best, Jingsong Lee > >> >> > >> > > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee |
ok. 就是用hadoop mr writer vs flink 自实现的native writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer 改成false是可以满足我们的写hive需求了 还有一个问题,之前问过你,你还没回复: HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了 在 2020-09-17 13:43:04,"Jingsong Li" <[hidden email]> 写道: >可以再尝试下最新的1.11.2吗? > >https://flink.apache.org/downloads.html > >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <[hidden email]> wrote: > >> 是master分支代码 >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 >> if (userMrWriter) { >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, >> rollingPolicy, outputFileConfig); >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); >> } else { >> Optional<BulkWriter.Factory<RowData>> bulkFactory = >> createBulkWriterFactory(partitionColumns, sd); >> if (bulkFactory.isPresent()) { >> builder = StreamingFileSink.forBulkFormat( >> new org.apache.flink.core.fs.Path(sd.getLocation()), >> new >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer)) >> .withBucketAssigner(assigner) >> .withRollingPolicy(rollingPolicy) >> .withOutputFileConfig(outputFileConfig); >> LOG.info("Hive streaming sink: Use native parquet&orc writer."); >> } else { >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, >> assigner, rollingPolicy, outputFileConfig); >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because >> BulkWriter Factory not available."); >> } >> } >> 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道: >> >是最新的代码吗? >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >> >它是影响性能的,1.11.2已经投票通过,即将发布 >> > >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote: >> > >> >> @Jingsong Li >> >> >> >> public TableSink createTableSink(TableSinkFactory.Context context) { >> >> CatalogTable table = checkNotNull(context.getTable()); >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl); >> >> >> >> boolean isGeneric = >> >> >> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >> >> >> >> if (!isGeneric) { >> >> return new HiveTableSink( >> >> context.getConfiguration().get( >> >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >> >> context.isBounded(), >> >> new JobConf(hiveConf), >> >> context.getObjectIdentifier(), >> >> table); >> >> } else { >> >> return TableFactoryUtil.findAndCreateTableSink(context); >> >> } >> >> } >> >> >> >> >> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >> >> >> >> If it is false, using flink native writer to write parquet and orc >> files; >> >> >> >> If it is true, using hadoop mapred record writer to write parquet and >> orc >> >> files >> >> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W >> >> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >> >> 一些相关的参数 ? >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道: >> >> >Sink并行度 >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >> >> > >> >> >HDFS性能 >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >> >> > >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote: >> >> > >> >> >> 场景很简单,就是kafka2hive >> >> >> --5min入仓Hive >> >> >> >> >> >> INSERT INTO hive.temp_.hive_5min >> >> >> >> >> >> SELECT >> >> >> >> >> >> arg_service, >> >> >> >> >> >> time_local >> >> >> >> >> >> ..... >> >> >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >> >> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS(' >> properties.group.id >> >> '='kafka_hive_test', >> >> >> 'scan.startup.mode'='earliest-offset') */; >> >> >> >> >> >> >> >> >> >> >> >> --kafka source表定义 >> >> >> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >> >> >> >> >> >> arg_service string COMMENT 'arg_service', >> >> >> >> >> >> .... >> >> >> >> >> >> )WITH ( >> >> >> >> >> >> 'connector' = 'kafka', >> >> >> >> >> >> 'topic' = '...', >> >> >> >> >> >> 'properties.bootstrap.servers' = '...', >> >> >> >> >> >> 'properties.group.id' = 'flink_etl_kafka_hive', >> >> >> >> >> >> 'scan.startup.mode' = 'group-offsets', >> >> >> >> >> >> 'format' = 'json', >> >> >> >> >> >> 'json.fail-on-missing-field' = 'false', >> >> >> >> >> >> 'json.ignore-parse-errors' = 'true' >> >> >> >> >> >> ); >> >> >> --sink hive表定义 >> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >> >> >> .... >> >> >> ) >> >> >> PARTITIONED BY (dt string , hm string) stored as orc location >> >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( >> >> >> 'sink.partition-commit.trigger'='process-time', >> >> >> 'sink.partition-commit.delay'='0 min', >> >> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >> >> >> 'sink.rolling-policy.check-interval' ='30s', >> >> >> 'sink.rolling-policy.rollover-interval'='10min', >> >> >> 'sink.rolling-policy.file-size'='128MB' >> >> >> ); >> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 >> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 >> >> >> 就是flink sql可以 >> >> >> >> >> >> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter >> >> >> 这块,有没有什么可以提升性能相关的优化参数? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道: >> >> >> >Hi, >> >> >> > >> >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? >> >> >> > >> >> >> >另外,压测时是否可以看下jstack? >> >> >> > >> >> >> >Best, >> >> >> >Jingsong >> >> >> > >> >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> >> wrote: >> >> >> > >> >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 >> >> >> ,source >> >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >> >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 >> >> >> > >> >> >> > >> >> >> > >> >> >> >-- >> >> >> >Best, Jingsong Lee >> >> >> >> >> > >> >> > >> >> >-- >> >> >Best, Jingsong Lee >> >> >> > >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee |
是的,可以测一下,理论上 mr writer不应该有较大性能差距。
> 为何要强制滚动文件 因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。 On Thu, Sep 17, 2020 at 2:05 PM kandy.wang <[hidden email]> wrote: > > > > ok. 就是用hadoop mr writer vs flink 自实现的native > writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer > 改成false是可以满足我们的写hive需求了 > 还有一个问题,之前问过你,你还没回复: > HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? > 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 > sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了 > 在 2020-09-17 13:43:04,"Jingsong Li" <[hidden email]> 写道: > >可以再尝试下最新的1.11.2吗? > > > >https://flink.apache.org/downloads.html > > > >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <[hidden email]> wrote: > > > >> 是master分支代码 > >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 > >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 > >> if (userMrWriter) { > >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, > assigner, > >> rollingPolicy, outputFileConfig); > >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); > >> } else { > >> Optional<BulkWriter.Factory<RowData>> bulkFactory = > >> createBulkWriterFactory(partitionColumns, sd); > >> if (bulkFactory.isPresent()) { > >> builder = StreamingFileSink.forBulkFormat( > >> new org.apache.flink.core.fs.Path(sd.getLocation()), > >> new > >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), > partComputer)) > >> .withBucketAssigner(assigner) > >> .withRollingPolicy(rollingPolicy) > >> .withOutputFileConfig(outputFileConfig); > >> LOG.info("Hive streaming sink: Use native parquet&orc writer."); > >> } else { > >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, > >> assigner, rollingPolicy, outputFileConfig); > >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because > >> BulkWriter Factory not available."); > >> } > >> } > >> 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道: > >> >是最新的代码吗? > >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 > >> >它是影响性能的,1.11.2已经投票通过,即将发布 > >> > > >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote: > >> > > >> >> @Jingsong Li > >> >> > >> >> public TableSink createTableSink(TableSinkFactory.Context context) { > >> >> CatalogTable table = checkNotNull(context.getTable()); > >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl); > >> >> > >> >> boolean isGeneric = > >> >> > >> > Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); > >> >> > >> >> if (!isGeneric) { > >> >> return new HiveTableSink( > >> >> context.getConfiguration().get( > >> >> > HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), > >> >> context.isBounded(), > >> >> new JobConf(hiveConf), > >> >> context.getObjectIdentifier(), > >> >> table); > >> >> } else { > >> >> return TableFactoryUtil.findAndCreateTableSink(context); > >> >> } > >> >> } > >> >> > >> >> > >> > HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop > >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 > >> >> > >> >> If it is false, using flink native writer to write parquet and orc > >> files; > >> >> > >> >> If it is true, using hadoop mapred record writer to write parquet and > >> orc > >> >> files > >> >> > >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W > >> >> > >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush > >> >> 一些相关的参数 ? > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道: > >> >> >Sink并行度 > >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 > >> >> > > >> >> >HDFS性能 > >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO > >> >> > > >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> > wrote: > >> >> > > >> >> >> 场景很简单,就是kafka2hive > >> >> >> --5min入仓Hive > >> >> >> > >> >> >> INSERT INTO hive.temp_.hive_5min > >> >> >> > >> >> >> SELECT > >> >> >> > >> >> >> arg_service, > >> >> >> > >> >> >> time_local > >> >> >> > >> >> >> ..... > >> >> >> > >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), > >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 > >> >> >> > >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS(' > >> properties.group.id > >> >> '='kafka_hive_test', > >> >> >> 'scan.startup.mode'='earliest-offset') */; > >> >> >> > >> >> >> > >> >> >> > >> >> >> --kafka source表定义 > >> >> >> > >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( > >> >> >> > >> >> >> arg_service string COMMENT 'arg_service', > >> >> >> > >> >> >> .... > >> >> >> > >> >> >> )WITH ( > >> >> >> > >> >> >> 'connector' = 'kafka', > >> >> >> > >> >> >> 'topic' = '...', > >> >> >> > >> >> >> 'properties.bootstrap.servers' = '...', > >> >> >> > >> >> >> 'properties.group.id' = 'flink_etl_kafka_hive', > >> >> >> > >> >> >> 'scan.startup.mode' = 'group-offsets', > >> >> >> > >> >> >> 'format' = 'json', > >> >> >> > >> >> >> 'json.fail-on-missing-field' = 'false', > >> >> >> > >> >> >> 'json.ignore-parse-errors' = 'true' > >> >> >> > >> >> >> ); > >> >> >> --sink hive表定义 > >> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( > >> >> >> .... > >> >> >> ) > >> >> >> PARTITIONED BY (dt string , hm string) stored as orc location > >> >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( > >> >> >> 'sink.partition-commit.trigger'='process-time', > >> >> >> 'sink.partition-commit.delay'='0 min', > >> >> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', > >> >> >> > >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', > >> >> >> 'sink.rolling-policy.check-interval' ='30s', > >> >> >> 'sink.rolling-policy.rollover-interval'='10min', > >> >> >> 'sink.rolling-policy.file-size'='128MB' > >> >> >> ); > >> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 > >> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 > >> >> >> 就是flink sql可以 > >> >> >> > >> >> > >> > 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter > >> >> >> 这块,有没有什么可以提升性能相关的优化参数? > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道: > >> >> >> >Hi, > >> >> >> > > >> >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? > >> >> >> > > >> >> >> >另外,压测时是否可以看下jstack? > >> >> >> > > >> >> >> >Best, > >> >> >> >Jingsong > >> >> >> > > >> >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> > >> wrote: > >> >> >> > > >> >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka > partition=40 > >> >> >> ,source > >> >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w > >> >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> >-- > >> >> >> >Best, Jingsong Lee > >> >> >> > >> >> > > >> >> > > >> >> >-- > >> >> >Best, Jingsong Lee > >> >> > >> > > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee |
@Jingsong Li 测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。 我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter 修改应该都提交到flink 1.11分支了吧。 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ? 在 2020-09-17 14:19:42,"Jingsong Li" <[hidden email]> 写道: >是的,可以测一下,理论上 mr writer不应该有较大性能差距。 > >> 为何要强制滚动文件 > >因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。 > >On Thu, Sep 17, 2020 at 2:05 PM kandy.wang <[hidden email]> wrote: > >> >> >> >> ok. 就是用hadoop mr writer vs flink 自实现的native >> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer >> 改成false是可以满足我们的写hive需求了 >> 还有一个问题,之前问过你,你还没回复: >> HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? >> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 >> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了 >> 在 2020-09-17 13:43:04,"Jingsong Li" <[hidden email]> 写道: >> >可以再尝试下最新的1.11.2吗? >> > >> >https://flink.apache.org/downloads.html >> > >> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <[hidden email]> wrote: >> > >> >> 是master分支代码 >> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 >> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 >> >> if (userMrWriter) { >> >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, >> assigner, >> >> rollingPolicy, outputFileConfig); >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); >> >> } else { >> >> Optional<BulkWriter.Factory<RowData>> bulkFactory = >> >> createBulkWriterFactory(partitionColumns, sd); >> >> if (bulkFactory.isPresent()) { >> >> builder = StreamingFileSink.forBulkFormat( >> >> new org.apache.flink.core.fs.Path(sd.getLocation()), >> >> new >> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), >> partComputer)) >> >> .withBucketAssigner(assigner) >> >> .withRollingPolicy(rollingPolicy) >> >> .withOutputFileConfig(outputFileConfig); >> >> LOG.info("Hive streaming sink: Use native parquet&orc writer."); >> >> } else { >> >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, >> >> assigner, rollingPolicy, outputFileConfig); >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because >> >> BulkWriter Factory not available."); >> >> } >> >> } >> >> 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道: >> >> >是最新的代码吗? >> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >> >> >它是影响性能的,1.11.2已经投票通过,即将发布 >> >> > >> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote: >> >> > >> >> >> @Jingsong Li >> >> >> >> >> >> public TableSink createTableSink(TableSinkFactory.Context context) { >> >> >> CatalogTable table = checkNotNull(context.getTable()); >> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl); >> >> >> >> >> >> boolean isGeneric = >> >> >> >> >> >> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >> >> >> >> >> >> if (!isGeneric) { >> >> >> return new HiveTableSink( >> >> >> context.getConfiguration().get( >> >> >> >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >> >> >> context.isBounded(), >> >> >> new JobConf(hiveConf), >> >> >> context.getObjectIdentifier(), >> >> >> table); >> >> >> } else { >> >> >> return TableFactoryUtil.findAndCreateTableSink(context); >> >> >> } >> >> >> } >> >> >> >> >> >> >> >> >> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >> >> >> >> >> >> If it is false, using flink native writer to write parquet and orc >> >> files; >> >> >> >> >> >> If it is true, using hadoop mapred record writer to write parquet and >> >> orc >> >> >> files >> >> >> >> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W >> >> >> >> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >> >> >> 一些相关的参数 ? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道: >> >> >> >Sink并行度 >> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >> >> >> > >> >> >> >HDFS性能 >> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >> >> >> > >> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> >> wrote: >> >> >> > >> >> >> >> 场景很简单,就是kafka2hive >> >> >> >> --5min入仓Hive >> >> >> >> >> >> >> >> INSERT INTO hive.temp_.hive_5min >> >> >> >> >> >> >> >> SELECT >> >> >> >> >> >> >> >> arg_service, >> >> >> >> >> >> >> >> time_local >> >> >> >> >> >> >> >> ..... >> >> >> >> >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >> >> >> >> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS(' >> >> properties.group.id >> >> >> '='kafka_hive_test', >> >> >> >> 'scan.startup.mode'='earliest-offset') */; >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> --kafka source表定义 >> >> >> >> >> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >> >> >> >> >> >> >> >> arg_service string COMMENT 'arg_service', >> >> >> >> >> >> >> >> .... >> >> >> >> >> >> >> >> )WITH ( >> >> >> >> >> >> >> >> 'connector' = 'kafka', >> >> >> >> >> >> >> >> 'topic' = '...', >> >> >> >> >> >> >> >> 'properties.bootstrap.servers' = '...', >> >> >> >> >> >> >> >> 'properties.group.id' = 'flink_etl_kafka_hive', >> >> >> >> >> >> >> >> 'scan.startup.mode' = 'group-offsets', >> >> >> >> >> >> >> >> 'format' = 'json', >> >> >> >> >> >> >> >> 'json.fail-on-missing-field' = 'false', >> >> >> >> >> >> >> >> 'json.ignore-parse-errors' = 'true' >> >> >> >> >> >> >> >> ); >> >> >> >> --sink hive表定义 >> >> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >> >> >> >> .... >> >> >> >> ) >> >> >> >> PARTITIONED BY (dt string , hm string) stored as orc location >> >> >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( >> >> >> >> 'sink.partition-commit.trigger'='process-time', >> >> >> >> 'sink.partition-commit.delay'='0 min', >> >> >> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', >> >> >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >> >> >> >> 'sink.rolling-policy.check-interval' ='30s', >> >> >> >> 'sink.rolling-policy.rollover-interval'='10min', >> >> >> >> 'sink.rolling-policy.file-size'='128MB' >> >> >> >> ); >> >> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 >> >> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 >> >> >> >> 就是flink sql可以 >> >> >> >> >> >> >> >> >> >> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter >> >> >> >> 这块,有没有什么可以提升性能相关的优化参数? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道: >> >> >> >> >Hi, >> >> >> >> > >> >> >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? >> >> >> >> > >> >> >> >> >另外,压测时是否可以看下jstack? >> >> >> >> > >> >> >> >> >Best, >> >> >> >> >Jingsong >> >> >> >> > >> >> >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> >> >> wrote: >> >> >> >> > >> >> >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka >> partition=40 >> >> >> >> ,source >> >> >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >> >> >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> >-- >> >> >> >> >Best, Jingsong Lee >> >> >> >> >> >> >> > >> >> >> > >> >> >> >-- >> >> >> >Best, Jingsong Lee >> >> >> >> >> > >> >> > >> >> >-- >> >> >Best, Jingsong Lee >> >> >> > >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee |
Hi,
不好意思,麻烦试下 试下最新的release-1.11分支编译出来的Hive依赖 (flink-connector-hive的 改动) > 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ? 这是1.12的目标,这两天会出来JIRA和设计方案,类似会加上"auto-compaction"的配置,sink中自动合并 Best, Jingsong On Fri, Sep 18, 2020 at 10:18 AM kandy.wang <[hidden email]> wrote: > > > > > > > @Jingsong Li > 测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。 > 我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter > 修改应该都提交到flink 1.11分支了吧。 > 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ? > > > 在 2020-09-17 14:19:42,"Jingsong Li" <[hidden email]> 写道: > >是的,可以测一下,理论上 mr writer不应该有较大性能差距。 > > > >> 为何要强制滚动文件 > > > >因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。 > > > >On Thu, Sep 17, 2020 at 2:05 PM kandy.wang <[hidden email]> wrote: > > > >> > >> > >> > >> ok. 就是用hadoop mr writer vs flink 自实现的native > >> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer > >> 改成false是可以满足我们的写hive需求了 > >> 还有一个问题,之前问过你,你还没回复: > >> HiveRollingPolicy为什么 shouldRollOnCheckpoint true > 为何要强制滚动文件,这个可以抽取成一个配置参数么? > >> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 > >> > sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了 > >> 在 2020-09-17 13:43:04,"Jingsong Li" <[hidden email]> 写道: > >> >可以再尝试下最新的1.11.2吗? > >> > > >> >https://flink.apache.org/downloads.html > >> > > >> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <[hidden email]> wrote: > >> > > >> >> 是master分支代码 > >> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 > >> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了 > >> >> if (userMrWriter) { > >> >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, > >> assigner, > >> >> rollingPolicy, outputFileConfig); > >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); > >> >> } else { > >> >> Optional<BulkWriter.Factory<RowData>> bulkFactory = > >> >> createBulkWriterFactory(partitionColumns, sd); > >> >> if (bulkFactory.isPresent()) { > >> >> builder = StreamingFileSink.forBulkFormat( > >> >> new org.apache.flink.core.fs.Path(sd.getLocation()), > >> >> new > >> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), > >> partComputer)) > >> >> .withBucketAssigner(assigner) > >> >> .withRollingPolicy(rollingPolicy) > >> >> .withOutputFileConfig(outputFileConfig); > >> >> LOG.info("Hive streaming sink: Use native parquet&orc writer."); > >> >> } else { > >> >> builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, > >> >> assigner, rollingPolicy, outputFileConfig); > >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer > because > >> >> BulkWriter Factory not available."); > >> >> } > >> >> } > >> >> 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道: > >> >> >是最新的代码吗? > >> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 > >> >> >它是影响性能的,1.11.2已经投票通过,即将发布 > >> >> > > >> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> > wrote: > >> >> > > >> >> >> @Jingsong Li > >> >> >> > >> >> >> public TableSink createTableSink(TableSinkFactory.Context > context) { > >> >> >> CatalogTable table = checkNotNull(context.getTable()); > >> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl); > >> >> >> > >> >> >> boolean isGeneric = > >> >> >> > >> >> > >> > Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); > >> >> >> > >> >> >> if (!isGeneric) { > >> >> >> return new HiveTableSink( > >> >> >> context.getConfiguration().get( > >> >> >> > >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), > >> >> >> context.isBounded(), > >> >> >> new JobConf(hiveConf), > >> >> >> context.getObjectIdentifier(), > >> >> >> table); > >> >> >> } else { > >> >> >> return TableFactoryUtil.findAndCreateTableSink(context); > >> >> >> } > >> >> >> } > >> >> >> > >> >> >> > >> >> > >> > HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop > >> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 > >> >> >> > >> >> >> If it is false, using flink native writer to write parquet and orc > >> >> files; > >> >> >> > >> >> >> If it is true, using hadoop mapred record writer to write parquet > and > >> >> orc > >> >> >> files > >> >> >> > >> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W > >> >> >> > >> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async > flush > >> >> >> 一些相关的参数 ? > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道: > >> >> >> >Sink并行度 > >> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 > >> >> >> > > >> >> >> >HDFS性能 > >> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO > >> >> >> > > >> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> > >> wrote: > >> >> >> > > >> >> >> >> 场景很简单,就是kafka2hive > >> >> >> >> --5min入仓Hive > >> >> >> >> > >> >> >> >> INSERT INTO hive.temp_.hive_5min > >> >> >> >> > >> >> >> >> SELECT > >> >> >> >> > >> >> >> >> arg_service, > >> >> >> >> > >> >> >> >> time_local > >> >> >> >> > >> >> >> >> ..... > >> >> >> >> > >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), > >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 > >> >> >> >> > >> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS(' > >> >> properties.group.id > >> >> >> '='kafka_hive_test', > >> >> >> >> 'scan.startup.mode'='earliest-offset') */; > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> --kafka source表定义 > >> >> >> >> > >> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( > >> >> >> >> > >> >> >> >> arg_service string COMMENT 'arg_service', > >> >> >> >> > >> >> >> >> .... > >> >> >> >> > >> >> >> >> )WITH ( > >> >> >> >> > >> >> >> >> 'connector' = 'kafka', > >> >> >> >> > >> >> >> >> 'topic' = '...', > >> >> >> >> > >> >> >> >> 'properties.bootstrap.servers' = '...', > >> >> >> >> > >> >> >> >> 'properties.group.id' = 'flink_etl_kafka_hive', > >> >> >> >> > >> >> >> >> 'scan.startup.mode' = 'group-offsets', > >> >> >> >> > >> >> >> >> 'format' = 'json', > >> >> >> >> > >> >> >> >> 'json.fail-on-missing-field' = 'false', > >> >> >> >> > >> >> >> >> 'json.ignore-parse-errors' = 'true' > >> >> >> >> > >> >> >> >> ); > >> >> >> >> --sink hive表定义 > >> >> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( > >> >> >> >> .... > >> >> >> >> ) > >> >> >> >> PARTITIONED BY (dt string , hm string) stored as orc location > >> >> >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( > >> >> >> >> 'sink.partition-commit.trigger'='process-time', > >> >> >> >> 'sink.partition-commit.delay'='0 min', > >> >> >> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', > >> >> >> >> > >> >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', > >> >> >> >> 'sink.rolling-policy.check-interval' ='30s', > >> >> >> >> 'sink.rolling-policy.rollover-interval'='10min', > >> >> >> >> 'sink.rolling-policy.file-size'='128MB' > >> >> >> >> ); > >> >> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 > >> >> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 > >> >> >> >> 就是flink sql可以 > >> >> >> >> > >> >> >> > >> >> > >> > 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter > >> >> >> >> 这块,有没有什么可以提升性能相关的优化参数? > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> > 写道: > >> >> >> >> >Hi, > >> >> >> >> > > >> >> >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? > >> >> >> >> > > >> >> >> >> >另外,压测时是否可以看下jstack? > >> >> >> >> > > >> >> >> >> >Best, > >> >> >> >> >Jingsong > >> >> >> >> > > >> >> >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email] > > > >> >> wrote: > >> >> >> >> > > >> >> >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka > >> partition=40 > >> >> >> >> ,source > >> >> >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w > >> >> >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> >-- > >> >> >> >> >Best, Jingsong Lee > >> >> >> >> > >> >> >> > > >> >> >> > > >> >> >> >-- > >> >> >> >Best, Jingsong Lee > >> >> >> > >> >> > > >> >> > > >> >> >-- > >> >> >Best, Jingsong Lee > >> >> > >> > > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee |
In reply to this post by Jingsong Li
我这边也遇到了这个bug,table.exec.hive.fallback-mapred-writer没配置的情况下,默认走到hadoop mr
write; 当我的数据比较分散,Hive三分区Parquet表(年月日,小时,hashtid),会产生多个writer,运行一会就TM就OOM; 相同数据量table.exec.hive.fallback-mapred-writer设置为false,用flink native write没有问题; Jingsong Li wrote > 是最新的代码吗? > 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 > 它是影响性能的,1.11.2已经投票通过,即将发布 > > On Thu, Sep 17, 2020 at 12:46 PM kandy.wang < > kandy1203@ > > wrote: > >> @Jingsong Li >> >> public TableSink createTableSink(TableSinkFactory.Context context) { >> CatalogTable table = checkNotNull(context.getTable()); >> Preconditions.checkArgument(table instanceof CatalogTableImpl); >> >> boolean isGeneric = >> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >> >> if (!isGeneric) { >> return new HiveTableSink( >> context.getConfiguration().get( >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >> context.isBounded(), >> new JobConf(hiveConf), >> context.getObjectIdentifier(), >> table); >> } else { >> return TableFactoryUtil.findAndCreateTableSink(context); >> } >> } >> >> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >> >> If it is false, using flink native writer to write parquet and orc files; >> >> If it is true, using hadoop mapred record writer to write parquet and orc >> files >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >> 一些相关的参数 ? >> >> >> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" < > jingsonglee0@ > > 写道: >> >Sink并行度 >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >> > >> >HDFS性能 >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >> > >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang < > kandy1203@ > > wrote: >> > >> >> 场景很简单,就是kafka2hive >> >> --5min入仓Hive >> >> >> >> INSERT INTO hive.temp_.hive_5min >> >> >> >> SELECT >> >> >> >> arg_service, >> >> >> >> time_local >> >> >> >> ..... >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id >> '='kafka_hive_test', >> >> 'scan.startup.mode'='earliest-offset') */; >> >> >> >> >> >> >> >> --kafka source表定义 >> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >> >> >> >> arg_service string COMMENT 'arg_service', >> >> >> >> .... >> >> >> >> )WITH ( >> >> >> >> 'connector' = 'kafka', >> >> >> >> 'topic' = '...', >> >> >> >> 'properties.bootstrap.servers' = '...', >> >> >> >> 'properties.group.id' = 'flink_etl_kafka_hive', >> >> >> >> 'scan.startup.mode' = 'group-offsets', >> >> >> >> 'format' = 'json', >> >> >> >> 'json.fail-on-missing-field' = 'false', >> >> >> >> 'json.ignore-parse-errors' = 'true' >> >> >> >> ); >> >> --sink hive表定义 >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >> >> .... >> >> ) >> >> PARTITIONED BY (dt string , hm string) stored as orc location >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( >> >> 'sink.partition-commit.trigger'='process-time', >> >> 'sink.partition-commit.delay'='0 min', >> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', >> >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >> >> 'sink.rolling-policy.check-interval' ='30s', >> >> 'sink.rolling-policy.rollover-interval'='10min', >> >> 'sink.rolling-policy.file-size'='128MB' >> >> ); >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 >> >> 就是flink sql可以 >> >> >> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter >> >> 这块,有没有什么可以提升性能相关的优化参数? >> >> >> >> >> >> >> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" < > jingsonglee0@ > > 写道: >> >> >Hi, >> >> > >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? >> >> > >> >> >另外,压测时是否可以看下jstack? >> >> > >> >> >Best, >> >> >Jingsong >> >> > >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang < > kandy1203@ > > wrote: >> >> > >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 >> >> ,source >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 >> >> > >> >> > >> >> > >> >> >-- >> >> >Best, Jingsong Lee >> >> >> > >> > >> >-- >> >Best, Jingsong Lee >> > > > -- > Best, Jingsong Lee Jingsong Li wrote > 是最新的代码吗? > 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 > 它是影响性能的,1.11.2已经投票通过,即将发布 > > On Thu, Sep 17, 2020 at 12:46 PM kandy.wang < > kandy1203@ > > wrote: > >> @Jingsong Li >> >> public TableSink createTableSink(TableSinkFactory.Context context) { >> CatalogTable table = checkNotNull(context.getTable()); >> Preconditions.checkArgument(table instanceof CatalogTableImpl); >> >> boolean isGeneric = >> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >> >> if (!isGeneric) { >> return new HiveTableSink( >> context.getConfiguration().get( >> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >> context.isBounded(), >> new JobConf(hiveConf), >> context.getObjectIdentifier(), >> table); >> } else { >> return TableFactoryUtil.findAndCreateTableSink(context); >> } >> } >> >> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >> >> If it is false, using flink native writer to write parquet and orc files; >> >> If it is true, using hadoop mapred record writer to write parquet and orc >> files >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >> 一些相关的参数 ? >> >> >> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" < > jingsonglee0@ > > 写道: >> >Sink并行度 >> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >> > >> >HDFS性能 >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >> > >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang < > kandy1203@ > > wrote: >> > >> >> 场景很简单,就是kafka2hive >> >> --5min入仓Hive >> >> >> >> INSERT INTO hive.temp_.hive_5min >> >> >> >> SELECT >> >> >> >> arg_service, >> >> >> >> time_local >> >> >> >> ..... >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id >> '='kafka_hive_test', >> >> 'scan.startup.mode'='earliest-offset') */; >> >> >> >> >> >> >> >> --kafka source表定义 >> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >> >> >> >> arg_service string COMMENT 'arg_service', >> >> >> >> .... >> >> >> >> )WITH ( >> >> >> >> 'connector' = 'kafka', >> >> >> >> 'topic' = '...', >> >> >> >> 'properties.bootstrap.servers' = '...', >> >> >> >> 'properties.group.id' = 'flink_etl_kafka_hive', >> >> >> >> 'scan.startup.mode' = 'group-offsets', >> >> >> >> 'format' = 'json', >> >> >> >> 'json.fail-on-missing-field' = 'false', >> >> >> >> 'json.ignore-parse-errors' = 'true' >> >> >> >> ); >> >> --sink hive表定义 >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >> >> .... >> >> ) >> >> PARTITIONED BY (dt string , hm string) stored as orc location >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( >> >> 'sink.partition-commit.trigger'='process-time', >> >> 'sink.partition-commit.delay'='0 min', >> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', >> >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >> >> 'sink.rolling-policy.check-interval' ='30s', >> >> 'sink.rolling-policy.rollover-interval'='10min', >> >> 'sink.rolling-policy.file-size'='128MB' >> >> ); >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 >> >> 就是flink sql可以 >> >> >> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter >> >> 这块,有没有什么可以提升性能相关的优化参数? >> >> >> >> >> >> >> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" < > jingsonglee0@ > > 写道: >> >> >Hi, >> >> > >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? >> >> > >> >> >另外,压测时是否可以看下jstack? >> >> > >> >> >Best, >> >> >Jingsong >> >> > >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang < > kandy1203@ > > wrote: >> >> > >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 >> >> ,source >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 >> >> > >> >> > >> >> > >> >> >-- >> >> >Best, Jingsong Lee >> >> >> > >> > >> >-- >> >Best, Jingsong Lee >> > > > -- > Best, Jingsong Lee -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
hi wangenbao : 我这边还没出现过OOM的情况,我理解调大TM 的资源内存 CPU这些参数应当是可以的。 我这边遇到的问题是性能上不去。不过table.exec.hive.fallback-mapred-writer=false 确实有较大改观。 在 2020-09-18 16:45:29,"wangenbao" <[hidden email]> 写道: >我这边也遇到了这个bug,table.exec.hive.fallback-mapred-writer没配置的情况下,默认走到hadoop mr >write; >当我的数据比较分散,Hive三分区Parquet表(年月日,小时,hashtid),会产生多个writer,运行一会就TM就OOM; >相同数据量table.exec.hive.fallback-mapred-writer设置为false,用flink native write没有问题; >Jingsong Li wrote >> 是最新的代码吗? >> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >> 它是影响性能的,1.11.2已经投票通过,即将发布 >> >> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang < > >> kandy1203@ > >> > wrote: >> >>> @Jingsong Li >>> >>> public TableSink createTableSink(TableSinkFactory.Context context) { >>> CatalogTable table = checkNotNull(context.getTable()); >>> Preconditions.checkArgument(table instanceof CatalogTableImpl); >>> >>> boolean isGeneric = >>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >>> >>> if (!isGeneric) { >>> return new HiveTableSink( >>> context.getConfiguration().get( >>> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >>> context.isBounded(), >>> new JobConf(hiveConf), >>> context.getObjectIdentifier(), >>> table); >>> } else { >>> return TableFactoryUtil.findAndCreateTableSink(context); >>> } >>> } >>> >>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >>> >>> If it is false, using flink native writer to write parquet and orc files; >>> >>> If it is true, using hadoop mapred record writer to write parquet and orc >>> files >>> >>> 将此参数调整成false后,同样的资源配置下,tps达到30W >>> >>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >>> 一些相关的参数 ? >>> >>> >>> >>> >>> >>> 在 2020-09-17 11:21:43,"Jingsong Li" < > >> jingsonglee0@ > >> > 写道: >>> >Sink并行度 >>> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >>> > >>> >HDFS性能 >>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >>> > >>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang < > >> kandy1203@ > >> > wrote: >>> > >>> >> 场景很简单,就是kafka2hive >>> >> --5min入仓Hive >>> >> >>> >> INSERT INTO hive.temp_.hive_5min >>> >> >>> >> SELECT >>> >> >>> >> arg_service, >>> >> >>> >> time_local >>> >> >>> >> ..... >>> >> >>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), >>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >>> >> >>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id >>> '='kafka_hive_test', >>> >> 'scan.startup.mode'='earliest-offset') */; >>> >> >>> >> >>> >> >>> >> --kafka source表定义 >>> >> >>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >>> >> >>> >> arg_service string COMMENT 'arg_service', >>> >> >>> >> .... >>> >> >>> >> )WITH ( >>> >> >>> >> 'connector' = 'kafka', >>> >> >>> >> 'topic' = '...', >>> >> >>> >> 'properties.bootstrap.servers' = '...', >>> >> >>> >> 'properties.group.id' = 'flink_etl_kafka_hive', >>> >> >>> >> 'scan.startup.mode' = 'group-offsets', >>> >> >>> >> 'format' = 'json', >>> >> >>> >> 'json.fail-on-missing-field' = 'false', >>> >> >>> >> 'json.ignore-parse-errors' = 'true' >>> >> >>> >> ); >>> >> --sink hive表定义 >>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >>> >> .... >>> >> ) >>> >> PARTITIONED BY (dt string , hm string) stored as orc location >>> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( >>> >> 'sink.partition-commit.trigger'='process-time', >>> >> 'sink.partition-commit.delay'='0 min', >>> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', >>> >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >>> >> 'sink.rolling-policy.check-interval' ='30s', >>> >> 'sink.rolling-policy.rollover-interval'='10min', >>> >> 'sink.rolling-policy.file-size'='128MB' >>> >> ); >>> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 >>> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 >>> >> 就是flink sql可以 >>> >> >>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter >>> >> 这块,有没有什么可以提升性能相关的优化参数? >>> >> >>> >> >>> >> >>> >> >>> >> 在 2020-09-16 19:29:50,"Jingsong Li" < > >> jingsonglee0@ > >> > 写道: >>> >> >Hi, >>> >> > >>> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? >>> >> > >>> >> >另外,压测时是否可以看下jstack? >>> >> > >>> >> >Best, >>> >> >Jingsong >>> >> > >>> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang < > >> kandy1203@ > >> > wrote: >>> >> > >>> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 >>> >> ,source >>> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >>> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 >>> >> > >>> >> > >>> >> > >>> >> >-- >>> >> >Best, Jingsong Lee >>> >> >>> > >>> > >>> >-- >>> >Best, Jingsong Lee >>> >> >> >> -- >> Best, Jingsong Lee > > >Jingsong Li wrote >> 是最新的代码吗? >> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121 >> 它是影响性能的,1.11.2已经投票通过,即将发布 >> >> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang < > >> kandy1203@ > >> > wrote: >> >>> @Jingsong Li >>> >>> public TableSink createTableSink(TableSinkFactory.Context context) { >>> CatalogTable table = checkNotNull(context.getTable()); >>> Preconditions.checkArgument(table instanceof CatalogTableImpl); >>> >>> boolean isGeneric = >>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); >>> >>> if (!isGeneric) { >>> return new HiveTableSink( >>> context.getConfiguration().get( >>> HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER), >>> context.isBounded(), >>> new JobConf(hiveConf), >>> context.getObjectIdentifier(), >>> table); >>> } else { >>> return TableFactoryUtil.findAndCreateTableSink(context); >>> } >>> } >>> >>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop >>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。 >>> >>> If it is false, using flink native writer to write parquet and orc files; >>> >>> If it is true, using hadoop mapred record writer to write parquet and orc >>> files >>> >>> 将此参数调整成false后,同样的资源配置下,tps达到30W >>> >>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async flush >>> 一些相关的参数 ? >>> >>> >>> >>> >>> >>> 在 2020-09-17 11:21:43,"Jingsong Li" < > >> jingsonglee0@ > >> > 写道: >>> >Sink并行度 >>> >我理解是配置Sink并行度,这个一直在讨论,还没结论 >>> > >>> >HDFS性能 >>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO >>> > >>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang < > >> kandy1203@ > >> > wrote: >>> > >>> >> 场景很简单,就是kafka2hive >>> >> --5min入仓Hive >>> >> >>> >> INSERT INTO hive.temp_.hive_5min >>> >> >>> >> SELECT >>> >> >>> >> arg_service, >>> >> >>> >> time_local >>> >> >>> >> ..... >>> >> >>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), >>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm') 5min产生一个分区 >>> >> >>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id >>> '='kafka_hive_test', >>> >> 'scan.startup.mode'='earliest-offset') */; >>> >> >>> >> >>> >> >>> >> --kafka source表定义 >>> >> >>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview ( >>> >> >>> >> arg_service string COMMENT 'arg_service', >>> >> >>> >> .... >>> >> >>> >> )WITH ( >>> >> >>> >> 'connector' = 'kafka', >>> >> >>> >> 'topic' = '...', >>> >> >>> >> 'properties.bootstrap.servers' = '...', >>> >> >>> >> 'properties.group.id' = 'flink_etl_kafka_hive', >>> >> >>> >> 'scan.startup.mode' = 'group-offsets', >>> >> >>> >> 'format' = 'json', >>> >> >>> >> 'json.fail-on-missing-field' = 'false', >>> >> >>> >> 'json.ignore-parse-errors' = 'true' >>> >> >>> >> ); >>> >> --sink hive表定义 >>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min ( >>> >> .... >>> >> ) >>> >> PARTITIONED BY (dt string , hm string) stored as orc location >>> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES( >>> >> 'sink.partition-commit.trigger'='process-time', >>> >> 'sink.partition-commit.delay'='0 min', >>> >> 'sink.partition-commit.policy.class'='...CustomCommitPolicy', >>> >> 'sink.partition-commit.policy.kind'='metastore,success-file,custom', >>> >> 'sink.rolling-policy.check-interval' ='30s', >>> >> 'sink.rolling-policy.rollover-interval'='10min', >>> >> 'sink.rolling-policy.file-size'='128MB' >>> >> ); >>> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 >>> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。 >>> >> 就是flink sql可以 >>> >> >>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter >>> >> 这块,有没有什么可以提升性能相关的优化参数? >>> >> >>> >> >>> >> >>> >> >>> >> 在 2020-09-16 19:29:50,"Jingsong Li" < > >> jingsonglee0@ > >> > 写道: >>> >> >Hi, >>> >> > >>> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距? >>> >> > >>> >> >另外,压测时是否可以看下jstack? >>> >> > >>> >> >Best, >>> >> >Jingsong >>> >> > >>> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang < > >> kandy1203@ > >> > wrote: >>> >> > >>> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 >>> >> ,source >>> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w >>> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少 >>> >> > >>> >> > >>> >> > >>> >> >-- >>> >> >Best, Jingsong Lee >>> >> >>> > >>> > >>> >-- >>> >Best, Jingsong Lee >>> >> >> >> -- >> Best, Jingsong Lee > > > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |