StreamingFileWriter 压测性能

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileWriter 压测性能

kandy.wang
压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileWriter 压测性能

Jingsong Li
Hi,

可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?

另外,压测时是否可以看下jstack?

Best,
Jingsong

On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote:

> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source
> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: StreamingFileWriter 压测性能

kandy.wang
场景很简单,就是kafka2hive
--5min入仓Hive

INSERT INTO  hive.temp_.hive_5min

SELECT

 arg_service,

time_local

.....

FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'), FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区

FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id'='kafka_hive_test', 'scan.startup.mode'='earliest-offset') */;



--kafka source表定义

CREATE TABLE hive.temp_vipflink.kafka_source_pageview (

arg_service string COMMENT 'arg_service',

....

)WITH (

  'connector' = 'kafka',

  'topic' = '...',

  'properties.bootstrap.servers' = '...',

  'properties.group.id' = 'flink_etl_kafka_hive',

  'scan.startup.mode' = 'group-offsets',

  'format' = 'json',

  'json.fail-on-missing-field' = 'false',

  'json.ignore-parse-errors' = 'true'

);
--sink hive表定义
CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
....
)
PARTITIONED BY (dt string , hm string) stored as orc location 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='0 min',
  'sink.partition-commit.policy.class'='...CustomCommitPolicy',
  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
  'sink.rolling-policy.check-interval' ='30s',
  'sink.rolling-policy.rollover-interval'='10min',
  'sink.rolling-policy.file-size'='128MB'
);
初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
就是flink sql可以 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter 这块,有没有什么可以提升性能相关的优化参数?




在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道:

>Hi,
>
>可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>
>另外,压测时是否可以看下jstack?
>
>Best,
>Jingsong
>
>On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote:
>
>> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source
>> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: StreamingFileWriter 压测性能

Jingsong Li
Sink并行度
我理解是配置Sink并行度,这个一直在讨论,还没结论

HDFS性能
具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO

On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote:

> 场景很简单,就是kafka2hive
> --5min入仓Hive
>
> INSERT INTO  hive.temp_.hive_5min
>
> SELECT
>
>  arg_service,
>
> time_local
>
> .....
>
> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>
> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id'='kafka_hive_test',
> 'scan.startup.mode'='earliest-offset') */;
>
>
>
> --kafka source表定义
>
> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>
> arg_service string COMMENT 'arg_service',
>
> ....
>
> )WITH (
>
>   'connector' = 'kafka',
>
>   'topic' = '...',
>
>   'properties.bootstrap.servers' = '...',
>
>   'properties.group.id' = 'flink_etl_kafka_hive',
>
>   'scan.startup.mode' = 'group-offsets',
>
>   'format' = 'json',
>
>   'json.fail-on-missing-field' = 'false',
>
>   'json.ignore-parse-errors' = 'true'
>
> );
> --sink hive表定义
> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> ....
> )
> PARTITIONED BY (dt string , hm string) stored as orc location
> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
>   'sink.partition-commit.trigger'='process-time',
>   'sink.partition-commit.delay'='0 min',
>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>   'sink.rolling-policy.check-interval' ='30s',
>   'sink.rolling-policy.rollover-interval'='10min',
>   'sink.rolling-policy.file-size'='128MB'
> );
> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> 就是flink sql可以
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> 这块,有没有什么可以提升性能相关的优化参数?
>
>
>
>
> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道:
> >Hi,
> >
> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >
> >另外,压测时是否可以看下jstack?
> >
> >Best,
> >Jingsong
> >
> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote:
> >
> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
> ,source
> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >
> >
> >
> >--
> >Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: StreamingFileWriter 压测性能

kandy.wang
@Jingsong Li

public TableSink createTableSink(TableSinkFactory.Context context) {
   CatalogTable table = checkNotNull(context.getTable());
Preconditions.checkArgument(table instanceof CatalogTableImpl);

   boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));

   if (!isGeneric) {
return new HiveTableSink(
            context.getConfiguration().get(
                  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
context.isBounded(),
            new JobConf(hiveConf),
context.getObjectIdentifier(),
table);
} else {
return TableFactoryUtil.findAndCreateTableSink(context);
}
}

HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。

If it is false, using flink native writer to write parquet and orc files;

If it is true, using hadoop mapred record writer to write parquet and orc files

将此参数调整成false后,同样的资源配置下,tps达到30W

这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush 一些相关的参数 ?





在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道:

>Sink并行度
>我理解是配置Sink并行度,这个一直在讨论,还没结论
>
>HDFS性能
>具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>
>On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote:
>
>> 场景很简单,就是kafka2hive
>> --5min入仓Hive
>>
>> INSERT INTO  hive.temp_.hive_5min
>>
>> SELECT
>>
>>  arg_service,
>>
>> time_local
>>
>> .....
>>
>> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
>> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>>
>> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id'='kafka_hive_test',
>> 'scan.startup.mode'='earliest-offset') */;
>>
>>
>>
>> --kafka source表定义
>>
>> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>>
>> arg_service string COMMENT 'arg_service',
>>
>> ....
>>
>> )WITH (
>>
>>   'connector' = 'kafka',
>>
>>   'topic' = '...',
>>
>>   'properties.bootstrap.servers' = '...',
>>
>>   'properties.group.id' = 'flink_etl_kafka_hive',
>>
>>   'scan.startup.mode' = 'group-offsets',
>>
>>   'format' = 'json',
>>
>>   'json.fail-on-missing-field' = 'false',
>>
>>   'json.ignore-parse-errors' = 'true'
>>
>> );
>> --sink hive表定义
>> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> ....
>> )
>> PARTITIONED BY (dt string , hm string) stored as orc location
>> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
>>   'sink.partition-commit.trigger'='process-time',
>>   'sink.partition-commit.delay'='0 min',
>>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>>   'sink.rolling-policy.check-interval' ='30s',
>>   'sink.rolling-policy.rollover-interval'='10min',
>>   'sink.rolling-policy.file-size'='128MB'
>> );
>> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> 就是flink sql可以
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> 这块,有没有什么可以提升性能相关的优化参数?
>>
>>
>>
>>
>> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道:
>> >Hi,
>> >
>> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >
>> >另外,压测时是否可以看下jstack?
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote:
>> >
>> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>> ,source
>> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>> >
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: StreamingFileWriter 压测性能

Jingsong Li
是最新的代码吗?
1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
它是影响性能的,1.11.2已经投票通过,即将发布

On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote:

