procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

谌祖安
您好!

重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。
 在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。
请问是哪里写错了吗?   和   flink官网中     state.update(current);有什么不同吗?

以下为代码:
 private MapState<String, MicsTrainPractical> map;  //定义map
  @Override
public void processElement(MicsTrainPracticalDetail value, Context ctx, Collector<MicsTrainPractical> out) throws Exception {
       
        MicsTrainPractical current = map.get(value.getTrainNumber());        
         System.out.println(map.isEmpty());  // 每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据
        Long departTime = DateTimeUtils.convertToTimestampSecond(value.getDepartTime());
        Long arrivalTime = DateTimeUtils.convertToTimestampSecond(value.getArrivalTime());
        if (current == null) {   //如果map中没有获取到string相同的数据,则新建一条数据put进去
            MicsTrainPractical actual = new MicsTrainPractical();
            actual.setTrainId(value.getTrainNumber());
            actual.setCarId(value.getCarId());
            actual.setStartStationId(value.getStationId());
            actual.setStartPracticalArrivalTime(arrivalTime);
            actual.setEndPracticalArrivalTime(arrivalTime);
            actual.setStartPracticalDepartTime(departTime);
            actual.setEndPracticalDepartTime(departTime);
            actual.setEndStationId(value.getStationId());
            actual.setStopTime(0L);
            actual.setDs(value.getDs());
            actual.setIsInsert(true);
            actual.setTargetStationId(value.getTargetStationId());
            out.collect(actual);
            map.put(value.getTrainNumber(), actual);    //向map中写入数据
            return;
        } else {                   //如果map有获取到string相同的数据,则转换数据后写入map
           
            MicsTrainPractical actual = new MicsTrainPractical();
            actual.setTrainId(value.getTrainNumber());
            actual.setCarId(value.getCarId());
            actual.setStartStationId(current.getStartStationId());
            actual.setEndStationId(value.getStationId());
            actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime());
            actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime());
            actual.setEndPracticalArrivalTime(arrivalTime);
            actual.setEndPracticalDepartTime(departTime);
            actual.setStopTime(current.getStopTime() + Math.abs(departTime - arrivalTime));
            actual.setDs(value.getDs());
            actual.setIsInsert(false);
            actual.setTargetStationId(value.getTargetStationId());
            current.setEndStationId(actual.getEndStationId());
            current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime());
            current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime());
            current.setStopTime(actual.getStopTime());
            current.setIsInsert(actual.getIsInsert());

            MicsTrainSectionInfo trainSectionInfo = new MicsTrainSectionInfo();
            trainSectionInfo.setTrainId(actual.getTrainId());
            trainSectionInfo.setCarId(actual.getCarId());
            trainSectionInfo.setStartStationId(current.getEndStationId());
            trainSectionInfo.setEndStationId(actual.getEndStationId());
            trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime());
            trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime());

            map.put(value.getTrainNumber(), actual);
            out.collect(actual);
            ctx.output(outputTagSectionFlow, trainSectionInfo);
        }
    }




谌祖安
智能轨道交通业务群 / 产品事业部 / 开发经理
Intelligent RailTransportation BG /Development Manager
广东省广州市天河区新岑四路2号佳都智慧大厦
PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District, Guangzhou, Guangdong
E [hidden email]
M 86-18680458868
www.pcitech.com
Reply | Threaded
Open this post in threaded view
|

Re: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

nobleyd
keyedStream? key不同可能是。

谌祖安 <[hidden email]> 于2021年2月7日周日 下午6:00写道:

> 您好!
>
>
> 重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。
>  在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。
> 请问是哪里写错了吗?   和   flink官网中     state.update(current);有什么不同吗?
>
> 以下为代码:
>  private MapState<String, MicsTrainPractical> map;  //定义map
>   @Override
> public void processElement(MicsTrainPracticalDetail value, Context ctx,
> Collector<MicsTrainPractical> out) throws Exception {
>
>         MicsTrainPractical current = map.get(value.getTrainNumber());
>
>          System.out.println(map.isEmpty());  //
> 每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据
>         Long departTime =
> DateTimeUtils.convertToTimestampSecond(value.getDepartTime());
>         Long arrivalTime =
> DateTimeUtils.convertToTimestampSecond(value.getArrivalTime());
>         if (current == null) {   //如果map中没有获取到string相同的数据,则新建一条数据put进去
>             MicsTrainPractical actual = new MicsTrainPractical();
>             actual.setTrainId(value.getTrainNumber());
>             actual.setCarId(value.getCarId());
>             actual.setStartStationId(value.getStationId());
>             actual.setStartPracticalArrivalTime(arrivalTime);
>             actual.setEndPracticalArrivalTime(arrivalTime);
>             actual.setStartPracticalDepartTime(departTime);
>             actual.setEndPracticalDepartTime(departTime);
>             actual.setEndStationId(value.getStationId());
>             actual.setStopTime(0L);
>             actual.setDs(value.getDs());
>             actual.setIsInsert(true);
>             actual.setTargetStationId(value.getTargetStationId());
>             out.collect(actual);
>             map.put(value.getTrainNumber(), actual);    //向map中写入数据
>             return;
>         } else {                   //如果map有获取到string相同的数据,则转换数据后写入map
>
>             MicsTrainPractical actual = new MicsTrainPractical();
>             actual.setTrainId(value.getTrainNumber());
>             actual.setCarId(value.getCarId());
>             actual.setStartStationId(current.getStartStationId());
>             actual.setEndStationId(value.getStationId());
>
> actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime());
>
> actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime());
>             actual.setEndPracticalArrivalTime(arrivalTime);
>             actual.setEndPracticalDepartTime(departTime);
>             actual.setStopTime(current.getStopTime() + Math.abs(departTime
> - arrivalTime));
>             actual.setDs(value.getDs());
>             actual.setIsInsert(false);
>             actual.setTargetStationId(value.getTargetStationId());
>             current.setEndStationId(actual.getEndStationId());
>
> current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime());
>
> current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime());
>             current.setStopTime(actual.getStopTime());
>             current.setIsInsert(actual.getIsInsert());
>
>             MicsTrainSectionInfo trainSectionInfo = new
> MicsTrainSectionInfo();
>             trainSectionInfo.setTrainId(actual.getTrainId());
>             trainSectionInfo.setCarId(actual.getCarId());
>             trainSectionInfo.setStartStationId(current.getEndStationId());
>             trainSectionInfo.setEndStationId(actual.getEndStationId());
>
> trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime());
>
> trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime());
>
>             map.put(value.getTrainNumber(), actual);
>             out.collect(actual);
>             ctx.output(outputTagSectionFlow, trainSectionInfo);
>         }
>     }
>
>
>
>
> 谌祖安
> 智能轨道交通业务群 / 产品事业部 / 开发经理
> Intelligent RailTransportation BG /Development Manager
> 广东省广州市天河区新岑四路2号佳都智慧大厦
> PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District,
> Guangzhou, Guangdong
> E [hidden email]
> M 86-18680458868
> www.pcitech.com
>
Reply | Threaded
Open this post in threaded view
|

Re: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

Yun Tang
Hi 祖安,

state抽象的数据结构,无论是value state,list state还是map state,其都是对应流计算处理中的当前key对应的数据结构。以map state具体来说对于每个正在处理的current key (由key selector选择出来 [1]),都有一个对应 的map存储相关的数据,如果你每次都发现对应的map为空,很有可能是因为你的key selector选择出来的key每次都不相同,很大概率是当前处理的record不同导致。

另外,map.isEmpty() 的调用是需要额外开销的(尤其对于RocksDB state backend),如果只是需要处理,仅需要根据 map.get(value.getTrainNumber()) 是否为null即可。


