Flink提jar包部署到Yarn上报错

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink提jar包部署到Yarn上报错

zhangjunjie1130@163.com
您好:
        我的程序是从kafka取数,然后Flink处理后有写入kafka;本地运行正常,上传到Yarn集群上报错,
       Flink版本是:1.7.2

错误是:
2019-10-21 09:52:30,054 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1) of job 7d5dfa42776d679eb240fa833444bc22 is not in state RUNNING but DEPLOYING instead. Aborting checkpoint.
2019-10-21 09:52:30,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1) (05b67b88bf084a9c9884201d224768b4) switched from DEPLOYING to RUNNING.
2019-10-21 09:52:31,054 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1571622751054 for job 7d5dfa42776d679eb240fa833444bc22.
2019-10-21 09:52:31,488 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler  - Implementation error: Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
        at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
        at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2019-10-21 09:52:34,467 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler  - Implementation error: Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
        at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
        at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

========================================
相关代码:
                 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(1000);
        Properties properties1 = new Properties();
        properties1.setProperty("bootstrap.servers", "xxx:9092");
        properties1.setProperty("group.id", "test");

        System.out.println("333333333333333333333333");

        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>("bigeyeservertopictest", new SimpleStringSchema(), properties1);

        DataStream<String> stream = env.addSource(myConsumer);
//        stream.print();
        DataStream<String> student = stream.flatMap(new StringToJsonObject()).map(value -> value.toJSONArraytoString())
                .map(new MapFunction<JSONArray, String>() {
                    public String map(JSONArray jsonArray) throws Exception{
                        String source= "bigeye";
                        JSONObject jsonObject1 = new JSONObject();
                        String result = null;
//                        readRedis redis = new readRedis();
                        for(int i=0;i<jsonArray.size();i++){
//                            log.info("1111111111111111");
                            String jsonObjectStr = jsonArray.getString(i);
                            String timeValue = JSONObject.parseObject(jsonObjectStr).getString("time");
                            String valueValue = JSONObject.parseObject(jsonObjectStr).getString("value");
                            String agentHostnameValue = JSONObject.parseObject(jsonObjectStr).getString("agentHostname");
                            String streamIDValue = JSONObject.parseObject(jsonObjectStr).getString("streamID");
//                            Map<String, String> redisMap = redis.getInfo(agentHostnameValue,streamIDValue,source);
                            // 创建opentsdb数据对象
                            Map<String, Object> opentsdbValue = new HashMap<>();
                            opentsdbValue.put("metric", streamIDValue);
                            opentsdbValue.put("timestamp", timeValue);
                            opentsdbValue.put("value", valueValue);
                            Gson gson = new Gson();
//                            opentsdbValue.put("tags", gson.fromJson((String) redisMap.get("tags"), Map.class));

//                            jsonObject1.put(flag,timeValue+"+"+streamIDValue);
                            jsonObject1.put("BigeyeValue",opentsdbValue);
//                            redis.close();

                        }

                        result = jsonObject1.toString();
                        return result;
                    }
                })
                ;
        student.addSink(new FlinkKafkaProducer010<String>("xxxx:9092","test", new SimpleStringSchema())).name("flink-kafka");

万分感谢!



[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复: Flink提jar包部署到Yarn上报错

hery168@163.com
参考:http://mail-archives.apache.org/mod_mbox/flink-user-zh/201905.mbox/%3C2019052911134683852017@...%3E



[hidden email]
 
发件人: [hidden email]
发送时间: 2019-10-21 11:05
收件人: user-zh
抄送: zhangjunjie1130
主题: Flink提jar包部署到Yarn上报错
您好:
        我的程序是从kafka取数,然后Flink处理后有写入kafka;本地运行正常,上传到Yarn集群上报错,
       Flink版本是:1.7.2
 
错误是:
2019-10-21 09:52:30,054 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1) of job 7d5dfa42776d679eb240fa833444bc22 is not in state RUNNING but DEPLOYING instead. Aborting checkpoint.
2019-10-21 09:52:30,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1) (05b67b88bf084a9c9884201d224768b4) switched from DEPLOYING to RUNNING.
2019-10-21 09:52:31,054 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1571622751054 for job 7d5dfa42776d679eb240fa833444bc22.
2019-10-21 09:52:31,488 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler  - Implementation error: Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
        at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
        at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2019-10-21 09:52:34,467 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler  - Implementation error: Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
        at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
        at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