> @Jingsong Li
>
> public TableSink createTableSink(TableSinkFactory.Context context) {
>    CatalogTable table = checkNotNull(context.getTable());
> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>
>    boolean isGeneric =
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>
>    if (!isGeneric) {
> return new HiveTableSink(
>             context.getConfiguration().get(
>                   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> context.isBounded(),
>             new JobConf(hiveConf),
> context.getObjectIdentifier(),
> table);
> } else {
> return TableFactoryUtil.findAndCreateTableSink(context);
> }
> }
>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>
> If it is false, using flink native writer to write parquet and orc files;
>
> If it is true, using hadoop mapred record writer to write parquet and orc
> files
>
> 将此参数调整成false后,同样的资源配置下,tps达到30W
>
> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> 一些相关的参数 ?
>
>
>
>
>
> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道:
> >Sink并行度
> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >
> >HDFS性能
> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >
> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote:
> >
> >> 场景很简单,就是kafka2hive
> >> --5min入仓Hive
> >>
> >> INSERT INTO  hive.temp_.hive_5min
> >>
> >> SELECT
> >>
> >>  arg_service,
> >>
> >> time_local
> >>
> >> .....
> >>
> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >>
> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
> '='kafka_hive_test',
> >> 'scan.startup.mode'='earliest-offset') */;
> >>
> >>
> >>
> >> --kafka source表定义
> >>
> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >>
> >> arg_service string COMMENT 'arg_service',
> >>
> >> ....
> >>
> >> )WITH (
> >>
> >>   'connector' = 'kafka',
> >>
> >>   'topic' = '...',
> >>
> >>   'properties.bootstrap.servers' = '...',
> >>
> >>   'properties.group.id' = 'flink_etl_kafka_hive',
> >>
> >>   'scan.startup.mode' = 'group-offsets',
> >>
> >>   'format' = 'json',
> >>
> >>   'json.fail-on-missing-field' = 'false',
> >>
> >>   'json.ignore-parse-errors' = 'true'
> >>
> >> );
> >> --sink hive表定义
> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> >> ....
> >> )
> >> PARTITIONED BY (dt string , hm string) stored as orc location
> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
> >>   'sink.partition-commit.trigger'='process-time',
> >>   'sink.partition-commit.delay'='0 min',
> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >>   'sink.rolling-policy.check-interval' ='30s',
> >>   'sink.rolling-policy.rollover-interval'='10min',
> >>   'sink.rolling-policy.file-size'='128MB'
> >> );
> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> >> 就是flink sql可以
> >>
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> >> 这块,有没有什么可以提升性能相关的优化参数?
> >>
> >>
> >>
> >>
> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道:
> >> >Hi,
> >> >
> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >> >
> >> >另外,压测时是否可以看下jstack?
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote:
> >> >
> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
> >> ,source
> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >> >
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: StreamingFileWriter 压测性能

kandy.wang
是master分支代码
那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
if (userMrWriter) {
   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
   Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
   if (bulkFactory.isPresent()) {
      builder = StreamingFileSink.forBulkFormat(
new org.apache.flink.core.fs.Path(sd.getLocation()),
            new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
            .withBucketAssigner(assigner)
            .withRollingPolicy(rollingPolicy)
            .withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use native parquet&orc writer.");
} else {
      builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
}
}
在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道:

>是最新的代码吗?
>1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>它是影响性能的,1.11.2已经投票通过,即将发布
>
>On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote:
>
>> @Jingsong Li
>>
>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>    CatalogTable table = checkNotNull(context.getTable());
>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>
>>    boolean isGeneric =
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>
>>    if (!isGeneric) {
>> return new HiveTableSink(
>>             context.getConfiguration().get(
>>                   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> context.isBounded(),
>>             new JobConf(hiveConf),
>> context.getObjectIdentifier(),
>> table);
>> } else {
>> return TableFactoryUtil.findAndCreateTableSink(context);
>> }
>> }
>>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>>
>> If it is false, using flink native writer to write parquet and orc files;
>>
>> If it is true, using hadoop mapred record writer to write parquet and orc
>> files
>>
>> 将此参数调整成false后,同样的资源配置下,tps达到30W
>>
>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> 一些相关的参数 ?
>>
>>
>>
>>
>>
>> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道:
>> >Sink并行度
>> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >
>> >HDFS性能
>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >
>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote:
>> >
>> >> 场景很简单,就是kafka2hive
>> >> --5min入仓Hive
>> >>
>> >> INSERT INTO  hive.temp_.hive_5min
>> >>
>> >> SELECT
>> >>
>> >>  arg_service,
>> >>
>> >> time_local
>> >>
>> >> .....
>> >>
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >>
>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
>> '='kafka_hive_test',
>> >> 'scan.startup.mode'='earliest-offset') */;
>> >>
>> >>
>> >>
>> >> --kafka source表定义
>> >>
>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >>
>> >> arg_service string COMMENT 'arg_service',
>> >>
>> >> ....
>> >>
>> >> )WITH (
>> >>
>> >>   'connector' = 'kafka',
>> >>
>> >>   'topic' = '...',
>> >>
>> >>   'properties.bootstrap.servers' = '...',
>> >>
>> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>> >>
>> >>   'scan.startup.mode' = 'group-offsets',
>> >>
>> >>   'format' = 'json',
>> >>
>> >>   'json.fail-on-missing-field' = 'false',
>> >>
>> >>   'json.ignore-parse-errors' = 'true'
>> >>
>> >> );
>> >> --sink hive表定义
>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> >> ....
>> >> )
>> >> PARTITIONED BY (dt string , hm string) stored as orc location
>> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
>> >>   'sink.partition-commit.trigger'='process-time',
>> >>   'sink.partition-commit.delay'='0 min',
>> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >>   'sink.rolling-policy.check-interval' ='30s',
>> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >>   'sink.rolling-policy.file-size'='128MB'
>> >> );
>> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> >> 就是flink sql可以
>> >>
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> >> 这块,有没有什么可以提升性能相关的优化参数?
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道:
>> >> >Hi,
>> >> >
>> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >> >
>> >> >另外,压测时是否可以看下jstack?
>> >> >
>> >> >Best,
>> >> >Jingsong
>> >> >
>> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]> wrote:
>> >> >
>> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>> >> ,source
>> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>> >> >
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: StreamingFileWriter 压测性能

Jingsong Li
可以再尝试下最新的1.11.2吗?

https://flink.apache.org/downloads.html

On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <[hidden email]> wrote:

> 是master分支代码
> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> if (userMrWriter) {
>    builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner,
> rollingPolicy, outputFileConfig);
> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> } else {
>    Optional<BulkWriter.Factory<RowData>> bulkFactory =
> createBulkWriterFactory(partitionColumns, sd);
>    if (bulkFactory.isPresent()) {
>       builder = StreamingFileSink.forBulkFormat(
> new org.apache.flink.core.fs.Path(sd.getLocation()),
>             new
> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
>             .withBucketAssigner(assigner)
>             .withRollingPolicy(rollingPolicy)
>             .withOutputFileConfig(outputFileConfig);
> LOG.info("Hive streaming sink: Use native parquet&orc writer.");
> } else {
>       builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> assigner, rollingPolicy, outputFileConfig);
> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
> BulkWriter Factory not available.");
> }
> }
> 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道:
> >是最新的代码吗?
> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >它是影响性能的,1.11.2已经投票通过,即将发布
> >
> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote:
> >
> >> @Jingsong Li
> >>
> >> public TableSink createTableSink(TableSinkFactory.Context context) {
> >>    CatalogTable table = checkNotNull(context.getTable());
> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >>
> >>    boolean isGeneric =
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >>
> >>    if (!isGeneric) {
> >> return new HiveTableSink(
> >>             context.getConfiguration().get(
> >>                   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> context.isBounded(),
> >>             new JobConf(hiveConf),
> >> context.getObjectIdentifier(),
> >> table);
> >> } else {
> >> return TableFactoryUtil.findAndCreateTableSink(context);
> >> }
> >> }
> >>
> >>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> >>
> >> If it is false, using flink native writer to write parquet and orc
> files;
> >>
> >> If it is true, using hadoop mapred record writer to write parquet and
> orc
> >> files
> >>
> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
> >>
> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> >> 一些相关的参数 ?
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道:
> >> >Sink并行度
> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >> >
> >> >HDFS性能
> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >> >
> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote:
> >> >
> >> >> 场景很简单,就是kafka2hive
> >> >> --5min入仓Hive
> >> >>
> >> >> INSERT INTO  hive.temp_.hive_5min
> >> >>
> >> >> SELECT
> >> >>
> >> >>  arg_service,
> >> >>
> >> >> time_local
> >> >>
> >> >> .....
> >> >>
> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >> >>
> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
> properties.group.id
> >> '='kafka_hive_test',
> >> >> 'scan.startup.mode'='earliest-offset') */;
> >> >>
> >> >>
> >> >>
> >> >> --kafka source表定义
> >> >>
> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >> >>
> >> >> arg_service string COMMENT 'arg_service',
> >> >>
> >> >> ....
> >> >>
> >> >> )WITH (
> >> >>
> >> >>   'connector' = 'kafka',
> >> >>
> >> >>   'topic' = '...',
> >> >>
> >> >>   'properties.bootstrap.servers' = '...',
> >> >>
> >> >>   'properties.group.id' = 'flink_etl_kafka_hive',
> >> >>
> >> >>   'scan.startup.mode' = 'group-offsets',
> >> >>
> >> >>   'format' = 'json',
> >> >>
> >> >>   'json.fail-on-missing-field' = 'false',
> >> >>
> >> >>   'json.ignore-parse-errors' = 'true'
> >> >>
> >> >> );
> >> >> --sink hive表定义
> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> >> >> ....
> >> >> )
> >> >> PARTITIONED BY (dt string , hm string) stored as orc location
> >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
> >> >>   'sink.partition-commit.trigger'='process-time',
> >> >>   'sink.partition-commit.delay'='0 min',
> >> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
> >> >>
>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >> >>   'sink.rolling-policy.check-interval' ='30s',
> >> >>   'sink.rolling-policy.rollover-interval'='10min',
> >> >>   'sink.rolling-policy.file-size'='128MB'
> >> >> );
> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> >> >> 就是flink sql可以
> >> >>
> >>
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> >> >> 这块,有没有什么可以提升性能相关的优化参数?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道:
> >> >> >Hi,
> >> >> >
> >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >> >> >
> >> >> >另外,压测时是否可以看下jstack?
> >> >> >
> >> >> >Best,
> >> >> >Jingsong
> >> >> >
> >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]>
> wrote:
> >> >> >
> >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
> >> >> ,source
> >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >> >> >
> >> >> >
> >> >> >
> >> >> >--
> >> >> >Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: Re: StreamingFileWriter 压测性能

kandy.wang



ok. 就是用hadoop mr writer vs  flink 自实现的native writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer 改成false是可以满足我们的写hive需求了
还有一个问题,之前问过你,你还没回复:
HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、 sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
在 2020-09-17 13:43:04,"Jingsong Li" <[hidden email]> 写道:

>可以再尝试下最新的1.11.2吗?
>
>https://flink.apache.org/downloads.html
>
>On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <[hidden email]> wrote:
>
>> 是master分支代码
>> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
>> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
>> if (userMrWriter) {
>>    builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner,
>> rollingPolicy, outputFileConfig);
>> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
>> } else {
>>    Optional<BulkWriter.Factory<RowData>> bulkFactory =
>> createBulkWriterFactory(partitionColumns, sd);
>>    if (bulkFactory.isPresent()) {
>>       builder = StreamingFileSink.forBulkFormat(
>> new org.apache.flink.core.fs.Path(sd.getLocation()),
>>             new
>> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
>>             .withBucketAssigner(assigner)
>>             .withRollingPolicy(rollingPolicy)
>>             .withOutputFileConfig(outputFileConfig);
>> LOG.info("Hive streaming sink: Use native parquet&orc writer.");
>> } else {
>>       builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
>> assigner, rollingPolicy, outputFileConfig);
>> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
>> BulkWriter Factory not available.");
>> }
>> }
>> 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道:
>> >是最新的代码吗?
>> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> >它是影响性能的,1.11.2已经投票通过,即将发布
>> >
>> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote:
>> >
>> >> @Jingsong Li
>> >>
>> >> public TableSink createTableSink(TableSinkFactory.Context context) {
>> >>    CatalogTable table = checkNotNull(context.getTable());
>> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>> >>
>> >>    boolean isGeneric =
>> >>
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>> >>
>> >>    if (!isGeneric) {
>> >> return new HiveTableSink(
>> >>             context.getConfiguration().get(
>> >>                   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> >> context.isBounded(),
>> >>             new JobConf(hiveConf),
>> >> context.getObjectIdentifier(),
>> >> table);
>> >> } else {
>> >> return TableFactoryUtil.findAndCreateTableSink(context);
>> >> }
>> >> }
>> >>
>> >>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>> >>
>> >> If it is false, using flink native writer to write parquet and orc
>> files;
>> >>
>> >> If it is true, using hadoop mapred record writer to write parquet and
>> orc
>> >> files
>> >>
>> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
>> >>
>> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> >> 一些相关的参数 ?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道:
>> >> >Sink并行度
>> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >> >
>> >> >HDFS性能
>> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >> >
>> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]> wrote:
>> >> >
>> >> >> 场景很简单,就是kafka2hive
>> >> >> --5min入仓Hive
>> >> >>
>> >> >> INSERT INTO  hive.temp_.hive_5min
>> >> >>
>> >> >> SELECT
>> >> >>
>> >> >>  arg_service,
>> >> >>
>> >> >> time_local
>> >> >>
>> >> >> .....
>> >> >>
>> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
>> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >> >>
>> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
>> properties.group.id
>> >> '='kafka_hive_test',
>> >> >> 'scan.startup.mode'='earliest-offset') */;
>> >> >>
>> >> >>
>> >> >>
>> >> >> --kafka source表定义
>> >> >>
>> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >> >>
>> >> >> arg_service string COMMENT 'arg_service',
>> >> >>
>> >> >> ....
>> >> >>
>> >> >> )WITH (
>> >> >>
>> >> >>   'connector' = 'kafka',
>> >> >>
>> >> >>   'topic' = '...',
>> >> >>
>> >> >>   'properties.bootstrap.servers' = '...',
>> >> >>
>> >> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>> >> >>
>> >> >>   'scan.startup.mode' = 'group-offsets',
>> >> >>
>> >> >>   'format' = 'json',
>> >> >>
>> >> >>   'json.fail-on-missing-field' = 'false',
>> >> >>
>> >> >>   'json.ignore-parse-errors' = 'true'
>> >> >>
>> >> >> );
>> >> >> --sink hive表定义
>> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> >> >> ....
>> >> >> )
>> >> >> PARTITIONED BY (dt string , hm string) stored as orc location
>> >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
>> >> >>   'sink.partition-commit.trigger'='process-time',
>> >> >>   'sink.partition-commit.delay'='0 min',
>> >> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>> >> >>
>>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >> >>   'sink.rolling-policy.check-interval' ='30s',
>> >> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >> >>   'sink.rolling-policy.file-size'='128MB'
>> >> >> );
>> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> >> >> 就是flink sql可以
>> >> >>
>> >>
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> >> >> 这块,有没有什么可以提升性能相关的优化参数?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道:
>> >> >> >Hi,
>> >> >> >
>> >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >> >> >
>> >> >> >另外,压测时是否可以看下jstack?
>> >> >> >
>> >> >> >Best,
>> >> >> >Jingsong
>> >> >> >
>> >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]>
>> wrote:
>> >> >> >
>> >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>> >> >> ,source
>> >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >--
>> >> >> >Best, Jingsong Lee
>> >> >>
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