[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-datastream

祝好
唐云
________________________________
From: 赵一旦 <[hidden email]>
Sent: Monday, February 8, 2021 10:00
To: [hidden email] <[hidden email]>
Subject: Re: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

keyedStream? key不同可能是。

谌祖安 <[hidden email]> 于2021年2月7日周日 下午6:00写道:

> 您好!
>
>
> 重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。
>  在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。
> 请问是哪里写错了吗?   和   flink官网中     state.update(current);有什么不同吗?
>
> 以下为代码:
>  private MapState<String, MicsTrainPractical> map;  //定义map
>   @Override
> public void processElement(MicsTrainPracticalDetail value, Context ctx,
> Collector<MicsTrainPractical> out) throws Exception {
>
>         MicsTrainPractical current = map.get(value.getTrainNumber());
>
>          System.out.println(map.isEmpty());  //
> 每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据
>         Long departTime =
> DateTimeUtils.convertToTimestampSecond(value.getDepartTime());
>         Long arrivalTime =
> DateTimeUtils.convertToTimestampSecond(value.getArrivalTime());
>         if (current == null) {   //如果map中没有获取到string相同的数据,则新建一条数据put进去
>             MicsTrainPractical actual = new MicsTrainPractical();
>             actual.setTrainId(value.getTrainNumber());
>             actual.setCarId(value.getCarId());
>             actual.setStartStationId(value.getStationId());
>             actual.setStartPracticalArrivalTime(arrivalTime);
>             actual.setEndPracticalArrivalTime(arrivalTime);
>             actual.setStartPracticalDepartTime(departTime);
>             actual.setEndPracticalDepartTime(departTime);
>             actual.setEndStationId(value.getStationId());
>             actual.setStopTime(0L);
>             actual.setDs(value.getDs());
>             actual.setIsInsert(true);
>             actual.setTargetStationId(value.getTargetStationId());
>             out.collect(actual);
>             map.put(value.getTrainNumber(), actual);    //向map中写入数据
>             return;
>         } else {                   //如果map有获取到string相同的数据,则转换数据后写入map
>
>             MicsTrainPractical actual = new MicsTrainPractical();
>             actual.setTrainId(value.getTrainNumber());
>             actual.setCarId(value.getCarId());
>             actual.setStartStationId(current.getStartStationId());
>             actual.setEndStationId(value.getStationId());
>
> actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime());
>
> actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime());
>             actual.setEndPracticalArrivalTime(arrivalTime);
>             actual.setEndPracticalDepartTime(departTime);
>             actual.setStopTime(current.getStopTime() + Math.abs(departTime
> - arrivalTime));
>             actual.setDs(value.getDs());
>             actual.setIsInsert(false);
>             actual.setTargetStationId(value.getTargetStationId());
>             current.setEndStationId(actual.getEndStationId());
>
> current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime());
>
> current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime());
>             current.setStopTime(actual.getStopTime());
>             current.setIsInsert(actual.getIsInsert());
>
>             MicsTrainSectionInfo trainSectionInfo = new
> MicsTrainSectionInfo();
>             trainSectionInfo.setTrainId(actual.getTrainId());
>             trainSectionInfo.setCarId(actual.getCarId());
>             trainSectionInfo.setStartStationId(current.getEndStationId());
>             trainSectionInfo.setEndStationId(actual.getEndStationId());
>
> trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime());
>
> trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime());
>
>             map.put(value.getTrainNumber(), actual);
>             out.collect(actual);
>             ctx.output(outputTagSectionFlow, trainSectionInfo);
>         }
>     }
>
>
>
>
> 谌祖安
> 智能轨道交通业务群 / 产品事业部 / 开发经理
> Intelligent RailTransportation BG /Development Manager
> 广东省广州市天河区新岑四路2号佳都智慧大厦
> PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District,
> Guangzhou, Guangdong
> E [hidden email]
> M 86-18680458868
> www.pcitech.com<http://www.pcitech.com>
>
Reply | Threaded
Open this post in threaded view
|

