RichMapFunction的问题

classic Classic list List threaded Threaded
37 messages Options
12
Reply | Threaded
Open this post in threaded view
|

RichMapFunction的问题

xuefli@outlook.com
遇到两个问题:
  背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200
  比如我的一个RichMapFunction在open中会加载存量数据。
  因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存

1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;

2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;

说简单点:

1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响;

2、 对于一个算子如何干预使其分散到不同的taskmanager上;




发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

Reply | Threaded
Open this post in threaded view
|

Re: RichMapFunction的问题

tison
关于第一个问题,最好细化一下【各种问题】是什么问题。

关于第二个问题,我印象中目前 Flink 不支持按并发(SubTask)级别指定调度的位置,绕过方案可以是设置每个 TM 仅持有一个
Slot。这方面我抄送 Xintong,或许他的工作能帮到你。

Best,
tison.


[hidden email] <[hidden email]> 于2020年5月25日周一 上午11:29写道:

> 遇到两个问题:
>   背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200
>   比如我的一个RichMapFunction在open中会加载存量数据。
>   因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存
>
> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;
>
>
> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;
>
> 说简单点:
>
> 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响;
>
> 2、 对于一个算子如何干预使其分散到不同的taskmanager上;
>
>
>
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>
Reply | Threaded
Open this post in threaded view
|

回复:RichMapFunction的问题

蒋佳成(Jiacheng Jiang)
In reply to this post by xuefli@outlook.com
flink 1.10支持将slot在tm上平均分配。cluster.evenly-spread-out-slots: true






------------------&nbsp;原始邮件&nbsp;------------------
发件人: "[hidden email]"<[hidden email]&gt;;
发送时间: 2020年5月25日(星期一) 中午11:28
收件人: "user-zh"<[hidden email]&gt;;
主题: RichMapFunction的问题



遇到两个问题:
&nbsp; 背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200
&nbsp; 比如我的一个RichMapFunction在open中会加载存量数据。
&nbsp; 因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存

1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;

2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;

说简单点:

1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响;

2、 对于一个算子如何干预使其分散到不同的taskmanager上;




发送自 Windows 10 版邮件<" rel="noopener" target="_blank">https://go.microsoft.com/fwlink/?LinkId=550986&amp;gt;应用
Reply | Threaded
Open this post in threaded view
|

Re:Re: RichMapFunction的问题

guanyq
In reply to this post by tison



>> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;

-- 能粘贴下代码么
-- 还有提交的命令
>> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;

-- 什么模式提交的job(yarn session,yarn,还是stand alone模式)


在 2020-05-25 11:47:48,"tison" <[hidden email]> 写道:

>关于第一个问题,最好细化一下【各种问题】是什么问题。
>
>关于第二个问题,我印象中目前 Flink 不支持按并发(SubTask)级别指定调度的位置,绕过方案可以是设置每个 TM 仅持有一个
>Slot。这方面我抄送 Xintong,或许他的工作能帮到你。
>
>Best,
>tison.
>
>
>[hidden email] <[hidden email]> 于2020年5月25日周一 上午11:29写道:
>
>> 遇到两个问题:
>>   背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200
>>   比如我的一个RichMapFunction在open中会加载存量数据。
>>   因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存
>>
>> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;
>>
>>
>> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;
>>
>> 说简单点:
>>
>> 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响;
>>
>> 2、 对于一个算子如何干预使其分散到不同的taskmanager上;
>>
>>
>>
>>
>> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>>
>>
Reply | Threaded
Open this post in threaded view
|

回复: RichMapFunction的问题

xuefli@outlook.com
In reply to this post by tison
关于第一个问题,最好细化一下【各种问题】是什么问题。
1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;

现象就是: 某个task动不动就fail,然后整个job尝试重新create-->running,第二个问题的解决影响第一个问题,处理性能受第一个问题的影响,这个task的并发度死活都上不去,导致整体的Job的数据处理速度上不去。
Heartbeat超时,这个已经调大。

#配置超时

akka.ask.timeout: 60 s

因我的sink都是使用checkpoint定期出发,保存到hdfs,但flink有声明只保证集群内部的state是支持fail-over。对于sink为hdfs并没有很好的fail-over后接着hdfs上状态为ingress的文件接着处理,导致数据不准确,hdfs上也有脏数据。
我的job从业务上非常在意结果数据的精确度,每次遇到fail的,需要查看task和job的log,调整并行度和优化算法。