Jingsong Li
是的,可以测一下,理论上 mr writer不应该有较大性能差距。

> 为何要强制滚动文件

因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。

On Thu, Sep 17, 2020 at 2:05 PM kandy.wang <[hidden email]> wrote:

>
>
>
> ok. 就是用hadoop mr writer vs  flink 自实现的native
> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
> 改成false是可以满足我们的写hive需求了
> 还有一个问题,之前问过你,你还没回复:
> HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么?
> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
> 在 2020-09-17 13:43:04,"Jingsong Li" <[hidden email]> 写道:
> >可以再尝试下最新的1.11.2吗?
> >
> >https://flink.apache.org/downloads.html
> >
> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <[hidden email]> wrote:
> >
> >> 是master分支代码
> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> >> if (userMrWriter) {
> >>    builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> assigner,
> >> rollingPolicy, outputFileConfig);
> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> >> } else {
> >>    Optional<BulkWriter.Factory<RowData>> bulkFactory =
> >> createBulkWriterFactory(partitionColumns, sd);
> >>    if (bulkFactory.isPresent()) {
> >>       builder = StreamingFileSink.forBulkFormat(
> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
> >>             new
> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
> partComputer))
> >>             .withBucketAssigner(assigner)
> >>             .withRollingPolicy(rollingPolicy)
> >>             .withOutputFileConfig(outputFileConfig);
> >> LOG.info("Hive streaming sink: Use native parquet&orc writer.");
> >> } else {
> >>       builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> assigner, rollingPolicy, outputFileConfig);
> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
> >> BulkWriter Factory not available.");
> >> }
> >> }
> >> 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道:
> >> >是最新的代码吗?
> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >> >它是影响性能的,1.11.2已经投票通过,即将发布
> >> >
> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote:
> >> >
> >> >> @Jingsong Li
> >> >>
> >> >> public TableSink createTableSink(TableSinkFactory.Context context) {
> >> >>    CatalogTable table = checkNotNull(context.getTable());
> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >> >>
> >> >>    boolean isGeneric =
> >> >>
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >> >>
> >> >>    if (!isGeneric) {
> >> >> return new HiveTableSink(
> >> >>             context.getConfiguration().get(
> >> >>
>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> >> context.isBounded(),
> >> >>             new JobConf(hiveConf),
> >> >> context.getObjectIdentifier(),
> >> >> table);
> >> >> } else {
> >> >> return TableFactoryUtil.findAndCreateTableSink(context);
> >> >> }
> >> >> }
> >> >>
> >> >>
> >>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> >> >>
> >> >> If it is false, using flink native writer to write parquet and orc
> >> files;
> >> >>
> >> >> If it is true, using hadoop mapred record writer to write parquet and
> >> orc
> >> >> files
> >> >>
> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
> >> >>
> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> >> >> 一些相关的参数 ?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道:
> >> >> >Sink并行度
> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >> >> >
> >> >> >HDFS性能
> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >> >> >
> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]>
> wrote:
> >> >> >
> >> >> >> 场景很简单,就是kafka2hive
> >> >> >> --5min入仓Hive
> >> >> >>
> >> >> >> INSERT INTO  hive.temp_.hive_5min
> >> >> >>
> >> >> >> SELECT
> >> >> >>
> >> >> >>  arg_service,
> >> >> >>
> >> >> >> time_local
> >> >> >>
> >> >> >> .....
> >> >> >>
> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >> >> >>
> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
> >> properties.group.id
> >> >> '='kafka_hive_test',
> >> >> >> 'scan.startup.mode'='earliest-offset') */;
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --kafka source表定义
> >> >> >>
> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >> >> >>
> >> >> >> arg_service string COMMENT 'arg_service',
> >> >> >>
> >> >> >> ....
> >> >> >>
> >> >> >> )WITH (
> >> >> >>
> >> >> >>   'connector' = 'kafka',
> >> >> >>
> >> >> >>   'topic' = '...',
> >> >> >>
> >> >> >>   'properties.bootstrap.servers' = '...',
> >> >> >>
> >> >> >>   'properties.group.id' = 'flink_etl_kafka_hive',
> >> >> >>
> >> >> >>   'scan.startup.mode' = 'group-offsets',
> >> >> >>
> >> >> >>   'format' = 'json',
> >> >> >>
> >> >> >>   'json.fail-on-missing-field' = 'false',
> >> >> >>
> >> >> >>   'json.ignore-parse-errors' = 'true'
> >> >> >>
> >> >> >> );
> >> >> >> --sink hive表定义
> >> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> >> >> >> ....
> >> >> >> )
> >> >> >> PARTITIONED BY (dt string , hm string) stored as orc location
> >> >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
> >> >> >>   'sink.partition-commit.trigger'='process-time',
> >> >> >>   'sink.partition-commit.delay'='0 min',
> >> >> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
> >> >> >>
> >>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >> >> >>   'sink.rolling-policy.check-interval' ='30s',
> >> >> >>   'sink.rolling-policy.rollover-interval'='10min',
> >> >> >>   'sink.rolling-policy.file-size'='128MB'
> >> >> >> );
> >> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> >> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> >> >> >> 就是flink sql可以
> >> >> >>
> >> >>
> >>
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> >> >> >> 这块,有没有什么可以提升性能相关的优化参数?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道:
> >> >> >> >Hi,
> >> >> >> >
> >> >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >> >> >> >
> >> >> >> >另外,压测时是否可以看下jstack?
> >> >> >> >
> >> >> >> >Best,
> >> >> >> >Jingsong
> >> >> >> >
> >> >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]>
> >> wrote:
> >> >> >> >
> >> >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka
> partition=40
> >> >> >> ,source
> >> >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >--
> >> >> >> >Best, Jingsong Lee
> >> >> >>
> >> >> >
> >> >> >
> >> >> >--
> >> >> >Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

kandy.wang






@Jingsong Li
  测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。
我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter 修改应该都提交到flink 1.11分支了吧。
顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?


在 2020-09-17 14:19:42,"Jingsong Li" <[hidden email]> 写道:

>是的,可以测一下,理论上 mr writer不应该有较大性能差距。
>
>> 为何要强制滚动文件
>
>因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。
>
>On Thu, Sep 17, 2020 at 2:05 PM kandy.wang <[hidden email]> wrote:
>
>>
>>
>>
>> ok. 就是用hadoop mr writer vs  flink 自实现的native
>> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
>> 改成false是可以满足我们的写hive需求了
>> 还有一个问题,之前问过你,你还没回复:
>> HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么?
>> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
>> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
>> 在 2020-09-17 13:43:04,"Jingsong Li" <[hidden email]> 写道:
>> >可以再尝试下最新的1.11.2吗?
>> >
>> >https://flink.apache.org/downloads.html
>> >
>> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <[hidden email]> wrote:
>> >
>> >> 是master分支代码
>> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
>> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
>> >> if (userMrWriter) {
>> >>    builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
>> assigner,
>> >> rollingPolicy, outputFileConfig);
>> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
>> >> } else {
>> >>    Optional<BulkWriter.Factory<RowData>> bulkFactory =
>> >> createBulkWriterFactory(partitionColumns, sd);
>> >>    if (bulkFactory.isPresent()) {
>> >>       builder = StreamingFileSink.forBulkFormat(
>> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
>> >>             new
>> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
>> partComputer))
>> >>             .withBucketAssigner(assigner)
>> >>             .withRollingPolicy(rollingPolicy)
>> >>             .withOutputFileConfig(outputFileConfig);
>> >> LOG.info("Hive streaming sink: Use native parquet&orc writer.");
>> >> } else {
>> >>       builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
>> >> assigner, rollingPolicy, outputFileConfig);
>> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
>> >> BulkWriter Factory not available.");
>> >> }
>> >> }
>> >> 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道:
>> >> >是最新的代码吗?
>> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> >> >它是影响性能的,1.11.2已经投票通过,即将发布
>> >> >
>> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]> wrote:
>> >> >
>> >> >> @Jingsong Li
>> >> >>
>> >> >> public TableSink createTableSink(TableSinkFactory.Context context) {
>> >> >>    CatalogTable table = checkNotNull(context.getTable());
>> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>> >> >>
>> >> >>    boolean isGeneric =
>> >> >>
>> >>
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>> >> >>
>> >> >>    if (!isGeneric) {
>> >> >> return new HiveTableSink(
>> >> >>             context.getConfiguration().get(
>> >> >>
>>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> >> >> context.isBounded(),
>> >> >>             new JobConf(hiveConf),
>> >> >> context.getObjectIdentifier(),
>> >> >> table);
>> >> >> } else {
>> >> >> return TableFactoryUtil.findAndCreateTableSink(context);
>> >> >> }
>> >> >> }
>> >> >>
>> >> >>
>> >>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>> >> >>
>> >> >> If it is false, using flink native writer to write parquet and orc
>> >> files;
>> >> >>
>> >> >> If it is true, using hadoop mapred record writer to write parquet and
>> >> orc
>> >> >> files
>> >> >>
>> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
>> >> >>
>> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> >> >> 一些相关的参数 ?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道:
>> >> >> >Sink并行度
>> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >> >> >
>> >> >> >HDFS性能
>> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >> >> >
>> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]>
>> wrote:
>> >> >> >
>> >> >> >> 场景很简单,就是kafka2hive
>> >> >> >> --5min入仓Hive
>> >> >> >>
>> >> >> >> INSERT INTO  hive.temp_.hive_5min
>> >> >> >>
>> >> >> >> SELECT
>> >> >> >>
>> >> >> >>  arg_service,
>> >> >> >>
>> >> >> >> time_local
>> >> >> >>
>> >> >> >> .....
>> >> >> >>
>> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
>> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >> >> >>
>> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
>> >> properties.group.id
>> >> >> '='kafka_hive_test',
>> >> >> >> 'scan.startup.mode'='earliest-offset') */;
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --kafka source表定义
>> >> >> >>
>> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >> >> >>
>> >> >> >> arg_service string COMMENT 'arg_service',
>> >> >> >>
>> >> >> >> ....
>> >> >> >>
>> >> >> >> )WITH (
>> >> >> >>
>> >> >> >>   'connector' = 'kafka',
>> >> >> >>
>> >> >> >>   'topic' = '...',
>> >> >> >>
>> >> >> >>   'properties.bootstrap.servers' = '...',
>> >> >> >>
>> >> >> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>> >> >> >>
>> >> >> >>   'scan.startup.mode' = 'group-offsets',
>> >> >> >>
>> >> >> >>   'format' = 'json',
>> >> >> >>
>> >> >> >>   'json.fail-on-missing-field' = 'false',
>> >> >> >>
>> >> >> >>   'json.ignore-parse-errors' = 'true'
>> >> >> >>
>> >> >> >> );
>> >> >> >> --sink hive表定义
>> >> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> >> >> >> ....
>> >> >> >> )
>> >> >> >> PARTITIONED BY (dt string , hm string) stored as orc location
>> >> >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
>> >> >> >>   'sink.partition-commit.trigger'='process-time',
>> >> >> >>   'sink.partition-commit.delay'='0 min',
>> >> >> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>> >> >> >>
>> >>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >> >> >>   'sink.rolling-policy.check-interval' ='30s',
>> >> >> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >> >> >>   'sink.rolling-policy.file-size'='128MB'
>> >> >> >> );
>> >> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> >> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> >> >> >> 就是flink sql可以
>> >> >> >>
>> >> >>
>> >>
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> >> >> >> 这块,有没有什么可以提升性能相关的优化参数?
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]> 写道:
>> >> >> >> >Hi,
>> >> >> >> >
>> >> >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >> >> >> >
>> >> >> >> >另外,压测时是否可以看下jstack?
>> >> >> >> >
>> >> >> >> >Best,
>> >> >> >> >Jingsong
>> >> >> >> >
>> >> >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]>
>> >> wrote:
>> >> >> >> >
>> >> >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka
>> partition=40
>> >> >> >> ,source
>> >> >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >--
>> >> >> >> >Best, Jingsong Lee
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >--
>> >> >> >Best, Jingsong Lee
>> >> >>
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

Jingsong Li
Hi,

不好意思,麻烦试下
试下最新的release-1.11分支编译出来的Hive依赖 (flink-connector-hive的 改动)

> 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?

这是1.12的目标,这两天会出来JIRA和设计方案,类似会加上"auto-compaction"的配置,sink中自动合并

Best,
Jingsong


On Fri, Sep 18, 2020 at 10:18 AM kandy.wang <[hidden email]> wrote:

>
>
>
>
>
>
> @Jingsong Li
>   测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。
> 我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter
> 修改应该都提交到flink 1.11分支了吧。
> 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?
>
>
> 在 2020-09-17 14:19:42,"Jingsong Li" <[hidden email]> 写道:
> >是的,可以测一下,理论上 mr writer不应该有较大性能差距。
> >
> >> 为何要强制滚动文件
> >
> >因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。
> >
> >On Thu, Sep 17, 2020 at 2:05 PM kandy.wang <[hidden email]> wrote:
> >
> >>
> >>
> >>
> >> ok. 就是用hadoop mr writer vs  flink 自实现的native
> >> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
> >> 改成false是可以满足我们的写hive需求了
> >> 还有一个问题,之前问过你,你还没回复:
> >> HiveRollingPolicy为什么 shouldRollOnCheckpoint true
> 为何要强制滚动文件,这个可以抽取成一个配置参数么?
> >> 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
> >>
> sink.rolling-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
> >> 在 2020-09-17 13:43:04,"Jingsong Li" <[hidden email]> 写道:
> >> >可以再尝试下最新的1.11.2吗?
> >> >
> >> >https://flink.apache.org/downloads.html
> >> >
> >> >On Thu, Sep 17, 2020 at 1:33 PM kandy.wang <[hidden email]> wrote:
> >> >
> >> >> 是master分支代码
> >> >> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> >> >> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> >> >> if (userMrWriter) {
> >> >>    builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> assigner,
> >> >> rollingPolicy, outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> >> >> } else {
> >> >>    Optional<BulkWriter.Factory<RowData>> bulkFactory =
> >> >> createBulkWriterFactory(partitionColumns, sd);
> >> >>    if (bulkFactory.isPresent()) {
> >> >>       builder = StreamingFileSink.forBulkFormat(
> >> >> new org.apache.flink.core.fs.Path(sd.getLocation()),
> >> >>             new
> >> >> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(),
> >> partComputer))
> >> >>             .withBucketAssigner(assigner)
> >> >>             .withRollingPolicy(rollingPolicy)
> >> >>             .withOutputFileConfig(outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use native parquet&orc writer.");
> >> >> } else {
> >> >>       builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> >> >> assigner, rollingPolicy, outputFileConfig);
> >> >> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer
> because
> >> >> BulkWriter Factory not available.");
> >> >> }
> >> >> }
> >> >> 在 2020-09-17 13:21:40,"Jingsong Li" <[hidden email]> 写道:
> >> >> >是最新的代码吗?
> >> >> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >> >> >它是影响性能的,1.11.2已经投票通过,即将发布
> >> >> >
> >> >> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <[hidden email]>
> wrote:
> >> >> >
> >> >> >> @Jingsong Li
> >> >> >>
> >> >> >> public TableSink createTableSink(TableSinkFactory.Context
> context) {
> >> >> >>    CatalogTable table = checkNotNull(context.getTable());
> >> >> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >> >> >>
> >> >> >>    boolean isGeneric =
> >> >> >>
> >> >>
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >> >> >>
> >> >> >>    if (!isGeneric) {
> >> >> >> return new HiveTableSink(
> >> >> >>             context.getConfiguration().get(
> >> >> >>
> >>  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> >> >> context.isBounded(),
> >> >> >>             new JobConf(hiveConf),
> >> >> >> context.getObjectIdentifier(),
> >> >> >> table);
> >> >> >> } else {
> >> >> >> return TableFactoryUtil.findAndCreateTableSink(context);
> >> >> >> }
> >> >> >> }
> >> >> >>
> >> >> >>
> >> >>
> >>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> >> >> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> >> >> >>
> >> >> >> If it is false, using flink native writer to write parquet and orc
> >> >> files;
> >> >> >>
> >> >> >> If it is true, using hadoop mapred record writer to write parquet
> and
> >> >> orc
> >> >> >> files
> >> >> >>
> >> >> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
> >> >> >>
> >> >> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async
> flush
> >> >> >> 一些相关的参数 ?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2020-09-17 11:21:43,"Jingsong Li" <[hidden email]> 写道:
> >> >> >> >Sink并行度
> >> >> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >> >> >> >
> >> >> >> >HDFS性能
> >> >> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >> >> >> >
> >> >> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <[hidden email]>
> >> wrote:
> >> >> >> >
> >> >> >> >> 场景很简单,就是kafka2hive
> >> >> >> >> --5min入仓Hive
> >> >> >> >>
> >> >> >> >> INSERT INTO  hive.temp_.hive_5min
> >> >> >> >>
> >> >> >> >> SELECT
> >> >> >> >>
> >> >> >> >>  arg_service,
> >> >> >> >>
> >> >> >> >> time_local
> >> >> >> >>
> >> >> >> >> .....
> >> >> >> >>
> >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
> >> >> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >> >> >> >>
> >> >> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
> >> >> properties.group.id
> >> >> >> '='kafka_hive_test',
> >> >> >> >> 'scan.startup.mode'='earliest-offset') */;
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> --kafka source表定义
> >> >> >> >>
> >> >> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >> >> >> >>
> >> >> >> >> arg_service string COMMENT 'arg_service',
> >> >> >> >>
> >> >> >> >> ....
> >> >> >> >>
> >> >> >> >> )WITH (
> >> >> >> >>
> >> >> >> >>   'connector' = 'kafka',
> >> >> >> >>
> >> >> >> >>   'topic' = '...',
> >> >> >> >>
> >> >> >> >>   'properties.bootstrap.servers' = '...',
> >> >> >> >>
> >> >> >> >>   'properties.group.id' = 'flink_etl_kafka_hive',
> >> >> >> >>
> >> >> >> >>   'scan.startup.mode' = 'group-offsets',
> >> >> >> >>
> >> >> >> >>   'format' = 'json',
> >> >> >> >>
> >> >> >> >>   'json.fail-on-missing-field' = 'false',
> >> >> >> >>
> >> >> >> >>   'json.ignore-parse-errors' = 'true'
> >> >> >> >>
> >> >> >> >> );
> >> >> >> >> --sink hive表定义
> >> >> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> >> >> >> >> ....
> >> >> >> >> )
> >> >> >> >> PARTITIONED BY (dt string , hm string) stored as orc location
> >> >> >> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
> >> >> >> >>   'sink.partition-commit.trigger'='process-time',
> >> >> >> >>   'sink.partition-commit.delay'='0 min',
> >> >> >> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
> >> >> >> >>
> >> >>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >> >> >> >>   'sink.rolling-policy.check-interval' ='30s',
> >> >> >> >>   'sink.rolling-policy.rollover-interval'='10min',
> >> >> >> >>   'sink.rolling-policy.file-size'='128MB'
> >> >> >> >> );
> >> >> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> >> >> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> >> >> >> >> 就是flink sql可以
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> >> >> >> >> 这块,有没有什么可以提升性能相关的优化参数?
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> 在 2020-09-16 19:29:50,"Jingsong Li" <[hidden email]>
> 写道:
> >> >> >> >> >Hi,
> >> >> >> >> >
> >> >> >> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >> >> >> >> >
> >> >> >> >> >另外,压测时是否可以看下jstack?
> >> >> >> >> >
> >> >> >> >> >Best,
> >> >> >> >> >Jingsong
> >> >> >> >> >
> >> >> >> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <[hidden email]
> >
> >> >> wrote:
> >> >> >> >> >
> >> >> >> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka
> >> partition=40
> >> >> >> >> ,source
> >> >> >> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> >> >> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >--
> >> >> >> >> >Best, Jingsong Lee
> >> >> >> >>
> >> >> >> >
> >> >> >> >
> >> >> >> >--
> >> >> >> >Best, Jingsong Lee
> >> >> >>
> >> >> >
> >> >> >
> >> >> >--
> >> >> >Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: StreamingFileWriter 压测性能

wangenbao
In reply to this post by Jingsong Li
我这边也遇到了这个bug,table.exec.hive.fallback-mapred-writer没配置的情况下,默认走到hadoop mr
write;
当我的数据比较分散,Hive三分区Parquet表(年月日,小时,hashtid),会产生多个writer,运行一会就TM就OOM;
相同数据量table.exec.hive.fallback-mapred-writer设置为false,用flink native write没有问题;
Jingsong Li wrote
> 是最新的代码吗?
> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> 它是影响性能的,1.11.2已经投票通过,即将发布
>
> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang &lt;

> kandy1203@

> &gt; wrote:
>
>> @Jingsong Li
>>
>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>    CatalogTable table = checkNotNull(context.getTable());
>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>
>>    boolean isGeneric =
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>
>>    if (!isGeneric) {
>> return new HiveTableSink(
>>             context.getConfiguration().get(
>>                   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> context.isBounded(),
>>             new JobConf(hiveConf),
>> context.getObjectIdentifier(),
>> table);
>> } else {
>> return TableFactoryUtil.findAndCreateTableSink(context);
>> }
>> }
>>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>>
>> If it is false, using flink native writer to write parquet and orc files;
>>
>> If it is true, using hadoop mapred record writer to write parquet and orc
>> files
>>
>> 将此参数调整成false后,同样的资源配置下,tps达到30W
>>
>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> 一些相关的参数 ?
>>
>>
>>
>>
>>
>> 在 2020-09-17 11:21:43,"Jingsong Li" &lt;