========================================
相关代码:
                 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(1000);
        Properties properties1 = new Properties();
        properties1.setProperty("bootstrap.servers", "xxx:9092");
        properties1.setProperty("group.id", "test");
 
        System.out.println("333333333333333333333333");
 
        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>("bigeyeservertopictest", new SimpleStringSchema(), properties1);
 
        DataStream<String> stream = env.addSource(myConsumer);
//        stream.print();
        DataStream<String> student = stream.flatMap(new StringToJsonObject()).map(value -> value.toJSONArraytoString())
                .map(new MapFunction<JSONArray, String>() {
                    public String map(JSONArray jsonArray) throws Exception{
                        String source= "bigeye";
                        JSONObject jsonObject1 = new JSONObject();
                        String result = null;
//                        readRedis redis = new readRedis();
                        for(int i=0;i<jsonArray.size();i++){
//                            log.info("1111111111111111");
                            String jsonObjectStr = jsonArray.getString(i);
                            String timeValue = JSONObject.parseObject(jsonObjectStr).getString("time");
                            String valueValue = JSONObject.parseObject(jsonObjectStr).getString("value");
                            String agentHostnameValue = JSONObject.parseObject(jsonObjectStr).getString("agentHostname");
                            String streamIDValue = JSONObject.parseObject(jsonObjectStr).getString("streamID");
//                            Map<String, String> redisMap = redis.getInfo(agentHostnameValue,streamIDValue,source);
                            // 创建opentsdb数据对象
                            Map<String, Object> opentsdbValue = new HashMap<>();
                            opentsdbValue.put("metric", streamIDValue);
                            opentsdbValue.put("timestamp", timeValue);
                            opentsdbValue.put("value", valueValue);
                            Gson gson = new Gson();
//                            opentsdbValue.put("tags", gson.fromJson((String) redisMap.get("tags"), Map.class));
 
//                            jsonObject1.put(flag,timeValue+"+"+streamIDValue);
                            jsonObject1.put("BigeyeValue",opentsdbValue);
//                            redis.close();
 
                        }
 
                        result = jsonObject1.toString();
                        return result;
                    }
                })
                ;
        student.addSink(new FlinkKafkaProducer010<String>("xxxx:9092","test", new SimpleStringSchema())).name("flink-kafka");
 
万分感谢!
 
 
 
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink提jar包部署到Yarn上报错

Xintong Song
看报错是TM挂了,具体原因需要分析TM日志,有可能是上面答复中相同的问题,也有可能是其他原因造成的。


Thank you~

Xintong Song



On Mon, Oct 21, 2019 at 11:36 AM [hidden email] <[hidden email]> wrote:

> 参考:
> http://mail-archives.apache.org/mod_mbox/flink-user-zh/201905.mbox/%3C2019052911134683852017@...%3E
>
>
>
> [hidden email]
>
> 发件人: [hidden email]
> 发送时间: 2019-10-21 11:05
> 收件人: user-zh
> 抄送: zhangjunjie1130
> 主题: Flink提jar包部署到Yarn上报错
> 您好:
>         我的程序是从kafka取数,然后Flink处理后有写入kafka;本地运行正常,上传到Yarn集群上报错,
>        Flink版本是:1.7.2
>
> 错误是:
> 2019-10-21 09:52:30,054 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint
> triggering task Source: Custom Source -> Flat Map -> Map -> Map -> Sink:
> flink-conncetors-kafka (1/1) of job 7d5dfa42776d679eb240fa833444bc22 is not
> in state RUNNING but DEPLOYING instead. Aborting checkpoint.
> 2019-10-21 09:52:30,389 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka
> (1/1) (05b67b88bf084a9c9884201d224768b4) switched from DEPLOYING to RUNNING.
> 2019-10-21 09:52:31,054 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 1 @ 1571622751054 for job 7d5dfa42776d679eb240fa833444bc22.
> 2019-10-21 09:52:31,488 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> - Implementation error: Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
>         at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
>         at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>         at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2019-10-21 09:52:34,467 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> - Implementation error: Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
>         at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
>         at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>         at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> ========================================
> 相关代码:
>                  final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(1000);
>         Properties properties1 = new Properties();
>         properties1.setProperty("bootstrap.servers", "xxx:9092");
>         properties1.setProperty("group.id", "test");
>
>         System.out.println("333333333333333333333333");
>
>         FlinkKafkaConsumer010<String> myConsumer = new
> FlinkKafkaConsumer010<String>("bigeyeservertopictest", new
> SimpleStringSchema(), properties1);
>
>         DataStream<String> stream = env.addSource(myConsumer);
> //        stream.print();
>         DataStream<String> student = stream.flatMap(new
> StringToJsonObject()).map(value -> value.toJSONArraytoString())
>                 .map(new MapFunction<JSONArray, String>() {
>                     public String map(JSONArray jsonArray) throws
> Exception{
>                         String source= "bigeye";
>                         JSONObject jsonObject1 = new JSONObject();
>                         String result = null;
> //                        readRedis redis = new readRedis();
>                         for(int i=0;i<jsonArray.size();i++){
> //                            log.info("1111111111111111");
>                             String jsonObjectStr = jsonArray.getString(i);
>                             String timeValue =
> JSONObject.parseObject(jsonObjectStr).getString("time");
>                             String valueValue =
> JSONObject.parseObject(jsonObjectStr).getString("value");
>                             String agentHostnameValue =
> JSONObject.parseObject(jsonObjectStr).getString("agentHostname");
>                             String streamIDValue =
> JSONObject.parseObject(jsonObjectStr).getString("streamID");
> //                            Map<String, String> redisMap =
> redis.getInfo(agentHostnameValue,streamIDValue,source);
>                             // 创建opentsdb数据对象
>                             Map<String, Object> opentsdbValue = new
> HashMap<>();
>                             opentsdbValue.put("metric", streamIDValue);
>                             opentsdbValue.put("timestamp", timeValue);
>                             opentsdbValue.put("value", valueValue);
>                             Gson gson = new Gson();
> //                            opentsdbValue.put("tags",
> gson.fromJson((String) redisMap.get("tags"), Map.class));
>
> //
> jsonObject1.put(flag,timeValue+"+"+streamIDValue);
>                             jsonObject1.put("BigeyeValue",opentsdbValue);
> //                            redis.close();
>
>                         }
>
>                         result = jsonObject1.toString();
>                         return result;
>                     }
>                 })
>                 ;
>         student.addSink(new
> FlinkKafkaProducer010<String>("xxxx:9092","test", new
> SimpleStringSchema())).name("flink-kafka");
>
> 万分感谢!
>
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink提jar包部署到Yarn上报错

zhangjunjie1130@163.com
您好:
    非常感谢大家的回复帮忙。我看到TM的日志了(如下),而且TM根本就没有跑起来:
    2019-10-21 15:39:33,192 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file directory '/opt/appdata/disk24/yarn/nm/usercache/flink/appcache/application_1562833757356_1956': total 3723 GB, usable 3722 GB (99.97% usable)
