ProcessWindowFunction 问题求助

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

ProcessWindowFunction 问题求助

郭红科
大神:
报错信息:
Error:(73, 27) java: 对于process(com.linkedsee.aiops.CountErrorFunction), 找不到合适的方法
    方法 org.apache.flink.streaming.api.datastream.DataStream.<R>process(org.apache.flink.streaming.api.functions.ProcessFunction<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,R>)不适用
      (无法推断类型变量 R
        (参数不匹配; com.linkedsee.aiops.CountErrorFunction无法转换为org.apache.flink.streaming.api.functions.ProcessFunction<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,R>))
    方法 org.apache.flink.streaming.api.datastream.DataStream.<R>process(org.apache.flink.streaming.api.functions.ProcessFunction<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,R>,org.apache.flink.api.common.typeinfo.TypeInformation<R>)不适用
      (无法推断类型变量 R
        (实际参数列表和形式参数列表长度不同))

StreamingJob.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.linkedsee.aiops;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;

import java.util.*;

/**
 * Skeleton for a Flink Streaming Job.
 *
 * <p>For a tutorial how to write a Flink streaming application, check the
 * tutorials and examples on the <a href="https://flink.apache.org/docs/stable/">Flink Website</a>.
 *
 * <p>To package your application into a JAR file for execution, run
 * 'mvn clean package' on the command line.
 *
 * <p>If you change the name of the main class (with the public static void main(String[] args))
 * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
 */
public class StreamingJob {

   public static void main(String[] args) throws Exception {
      // set up the streaming execution environment
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

      Properties properties = new Properties();
      properties.setProperty("bootstrap.servers", "xxx:9092");
      properties.setProperty("auto.offset.reset", "earliest");
      properties.setProperty("group.id", "test");

      FlinkKafkaConsumer<ObjectNode> fConsumer =
            new  FlinkKafkaConsumer<>("aiops_log", new JSONKeyValueDeserializationSchema(false), properties);
      fConsumer.assignTimestampsAndWatermarks(new AssignerWatermarks());

      DataStream<ObjectNode> dataStream = env
            .addSource(fConsumer);

      dataStream
                .keyBy(new KeySelector<ObjectNode, Object>() {
                    @Override
                    public Object getKey(ObjectNode objectNode) throws Exception {
                        return objectNode.get("value").get("type");
                    }
                })
                .timeWindow(Time.seconds(60));

      dataStream.process(new CountErrorFunction());

        dataStream.print();


      /*
       * Here, you can start creating your execution plan for Flink.
       *
       * Start with getting some data from the environment, like
       *     env.readTextFile(textPath);
       *
       * then, transform the resulting DataStream<String> using operations
       * like
       *     .filter()
       *     .flatMap()
       *     .join()
       *     .coGroup()
       *
       * and many more.
       * Have a look at the programming guide for the Java API:
       *
       * https://flink.apache.org/docs/latest/apis/streaming/index.html
       *
       */

      // execute program
      env.execute("Error Detection");
   }


}
CountErrorFunction.java
package com.linkedsee.aiops;

import jdk.nashorn.internal.ir.ObjectNode;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class CountErrorFunction extends ProcessWindowFunction<ObjectNode, String, String, TimeWindow> {

    @Override
    public void process(String s, Context context, Iterable<ObjectNode> iterable, Collector<String> collector) throws Exception {
        long count = 0;
        for (ObjectNode objN : iterable) {
            System.out.println(objN.getElements());
            count++;
        }
        collector.collect("Window:" + context.window() + "count: " + count);
    }


}

AssignerWatermarks.java
package com.linkedsee.aiops;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.text.ParseException;
import java.text.SimpleDateFormat;

public class AssignerWatermarks implements AssignerWithPeriodicWatermarks<ObjectNode> {

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");

    private Long currentMaxTimestamp = 0L;
    private Long maxOutOfOrderness = 1000L; // 最大允许的乱序时间是10s

    @Override
    public long extractTimestamp(ObjectNode objectNode, long previousElementTimestamp) throws RuntimeException{
        long timestamp = 0L;
        try {
            System.out.println("msg:" + objectNode.toString());
            String timestr = objectNode.get("value").get("timestamp").toString();
            if (timestr == null){
                timestamp = System.currentTimeMillis();
            }else {
                timestamp = sdf.parse(timestr).getTime();
            }
        }catch (ParseException ex){
            timestamp = System.currentTimeMillis();
        }
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

kafka数据格式:
{"level":"INFO","timestamp":"2020-03-18 13:58:40,938","path":"/data/linkedsee/linkedsee_story/installer/logs/bi_aiops/bi_aiops.log","@timestamp":"2020-03-18T05:58:41.487Z","host":"aiops-dev2","type":"runtimelog","project":"metric","@version":"1","trace_id":"2b1c3d15-f322-3337-809f-767749aa8b1c","raw":"granularity is 10m"}

问题:无法定位问题出在哪里

如果 ObjectNode不能作为操作单元那应该怎么写?


谢谢