存在第三个问题:定期触发的checkpoint超越window的限制,提前触发
3、我的job是
A数据―>key by(key1)-->window1(30 minutes)-->reduce1+sink(hdfs)-->key by(key2)-->window2
(60 minutes) -->reduce2-->sink(hdfs)
Checkpoint采取5minutes定期触发,windows1还能基本遵循契约在每30分钟触发reduce1,但reduce2就不是了
基本上在windows1完成30分钟后,立刻触发,并且在reduce2上有看到输入数据量。
这个和window的初衷不吻合。


另外这种类似的两个阶段的级联带窗口聚合操作,对于官方的hdfs的sink类RolloverInterval、InactivityInterval

并不能独立遵守契约,受定期checkpoint的触发影响。

这个导致的问题是,hdfs的块大小是128M翻滚一个新的文件。如果要提高hdfs的block的有效存储负载,最好是加大window,但系统受不了经常task出现fail,并且处理时间会变长。如果变小处理时间会相对高效很多,但hdfs的存储的payload会变差

另外一些现象是资源先后抢占的问题
整个集群的slot的是足够的。相同的job运行参数不同,先后启动jobA和jobB,前后时隔不超过10秒,但jobA处理数据的速度几乎是jobB的10倍。前者一分钟是1千万,后者是1百万

总的是flink要把介入点和观察点(source、sink、checkpoint、job finish/stop)开放给业务逻辑去干预
简单来说数据是无限的没有边界,但业务是有边界的,业务的边界通过开放接入点和观察点让业务处理。
上帝的归上帝、凯撒的归凯撒

发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

发件人: tison<mailto:[hidden email]>
发送时间: 2020年5月25日 11:48
收件人: user-zh<mailto:[hidden email]>
抄送: Xintong Song<mailto:[hidden email]>
主题: Re: RichMapFunction的问题

关于第一个问题,最好细化一下【各种问题】是什么问题。

关于第二个问题,我印象中目前 Flink 不支持按并发(SubTask)级别指定调度的位置,绕过方案可以是设置每个 TM 仅持有一个
Slot。这方面我抄送 Xintong,或许他的工作能帮到你。

Best,
tison.


[hidden email] <[hidden email]> 于2020年5月25日周一 上午11:29写道:

> 遇到两个问题:
>   背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200
>   比如我的一个RichMapFunction在open中会加载存量数据。
>   因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存
>
> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;
>
>
> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;
>
> 说简单点:
>
> 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响;
>
> 2、 对于一个算子如何干预使其分散到不同的taskmanager上;
>
>
>
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>

Reply | Threaded
Open this post in threaded view
|

回复: Re:Re: RichMapFunction的问题

xuefli@outlook.com
In reply to this post by guanyq


发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

发件人: guanyq<mailto:[hidden email]>
发送时间: 2020年5月25日 13:51
收件人: [hidden email]<mailto:[hidden email]>
主题: Re:Re: RichMapFunction的问题




>> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;

-- 能粘贴下代码么
```

//xxxSink 为StreamingFileSink.withMaxPartSize.withMaxPartSize(128M).withInactivityInterval(1 minutes). withRolloverInterval(30minutes)



//并发度4死活挑上不去

xxxSteam.map(item -> item.record()).setParallelism(4).startNewChain().addSink(xxxSink).setParallelism(4).name(xxxx + "-Sink");

XXXRichMapFunction.java  //pre-load dimension policy
```
open() {
   //全量通过全部分布式文件加载多个维度数据

  //getRuntimeContext().getDistributedCache()

 //数据量不大,200W,单条记录的字段数量和数据类型,其实也很简单
}
```
```
-- 还有提交的命令
```

nohup ./bin/flink run -p 200 xxx1.0.0.jar
```
>> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;

-- 什么模式提交的job(yarn session,yarn,还是stand alone模式)
```
集群是stand alone
还未尝试 蒋佳成(Jiacheng Jiang) 说的
flink 1.10支持将slot在tm上平均分配。cluster.evenly-spread-out-slots: true
flink 1.10缺省是分配在同一个task,这样效率高,避免了跨网络的数据交换,提高速度处理效率
```


在 2020-05-25 11:47:48,"tison" <[hidden email]> 写道:

>关于第一个问题,最好细化一下【各种问题】是什么问题。
>
>关于第二个问题,我印象中目前 Flink 不支持按并发(SubTask)级别指定调度的位置,绕过方案可以是设置每个 TM 仅持有一个
>Slot。这方面我抄送 Xintong,或许他的工作能帮到你。
>
>Best,
>tison.
>
>
>[hidden email] <[hidden email]> 于2020年5月25日周一 上午11:29写道:
>
>> 遇到两个问题:
>>   背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200
>>   比如我的一个RichMapFunction在open中会加载存量数据。
>>   因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存
>>
>> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;
>>
>>
>> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;
>>
>> 说简单点:
>>
>> 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响;
>>
>> 2、 对于一个算子如何干预使其分散到不同的taskmanager上;
>>
>>
>>
>>
>> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>>
>>

Reply | Threaded
Open this post in threaded view
|

flink1.9,如何实时查看kafka消费的挤压量

guanyq
In reply to this post by tison
请加个问题

1.消费kafka时,是如何实时查看kafka topic的挤压量的?
Reply | Threaded
Open this post in threaded view
|

回复:flink1.9,如何实时查看kafka消费的挤压量

13122260573@163.com
一般是kafka自带的查看消费组的命令工具可以看
./kafka-consumer-groups.sh --describe --group test-consumer-group  --bootstrap-server


| |
Zhonghan Tang
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2020年06月3日 14:10,guanyq<[hidden email]> 写道:
请加个问题

1.消费kafka时,是如何实时查看kafka topic的挤压量的?
Reply | Threaded
Open this post in threaded view
|

回复:flink1.9,如何实时查看kafka消费的挤压量

zhiyezou
Hi:
&nbsp; &nbsp; 可以考虑用prometheus采集kafka的metrics,在grafana上展示




------------------&nbsp;原始邮件&nbsp;------------------
发件人: "Zhonghan Tang"<[hidden email]&gt;;
发送时间: 2020年6月3日(星期三) 下午2:29
收件人: "user-zh"<[hidden email]&gt;;
抄送: "user-zh"<[hidden email]&gt;;
主题: 回复:flink1.9,如何实时查看kafka消费的挤压量



一般是kafka自带的查看消费组的命令工具可以看
./kafka-consumer-groups.sh --describe --group test-consumer-group&nbsp; --bootstrap-server


| |
Zhonghan Tang
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2020年06月3日 14:10,guanyq<[hidden email]&gt; 写道:
请加个问题

1.消费kafka时,是如何实时查看kafka topic的挤压量的?
Reply | Threaded
Open this post in threaded view
|

Re:回复:flink1.9,如何实时查看kafka消费的挤压量

guanyq
kafka挤压量的metrics的demo有么,或者参考资料








在 2020-06-03 14:31:56,"1530130567" <[hidden email]> 写道:

>Hi:
>&nbsp; &nbsp; 可以考虑用prometheus采集kafka的metrics,在grafana上展示
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人: "Zhonghan Tang"<[hidden email]&gt;;
>发送时间: 2020年6月3日(星期三) 下午2:29
>收件人: "user-zh"<[hidden email]&gt;;
>抄送: "user-zh"<[hidden email]&gt;;
>主题: 回复:flink1.9,如何实时查看kafka消费的挤压量
>
>
>
>一般是kafka自带的查看消费组的命令工具可以看
>./kafka-consumer-groups.sh --describe --group test-consumer-group&nbsp; --bootstrap-server
>
>
>| |
>Zhonghan Tang
>|
>|
>[hidden email]
>|
>签名由网易邮箱大师定制
>
>
>在2020年06月3日 14:10,guanyq<[hidden email]&gt; 写道:
>请加个问题
>
>1.消费kafka时,是如何实时查看kafka topic的挤压量的?
Reply | Threaded
Open this post in threaded view
|

Re:Re:回复:flink1.9,如何实时查看kafka消费的挤压量

guanyq



找到了,原生就有的committedOffsets-currentOffsets
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter
Connectors
Kafka Connectors
| Scope | Metrics | User Variables | Description | Type |
| Operator | commitsSucceeded | n/a | The total number of successful offset commits to Kafka, if offset committing is turned on and checkpointing is enabled. | Counter |
| Operator | commitsFailed | n/a | The total number of offset commit failures to Kafka, if offset committing is turned on and checkpointing is enabled. Note that committing offsets back to Kafka is only a means to expose consumer progress, so a commit failure does not affect the integrity of Flink's checkpointed partition offsets. | Counter |
| Operator | committedOffsets | topic, partition | The last successfully committed offsets to Kafka, for each partition. A particular partition's metric can be specified by topic name and partition id. | Gauge |
| Operator | currentOffsets | topic, partition | The consumer's current read offset, for each partition. A particular partition's metric can be specified by topic name and partition id. | Gauge |











