Login  Register

Flink 的 log 文件夹下产生了 44G 日志

classic Classic list List threaded Threaded
10 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Flink 的 log 文件夹下产生了 44G 日志

Henry
22 posts
 大家好,之前那个报错图片大家没看到,重新弄一下。
报错图片链接:
https://img-blog.csdnimg.cn/20190719092540880.png
https://img-blog.csdnimg.cn/20190719092848500.png


我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
21 posts
Hi Henry,

这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
的源码让它出错后关闭或者进行其它处理...

Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:

>  大家好,之前那个报错图片大家没看到,重新弄一下。
> 报错图片链接:
> https://img-blog.csdnimg.cn/20190719092540880.png
> https://img-blog.csdnimg.cn/20190719092848500.png
>
>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re:Re: Flink 的 log 文件夹下产生了 44G 日志

Henry
22 posts


你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。





在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:

>Hi Henry,
>
>这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
>的源码让它出错后关闭或者进行其它处理...
>
>Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
>
>>  大家好,之前那个报错图片大家没看到,重新弄一下。
>> 报错图片链接:
>> https://img-blog.csdnimg.cn/20190719092540880.png
>> https://img-blog.csdnimg.cn/20190719092848500.png
>>
>>
>> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
>> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
21 posts
Hi Henry

你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 log
等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...

Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:

>
>
>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>
>
>
>
>
> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
> >Hi Henry,
> >
> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
> >的源码让它出错后关闭或者进行其它处理...
> >
> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
> >
> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> 报错图片链接:
> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >>
> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Biao Liu
34 posts
In reply to this post by Henry
最根本的解法当然是去掉打日志的地方,这 source 不是 Flink 内置的,Flink 当然不能控制你们自定义 source 的行为。

你可以考虑自己改一下 log4j.properties,手动关掉这个 logger, Flink 内置的 log4j.properties 里有
example,参考着改一下

log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
file
改成 log4j.logger.com.JavaCustoms.FlinkJMSStreamSource=OFF, file

但是这明显是个 ERROR,最好还是解决一下,要不就是掩耳盗铃啊


Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:

>
>
>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>
>
>
>
>
> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
> >Hi Henry,
> >
> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
> >的源码让它出错后关闭或者进行其它处理...
> >
> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
> >
> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> 报错图片链接:
> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >>
> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re:Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Henry
22 posts
In reply to this post by Caizhi Weng


谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source 代码,但是里面没有写log里报的哪个错的提示。
package com.JavaCustoms;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

public class FlinkJMSStreamSource extends RichSourceFunction<String> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(FlinkJMSStreamSource.class);
private transient volatile boolean running;
private transient MessageConsumer consumer;
private transient Connection connection;

// topic name
private static final String topicName = "flink_mypay";
// tcp str
private static final String tcpStr = "tcp://server.mn:61616";
// 持久订阅的id标识
private static final String clientId = "flink_hz";
// Subscription name
private static final String subscriptionName = "flink_topic_mypay";

private void init() throws JMSException {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcpStr);

// Create a Connection
connection = connectionFactory.createConnection();
connection.setClientID(clientId);
//    connection.start();

      // Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create a MessageConsumer from the Session to the Topic or Queue
Topic topic = session.createTopic(topicName);
consumer = session.createDurableSubscriber(topic, subscriptionName);
connection.start();
   }

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
running = true;
      init();
   }

@Override
public void run(SourceContext<String> ctx) {
// this source never completes

while (running) {
try {
            Message message = consumer.receive();
            BytesMessage bytesMessage = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(bytes);

String text = new String(bytes);
            ctx.collect(text);

         } catch (JMSException e) {
LOG.error(e.getLocalizedMessage());
running = true;
         }
      }
try {
         close();
      } catch (Exception e) {
LOG.error(e.getMessage(), e);
      }
   }

@Override
public void cancel() {
running = false;
   }

@Override
public void close() throws Exception {
LOG.info("Closing");
try {
connection.close();
      } catch (JMSException e) {
throw new RuntimeException("Error while closing ActiveMQ connection ", e);
      }
   }
}










