遇到两个问题:
背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200 比如我的一个RichMapFunction在open中会加载存量数据。 因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够; 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上; 说简单点: 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响; 2、 对于一个算子如何干预使其分散到不同的taskmanager上; 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 |
关于第一个问题,最好细化一下【各种问题】是什么问题。
关于第二个问题,我印象中目前 Flink 不支持按并发(SubTask)级别指定调度的位置,绕过方案可以是设置每个 TM 仅持有一个 Slot。这方面我抄送 Xintong,或许他的工作能帮到你。 Best, tison. [hidden email] <[hidden email]> 于2020年5月25日周一 上午11:29写道: > 遇到两个问题: > 背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200 > 比如我的一个RichMapFunction在open中会加载存量数据。 > 因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存 > > 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够; > > > 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上; > > 说简单点: > > 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响; > > 2、 对于一个算子如何干预使其分散到不同的taskmanager上; > > > > > 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > > |
In reply to this post by xuefli@outlook.com
flink 1.10支持将slot在tm上平均分配。cluster.evenly-spread-out-slots: true
------------------ 原始邮件 ------------------ 发件人: "[hidden email]"<[hidden email]>; 发送时间: 2020年5月25日(星期一) 中午11:28 收件人: "user-zh"<[hidden email]>; 主题: RichMapFunction的问题 遇到两个问题: 背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200 比如我的一个RichMapFunction在open中会加载存量数据。 因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够; 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上; 说简单点: 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响; 2、 对于一个算子如何干预使其分散到不同的taskmanager上; 发送自 Windows 10 版邮件<" rel="noopener" target="_blank">https://go.microsoft.com/fwlink/?LinkId=550986&gt;应用 |
In reply to this post by tison
>> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够; -- 能粘贴下代码么 -- 还有提交的命令 >> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上; -- 什么模式提交的job(yarn session,yarn,还是stand alone模式) 在 2020-05-25 11:47:48,"tison" <[hidden email]> 写道: >关于第一个问题,最好细化一下【各种问题】是什么问题。 > >关于第二个问题,我印象中目前 Flink 不支持按并发(SubTask)级别指定调度的位置,绕过方案可以是设置每个 TM 仅持有一个 >Slot。这方面我抄送 Xintong,或许他的工作能帮到你。 > >Best, >tison. > > >[hidden email] <[hidden email]> 于2020年5月25日周一 上午11:29写道: > >> 遇到两个问题: >> 背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200 >> 比如我的一个RichMapFunction在open中会加载存量数据。 >> 因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存 >> >> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够; >> >> >> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上; >> >> 说简单点: >> >> 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响; >> >> 2、 对于一个算子如何干预使其分散到不同的taskmanager上; >> >> >> >> >> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 >> >> |
In reply to this post by tison
关于第一个问题,最好细化一下【各种问题】是什么问题。
1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够; 现象就是: 某个task动不动就fail,然后整个job尝试重新create-->running,第二个问题的解决影响第一个问题,处理性能受第一个问题的影响,这个task的并发度死活都上不去,导致整体的Job的数据处理速度上不去。 Heartbeat超时,这个已经调大。 #配置超时 akka.ask.timeout: 60 s 因我的sink都是使用checkpoint定期出发,保存到hdfs,但flink有声明只保证集群内部的state是支持fail-over。对于sink为hdfs并没有很好的fail-over后接着hdfs上状态为ingress的文件接着处理,导致数据不准确,hdfs上也有脏数据。 我的job从业务上非常在意结果数据的精确度,每次遇到fail的,需要查看task和job的log,调整并行度和优化算法。 存在第三个问题:定期触发的checkpoint超越window的限制,提前触发 3、我的job是 A数据―>key by(key1)-->window1(30 minutes)-->reduce1+sink(hdfs)-->key by(key2)-->window2 (60 minutes) -->reduce2-->sink(hdfs) Checkpoint采取5minutes定期触发,windows1还能基本遵循契约在每30分钟触发reduce1,但reduce2就不是了 基本上在windows1完成30分钟后,立刻触发,并且在reduce2上有看到输入数据量。 这个和window的初衷不吻合。 另外这种类似的两个阶段的级联带窗口聚合操作,对于官方的hdfs的sink类RolloverInterval、InactivityInterval 并不能独立遵守契约,受定期checkpoint的触发影响。 这个导致的问题是,hdfs的块大小是128M翻滚一个新的文件。如果要提高hdfs的block的有效存储负载,最好是加大window,但系统受不了经常task出现fail,并且处理时间会变长。如果变小处理时间会相对高效很多,但hdfs的存储的payload会变差 另外一些现象是资源先后抢占的问题 整个集群的slot的是足够的。相同的job运行参数不同,先后启动jobA和jobB,前后时隔不超过10秒,但jobA处理数据的速度几乎是jobB的10倍。前者一分钟是1千万,后者是1百万 总的是flink要把介入点和观察点(source、sink、checkpoint、job finish/stop)开放给业务逻辑去干预 简单来说数据是无限的没有边界,但业务是有边界的,业务的边界通过开放接入点和观察点让业务处理。 上帝的归上帝、凯撒的归凯撒 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 发件人: tison<mailto:[hidden email]> 发送时间: 2020年5月25日 11:48 收件人: user-zh<mailto:[hidden email]> 抄送: Xintong Song<mailto:[hidden email]> 主题: Re: RichMapFunction的问题 关于第一个问题,最好细化一下【各种问题】是什么问题。 关于第二个问题,我印象中目前 Flink 不支持按并发(SubTask)级别指定调度的位置,绕过方案可以是设置每个 TM 仅持有一个 Slot。这方面我抄送 Xintong,或许他的工作能帮到你。 Best, tison. [hidden email] <[hidden email]> 于2020年5月25日周一 上午11:29写道: > 遇到两个问题: > 背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200 > 比如我的一个RichMapFunction在open中会加载存量数据。 > 因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存 > > 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够; > > > 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上; > > 说简单点: > > 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响; > > 2、 对于一个算子如何干预使其分散到不同的taskmanager上; > > > > > 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 > > |
In reply to this post by guanyq
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 发件人: guanyq<mailto:[hidden email]> 发送时间: 2020年5月25日 13:51 收件人: [hidden email]<mailto:[hidden email]> 主题: Re:Re: RichMapFunction的问题 >> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够; -- 能粘贴下代码么 ``` //xxxSink 为StreamingFileSink.withMaxPartSize.withMaxPartSize(128M).withInactivityInterval(1 minutes). withRolloverInterval(30minutes) //并发度4死活挑上不去 xxxSteam.map(item -> item.record()).setParallelism(4).startNewChain().addSink(xxxSink).setParallelism(4).name(xxxx + "-Sink"); XXXRichMapFunction.java //pre-load dimension policy ``` open() { //全量通过全部分布式文件加载多个维度数据 //getRuntimeContext().getDistributedCache() //数据量不大,200W,单条记录的字段数量和数据类型,其实也很简单 } ``` ``` -- 还有提交的命令 ``` nohup ./bin/flink run -p 200 xxx1.0.0.jar ``` >> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上; -- 什么模式提交的job(yarn session,yarn,还是stand alone模式) ``` 集群是stand alone 还未尝试 蒋佳成(Jiacheng Jiang) 说的 flink 1.10支持将slot在tm上平均分配。cluster.evenly-spread-out-slots: true flink 1.10缺省是分配在同一个task,这样效率高,避免了跨网络的数据交换,提高速度处理效率 ``` 在 2020-05-25 11:47:48,"tison" <[hidden email]> 写道: >关于第一个问题,最好细化一下【各种问题】是什么问题。 > >关于第二个问题,我印象中目前 Flink 不支持按并发(SubTask)级别指定调度的位置,绕过方案可以是设置每个 TM 仅持有一个 >Slot。这方面我抄送 Xintong,或许他的工作能帮到你。 > >Best, >tison. > > >[hidden email] <[hidden email]> 于2020年5月25日周一 上午11:29写道: > >> 遇到两个问题: >> 背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200 >> 比如我的一个RichMapFunction在open中会加载存量数据。 >> 因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存 >> >> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够; >> >> >> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上; >> >> 说简单点: >> >> 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响; >> >> 2、 对于一个算子如何干预使其分散到不同的taskmanager上; >> >> >> >> >> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 >> >> |
In reply to this post by tison
请加个问题
1.消费kafka时,是如何实时查看kafka topic的挤压量的? |
一般是kafka自带的查看消费组的命令工具可以看
./kafka-consumer-groups.sh --describe --group test-consumer-group --bootstrap-server | | Zhonghan Tang | | [hidden email] | 签名由网易邮箱大师定制 在2020年06月3日 14:10,guanyq<[hidden email]> 写道: 请加个问题 1.消费kafka时,是如何实时查看kafka topic的挤压量的? |
Hi:
可以考虑用prometheus采集kafka的metrics,在grafana上展示 ------------------ 原始邮件 ------------------ 发件人: "Zhonghan Tang"<[hidden email]>; 发送时间: 2020年6月3日(星期三) 下午2:29 收件人: "user-zh"<[hidden email]>; 抄送: "user-zh"<[hidden email]>; 主题: 回复:flink1.9,如何实时查看kafka消费的挤压量 一般是kafka自带的查看消费组的命令工具可以看 ./kafka-consumer-groups.sh --describe --group test-consumer-group --bootstrap-server | | Zhonghan Tang | | [hidden email] | 签名由网易邮箱大师定制 在2020年06月3日 14:10,guanyq<[hidden email]> 写道: 请加个问题 1.消费kafka时,是如何实时查看kafka topic的挤压量的? |
kafka挤压量的metrics的demo有么,或者参考资料
在 2020-06-03 14:31:56,"1530130567" <[hidden email]> 写道: >Hi: > 可以考虑用prometheus采集kafka的metrics,在grafana上展示 > > > > >------------------ 原始邮件 ------------------ >发件人: "Zhonghan Tang"<[hidden email]>; >发送时间: 2020年6月3日(星期三) 下午2:29 >收件人: "user-zh"<[hidden email]>; >抄送: "user-zh"<[hidden email]>; >主题: 回复:flink1.9,如何实时查看kafka消费的挤压量 > > > >一般是kafka自带的查看消费组的命令工具可以看 >./kafka-consumer-groups.sh --describe --group test-consumer-group --bootstrap-server > > >| | >Zhonghan Tang >| >| >[hidden email] >| >签名由网易邮箱大师定制 > > >在2020年06月3日 14:10,guanyq<[hidden email]> 写道: >请加个问题 > >1.消费kafka时,是如何实时查看kafka topic的挤压量的? |
找到了,原生就有的committedOffsets-currentOffsets https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter Connectors Kafka Connectors | Scope | Metrics | User Variables | Description | Type | | Operator | commitsSucceeded | n/a | The total number of successful offset commits to Kafka, if offset committing is turned on and checkpointing is enabled. | Counter | | Operator | commitsFailed | n/a | The total number of offset commit failures to Kafka, if offset committing is turned on and checkpointing is enabled. Note that committing offsets back to Kafka is only a means to expose consumer progress, so a commit failure does not affect the integrity of Flink's checkpointed partition offsets. | Counter | | Operator | committedOffsets | topic, partition | The last successfully committed offsets to Kafka, for each partition. A particular partition's metric can be specified by topic name and partition id. | Gauge | | Operator | currentOffsets | topic, partition | The consumer's current read offset, for each partition. A particular partition's metric can be specified by topic name and partition id. | Gauge | 在 2020-06-03 15:02:24,"guanyq" <[hidden email]> 写道: >kafka挤压量的metrics的demo有么,或者参考资料 > > > > > > > > >在 2020-06-03 14:31:56,"1530130567" <[hidden email]> 写道: >>Hi: >> 可以考虑用prometheus采集kafka的metrics,在grafana上展示 >> >> >> >> >>------------------ 原始邮件 ------------------ >>发件人: "Zhonghan Tang"<[hidden email]>; >>发送时间: 2020年6月3日(星期三) 下午2:29 >>收件人: "user-zh"<[hidden email]>; >>抄送: "user-zh"<[hidden email]>; >>主题: 回复:flink1.9,如何实时查看kafka消费的挤压量 >> >> >> >>一般是kafka自带的查看消费组的命令工具可以看 >>./kafka-consumer-groups.sh --describe --group test-consumer-group --bootstrap-server >> >> >>| | >>Zhonghan Tang >>| >>| >>[hidden email] >>| >>签名由网易邮箱大师定制 >> >> >>在2020年06月3日 14:10,guanyq<[hidden email]> 写道: >>请加个问题 >> >>1.消费kafka时,是如何实时查看kafka topic的挤压量的? |
或者可以通过 Kafka-Manager 来查看
guanyq <[hidden email]> 于2020年6月3日周三 下午4:45写道: > > > > 找到了,原生就有的committedOffsets-currentOffsets > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter > Connectors > Kafka Connectors > | Scope | Metrics | User Variables | Description | Type | > | Operator | commitsSucceeded | n/a | The total number of successful > offset commits to Kafka, if offset committing is turned on and > checkpointing is enabled. | Counter | > | Operator | commitsFailed | n/a | The total number of offset commit > failures to Kafka, if offset committing is turned on and checkpointing is > enabled. Note that committing offsets back to Kafka is only a means to > expose consumer progress, so a commit failure does not affect the integrity > of Flink's checkpointed partition offsets. | Counter | > | Operator | committedOffsets | topic, partition | The last successfully > committed offsets to Kafka, for each partition. A particular partition's > metric can be specified by topic name and partition id. | Gauge | > | Operator | currentOffsets | topic, partition | The consumer's current > read offset, for each partition. A particular partition's metric can be > specified by topic name and partition id. | Gauge | > > > > > > > > > > > > 在 2020-06-03 15:02:24,"guanyq" <[hidden email]> 写道: > >kafka挤压量的metrics的demo有么,或者参考资料 > > > > > > > > > > > > > > > > > >在 2020-06-03 14:31:56,"1530130567" <[hidden email]> 写道: > >>Hi: > >> 可以考虑用prometheus采集kafka的metrics,在grafana上展示 > >> > >> > >> > >> > >>------------------ 原始邮件 ------------------ > >>发件人: "Zhonghan Tang"<[hidden email]>; > >>发送时间: 2020年6月3日(星期三) 下午2:29 > >>收件人: "user-zh"<[hidden email]>; > >>抄送: "user-zh"<[hidden email]>; > >>主题: 回复:flink1.9,如何实时查看kafka消费的挤压量 > >> > >> > >> > >>一般是kafka自带的查看消费组的命令工具可以看 > >>./kafka-consumer-groups.sh --describe --group test-consumer-group > --bootstrap-server > >> > >> > >>| | > >>Zhonghan Tang > >>| > >>| > >>[hidden email] > >>| > >>签名由网易邮箱大师定制 > >> > >> > >>在2020年06月3日 14:10,guanyq<[hidden email]> 写道: > >>请加个问题 > >> > >>1.消费kafka时,是如何实时查看kafka topic的挤压量的? > |
In reply to this post by guanyq
|
请教下大佬们,想知道flink1.9.0版本对应pushgateway和prometheus的版本号分别都是多少。
|
附件图片,job已经跑起来了,但是flink的ui页面卡住了,一直出不来。 又遇到过这个问题的么。是什么原因。
|
Hi 可以看下集群资源是否充足 ------------------ 原始邮件 ------------------ 发件人: "guanyq"<[hidden email]>; 发送时间: 2020年6月8日(星期一) 下午2:54 收件人: "user-zh"<[hidden email]>; 主题: flink1.9 提交job到yarn后 flink的ui页面出来的问题 附件图片,job已经跑起来了,但是flink的ui页面卡住了,一直出不来。 又遇到过这个问题的么。是什么原因。
|
看一下Application的attempt页面是不是也无法显示,如果Flink的JobManager
向Yarn注册成功的话,Tracking URL应该会自动更新为proxy的地址的 Best, Yang zhiyezou <[hidden email]> 于2020年6月8日周一 下午3:25写道: > Hi > 可以看下集群资源是否充足 > > > ------------------ 原始邮件 ------------------ > *发件人:* "guanyq"<[hidden email]>; > *发送时间:* 2020年6月8日(星期一) 下午2:54 > *收件人:* "user-zh"<[hidden email]>; > *主题:* flink1.9 提交job到yarn后 flink的ui页面出来的问题 > > > 附件图片,job已经跑起来了,但是flink的ui页面卡住了,一直出不来。 > 又遇到过这个问题的么。是什么原因。 > > > > |
In reply to this post by guanyq
kafka 0.11版本
首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题 1.本地idea debug运行,无中文乱码问题 2.服务器Standalone模式运行,无中文乱码问题 3.服务器on yarn提交方式,就出现中文乱码问题 flink 消费kafka的api用的是这个 new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props); 根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。 |
我们也遇到过这个问题,我们当时遇到的问题是YARN NM上的默认charset是ascii。 通过在flink-conf.yaml文件里添加如下配置解决了该问题: env.java.opts.taskmanager: "-Dfile.encoding=UTF-8" 在 2020-06-08 21:48:33,"guanyq" <[hidden email]> 写道: >kafka 0.11版本 >首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题 >1.本地idea debug运行,无中文乱码问题 >2.服务器Standalone模式运行,无中文乱码问题 >3.服务器on yarn提交方式,就出现中文乱码问题 > > >flink 消费kafka的api用的是这个 >new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props); > > >根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。 |
非常感谢,问题解决了!
在 2020-06-09 08:27:47,"马阳阳" <[hidden email]> 写道: > > > >我们也遇到过这个问题,我们当时遇到的问题是YARN NM上的默认charset是ascii。 >通过在flink-conf.yaml文件里添加如下配置解决了该问题: >env.java.opts.taskmanager: "-Dfile.encoding=UTF-8" > > > > > > > > > > > > > > >在 2020-06-08 21:48:33,"guanyq" <[hidden email]> 写道: >>kafka 0.11版本 >>首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题 >>1.本地idea debug运行,无中文乱码问题 >>2.服务器Standalone模式运行,无中文乱码问题 >>3.服务器on yarn提交方式,就出现中文乱码问题 >> >> >>flink 消费kafka的api用的是这个 >>new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props); >> >> >>根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。 |
Free forum by Nabble | Edit this page |