在 2020-06-03 15:02:24,"guanyq" <[hidden email]> 写道:

>kafka挤压量的metrics的demo有么,或者参考资料
>
>
>
>
>
>
>
>
>在 2020-06-03 14:31:56,"1530130567" <[hidden email]> 写道:
>>Hi:
>>&nbsp; &nbsp; 可以考虑用prometheus采集kafka的metrics,在grafana上展示
>>
>>
>>
>>
>>------------------&nbsp;原始邮件&nbsp;------------------
>>发件人: "Zhonghan Tang"<[hidden email]&gt;;
>>发送时间: 2020年6月3日(星期三) 下午2:29
>>收件人: "user-zh"<[hidden email]&gt;;
>>抄送: "user-zh"<[hidden email]&gt;;
>>主题: 回复:flink1.9,如何实时查看kafka消费的挤压量
>>
>>
>>
>>一般是kafka自带的查看消费组的命令工具可以看
>>./kafka-consumer-groups.sh --describe --group test-consumer-group&nbsp; --bootstrap-server
>>
>>
>>| |
>>Zhonghan Tang
>>|
>>|
>>[hidden email]
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2020年06月3日 14:10,guanyq<[hidden email]&gt; 写道:
>>请加个问题
>>
>>1.消费kafka时,是如何实时查看kafka topic的挤压量的?
Reply | Threaded
Open this post in threaded view
|

Re: Re:回复:flink1.9,如何实时查看kafka消费的挤压量

LakeShen
或者可以通过 Kafka-Manager 来查看

guanyq <[hidden email]> 于2020年6月3日周三 下午4:45写道:

>
>
>
> 找到了,原生就有的committedOffsets-currentOffsets
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter
> Connectors
> Kafka Connectors
> | Scope | Metrics | User Variables | Description | Type |
> | Operator | commitsSucceeded | n/a | The total number of successful
> offset commits to Kafka, if offset committing is turned on and
> checkpointing is enabled. | Counter |
> | Operator | commitsFailed | n/a | The total number of offset commit
> failures to Kafka, if offset committing is turned on and checkpointing is
> enabled. Note that committing offsets back to Kafka is only a means to
> expose consumer progress, so a commit failure does not affect the integrity
> of Flink's checkpointed partition offsets. | Counter |
> | Operator | committedOffsets | topic, partition | The last successfully
> committed offsets to Kafka, for each partition. A particular partition's
> metric can be specified by topic name and partition id. | Gauge |
> | Operator | currentOffsets | topic, partition | The consumer's current
> read offset, for each partition. A particular partition's metric can be
> specified by topic name and partition id. | Gauge |
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-03 15:02:24,"guanyq" <[hidden email]> 写道:
> >kafka挤压量的metrics的demo有么,或者参考资料
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-06-03 14:31:56,"1530130567" <[hidden email]> 写道:
> >>Hi:
> >>&nbsp; &nbsp; 可以考虑用prometheus采集kafka的metrics,在grafana上展示
> >>
> >>
> >>
> >>
> >>------------------&nbsp;原始邮件&nbsp;------------------
> >>发件人: "Zhonghan Tang"<[hidden email]&gt;;
> >>发送时间: 2020年6月3日(星期三) 下午2:29
> >>收件人: "user-zh"<[hidden email]&gt;;
> >>抄送: "user-zh"<[hidden email]&gt;;
> >>主题: 回复:flink1.9,如何实时查看kafka消费的挤压量
> >>
> >>
> >>
> >>一般是kafka自带的查看消费组的命令工具可以看
> >>./kafka-consumer-groups.sh --describe --group test-consumer-group&nbsp;
> --bootstrap-server
> >>
> >>
> >>| |
> >>Zhonghan Tang
> >>|
> >>|
> >>[hidden email]
> >>|
> >>签名由网易邮箱大师定制
> >>
> >>
> >>在2020年06月3日 14:10,guanyq<[hidden email]&gt; 写道:
> >>请加个问题
> >>
> >>1.消费kafka时,是如何实时查看kafka topic的挤压量的?
>
Reply | Threaded
Open this post in threaded view
|

flink1.9 stream job的异常日志

