大家好,
我遇到一个问题想请教。 我有一个DataStream作业,其中有一步需要按照某一字段(userId)合并数条消息(因为之前的步骤把一条数据拆成了数条,现在要重新合并)。我使用了 Time Session Window,以userid为key。作业调试成功,提交作业也没有报错,运行的前几分钟有输出,但过几分钟后作业完全停止。 查看流程图发现实际上有数据持续不断地传入window任务。因此我把问题定位到window这边。 尝试修改了一些参数没有效果,也没有任何报错,异常信息,完全没有解决的思路。 不知道大家有没有思路可以提供,十分感谢~~~~ 主代码如下 public class Mainjob { public static void main(String[] args) throws Exception { String schema = "userId, serviceId, logon_type, operation_type_1, operation_type_2, service_count, service_hot, service_month_count, day_1_10, day_11_20, day_21_31, service_day_count, time_7_11, time_12_18, time_19_7, date_timestamp, search_item, consult_item, father_id, age_22, age_23_33, age_34_40, age_41_50, age_51, sex_1, sex_2, sex_9, zone1, zone2, zone3, zone4, zone5, zone6"; String schema1 = "userId, serviceId, logon_type, operation_type_1, operation_type_2, service_count, service_hot, service_month_count, day_1_10, day_11_20, day_21_31, service_day_count, time_7_11, time_12_18, time_19_7, date_timestamp, search_item, consult_item, father_id, age_22, age_23_33, age_34_40, age_41_50, age_51, sex_1, sex_2, sex_9, zone1, zone2, zone3, zone4, zone5, zone6, prediction_result, prediction_detail"; //配置信息 Properties properties = new Properties(); ClassPathResource config= new ClassPathResource("KafkaConfig.properties"); InputStream in=config.getInputStream(); properties.load(in); //执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); /* env.enableCheckpointing(60000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); */ //kafka数据源 Properties consumerProperties = new Properties(); consumerProperties.setProperty("bootstrap.servers", properties.getProperty("bootstrap.servers")); consumerProperties.setProperty("group.id",properties.getProperty(" group.id")); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(properties.getProperty("consumer.topic"), new SimpleStringSchema(), consumerProperties); consumer.setStartFromLatest(); DataStream<String> kafkaStream=env.addSource(consumer); kafkaStream.print(LocalDateTime.now()+" kafka:"); //数据过滤 DataStream<String> filterStream=kafkaStream .filter(new FilterFunction<String>() { @Override public boolean filter(String s) { try{ return KafkaDataFilter.filter(JSONArray.parseArray(s).getJSONObject(0).getJSONObject("businessData")); }catch (Exception e){ return false; } } }); filterStream.print(LocalDateTime.now()+" filter :"); //规则协同计算 DataStream<List<Order>> serviceListStream = filterStream.map(new MapFunction<String, List<Order>>() { @Override public List<Order> map(String s) throws Exception { List<Order> list =new ArrayList<>(); try { String userId = "", serviceId = "", loginType = "", operateType = "", timestamp = "", result = ""; JSONObject json = JSONArray.parseArray(s).getJSONObject(0).getJSONObject("businessData"); userId = json.getString("userId"); serviceId = json.getString("serviceId"); loginType = json.getString("loginType"); operateType = json.getString("operateType"); timestamp = json.getString("timestamp"); //规则 List ruleList = Rule.getServiceList(userId, loginType, serviceId, operateType); //协同 List coordinationList = Coordination.get_similark(serviceId, 3, loginType); list = Rule.recallData(ruleList, coordinationList, loginType, userId,timestamp); } catch (Exception e) { e.printStackTrace(); } return list; } }); DataStream<Order> flatSource = serviceListStream.flatMap(new FlatMapFunction<List<Order>, Order>() { @Override public void flatMap(List<Order> list, Collector<Order> collector) throws Exception { list.forEach(Order -> collector.collect(Order)); } }); /**/ //排序 Table table = tEnv.fromDataStream(flatSource, schema); StreamOperator predictData = StreamOperator.fromTable(table); //读取CSV 文件 PipelineModel loadedModel = PipelineModel.load(properties.getProperty("csvPath")); StreamOperator streamOperator= loadedModel.transform(predictData); DataStream dataStream = streamOperator.getDataStream(); DataStream<Row> rowDataStream = dataStream.map(new MapFunction() { @Override public Object map(Object o) throws Exception { return o; } }).returns(new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.LONG, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.STRING)); tEnv.registerDataStream("OrderA",rowDataStream,schema1); tEnv.registerFunction("get_possibility", new get_possibility()); Table get_posibility1 = tEnv.sqlQuery("SELECT userId,serviceId,logon_type,date_timestamp,get_possibility(prediction_detail) as probability FROM OrderA"); get_posibility1.printSchema(); tEnv.registerTable("t1", get_posibility1); DataStream resultDataStream = tEnv.toAppendStream(get_posibility1, PredictResult.class); DataStream itemList = resultDataStream .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<PredictResult>(Time.milliseconds(10)) { @Override public long extractTimestamp(PredictResult predictResult) { return predictResult.getDate_timestamp(); } } // new AscendingTimestampExtractor<PredictResult>() { // @Override // public long extractAscendingTimestamp(PredictResult predictResult) { // return predictResult.getDate_timestamp(); // } // } ) .keyBy("userId") .window(EventTimeSessionWindows.withGap(Time.milliseconds(10))) .process(new TopNService(11)); itemList.print("IRS_RESULT: "); //SINK Properties propertiesProducer = new Properties(); propertiesProducer.setProperty("bootstrap.servers", properties.getProperty("bootstrap.servers")); FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer(properties.getProperty("producer.topic"), new ProducerStringSerializationSchema(properties.getProperty("producer.topic")), propertiesProducer, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); itemList.addSink(flinkKafkaProducer); env.execute("Flink Streaming Java API Skeleton"); } |
Free forum by Nabble | Edit this page |