在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道:

>Hi Henry
>
>你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
>level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 log
>等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
>
>Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:
>
>>
>>
>>
>> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>>
>>
>>
>>
>>
>> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
>> >Hi Henry,
>> >
>> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
>> >的源码让它出错后关闭或者进行其它处理...
>> >
>> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
>> >
>> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
>> >> 报错图片链接:
>> >> https://img-blog.csdnimg.cn/20190719092540880.png
>> >> https://img-blog.csdnimg.cn/20190719092848500.png
>> >>
>> >>
>> >>
>> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
>> >>
>> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
21 posts
Hi Henry,

LOG.error(e.getLocalizedMessage());
running = true;

这里写错了吧,应该是 running = false;

Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道:

>
>
> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source
> 代码,但是里面没有写log里报的哪个错的提示。
> package com.JavaCustoms;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import javax.jms.*;
>
> public class FlinkJMSStreamSource extends RichSourceFunction<String> {
> private static final long serialVersionUID = 1L;
> private static final Logger LOG =
> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
> private transient volatile boolean running;
> private transient MessageConsumer consumer;
> private transient Connection connection;
>
> // topic name
> private static final String topicName = "flink_mypay";
> // tcp str
> private static final String tcpStr = "tcp://server.mn:61616";
> // 持久订阅的id标识
> private static final String clientId = "flink_hz";
> // Subscription name
> private static final String subscriptionName = "flink_topic_mypay";
>
> private void init() throws JMSException {
> // Create a ConnectionFactory
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(tcpStr);
>
> // Create a Connection
> connection = connectionFactory.createConnection();
> connection.setClientID(clientId);
> //    connection.start();
>
>       // Create a Session
> Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
> // Create a MessageConsumer from the Session to the Topic or Queue
> Topic topic = session.createTopic(topicName);
> consumer = session.createDurableSubscriber(topic, subscriptionName);
> connection.start();
>    }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> running = true;
>       init();
>    }
>
> @Override
> public void run(SourceContext<String> ctx) {
> // this source never completes
>
> while (running) {
> try {
>             Message message = consumer.receive();
>             BytesMessage bytesMessage = (BytesMessage) message;
> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
>             bytesMessage.readBytes(bytes);
>
> String text = new String(bytes);
>             ctx.collect(text);
>
>          } catch (JMSException e) {
> LOG.error(e.getLocalizedMessage());
> running = true;
>          }
>       }
> try {
>          close();
>       } catch (Exception e) {
> LOG.error(e.getMessage(), e);
>       }
>    }
>
> @Override
> public void cancel() {
> running = false;
>    }
>
> @Override
> public void close() throws Exception {
> LOG.info("Closing");
> try {
> connection.close();
>       } catch (JMSException e) {
> throw new RuntimeException("Error while closing ActiveMQ connection ", e);
>       }
>    }
> }
>
>
>
>
>
>
>
>
>
>
> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道:
> >Hi Henry
> >
> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把
> log
> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
> >
> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:
> >
> >>
> >>
> >>
> >>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
> >>
> >>
> >>
> >>
> >>
> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
> >> >Hi Henry,
> >> >
> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
> >> >的源码让它出错后关闭或者进行其它处理...
> >> >
> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
> >> >
> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> >> 报错图片链接:
> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >> >>
> >> >>
> >> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >> >>
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
> >>
>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re:Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Henry
22 posts


啊!我想起来了,之前忘了因为啥原因了,为了方便调试把  running = false; 改成了 running = true;
感谢感谢!  但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它,
怎么会跳转这里呢?





在 2019-07-20 03:23:28,"Caizhi Weng" <[hidden email]> 写道:

>Hi Henry,
>
>LOG.error(e.getLocalizedMessage());
>running = true;
>
>这里写错了吧,应该是 running = false;
>
>Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道:
>
>>
>>
>> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source
>> 代码,但是里面没有写log里报的哪个错的提示。
>> package com.JavaCustoms;
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import javax.jms.*;
>>
>> public class FlinkJMSStreamSource extends RichSourceFunction<String> {
>> private static final long serialVersionUID = 1L;
>> private static final Logger LOG =
>> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
>> private transient volatile boolean running;
>> private transient MessageConsumer consumer;
>> private transient Connection connection;
>>
>> // topic name
>> private static final String topicName = "flink_mypay";
>> // tcp str
>> private static final String tcpStr = "tcp://server.mn:61616";
>> // 持久订阅的id标识
>> private static final String clientId = "flink_hz";
>> // Subscription name
>> private static final String subscriptionName = "flink_topic_mypay";
>>
>> private void init() throws JMSException {
>> // Create a ConnectionFactory
>> ActiveMQConnectionFactory connectionFactory = new
>> ActiveMQConnectionFactory(tcpStr);
>>
>> // Create a Connection
>> connection = connectionFactory.createConnection();
>> connection.setClientID(clientId);
>> //    connection.start();
>>
>>       // Create a Session
>> Session session = connection.createSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>>
>> // Create a MessageConsumer from the Session to the Topic or Queue
>> Topic topic = session.createTopic(topicName);
>> consumer = session.createDurableSubscriber(topic, subscriptionName);
>> connection.start();
>>    }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> super.open(parameters);
>> running = true;
>>       init();
>>    }
>>
>> @Override
>> public void run(SourceContext<String> ctx) {
>> // this source never completes
>>
>> while (running) {
>> try {
>>             Message message = consumer.receive();
>>             BytesMessage bytesMessage = (BytesMessage) message;
>> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
>>             bytesMessage.readBytes(bytes);
>>
>> String text = new String(bytes);
>>             ctx.collect(text);
>>
>>          } catch (JMSException e) {
>> LOG.error(e.getLocalizedMessage());
>> running = true;
>>          }
>>       }
>> try {
>>          close();
>>       } catch (Exception e) {
>> LOG.error(e.getMessage(), e);
>>       }
>>    }
>>
>> @Override
>> public void cancel() {
>> running = false;
>>    }
>>
>> @Override
>> public void close() throws Exception {
>> LOG.info("Closing");
>> try {
>> connection.close();
>>       } catch (JMSException e) {
>> throw new RuntimeException("Error while closing ActiveMQ connection ", e);
>>       }
>>    }
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道:
>> >Hi Henry
>> >
>> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
>> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把
>> log
>> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
>> >
>> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:
>> >
>> >>
>> >>
>> >>
>> >>
>> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
>> >> >Hi Henry,
>> >> >
>> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
>> >> >的源码让它出错后关闭或者进行其它处理...
>> >> >
>> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
>> >> >
>> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
>> >> >> 报错图片链接:
>> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
>> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
>> >> >>
>> >> >>
>> >> >>
>> >>
>> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
>> >> >>
>> >>
>> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>> >>
>>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
21 posts
Hi Henry,

你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running = true...

Henry <[hidden email]> 于2019年7月20日周六 下午9:32写道:

>
>
> 啊!我想起来了,之前忘了因为啥原因了,为了方便调试把  running = false; 改成了 running = true;
> 感谢感谢!  但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它,
> 怎么会跳转这里呢?
>
>
>
>
>
> 在 2019-07-20 03:23:28,"Caizhi Weng" <[hidden email]> 写道:
> >Hi Henry,
> >
> >LOG.error(e.getLocalizedMessage());
> >running = true;
> >
> >这里写错了吧,应该是 running = false;
> >
> >Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道:
> >
> >>
> >>
> >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source
> >> 代码,但是里面没有写log里报的哪个错的提示。
> >> package com.JavaCustoms;
> >> import org.apache.activemq.ActiveMQConnectionFactory;
> >> import org.apache.flink.configuration.Configuration;
> >> import
> org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> >> import org.slf4j.Logger;
> >> import org.slf4j.LoggerFactory;
> >>
> >> import javax.jms.*;
> >>
> >> public class FlinkJMSStreamSource extends RichSourceFunction<String> {
> >> private static final long serialVersionUID = 1L;
> >> private static final Logger LOG =
> >> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
> >> private transient volatile boolean running;
> >> private transient MessageConsumer consumer;
> >> private transient Connection connection;
> >>
> >> // topic name
> >> private static final String topicName = "flink_mypay";
> >> // tcp str
> >> private static final String tcpStr = "tcp://server.mn:61616";
> >> // 持久订阅的id标识
> >> private static final String clientId = "flink_hz";
> >> // Subscription name
> >> private static final String subscriptionName = "flink_topic_mypay";
> >>
> >> private void init() throws JMSException {
> >> // Create a ConnectionFactory
> >> ActiveMQConnectionFactory connectionFactory = new
> >> ActiveMQConnectionFactory(tcpStr);
> >>
> >> // Create a Connection
> >> connection = connectionFactory.createConnection();
> >> connection.setClientID(clientId);
> >> //    connection.start();
> >>
> >>       // Create a Session
> >> Session session = connection.createSession(false,
> >> Session.AUTO_ACKNOWLEDGE);
> >>
> >> // Create a MessageConsumer from the Session to the Topic or Queue
> >> Topic topic = session.createTopic(topicName);
> >> consumer = session.createDurableSubscriber(topic, subscriptionName);
> >> connection.start();
> >>    }
> >>
> >> @Override
> >> public void open(Configuration parameters) throws Exception {
> >> super.open(parameters);
> >> running = true;
> >>       init();
> >>    }
> >>
> >> @Override
> >> public void run(SourceContext<String> ctx) {
> >> // this source never completes
> >>
> >> while (running) {
> >> try {
> >>             Message message = consumer.receive();
> >>             BytesMessage bytesMessage = (BytesMessage) message;
> >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
> >>             bytesMessage.readBytes(bytes);
> >>
> >> String text = new String(bytes);
> >>             ctx.collect(text);
> >>
> >>          } catch (JMSException e) {
> >> LOG.error(e.getLocalizedMessage());
> >> running = true;
> >>          }
> >>       }
> >> try {
> >>          close();
> >>       } catch (Exception e) {
> >> LOG.error(e.getMessage(), e);
> >>       }
> >>    }
> >>
> >> @Override
> >> public void cancel() {
> >> running = false;
> >>    }
> >>
> >> @Override
> >> public void close() throws Exception {
> >> LOG.info("Closing");
> >> try {
> >> connection.close();
> >>       } catch (JMSException e) {
> >> throw new RuntimeException("Error while closing ActiveMQ connection ",
> e);
> >>       }
> >>    }
> >> }
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道:
> >> >Hi Henry
> >> >
> >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
> >> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把
> >> log
> >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
> >> >
> >> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:
> >> >
> >> >>
> >> >>
> >> >>
> >> >>
> >>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
> >> >> >Hi Henry,
> >> >> >
> >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
> >> >> >的源码让它出错后关闭或者进行其它处理...
> >> >> >
> >> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
> >> >> >
> >> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> >> >> 报错图片链接:
> >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >> >> >>
> >> >>
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
> >> >>
> >>
>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re:Re: Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Henry
22 posts



看到啦,谢谢啦。





在 2019-07-21 19:16:36,"Caizhi Weng" <[hidden email]> 写道:

>Hi Henry,
>
>你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running = true...
>
>Henry <[hidden email]> 于2019年7月20日周六 下午9:32写道:
>
>>
>>
>> 啊!我想起来了,之前忘了因为啥原因了,为了方便调试把  running = false; 改成了 running = true;
>> 感谢感谢!  但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它,
>> 怎么会跳转这里呢?
>>
>>
>>
>>
>>
>> 在 2019-07-20 03:23:28,"Caizhi Weng" <[hidden email]> 写道:
>> >Hi Henry,
>> >
>> >LOG.error(e.getLocalizedMessage());
>> >running = true;
>> >
>> >这里写错了吧,应该是 running = false;
>> >
>> >Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道:
>> >
>> >>
>> >>
>> >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source
>> >> 代码,但是里面没有写log里报的哪个错的提示。
>> >> package com.JavaCustoms;
>> >> import org.apache.activemq.ActiveMQConnectionFactory;
>> >> import org.apache.flink.configuration.Configuration;
>> >> import
>> org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>> >> import org.slf4j.Logger;
>> >> import org.slf4j.LoggerFactory;
>> >>
>> >> import javax.jms.*;
>> >>
>> >> public class FlinkJMSStreamSource extends RichSourceFunction<String> {
>> >> private static final long serialVersionUID = 1L;
>> >> private static final Logger LOG =
>> >> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
>> >> private transient volatile boolean running;
>> >> private transient MessageConsumer consumer;
>> >> private transient Connection connection;
>> >>
>> >> // topic name
>> >> private static final String topicName = "flink_mypay";
>> >> // tcp str
>> >> private static final String tcpStr = "tcp://server.mn:61616";
>> >> // 持久订阅的id标识
>> >> private static final String clientId = "flink_hz";
>> >> // Subscription name
>> >> private static final String subscriptionName = "flink_topic_mypay";
>> >>
>> >> private void init() throws JMSException {
>> >> // Create a ConnectionFactory
>> >> ActiveMQConnectionFactory connectionFactory = new
>> >> ActiveMQConnectionFactory(tcpStr);
>> >>
>> >> // Create a Connection
>> >> connection = connectionFactory.createConnection();
>> >> connection.setClientID(clientId);
>> >> //    connection.start();
>> >>
>> >>       // Create a Session
>> >> Session session = connection.createSession(false,
>> >> Session.AUTO_ACKNOWLEDGE);
>> >>
>> >> // Create a MessageConsumer from the Session to the Topic or Queue
>> >> Topic topic = session.createTopic(topicName);
>> >> consumer = session.createDurableSubscriber(topic, subscriptionName);
>> >> connection.start();
>> >>    }
>> >>
>> >> @Override
>> >> public void open(Configuration parameters) throws Exception {
>> >> super.open(parameters);
>> >> running = true;
>> >>       init();
>> >>    }
>> >>
>> >> @Override
>> >> public void run(SourceContext<String> ctx) {
>> >> // this source never completes
>> >>
>> >> while (running) {
>> >> try {
>> >>             Message message = consumer.receive();
>> >>             BytesMessage bytesMessage = (BytesMessage) message;
>> >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
>> >>             bytesMessage.readBytes(bytes);
>> >>
>> >> String text = new String(bytes);
>> >>             ctx.collect(text);
>> >>
>> >>          } catch (JMSException e) {
>> >> LOG.error(e.getLocalizedMessage());
>> >> running = true;
>> >>          }
>> >>       }
>> >> try {
>> >>          close();
>> >>       } catch (Exception e) {
>> >> LOG.error(e.getMessage(), e);
>> >>       }
>> >>    }
>> >>
>> >> @Override
>> >> public void cancel() {
>> >> running = false;
>> >>    }
>> >>
>> >> @Override
>> >> public void close() throws Exception {
>> >> LOG.info("Closing");
>> >> try {
>> >> connection.close();
>> >>       } catch (JMSException e) {
>> >> throw new RuntimeException("Error while closing ActiveMQ connection ",
>> e);
>> >>       }
>> >>    }
>> >> }
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道:
>> >> >Hi Henry
>> >> >
>> >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
>> >> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把
>> >> log
>> >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
>> >> >
>> >> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:
>> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
>> >> >> >Hi Henry,
>> >> >> >
>> >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
>> >> >> >的源码让它出错后关闭或者进行其它处理...
>> >> >> >
>> >> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
>> >> >> >
>> >> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
>> >> >> >> 报错图片链接:
>> >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
>> >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
>> >> >> >>
>> >> >>
>> >>
>> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>> >> >>
>> >>
>>