> jingsonglee0@

> &gt; 写道:
>> >Sink并行度
>> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >
>> >HDFS性能
>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >
>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang &lt;

> kandy1203@

> &gt; wrote:
>> >
>> >> 场景很简单,就是kafka2hive
>> >> --5min入仓Hive
>> >>
>> >> INSERT INTO  hive.temp_.hive_5min
>> >>
>> >> SELECT
>> >>
>> >>  arg_service,
>> >>
>> >> time_local
>> >>
>> >> .....
>> >>
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >>
>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
>> '='kafka_hive_test',
>> >> 'scan.startup.mode'='earliest-offset') */;
>> >>
>> >>
>> >>
>> >> --kafka source表定义
>> >>
>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >>
>> >> arg_service string COMMENT 'arg_service',
>> >>
>> >> ....
>> >>
>> >> )WITH (
>> >>
>> >>   'connector' = 'kafka',
>> >>
>> >>   'topic' = '...',
>> >>
>> >>   'properties.bootstrap.servers' = '...',
>> >>
>> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>> >>
>> >>   'scan.startup.mode' = 'group-offsets',
>> >>
>> >>   'format' = 'json',
>> >>
>> >>   'json.fail-on-missing-field' = 'false',
>> >>
>> >>   'json.ignore-parse-errors' = 'true'
>> >>
>> >> );
>> >> --sink hive表定义
>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> >> ....
>> >> )
>> >> PARTITIONED BY (dt string , hm string) stored as orc location
>> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
>> >>   'sink.partition-commit.trigger'='process-time',
>> >>   'sink.partition-commit.delay'='0 min',
>> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >>   'sink.rolling-policy.check-interval' ='30s',
>> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >>   'sink.rolling-policy.file-size'='128MB'
>> >> );
>> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> >> 就是flink sql可以
>> >>
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> >> 这块,有没有什么可以提升性能相关的优化参数?
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-16 19:29:50,"Jingsong Li" &lt;

> jingsonglee0@

> &gt; 写道:
>> >> >Hi,
>> >> >
>> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >> >
>> >> >另外,压测时是否可以看下jstack?
>> >> >
>> >> >Best,
>> >> >Jingsong
>> >> >
>> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang &lt;

> kandy1203@

> &gt; wrote:
>> >> >
>> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>> >> ,source
>> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>> >> >
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee


Jingsong Li wrote
> 是最新的代码吗?
> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> 它是影响性能的,1.11.2已经投票通过,即将发布
>
> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang &lt;

> kandy1203@

> &gt; wrote:
>
>> @Jingsong Li
>>
>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>    CatalogTable table = checkNotNull(context.getTable());
>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>
>>    boolean isGeneric =
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>
>>    if (!isGeneric) {
>> return new HiveTableSink(
>>             context.getConfiguration().get(
>>                   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> context.isBounded(),
>>             new JobConf(hiveConf),
>> context.getObjectIdentifier(),
>> table);
>> } else {
>> return TableFactoryUtil.findAndCreateTableSink(context);
>> }
>> }
>>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>>
>> If it is false, using flink native writer to write parquet and orc files;
>>
>> If it is true, using hadoop mapred record writer to write parquet and orc
>> files
>>
>> 将此参数调整成false后,同样的资源配置下,tps达到30W
>>
>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> 一些相关的参数 ?
>>
>>
>>
>>
>>
>> 在 2020-09-17 11:21:43,"Jingsong Li" &lt;

> jingsonglee0@

> &gt; 写道:
>> >Sink并行度
>> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >
>> >HDFS性能
>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >
>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang &lt;

> kandy1203@

> &gt; wrote:
>> >
>> >> 场景很简单,就是kafka2hive
>> >> --5min入仓Hive
>> >>
>> >> INSERT INTO  hive.temp_.hive_5min
>> >>
>> >> SELECT
>> >>
>> >>  arg_service,
>> >>
>> >> time_local
>> >>
>> >> .....
>> >>
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >>
>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
>> '='kafka_hive_test',
>> >> 'scan.startup.mode'='earliest-offset') */;
>> >>
>> >>
>> >>
>> >> --kafka source表定义
>> >>
>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >>
>> >> arg_service string COMMENT 'arg_service',
>> >>
>> >> ....
>> >>
>> >> )WITH (
>> >>
>> >>   'connector' = 'kafka',
>> >>
>> >>   'topic' = '...',
>> >>
>> >>   'properties.bootstrap.servers' = '...',
>> >>
>> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>> >>
>> >>   'scan.startup.mode' = 'group-offsets',
>> >>
>> >>   'format' = 'json',
>> >>
>> >>   'json.fail-on-missing-field' = 'false',
>> >>
>> >>   'json.ignore-parse-errors' = 'true'
>> >>
>> >> );
>> >> --sink hive表定义
>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> >> ....
>> >> )
>> >> PARTITIONED BY (dt string , hm string) stored as orc location
>> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
>> >>   'sink.partition-commit.trigger'='process-time',
>> >>   'sink.partition-commit.delay'='0 min',
>> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >>   'sink.rolling-policy.check-interval' ='30s',
>> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >>   'sink.rolling-policy.file-size'='128MB'
>> >> );
>> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> >> 就是flink sql可以
>> >>
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> >> 这块,有没有什么可以提升性能相关的优化参数?
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-16 19:29:50,"Jingsong Li" &lt;

> jingsonglee0@

> &gt; 写道:
>> >> >Hi,
>> >> >
>> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >> >
>> >> >另外,压测时是否可以看下jstack?
>> >> >
>> >> >Best,
>> >> >Jingsong
>> >> >
>> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang &lt;

> kandy1203@

> &gt; wrote:
>> >> >
>> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>> >> ,source
>> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>> >> >
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: StreamingFileWriter 压测性能

kandy.wang



hi wangenbao :
  我这边还没出现过OOM的情况,我理解调大TM 的资源内存 CPU这些参数应当是可以的。
  我这边遇到的问题是性能上不去。不过table.exec.hive.fallback-mapred-writer=false 确实有较大改观。
在 2020-09-18 16:45:29,"wangenbao" <[hidden email]> 写道:

>我这边也遇到了这个bug,table.exec.hive.fallback-mapred-writer没配置的情况下,默认走到hadoop mr
>write;
>当我的数据比较分散,Hive三分区Parquet表(年月日,小时,hashtid),会产生多个writer,运行一会就TM就OOM;
>相同数据量table.exec.hive.fallback-mapred-writer设置为false,用flink native write没有问题;
>Jingsong Li wrote
>> 是最新的代码吗?
>> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> 它是影响性能的,1.11.2已经投票通过,即将发布
>>
>> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang &lt;
>
>> kandy1203@
>
>> &gt; wrote:
>>
>>> @Jingsong Li
>>>
>>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>>    CatalogTable table = checkNotNull(context.getTable());
>>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>>
>>>    boolean isGeneric =
>>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>>
>>>    if (!isGeneric) {
>>> return new HiveTableSink(
>>>             context.getConfiguration().get(
>>>                   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>>> context.isBounded(),
>>>             new JobConf(hiveConf),
>>> context.getObjectIdentifier(),
>>> table);
>>> } else {
>>> return TableFactoryUtil.findAndCreateTableSink(context);
>>> }
>>> }
>>>
>>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>>>
>>> If it is false, using flink native writer to write parquet and orc files;
>>>
>>> If it is true, using hadoop mapred record writer to write parquet and orc
>>> files
>>>
>>> 将此参数调整成false后,同样的资源配置下,tps达到30W
>>>
>>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>>> 一些相关的参数 ?
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-09-17 11:21:43,"Jingsong Li" &lt;
>
>> jingsonglee0@
>
>> &gt; 写道:
>>> >Sink并行度
>>> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>>> >
>>> >HDFS性能
>>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>>> >
>>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang &lt;
>
>> kandy1203@
>
>> &gt; wrote:
>>> >
>>> >> 场景很简单,就是kafka2hive
>>> >> --5min入仓Hive
>>> >>
>>> >> INSERT INTO  hive.temp_.hive_5min
>>> >>
>>> >> SELECT
>>> >>
>>> >>  arg_service,
>>> >>
>>> >> time_local
>>> >>
>>> >> .....
>>> >>
>>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
>>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>>> >>
>>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
>>> '='kafka_hive_test',
>>> >> 'scan.startup.mode'='earliest-offset') */;
>>> >>
>>> >>
>>> >>
>>> >> --kafka source表定义
>>> >>
>>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>>> >>
>>> >> arg_service string COMMENT 'arg_service',
>>> >>
>>> >> ....
>>> >>
>>> >> )WITH (
>>> >>
>>> >>   'connector' = 'kafka',
>>> >>
>>> >>   'topic' = '...',
>>> >>
>>> >>   'properties.bootstrap.servers' = '...',
>>> >>
>>> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>>> >>
>>> >>   'scan.startup.mode' = 'group-offsets',
>>> >>
>>> >>   'format' = 'json',
>>> >>
>>> >>   'json.fail-on-missing-field' = 'false',
>>> >>
>>> >>   'json.ignore-parse-errors' = 'true'
>>> >>
>>> >> );
>>> >> --sink hive表定义
>>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>>> >> ....
>>> >> )
>>> >> PARTITIONED BY (dt string , hm string) stored as orc location
>>> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
>>> >>   'sink.partition-commit.trigger'='process-time',
>>> >>   'sink.partition-commit.delay'='0 min',
>>> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>>> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>>> >>   'sink.rolling-policy.check-interval' ='30s',
>>> >>   'sink.rolling-policy.rollover-interval'='10min',
>>> >>   'sink.rolling-policy.file-size'='128MB'
>>> >> );
>>> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>>> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>>> >> 就是flink sql可以
>>> >>
>>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>>> >> 这块,有没有什么可以提升性能相关的优化参数?
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> 在 2020-09-16 19:29:50,"Jingsong Li" &lt;
>
>> jingsonglee0@
>
>> &gt; 写道:
>>> >> >Hi,
>>> >> >
>>> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>>> >> >
>>> >> >另外,压测时是否可以看下jstack?
>>> >> >
>>> >> >Best,
>>> >> >Jingsong
>>> >> >
>>> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang &lt;
>
>> kandy1203@
>
>> &gt; wrote:
>>> >> >
>>> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>>> >> ,source
>>> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>>> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>>> >> >
>>> >> >
>>> >> >
>>> >> >--
>>> >> >Best, Jingsong Lee
>>> >>
>>> >
>>> >
>>> >--
>>> >Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>
>
>Jingsong Li wrote
>> 是最新的代码吗?
>> 1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>> 它是影响性能的,1.11.2已经投票通过,即将发布
>>
>> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang &lt;
>
>> kandy1203@
>
>> &gt; wrote:
>>
>>> @Jingsong Li
>>>
>>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>>    CatalogTable table = checkNotNull(context.getTable());
>>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>>
>>>    boolean isGeneric =
>>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>>
>>>    if (!isGeneric) {
>>> return new HiveTableSink(
>>>             context.getConfiguration().get(
>>>                   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>>> context.isBounded(),
>>>             new JobConf(hiveConf),
>>> context.getObjectIdentifier(),
>>> table);
>>> } else {
>>> return TableFactoryUtil.findAndCreateTableSink(context);
>>> }
>>> }
>>>
>>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>>>
>>> If it is false, using flink native writer to write parquet and orc files;
>>>
>>> If it is true, using hadoop mapred record writer to write parquet and orc
>>> files
>>>
>>> 将此参数调整成false后,同样的资源配置下,tps达到30W
>>>
>>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>>> 一些相关的参数 ?
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-09-17 11:21:43,"Jingsong Li" &lt;
>
>> jingsonglee0@
>
>> &gt; 写道:
>>> >Sink并行度
>>> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>>> >
>>> >HDFS性能
>>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>>> >
>>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang &lt;
>
>> kandy1203@
>
>> &gt; wrote:
>>> >
>>> >> 场景很简单,就是kafka2hive
>>> >> --5min入仓Hive
>>> >>
>>> >> INSERT INTO  hive.temp_.hive_5min
>>> >>
>>> >> SELECT
>>> >>
>>> >>  arg_service,
>>> >>
>>> >> time_local
>>> >>
>>> >> .....
>>> >>
>>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
>>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>>> >>
>>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
>>> '='kafka_hive_test',
>>> >> 'scan.startup.mode'='earliest-offset') */;
>>> >>
>>> >>
>>> >>
>>> >> --kafka source表定义
>>> >>
>>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>>> >>
>>> >> arg_service string COMMENT 'arg_service',
>>> >>
>>> >> ....
>>> >>
>>> >> )WITH (
>>> >>
>>> >>   'connector' = 'kafka',
>>> >>
>>> >>   'topic' = '...',
>>> >>
>>> >>   'properties.bootstrap.servers' = '...',
>>> >>
>>> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>>> >>
>>> >>   'scan.startup.mode' = 'group-offsets',
>>> >>
>>> >>   'format' = 'json',
>>> >>
>>> >>   'json.fail-on-missing-field' = 'false',
>>> >>
>>> >>   'json.ignore-parse-errors' = 'true'
>>> >>
>>> >> );
>>> >> --sink hive表定义
>>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>>> >> ....
>>> >> )
>>> >> PARTITIONED BY (dt string , hm string) stored as orc location
>>> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
>>> >>   'sink.partition-commit.trigger'='process-time',
>>> >>   'sink.partition-commit.delay'='0 min',
>>> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>>> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>>> >>   'sink.rolling-policy.check-interval' ='30s',
>>> >>   'sink.rolling-policy.rollover-interval'='10min',
>>> >>   'sink.rolling-policy.file-size'='128MB'
>>> >> );
>>> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>>> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>>> >> 就是flink sql可以
>>> >>
>>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>>> >> 这块,有没有什么可以提升性能相关的优化参数?
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> 在 2020-09-16 19:29:50,"Jingsong Li" &lt;
>
>> jingsonglee0@
>
>> &gt; 写道:
>>> >> >Hi,
>>> >> >
>>> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>>> >> >
>>> >> >另外,压测时是否可以看下jstack?
>>> >> >
>>> >> >Best,
>>> >> >Jingsong
>>> >> >
>>> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang &lt;
>
>> kandy1203@
>
>> &gt; wrote:
>>> >> >
>>> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>>> >> ,source
>>> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>>> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>>> >> >
>>> >> >
>>> >> >
>>> >> >--
>>> >> >Best, Jingsong Lee
>>> >>
>>> >
>>> >
>>> >--
>>> >Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/