您好!
重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。 在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。 请问是哪里写错了吗? 和 flink官网中 state.update(current);有什么不同吗? 以下为代码: private MapState<String, MicsTrainPractical> map; //定义map @Override public void processElement(MicsTrainPracticalDetail value, Context ctx, Collector<MicsTrainPractical> out) throws Exception { MicsTrainPractical current = map.get(value.getTrainNumber()); System.out.println(map.isEmpty()); // 每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据 Long departTime = DateTimeUtils.convertToTimestampSecond(value.getDepartTime()); Long arrivalTime = DateTimeUtils.convertToTimestampSecond(value.getArrivalTime()); if (current == null) { //如果map中没有获取到string相同的数据,则新建一条数据put进去 MicsTrainPractical actual = new MicsTrainPractical(); actual.setTrainId(value.getTrainNumber()); actual.setCarId(value.getCarId()); actual.setStartStationId(value.getStationId()); actual.setStartPracticalArrivalTime(arrivalTime); actual.setEndPracticalArrivalTime(arrivalTime); actual.setStartPracticalDepartTime(departTime); actual.setEndPracticalDepartTime(departTime); actual.setEndStationId(value.getStationId()); actual.setStopTime(0L); actual.setDs(value.getDs()); actual.setIsInsert(true); actual.setTargetStationId(value.getTargetStationId()); out.collect(actual); map.put(value.getTrainNumber(), actual); //向map中写入数据 return; } else { //如果map有获取到string相同的数据,则转换数据后写入map MicsTrainPractical actual = new MicsTrainPractical(); actual.setTrainId(value.getTrainNumber()); actual.setCarId(value.getCarId()); actual.setStartStationId(current.getStartStationId()); actual.setEndStationId(value.getStationId()); actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime()); actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime()); actual.setEndPracticalArrivalTime(arrivalTime); actual.setEndPracticalDepartTime(departTime); actual.setStopTime(current.getStopTime() + Math.abs(departTime - arrivalTime)); actual.setDs(value.getDs()); actual.setIsInsert(false); actual.setTargetStationId(value.getTargetStationId()); current.setEndStationId(actual.getEndStationId()); current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime()); current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime()); current.setStopTime(actual.getStopTime()); current.setIsInsert(actual.getIsInsert()); MicsTrainSectionInfo trainSectionInfo = new MicsTrainSectionInfo(); trainSectionInfo.setTrainId(actual.getTrainId()); trainSectionInfo.setCarId(actual.getCarId()); trainSectionInfo.setStartStationId(current.getEndStationId()); trainSectionInfo.setEndStationId(actual.getEndStationId()); trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime()); trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime()); map.put(value.getTrainNumber(), actual); out.collect(actual); ctx.output(outputTagSectionFlow, trainSectionInfo); } } 谌祖安 智能轨道交通业务群 / 产品事业部 / 开发经理 Intelligent RailTransportation BG /Development Manager 广东省广州市天河区新岑四路2号佳都智慧大厦 PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District, Guangzhou, Guangdong E [hidden email] M 86-18680458868 www.pcitech.com |
keyedStream? key不同可能是。
谌祖安 <[hidden email]> 于2021年2月7日周日 下午6:00写道: > 您好! > > > 重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。 > 在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。 > 请问是哪里写错了吗? 和 flink官网中 state.update(current);有什么不同吗? > > 以下为代码: > private MapState<String, MicsTrainPractical> map; //定义map > @Override > public void processElement(MicsTrainPracticalDetail value, Context ctx, > Collector<MicsTrainPractical> out) throws Exception { > > MicsTrainPractical current = map.get(value.getTrainNumber()); > > System.out.println(map.isEmpty()); // > 每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据 > Long departTime = > DateTimeUtils.convertToTimestampSecond(value.getDepartTime()); > Long arrivalTime = > DateTimeUtils.convertToTimestampSecond(value.getArrivalTime()); > if (current == null) { //如果map中没有获取到string相同的数据,则新建一条数据put进去 > MicsTrainPractical actual = new MicsTrainPractical(); > actual.setTrainId(value.getTrainNumber()); > actual.setCarId(value.getCarId()); > actual.setStartStationId(value.getStationId()); > actual.setStartPracticalArrivalTime(arrivalTime); > actual.setEndPracticalArrivalTime(arrivalTime); > actual.setStartPracticalDepartTime(departTime); > actual.setEndPracticalDepartTime(departTime); > actual.setEndStationId(value.getStationId()); > actual.setStopTime(0L); > actual.setDs(value.getDs()); > actual.setIsInsert(true); > actual.setTargetStationId(value.getTargetStationId()); > out.collect(actual); > map.put(value.getTrainNumber(), actual); //向map中写入数据 > return; > } else { //如果map有获取到string相同的数据,则转换数据后写入map > > MicsTrainPractical actual = new MicsTrainPractical(); > actual.setTrainId(value.getTrainNumber()); > actual.setCarId(value.getCarId()); > actual.setStartStationId(current.getStartStationId()); > actual.setEndStationId(value.getStationId()); > > actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime()); > > actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime()); > actual.setEndPracticalArrivalTime(arrivalTime); > actual.setEndPracticalDepartTime(departTime); > actual.setStopTime(current.getStopTime() + Math.abs(departTime > - arrivalTime)); > actual.setDs(value.getDs()); > actual.setIsInsert(false); > actual.setTargetStationId(value.getTargetStationId()); > current.setEndStationId(actual.getEndStationId()); > > current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime()); > > current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime()); > current.setStopTime(actual.getStopTime()); > current.setIsInsert(actual.getIsInsert()); > > MicsTrainSectionInfo trainSectionInfo = new > MicsTrainSectionInfo(); > trainSectionInfo.setTrainId(actual.getTrainId()); > trainSectionInfo.setCarId(actual.getCarId()); > trainSectionInfo.setStartStationId(current.getEndStationId()); > trainSectionInfo.setEndStationId(actual.getEndStationId()); > > trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime()); > > trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime()); > > map.put(value.getTrainNumber(), actual); > out.collect(actual); > ctx.output(outputTagSectionFlow, trainSectionInfo); > } > } > > > > > 谌祖安 > 智能轨道交通业务群 / 产品事业部 / 开发经理 > Intelligent RailTransportation BG /Development Manager > 广东省广州市天河区新岑四路2号佳都智慧大厦 > PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District, > Guangzhou, Guangdong > E [hidden email] > M 86-18680458868 > www.pcitech.com > |
Hi 祖安,
state抽象的数据结构,无论是value state,list state还是map state,其都是对应流计算处理中的当前key对应的数据结构。以map state具体来说对于每个正在处理的current key (由key selector选择出来 [1]),都有一个对应 的map存储相关的数据,如果你每次都发现对应的map为空,很有可能是因为你的key selector选择出来的key每次都不相同,很大概率是当前处理的record不同导致。 另外,map.isEmpty() 的调用是需要额外开销的(尤其对于RocksDB state backend),如果只是需要处理,仅需要根据 map.get(value.getTrainNumber()) 是否为null即可。 [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-datastream 祝好 唐云 ________________________________ From: 赵一旦 <[hidden email]> Sent: Monday, February 8, 2021 10:00 To: [hidden email] <[hidden email]> Subject: Re: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询 keyedStream? key不同可能是。 谌祖安 <[hidden email]> 于2021年2月7日周日 下午6:00写道: > 您好! > > > 重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。 > 在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。 > 请问是哪里写错了吗? 和 flink官网中 state.update(current);有什么不同吗? > > 以下为代码: > private MapState<String, MicsTrainPractical> map; //定义map > @Override > public void processElement(MicsTrainPracticalDetail value, Context ctx, > Collector<MicsTrainPractical> out) throws Exception { > > MicsTrainPractical current = map.get(value.getTrainNumber()); > > System.out.println(map.isEmpty()); // > 每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据 > Long departTime = > DateTimeUtils.convertToTimestampSecond(value.getDepartTime()); > Long arrivalTime = > DateTimeUtils.convertToTimestampSecond(value.getArrivalTime()); > if (current == null) { //如果map中没有获取到string相同的数据,则新建一条数据put进去 > MicsTrainPractical actual = new MicsTrainPractical(); > actual.setTrainId(value.getTrainNumber()); > actual.setCarId(value.getCarId()); > actual.setStartStationId(value.getStationId()); > actual.setStartPracticalArrivalTime(arrivalTime); > actual.setEndPracticalArrivalTime(arrivalTime); > actual.setStartPracticalDepartTime(departTime); > actual.setEndPracticalDepartTime(departTime); > actual.setEndStationId(value.getStationId()); > actual.setStopTime(0L); > actual.setDs(value.getDs()); > actual.setIsInsert(true); > actual.setTargetStationId(value.getTargetStationId()); > out.collect(actual); > map.put(value.getTrainNumber(), actual); //向map中写入数据 > return; > } else { //如果map有获取到string相同的数据,则转换数据后写入map > > MicsTrainPractical actual = new MicsTrainPractical(); > actual.setTrainId(value.getTrainNumber()); > actual.setCarId(value.getCarId()); > actual.setStartStationId(current.getStartStationId()); > actual.setEndStationId(value.getStationId()); > > actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime()); > > actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime()); > actual.setEndPracticalArrivalTime(arrivalTime); > actual.setEndPracticalDepartTime(departTime); > actual.setStopTime(current.getStopTime() + Math.abs(departTime > - arrivalTime)); > actual.setDs(value.getDs()); > actual.setIsInsert(false); > actual.setTargetStationId(value.getTargetStationId()); > current.setEndStationId(actual.getEndStationId()); > > current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime()); > > current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime()); > current.setStopTime(actual.getStopTime()); > current.setIsInsert(actual.getIsInsert()); > > MicsTrainSectionInfo trainSectionInfo = new > MicsTrainSectionInfo(); > trainSectionInfo.setTrainId(actual.getTrainId()); > trainSectionInfo.setCarId(actual.getCarId()); > trainSectionInfo.setStartStationId(current.getEndStationId()); > trainSectionInfo.setEndStationId(actual.getEndStationId()); > > trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime()); > > trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime()); > > map.put(value.getTrainNumber(), actual); > out.collect(actual); > ctx.output(outputTagSectionFlow, trainSectionInfo); > } > } > > > > > 谌祖安 > 智能轨道交通业务群 / 产品事业部 / 开发经理 > Intelligent RailTransportation BG /Development Manager > 广东省广州市天河区新岑四路2号佳都智慧大厦 > PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District, > Guangzhou, Guangdong > E [hidden email] > M 86-18680458868 > www.pcitech.com<http://www.pcitech.com> > |
In reply to this post by 谌祖安
可能是你没有在open方法里从上下文获取mapstate
e.g.: map = getRuntimeContext().getMapState(new MapStateDescriptor<xx,xx>(mapname, xx.class, xx.class))吗 -----邮件原件----- 发件人: 谌祖安 [mailto:[hidden email]] 发送时间: 2021年2月7日 星期日 16:25 收件人: user-zh <[hidden email]> 主题: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询 您好! 重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。 在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。 请问是哪里写错了吗? 和 flink官网中 state.update(current);有什么不同吗? 以下为代码: private MapState<String, MicsTrainPractical> map; //定义map @Override public void processElement(MicsTrainPracticalDetail value, Context ctx, Collector<MicsTrainPractical> out) throws Exception { MicsTrainPractical current = map.get(value.getTrainNumber()); System.out.println(map.isEmpty()); // 每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据 Long departTime = DateTimeUtils.convertToTimestampSecond(value.getDepartTime()); Long arrivalTime = DateTimeUtils.convertToTimestampSecond(value.getArrivalTime()); if (current == null) { //如果map中没有获取到string相同的数据,则新建一条数据put进去 MicsTrainPractical actual = new MicsTrainPractical(); actual.setTrainId(value.getTrainNumber()); actual.setCarId(value.getCarId()); actual.setStartStationId(value.getStationId()); actual.setStartPracticalArrivalTime(arrivalTime); actual.setEndPracticalArrivalTime(arrivalTime); actual.setStartPracticalDepartTime(departTime); actual.setEndPracticalDepartTime(departTime); actual.setEndStationId(value.getStationId()); actual.setStopTime(0L); actual.setDs(value.getDs()); actual.setIsInsert(true); actual.setTargetStationId(value.getTargetStationId()); out.collect(actual); map.put(value.getTrainNumber(), actual); //向map中写入数据 return; } else { //如果map有获取到string相同的数据,则转换数据后写入map MicsTrainPractical actual = new MicsTrainPractical(); actual.setTrainId(value.getTrainNumber()); actual.setCarId(value.getCarId()); actual.setStartStationId(current.getStartStationId()); actual.setEndStationId(value.getStationId()); actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime()); actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime()); actual.setEndPracticalArrivalTime(arrivalTime); actual.setEndPracticalDepartTime(departTime); actual.setStopTime(current.getStopTime() + Math.abs(departTime - arrivalTime)); actual.setDs(value.getDs()); actual.setIsInsert(false); actual.setTargetStationId(value.getTargetStationId()); current.setEndStationId(actual.getEndStationId()); current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime()); current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime()); current.setStopTime(actual.getStopTime()); current.setIsInsert(actual.getIsInsert()); MicsTrainSectionInfo trainSectionInfo = new MicsTrainSectionInfo(); trainSectionInfo.setTrainId(actual.getTrainId()); trainSectionInfo.setCarId(actual.getCarId()); trainSectionInfo.setStartStationId(current.getEndStationId()); trainSectionInfo.setEndStationId(actual.getEndStationId()); trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime()); trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime()); map.put(value.getTrainNumber(), actual); out.collect(actual); ctx.output(outputTagSectionFlow, trainSectionInfo); } } 谌祖安 智能轨道交通业务群 / 产品事业部 / 开发经理 Intelligent RailTransportation BG /Development Manager 广东省广州市天河区新岑四路2号佳都智慧大厦 PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District, Guangzhou, Guangdong E [hidden email] M 86-18680458868 www.pcitech.com |
Free forum by Nabble | Edit this page |