2019-10-21 15:39:33,389 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 294 MB for network buffer pool (number of memory segments: 9425, bytes per segment: 32768).
2019-10-21 15:39:33,445 INFO  org.apache.flink.runtime.query.QueryableStateUtils            - Could not load Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.
2019-10-21 15:39:33,446 INFO  org.apache.flink.runtime.query.QueryableStateUtils            - Could not load Queryable State Server. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.
2019-10-21 15:39:33,447 INFO  org.apache.flink.runtime.io.network.NetworkEnvironment        - Starting the network environment and its components.
2019-10-21 15:39:33,484 INFO  org.apache.flink.runtime.io.network.netty.NettyClient         - Successful initialization (took 35 ms).
2019-10-21 15:39:33,526 INFO  org.apache.flink.runtime.io.network.netty.NettyServer         - Successful initialization (took 41 ms). Listening on SocketAddress /10.221.124.46:43869.
2019-10-21 15:39:33,527 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Limiting managed memory to 0.7 of the currently free heap space (1839 MB), memory will be allocated lazily.
2019-10-21 15:39:33,531 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk01/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-1eed2f21-58ad-4f92-8e33-c87d2b9cc519 for spill files.
2019-10-21 15:39:33,531 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk02/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-e917619a-2c43-403b-b2ad-c51098d3ac55 for spill files.
2019-10-21 15:39:33,531 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk03/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-8f7ca801-647b-421e-a28a-00da5c62c213 for spill files.
2019-10-21 15:39:33,531 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk04/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-d6b4a21c-5a8b-4f5f-856d-4dae41e0ad30 for spill files.
2019-10-21 15:39:33,532 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk05/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-71e74e70-a582-47cb-8481-15b821ab51f8 for spill files.
2019-10-21 15:39:33,532 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk06/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-78be3561-3d4a-46ce-ae71-41efde4c08d1 for spill files.
2019-10-21 15:39:33,532 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk07/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-54d336b9-d697-4234-a5ba-a3ecf2236a5b for spill files.
2019-10-21 15:39:33,532 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk08/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-0ee42053-dd58-431a-981e-2199ffe363bc for spill files.
2019-10-21 15:39:33,532 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk09/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-1b18f7b3-059f-4c9a-92bc-8c55ac972da8 for spill files.
2019-10-21 15:39:33,532 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk10/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-6d3b5a27-6733-4001-b5f5-3556b24a8929 for spill files.
2019-10-21 15:39:33,533 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk11/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-2beee747-7161-44c3-aecf-fcf17fd85e97 for spill files.
2019-10-21 15:39:33,533 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk12/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-6b769ce1-a9ab-47cc-a92f-d4e3602db491 for spill files.
2019-10-21 15:39:33,533 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk13/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-48302e87-6887-4b92-9bbd-0e813878fe2e for spill files.
2019-10-21 15:39:33,533 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk14/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-5fd30eb2-0854-462c-bff0-42fad5928ab7 for spill files.
2019-10-21 15:39:33,533 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk15/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-ae3d47e4-9155-4d07-8f0e-a4798ca2281d for spill files.
2019-10-21 15:39:33,534 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk16/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-a8bf30d0-c617-4c8a-8672-ab5636341fb3 for spill files.
2019-10-21 15:39:33,534 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk17/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-1271499d-63c2-45fc-bbc4-c3f092b39ccd for spill files.
2019-10-21 15:39:33,534 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk18/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-035c1b6c-933f-4b9a-a4d2-c884798b4df7 for spill files.
2019-10-21 15:39:33,534 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk19/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-0627e795-a79c-4a5d-9a8f-6f3189323490 for spill files.
2019-10-21 15:39:33,534 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk20/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-f5f71ab0-1362-4f06-b2b3-a27f9095cf7f for spill files.
2019-10-21 15:39:33,535 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk21/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-7b2a6d98-bce5-4fab-b482-a238e64bd897 for spill files.
2019-10-21 15:39:33,535 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk22/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-6a075ee4-a5e6-4950-8a57-90f8ca1f063f for spill files.
2019-10-21 15:39:33,535 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk23/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-d323bf60-4129-475c-ae73-494e44570e1c for spill files.
2019-10-21 15:39:33,535 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /opt/appdata/disk24/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-io-e19de3fb-ae65-47d9-a4dd-4316151cb67a for spill files.
2019-10-21 15:39:33,598 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
2019-10-21 15:39:33,609 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 .
2019-10-21 15:39:33,623 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Start job leader service.
2019-10-21 15:39:33,623 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:38079/user/resourcemanager(00000000000000000000000000000000).
2019-10-21 15:39:33,624 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk01/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-5498311b-ec06-4806-99e3-ab4fc1a80b22
2019-10-21 15:39:33,624 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk02/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-d4dd8e64-94f1-4994-8255-59c47f6e9f81
2019-10-21 15:39:33,624 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk03/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-e82c4569-039b-4e9f-af99-676626233ef7
2019-10-21 15:39:33,624 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk04/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-29bdc137-d509-4935-b02d-166c418f4389
2019-10-21 15:39:33,624 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk05/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-b0f12bab-1241-422c-9927-21e3d31dc6e1
2019-10-21 15:39:33,624 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk06/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-c9be4b9b-91be-4138-8117-d72370283804
2019-10-21 15:39:33,625 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk07/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-bd836961-21b3-4f79-84a7-be392a9de752
2019-10-21 15:39:33,625 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk08/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-a0c776dd-68da-4b25-9cf4-0530e464c9f3
2019-10-21 15:39:33,625 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk09/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-de986724-8bcf-458c-aec5-012abd0e0b6f
2019-10-21 15:39:33,625 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk10/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-9bcddf05-d8d8-4709-bef4-7483f62c9d03
2019-10-21 15:39:33,625 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk11/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-7e03fafc-edaa-4ccc-b936-a3188642dc08
2019-10-21 15:39:33,625 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk12/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-33ae11b3-3259-4e18-8db4-42dbedf3c8c6
2019-10-21 15:39:33,626 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk13/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-57dce04a-402e-4840-bdde-164701995aec
2019-10-21 15:39:33,626 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk14/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-05211842-51b3-447a-b586-5d1c4d03644a
2019-10-21 15:39:33,626 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk15/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-7b8b2314-f2f4-4f0e-a49e-7eff9276886a
2019-10-21 15:39:33,626 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk16/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-185841ad-5b1b-4601-a83f-55664cc27cbe
2019-10-21 15:39:33,626 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk17/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-e71950e1-bf91-4d7e-96d7-7590547e3ec4
2019-10-21 15:39:33,626 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk18/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-99f4927a-4285-4962-b919-041673b99995
2019-10-21 15:39:33,627 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk19/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-25e3472a-b60a-4def-82d9-b75667ceb6f1
2019-10-21 15:39:33,627 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk20/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-e586a3c3-0fe7-4e64-b94e-826bcf7ddff1
2019-10-21 15:39:33,627 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk21/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-4957010f-4771-404c-be63-7ffb70ad020c
2019-10-21 15:39:33,627 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk22/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-9137dfa2-c3fb-43ae-93a6-d4c51440495b
2019-10-21 15:39:33,627 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk23/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-487e0ed7-77e5-49a0-a1cf-2900fa4f066a
2019-10-21 15:39:33,627 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /opt/appdata/disk24/yarn/nm/usercache/flink/appcache/application_1562833757356_1956/flink-dist-cache-49d2866b-6b8b-48b3-bf40-20d28ccab1a2
2019-10-21 15:39:33,812 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-10-21 15:39:33,812 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-10-21 15:39:33,859 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Successful registration at resource manager akka.tcp://[hidden email]:38079/user/resourcemanager under registration id 2f7bb48440ad95aeec8a05357b618733.
2019-10-21 15:39:33,876 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Receive slot request AllocationID{6203d66760d158ff8a22d86a4e37d309} for job 968350be49f51fee2e660184572ac618 from resource manager with leader id 00000000000000000000000000000000.
2019-10-21 15:39:33,877 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Allocated slot for AllocationID{6203d66760d158ff8a22d86a4e37d309}.
2019-10-21 15:39:33,877 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Add job 968350be49f51fee2e660184572ac618 for job leader monitoring.
2019-10-21 15:39:33,878 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Try to register at job manager akka.tcp://[hidden email]:38079/user/jobmanager_0 with leader id 00000000-0000-0000-0000-000000000000.
2019-10-21 15:39:33,891 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Resolved JobManager address, beginning registration
2019-10-21 15:39:33,892 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Registration at JobManager attempt 1 (timeout=100ms)
2019-10-21 15:39:33,906 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Successful registration at job manager akka.tcp://[hidden email]:38079/user/jobmanager_0 for job 968350be49f51fee2e660184572ac618.
2019-10-21 15:39:33,907 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Establish JobManager connection for job 968350be49f51fee2e660184572ac618.
2019-10-21 15:39:33,910 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Offer reserved slots to the leader of job 968350be49f51fee2e660184572ac618.
2019-10-21 15:39:33,921 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Activate slot AllocationID{6203d66760d158ff8a22d86a4e37d309}.
2019-10-21 15:39:33,947 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received task Source: Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1).
2019-10-21 15:39:33,947 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1) (6ada18dcebd7025ec5013aeecef2c3c3) switched from CREATED to DEPLOYING.
2019-10-21 15:39:33,947 INFO  org.apache.flink.runtime.taskmanager.Task                     - Creating FileSystem stream leak safety net for task Source: Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1) (6ada18dcebd7025ec5013aeecef2c3c3) [DEPLOYING]
2019-10-21 15:39:33,950 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1) (6ada18dcebd7025ec5013aeecef2c3c3) [DEPLOYING].
2019-10-21 15:39:33,953 INFO  org.apache.flink.runtime.blob.BlobClient                      - Downloading 968350be49f51fee2e660184572ac618/p-e063684ba81f692adfcda81a3e9c67dbfbd9a154-145a4657337e8272759a7eed0332c1e8 from p.com/10.221.124.46:33338
2019-10-21 15:39:35,256 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1) (6ada18dcebd7025ec5013aeecef2c3c3) [DEPLOYING].
2019-10-21 15:39:35,268 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1) (6ada18dcebd7025ec5013aeecef2c3c3) switched from DEPLOYING to RUNNING.
2019-10-21 15:39:35,276 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
2019-10-21 15:39:35,441 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does not contain a setter for field topic
2019-10-21 15:39:35,441 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2019-10-21 15:39:35,444 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
2019-10-21 15:39:35,456 INFO  org.apache.kafka.clients.producer.ProducerConfig              - ProducerConfig values:
   
