Hi,all:
flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例? 我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例? 希望有朋友能解释下算子在job运行中初始化的过程。 测试相关代码如下: // flink 1.10.2版本,并行度为3 @Slf4j public class PersonFlatMap extends RichFlatMapFunction<Tuple2<String, String>, Person> { private transient ValueState<Integer> state; public PersonFlatMap(){ log.info(String.format("PersonFlatMap【%s】: 创建实例",this.toString())); } @Override public void open(Configuration parameters) throws IOException { //略去无关代码... log.info(String.format("PersonFlatMap【%s】:初始化状态!", this.toString())); } @Override public void flatMap(Tuple2<String, String> t, Collector<Person> collector) throws Exception { Person p = JSONUtil.toObject(t.f1,Person.class); collector.collect(p); if(state.value() == null){state.update(0);} state.update(state.value() + 1); log.info("state: "+state.value()); } } //测试日志输出 ... flink-10.2 - [2020-11-16 13:41:54.360] - INFO [main] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例 //此处略去无关日志... flink-10.2 - [2020-11-16 13:42:00.326] - INFO [Flat Map -> Sink: Print to Std. Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory. flink-10.2 - [2020-11-16 13:42:00.351] - INFO [Flat Map -> Sink: Print to Std. Out (1/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态! flink-10.2 - [2020-11-16 13:42:00.354] - INFO [Flat Map -> Sink: Print to Std. Out (3/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态! flink-10.2 - [2020-11-16 13:42:00.356] - INFO [Flat Map -> Sink: Print to Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态! ... [hidden email] |
可以这么认为,大体上你可以认为每个并发有自己的环境。
技术上,算子对象是每个并发会实例化一个,而 static 变量的【共享】程度跟你设置的 slot per TM 值还有其他一些调度相关的参数有关,但是最好不要依赖这种实现层面的东西。 一种常见的误解是我创建一个 static HashMap 就神奇地拥有了全局的键值存储,这当然是不对的,只有在同一个 JVM 实例上也就是同一个 TM 上的任务才会看到同一个 HashMap 对象,而这几乎是不可控的。 可以看一下这篇文档[1]对物理部署的实际情况有一个基本的认知。 Best, tison. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html [hidden email] <[hidden email]> 于2020年11月16日周一 下午1:55写道: > Hi,all: > > flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例? > > 我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例? > 希望有朋友能解释下算子在job运行中初始化的过程。 > > 测试相关代码如下: > // flink 1.10.2版本,并行度为3 > @Slf4j > public class PersonFlatMap extends RichFlatMapFunction<Tuple2<String, > String>, Person> { > private transient ValueState<Integer> state; > > public PersonFlatMap(){ > log.info(String.format("PersonFlatMap【%s】: > 创建实例",this.toString())); > } > > @Override > public void open(Configuration parameters) throws IOException { > //略去无关代码... > log.info(String.format("PersonFlatMap【%s】:初始化状态!", > this.toString())); > } > > @Override > public void flatMap(Tuple2<String, String> t, Collector<Person> > collector) throws Exception { > Person p = JSONUtil.toObject(t.f1,Person.class); > collector.collect(p); > if(state.value() == null){state.update(0);} > state.update(state.value() + 1); > log.info("state: "+state.value()); > } > } > > //测试日志输出 > ... > flink-10.2 - [2020-11-16 13:41:54.360] - INFO [main] > com.toonyoo.operator.PersonFlatMap - > PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例 > //此处略去无关日志... > flink-10.2 - [2020-11-16 13:42:00.326] - INFO [Flat Map -> Sink: Print to > Std. Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend > - Initializing heap keyed state backend with stream factory. > flink-10.2 - [2020-11-16 13:42:00.351] - INFO [Flat Map -> Sink: Print to > Std. Out (1/3)] com.toonyoo.operator.PersonFlatMap - > PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态! > flink-10.2 - [2020-11-16 13:42:00.354] - INFO [Flat Map -> Sink: Print to > Std. Out (3/3)] com.toonyoo.operator.PersonFlatMap - > PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态! > flink-10.2 - [2020-11-16 13:42:00.356] - INFO [Flat Map -> Sink: Print to > Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap - > PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态! > ... > > > > > [hidden email] > |
Free forum by Nabble | Edit this page |