guanyq
In reply to this post by guanyq

附件是错误日志

我感觉看到错误日志之后,没有什么调查方向,应该怎么调查呢。



 


err.log (16K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

flink1.9集成pushgateway和prometheus版本问题

guanyq
请教下大佬们,想知道flink1.9.0版本对应pushgateway和prometheus的版本号分别都是多少。
Reply | Threaded
Open this post in threaded view
|

flink1.9 提交job到yarn后 flink的ui页面出来的问题

guanyq

附件图片,job已经跑起来了,但是flink的ui页面卡住了,一直出不来。
又遇到过这个问题的么。是什么原因。


 

Reply | Threaded
Open this post in threaded view
|

回复:flink1.9 提交job到yarn后 flink的ui页面出来的问题

zhiyezou
Hi
可以看下集群资源是否充足


------------------ 原始邮件 ------------------
发件人: "guanyq"<[hidden email]>;
发送时间: 2020年6月8日(星期一) 下午2:54
收件人: "user-zh"<[hidden email]>;
主题: flink1.9 提交job到yarn后 flink的ui页面出来的问题


附件图片,job已经跑起来了,但是flink的ui页面卡住了,一直出不来。
又遇到过这个问题的么。是什么原因。


 

Reply | Threaded
Open this post in threaded view
|

Re: flink1.9 提交job到yarn后 flink的ui页面出来的问题

Yang Wang
看一下Application的attempt页面是不是也无法显示,如果Flink的JobManager
向Yarn注册成功的话,Tracking URL应该会自动更新为proxy的地址的


Best,
Yang

zhiyezou <[hidden email]> 于2020年6月8日周一 下午3:25写道:

> Hi
> 可以看下集群资源是否充足
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "guanyq"<[hidden email]>;
> *发送时间:* 2020年6月8日(星期一) 下午2:54
> *收件人:* "user-zh"<[hidden email]>;
> *主题:* flink1.9 提交job到yarn后 flink的ui页面出来的问题
>
>
> 附件图片,job已经跑起来了,但是flink的ui页面卡住了,一直出不来。
> 又遇到过这个问题的么。是什么原因。
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

flink1.9 on yarn 消费kafka数据中文乱码

guanyq
In reply to this post by guanyq
kafka 0.11版本
首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题
1.本地idea debug运行,无中文乱码问题
2.服务器Standalone模式运行,无中文乱码问题
3.服务器on yarn提交方式,就出现中文乱码问题


flink 消费kafka的api用的是这个
new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props);


根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。
Reply | Threaded
Open this post in threaded view
|

Re:flink1.9 on yarn 消费kafka数据中文乱码

马阳阳



我们也遇到过这个问题,我们当时遇到的问题是YARN NM上的默认charset是ascii。
通过在flink-conf.yaml文件里添加如下配置解决了该问题:
env.java.opts.taskmanager: "-Dfile.encoding=UTF-8"














在 2020-06-08 21:48:33,"guanyq" <[hidden email]> 写道:

>kafka 0.11版本
>首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题
>1.本地idea debug运行,无中文乱码问题
>2.服务器Standalone模式运行,无中文乱码问题
>3.服务器on yarn提交方式,就出现中文乱码问题
>
>
>flink 消费kafka的api用的是这个
>new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props);
>
>
>根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。
Reply | Threaded
Open this post in threaded view
|

Re:Re:flink1.9 on yarn 消费kafka数据中文乱码

guanyq
非常感谢,问题解决了!

















在 2020-06-09 08:27:47,"马阳阳" <[hidden email]> 写道:

>
>
>
>我们也遇到过这个问题,我们当时遇到的问题是YARN NM上的默认charset是ascii。
>通过在flink-conf.yaml文件里添加如下配置解决了该问题:
>env.java.opts.taskmanager: "-Dfile.encoding=UTF-8"
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-08 21:48:33,"guanyq" <[hidden email]> 写道:
>>kafka 0.11版本
>>首先kafka source topic数据是正常的,kafka客户端消费出来无中文乱码问题
>>1.本地idea debug运行,无中文乱码问题
>>2.服务器Standalone模式运行,无中文乱码问题
>>3.服务器on yarn提交方式,就出现中文乱码问题
>>
>>
>>flink 消费kafka的api用的是这个
>>new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props);
>>
>>
>>根据1,2,3分析问题可能和yarn有关系。请教一下大佬们,还需要怎么调查,才能解决这个问题。
12