===================
        其中,我认为是“.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields”这个原因,但是还是在google没有找到具体的解决办法,在钉钉群里尝试问一次也没有回信。
        同时可能是序列化的问题,所以将serialVersionUID 加上【注:本地跑依旧正常,但是提交到Yarn集群上就报错!】,:
public class KafkaFlinkKafkaTestFour implements Serializable{
    private static final long serialVersionUID = 5624696288943005947L;
    public static Log log = LogFactory.getLog(KafkaFlinkKafkaTestFour.class);
    public static void main(String[] args) throws Exception  {
        System.out.println("11111111111111111111111");
//        readRedis redis = new readRedis();
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);

        Properties properties1 = new Properties();
        properties1.setProperty("bootstrap.servers", "xxx:9092, xxx:9092, xxx:9092");
        properties1.setProperty("group.id", "test_bigdata");

        System.out.println("333333333333333333333333");

        final FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>("bigeyeservertopictest", new SimpleStringSchema(), properties1);
//        FlinkKafkaProducer010<String> SinkProducer = new FlinkKafkaProducer010<String>("test", new SimpleStringSchema(), properties2);

        DataStream<String> stream = env.addSource(myConsumer);
//        stream.print();
        DataStream<String> student = stream.flatMap(new StringToJsonObject()).map(value -> value.toJSONArraytoString())
                .map(new MapFunction<JSONArray, String>() {
                    public String map(JSONArray jsonArray) throws Exception{
                        String source= "bigeye";
                        JSONObject jsonObject1 = new JSONObject();
                        String result = null;
//                        readRedis redis = new readRedis();
                        for(int i=0;i<jsonArray.size();i++){
//                            log.info("1111111111111111");
                            String jsonObjectStr = jsonArray.getString(i);
                            String timeValue = JSONObject.parseObject(jsonObjectStr).getString("time");
                            String valueValue = JSONObject.parseObject(jsonObjectStr).getString("value");
                            String agentHostnameValue = JSONObject.parseObject(jsonObjectStr).getString("agentHostname");
                            String streamIDValue = JSONObject.parseObject(jsonObjectStr).getString("streamID");
//                            Map<String, String> redisMap = redis.getInfo(agentHostnameValue,streamIDValue,source);
                            // 创建opentsdb数据对象
                            Map<String, Object> opentsdbValue = new HashMap<>();
                            opentsdbValue.put("metric", streamIDValue);
                            opentsdbValue.put("timestamp", timeValue);
                            opentsdbValue.put("value", valueValue);
                            Gson gson = new Gson();
//                            opentsdbValue.put("tags", gson.fromJson((String) redisMap.get("tags"), Map.class));

//                            jsonObject1.put(flag,timeValue+"+"+streamIDValue);
                            jsonObject1.put("BigeyeValue",opentsdbValue);
//                            redis.close();

                        }

                        result = jsonObject1.toString();
                        return result;
                    }
                })
                ;
        student.print();
        student.addSink(new FlinkKafkaProducer010<String>("xxxx:9092","test", new SimpleStringSchema())).name("flink-conncetors-kafka");
//                .addSink(SinkProducer);
//                .print();
        System.out.println("4444444444444444444444444");
        env.execute("WordCount from Kafka data");
=================
 感谢!      
       



[hidden email]
 
发件人: Xintong Song
发送时间: 2019-10-21 12:14
收件人: user-zh
主题: Re: Flink提jar包部署到Yarn上报错
看报错是TM挂了,具体原因需要分析TM日志,有可能是上面答复中相同的问题,也有可能是其他原因造成的。
 
 
Thank you~
 
Xintong Song
 
 
 
On Mon, Oct 21, 2019 at 11:36 AM [hidden email] <[hidden email]> wrote:
 

> 参考:
> http://mail-archives.apache.org/mod_mbox/flink-user-zh/201905.mbox/%3C2019052911134683852017@...%3E
>
>
>
> [hidden email]
>
> 发件人: [hidden email]
> 发送时间: 2019-10-21 11:05
> 收件人: user-zh
> 抄送: zhangjunjie1130
> 主题: Flink提jar包部署到Yarn上报错
> 您好:
>         我的程序是从kafka取数,然后Flink处理后有写入kafka;本地运行正常,上传到Yarn集群上报错,
>        Flink版本是:1.7.2
>
> 错误是:
> 2019-10-21 09:52:30,054 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint
> triggering task Source: Custom Source -> Flat Map -> Map -> Map -> Sink:
> flink-conncetors-kafka (1/1) of job 7d5dfa42776d679eb240fa833444bc22 is not
> in state RUNNING but DEPLOYING instead. Aborting checkpoint.
> 2019-10-21 09:52:30,389 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka
> (1/1) (05b67b88bf084a9c9884201d224768b4) switched from DEPLOYING to RUNNING.
> 2019-10-21 09:52:31,054 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 1 @ 1571622751054 for job 7d5dfa42776d679eb240fa833444bc22.
> 2019-10-21 09:52:31,488 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> - Implementation error: Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
>         at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
>         at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>         at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2019-10-21 09:52:34,467 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> - Implementation error: Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
>         at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
>         at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>         at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> ========================================
> 相关代码:
>                  final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(1000);
>         Properties properties1 = new Properties();
>         properties1.setProperty("bootstrap.servers", "xxx:9092");
>         properties1.setProperty("group.id", "test");
>
>         System.out.println("333333333333333333333333");
>
>         FlinkKafkaConsumer010<String> myConsumer = new
> FlinkKafkaConsumer010<String>("bigeyeservertopictest", new
> SimpleStringSchema(), properties1);
>
>         DataStream<String> stream = env.addSource(myConsumer);
> //        stream.print();
>         DataStream<String> student = stream.flatMap(new
> StringToJsonObject()).map(value -> value.toJSONArraytoString())
>                 .map(new MapFunction<JSONArray, String>() {
>                     public String map(JSONArray jsonArray) throws
> Exception{
>                         String source= "bigeye";
>                         JSONObject jsonObject1 = new JSONObject();
>                         String result = null;
> //                        readRedis redis = new readRedis();
>                         for(int i=0;i<jsonArray.size();i++){
> //                            log.info("1111111111111111");
>                             String jsonObjectStr = jsonArray.getString(i);
>                             String timeValue =
> JSONObject.parseObject(jsonObjectStr).getString("time");
>                             String valueValue =
> JSONObject.parseObject(jsonObjectStr).getString("value");
>                             String agentHostnameValue =
> JSONObject.parseObject(jsonObjectStr).getString("agentHostname");
>                             String streamIDValue =
> JSONObject.parseObject(jsonObjectStr).getString("streamID");
> //                            Map<String, String> redisMap =
> redis.getInfo(agentHostnameValue,streamIDValue,source);
>                             // 创建opentsdb数据对象
>                             Map<String, Object> opentsdbValue = new
> HashMap<>();
>                             opentsdbValue.put("metric", streamIDValue);
>                             opentsdbValue.put("timestamp", timeValue);
>                             opentsdbValue.put("value", valueValue);
>                             Gson gson = new Gson();
> //                            opentsdbValue.put("tags",
> gson.fromJson((String) redisMap.get("tags"), Map.class));
>
> //
> jsonObject1.put(flag,timeValue+"+"+streamIDValue);
>                             jsonObject1.put("BigeyeValue",opentsdbValue);
> //                            redis.close();
>
>                         }
>
>                         result = jsonObject1.toString();
>                         return result;
>                     }
>                 })
>                 ;
>         student.addSink(new
> FlinkKafkaProducer010<String>("xxxx:9092","test", new
> SimpleStringSchema())).name("flink-kafka");
>
> 万分感谢!
>
>
>
> [hidden email]
>