答复: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

范超
In reply to this post by 谌祖安
可能是你没有在open方法里从上下文获取mapstate
e.g.:
map = getRuntimeContext().getMapState(new MapStateDescriptor<xx,xx>(mapname, xx.class, xx.class))吗


-----邮件原件-----
发件人: 谌祖安 [mailto:[hidden email]]
发送时间: 2021年2月7日 星期日 16:25
收件人: user-zh <[hidden email]>
主题: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

您好!

重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。
 在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。
请问是哪里写错了吗?   和   flink官网中     state.update(current);有什么不同吗?

以下为代码:
 private MapState<String, MicsTrainPractical> map;  //定义map
  @Override
public void processElement(MicsTrainPracticalDetail value, Context ctx, Collector<MicsTrainPractical> out) throws Exception {
       
        MicsTrainPractical current = map.get(value.getTrainNumber());        
         System.out.println(map.isEmpty());  // 每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据
        Long departTime = DateTimeUtils.convertToTimestampSecond(value.getDepartTime());
        Long arrivalTime = DateTimeUtils.convertToTimestampSecond(value.getArrivalTime());
        if (current == null) {   //如果map中没有获取到string相同的数据,则新建一条数据put进去
            MicsTrainPractical actual = new MicsTrainPractical();
            actual.setTrainId(value.getTrainNumber());
            actual.setCarId(value.getCarId());
            actual.setStartStationId(value.getStationId());
            actual.setStartPracticalArrivalTime(arrivalTime);
            actual.setEndPracticalArrivalTime(arrivalTime);
            actual.setStartPracticalDepartTime(departTime);
            actual.setEndPracticalDepartTime(departTime);
            actual.setEndStationId(value.getStationId());
            actual.setStopTime(0L);
            actual.setDs(value.getDs());
            actual.setIsInsert(true);
            actual.setTargetStationId(value.getTargetStationId());
            out.collect(actual);
            map.put(value.getTrainNumber(), actual);    //向map中写入数据
            return;
        } else {                   //如果map有获取到string相同的数据,则转换数据后写入map
           
            MicsTrainPractical actual = new MicsTrainPractical();
            actual.setTrainId(value.getTrainNumber());
            actual.setCarId(value.getCarId());
            actual.setStartStationId(current.getStartStationId());
            actual.setEndStationId(value.getStationId());
            actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime());
            actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime());
            actual.setEndPracticalArrivalTime(arrivalTime);
            actual.setEndPracticalDepartTime(departTime);
            actual.setStopTime(current.getStopTime() + Math.abs(departTime - arrivalTime));
            actual.setDs(value.getDs());
            actual.setIsInsert(false);
            actual.setTargetStationId(value.getTargetStationId());
            current.setEndStationId(actual.getEndStationId());
            current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime());
            current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime());
            current.setStopTime(actual.getStopTime());
            current.setIsInsert(actual.getIsInsert());

            MicsTrainSectionInfo trainSectionInfo = new MicsTrainSectionInfo();
            trainSectionInfo.setTrainId(actual.getTrainId());
            trainSectionInfo.setCarId(actual.getCarId());
            trainSectionInfo.setStartStationId(current.getEndStationId());
            trainSectionInfo.setEndStationId(actual.getEndStationId());
            trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime());
            trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime());

            map.put(value.getTrainNumber(), actual);
            out.collect(actual);
            ctx.output(outputTagSectionFlow, trainSectionInfo);
        }
    }




谌祖安
智能轨道交通业务群 / 产品事业部 / 开发经理
Intelligent RailTransportation BG /Development Manager
广东省广州市天河区新岑四路2号佳都智慧大厦
PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District, Guangzhou, Guangdong E [hidden email] M 86-18680458868 www.pcitech.com