大神:
报错信息: Error:(73, 27) java: 对于process(com.linkedsee.aiops.CountErrorFunction), 找不到合适的方法 方法 org.apache.flink.streaming.api.datastream.DataStream.<R>process(org.apache.flink.streaming.api.functions.ProcessFunction<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,R>)不适用 (无法推断类型变量 R (参数不匹配; com.linkedsee.aiops.CountErrorFunction无法转换为org.apache.flink.streaming.api.functions.ProcessFunction<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,R>)) 方法 org.apache.flink.streaming.api.datastream.DataStream.<R>process(org.apache.flink.streaming.api.functions.ProcessFunction<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,R>,org.apache.flink.api.common.typeinfo.TypeInformation<R>)不适用 (无法推断类型变量 R (实际参数列表和形式参数列表长度不同)) StreamingJob.java /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.linkedsee.aiops; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import java.util.*; /** * Skeleton for a Flink Streaming Job. * * <p>For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the <a href="https://flink.apache.org/docs/stable/">Flink Website</a>. * * <p>To package your application into a JAR file for execution, run * 'mvn clean package' on the command line. * * <p>If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). */ public class StreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "xxx:9092"); properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer<ObjectNode> fConsumer = new FlinkKafkaConsumer<>("aiops_log", new JSONKeyValueDeserializationSchema(false), properties); fConsumer.assignTimestampsAndWatermarks(new AssignerWatermarks()); DataStream<ObjectNode> dataStream = env .addSource(fConsumer); dataStream .keyBy(new KeySelector<ObjectNode, Object>() { @Override public Object getKey(ObjectNode objectNode) throws Exception { return objectNode.get("value").get("type"); } }) .timeWindow(Time.seconds(60)); dataStream.process(new CountErrorFunction()); dataStream.print(); /* * Here, you can start creating your execution plan for Flink. * * Start with getting some data from the environment, like * env.readTextFile(textPath); * * then, transform the resulting DataStream<String> using operations * like * .filter() * .flatMap() * .join() * .coGroup() * * and many more. * Have a look at the programming guide for the Java API: * * https://flink.apache.org/docs/latest/apis/streaming/index.html * */ // execute program env.execute("Error Detection"); } } CountErrorFunction.java package com.linkedsee.aiops; import jdk.nashorn.internal.ir.ObjectNode; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class CountErrorFunction extends ProcessWindowFunction<ObjectNode, String, String, TimeWindow> { @Override public void process(String s, Context context, Iterable<ObjectNode> iterable, Collector<String> collector) throws Exception { long count = 0; for (ObjectNode objN : iterable) { System.out.println(objN.getElements()); count++; } collector.collect("Window:" + context.window() + "count: " + count); } } AssignerWatermarks.java package com.linkedsee.aiops; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import java.text.ParseException; import java.text.SimpleDateFormat; public class AssignerWatermarks implements AssignerWithPeriodicWatermarks<ObjectNode> { private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); private Long currentMaxTimestamp = 0L; private Long maxOutOfOrderness = 1000L; // 最大允许的乱序时间是10s @Override public long extractTimestamp(ObjectNode objectNode, long previousElementTimestamp) throws RuntimeException{ long timestamp = 0L; try { System.out.println("msg:" + objectNode.toString()); String timestr = objectNode.get("value").get("timestamp").toString(); if (timestr == null){ timestamp = System.currentTimeMillis(); }else { timestamp = sdf.parse(timestr).getTime(); } }catch (ParseException ex){ timestamp = System.currentTimeMillis(); } currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } kafka数据格式: {"level":"INFO","timestamp":"2020-03-18 13:58:40,938","path":"/data/linkedsee/linkedsee_story/installer/logs/bi_aiops/bi_aiops.log","@timestamp":"2020-03-18T05:58:41.487Z","host":"aiops-dev2","type":"runtimelog","project":"metric","@version":"1","trace_id":"2b1c3d15-f322-3337-809f-767749aa8b1c","raw":"granularity is 10m"} 问题:无法定位问题出在哪里 如果 ObjectNode不能作为操作单元那应该怎么写? 谢谢 |
Free forum by Nabble | Edit this page |