Flink Time Session Window 作业输出停止

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink Time Session Window 作业输出停止

李佳宸
大家好,
我遇到一个问题想请教。
我有一个DataStream作业,其中有一步需要按照某一字段(userId)合并数条消息(因为之前的步骤把一条数据拆成了数条,现在要重新合并)。我使用了
Time Session Window,以userid为key。作业调试成功,提交作业也没有报错,运行的前几分钟有输出,但过几分钟后作业完全停止。
查看流程图发现实际上有数据持续不断地传入window任务。因此我把问题定位到window这边。
尝试修改了一些参数没有效果,也没有任何报错,异常信息,完全没有解决的思路。

不知道大家有没有思路可以提供,十分感谢~~~~

主代码如下
public class Mainjob {

    public static void main(String[] args) throws Exception {

        String schema = "userId, serviceId, logon_type, operation_type_1,
operation_type_2, service_count, service_hot, service_month_count,
day_1_10, day_11_20, day_21_31, service_day_count, time_7_11, time_12_18,
time_19_7, date_timestamp, search_item, consult_item, father_id, age_22,
age_23_33, age_34_40, age_41_50, age_51, sex_1, sex_2, sex_9, zone1, zone2,
zone3, zone4, zone5, zone6";
        String schema1 = "userId, serviceId, logon_type, operation_type_1,
operation_type_2, service_count, service_hot, service_month_count,
day_1_10, day_11_20, day_21_31, service_day_count, time_7_11, time_12_18,
time_19_7, date_timestamp, search_item, consult_item, father_id, age_22,
age_23_33, age_34_40, age_41_50, age_51, sex_1, sex_2, sex_9, zone1, zone2,
zone3, zone4, zone5, zone6, prediction_result, prediction_detail";
        //配置信息
        Properties properties = new Properties();
        ClassPathResource config= new
ClassPathResource("KafkaConfig.properties");
        InputStream in=config.getInputStream();
        properties.load(in);
        //执行环境
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        /*
        env.enableCheckpointing(60000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        */
        //kafka数据源
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers",
properties.getProperty("bootstrap.servers"));
        consumerProperties.setProperty("group.id",properties.getProperty("
group.id"));
        FlinkKafkaConsumer<String> consumer = new
FlinkKafkaConsumer<>(properties.getProperty("consumer.topic"), new
SimpleStringSchema(), consumerProperties);
        consumer.setStartFromLatest();
        DataStream<String> kafkaStream=env.addSource(consumer);
        kafkaStream.print(LocalDateTime.now()+" kafka:");

        //数据过滤
        DataStream<String> filterStream=kafkaStream
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String s) {
                        try{
                            return
KafkaDataFilter.filter(JSONArray.parseArray(s).getJSONObject(0).getJSONObject("businessData"));
                        }catch (Exception e){
                            return false;
                        }
                    }
                });
        filterStream.print(LocalDateTime.now()+" filter :");
        //规则协同计算
        DataStream<List<Order>> serviceListStream = filterStream.map(new
MapFunction<String, List<Order>>() {
            @Override
            public List<Order> map(String s) throws Exception {
                List<Order> list =new ArrayList<>();
                try {
                    String userId = "", serviceId = "", loginType = "",
operateType = "", timestamp = "", result = "";
                    JSONObject json =
JSONArray.parseArray(s).getJSONObject(0).getJSONObject("businessData");
                    userId = json.getString("userId");
                    serviceId = json.getString("serviceId");
                    loginType = json.getString("loginType");
                    operateType = json.getString("operateType");
                    timestamp = json.getString("timestamp");
                    //规则
                    List ruleList = Rule.getServiceList(userId, loginType,
serviceId, operateType);
                    //协同
                    List coordinationList =
Coordination.get_similark(serviceId, 3, loginType);
                    list = Rule.recallData(ruleList, coordinationList,
loginType, userId,timestamp);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return list;
            }
        });
        DataStream<Order> flatSource = serviceListStream.flatMap(new
FlatMapFunction<List<Order>, Order>() {
            @Override
            public void flatMap(List<Order> list, Collector<Order>
collector) throws Exception {
                list.forEach(Order -> collector.collect(Order));
            }
        });
        /**/
        //排序
        Table table = tEnv.fromDataStream(flatSource, schema);
        StreamOperator predictData = StreamOperator.fromTable(table);
        //读取CSV 文件
        PipelineModel loadedModel =
PipelineModel.load(properties.getProperty("csvPath"));
        StreamOperator streamOperator= loadedModel.transform(predictData);
        DataStream dataStream = streamOperator.getDataStream();
        DataStream<Row> rowDataStream = dataStream.map(new MapFunction() {
            @Override
            public Object map(Object o) throws Exception {
                return o;
            }
        }).returns(new RowTypeInfo(Types.INT, Types.STRING, Types.STRING,
Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT,
Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT,
Types.LONG, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT,
Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT,
Types.INT, Types.INT, Types.INT, Types.INT, Types.INT, Types.INT,
Types.INT, Types.STRING));
        tEnv.registerDataStream("OrderA",rowDataStream,schema1);
        tEnv.registerFunction("get_possibility", new get_possibility());
        Table get_posibility1 = tEnv.sqlQuery("SELECT
userId,serviceId,logon_type,date_timestamp,get_possibility(prediction_detail)
as probability FROM OrderA");
        get_posibility1.printSchema();
        tEnv.registerTable("t1", get_posibility1);
        DataStream resultDataStream = tEnv.toAppendStream(get_posibility1,
PredictResult.class);
        DataStream itemList = resultDataStream
                .assignTimestampsAndWatermarks(
                       new
BoundedOutOfOrdernessTimestampExtractor<PredictResult>(Time.milliseconds(10))
{
                           @Override
                           public long extractTimestamp(PredictResult
predictResult) {
                               return predictResult.getDate_timestamp();
                           }
                       }
                        // new AscendingTimestampExtractor<PredictResult>()
{
                            // @Override
                            // public long
extractAscendingTimestamp(PredictResult predictResult) {
                                // return predictResult.getDate_timestamp();
                            // }
                        // }
                )
                .keyBy("userId")

.window(EventTimeSessionWindows.withGap(Time.milliseconds(10)))
                .process(new TopNService(11));
        itemList.print("IRS_RESULT: ");
        //SINK
        Properties propertiesProducer = new Properties();
        propertiesProducer.setProperty("bootstrap.servers",
properties.getProperty("bootstrap.servers"));
        FlinkKafkaProducer<String> flinkKafkaProducer = new
FlinkKafkaProducer(properties.getProperty("producer.topic"),
                new
ProducerStringSerializationSchema(properties.getProperty("producer.topic")),
                propertiesProducer,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
        itemList.addSink(flinkKafkaProducer);


        env.execute("Flink Streaming Java API Skeleton");
    }