Flink 的 log 文件夹下产生了 44G 日志

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 的 log 文件夹下产生了 44G 日志

Henry
 大家好,之前那个报错图片大家没看到,重新弄一下。
报错图片链接:
https://img-blog.csdnimg.cn/20190719092540880.png
https://img-blog.csdnimg.cn/20190719092848500.png


我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
Reply | Threaded
Open this post in threaded view
|

Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
Hi Henry,

这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
的源码让它出错后关闭或者进行其它处理...

Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:

>  大家好,之前那个报错图片大家没看到,重新弄一下。
> 报错图片链接:
> https://img-blog.csdnimg.cn/20190719092540880.png
> https://img-blog.csdnimg.cn/20190719092848500.png
>
>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink 的 log 文件夹下产生了 44G 日志

Henry


你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。





在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:

>Hi Henry,
>
>这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
>的源码让它出错后关闭或者进行其它处理...
>
>Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
>
>>  大家好,之前那个报错图片大家没看到,重新弄一下。
>> 报错图片链接:
>> https://img-blog.csdnimg.cn/20190719092540880.png
>> https://img-blog.csdnimg.cn/20190719092848500.png
>>
>>
>> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
>> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
Hi Henry

你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 log
等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...

Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:

>
>
>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>
>
>
>
>
> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
> >Hi Henry,
> >
> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
> >的源码让它出错后关闭或者进行其它处理...
> >
> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
> >
> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> 报错图片链接:
> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >>
> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Biao Liu
In reply to this post by Henry
最根本的解法当然是去掉打日志的地方,这 source 不是 Flink 内置的,Flink 当然不能控制你们自定义 source 的行为。

你可以考虑自己改一下 log4j.properties,手动关掉这个 logger, Flink 内置的 log4j.properties 里有
example,参考着改一下

log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
file
改成 log4j.logger.com.JavaCustoms.FlinkJMSStreamSource=OFF, file

但是这明显是个 ERROR,最好还是解决一下,要不就是掩耳盗铃啊


Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:

>
>
>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>
>
>
>
>
> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
> >Hi Henry,
> >
> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
> >的源码让它出错后关闭或者进行其它处理...
> >
> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
> >
> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> 报错图片链接:
> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >>
> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Henry
In reply to this post by Caizhi Weng


谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source 代码,但是里面没有写log里报的哪个错的提示。
package com.JavaCustoms;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

public class FlinkJMSStreamSource extends RichSourceFunction<String> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(FlinkJMSStreamSource.class);
private transient volatile boolean running;
private transient MessageConsumer consumer;
private transient Connection connection;

// topic name
private static final String topicName = "flink_mypay";
// tcp str
private static final String tcpStr = "tcp://server.mn:61616";
// 持久订阅的id标识
private static final String clientId = "flink_hz";
// Subscription name
private static final String subscriptionName = "flink_topic_mypay";

private void init() throws JMSException {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcpStr);

// Create a Connection
connection = connectionFactory.createConnection();
connection.setClientID(clientId);
//    connection.start();

      // Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create a MessageConsumer from the Session to the Topic or Queue
Topic topic = session.createTopic(topicName);
consumer = session.createDurableSubscriber(topic, subscriptionName);
connection.start();
   }

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
running = true;
      init();
   }

@Override
public void run(SourceContext<String> ctx) {
// this source never completes

while (running) {
try {
            Message message = consumer.receive();
            BytesMessage bytesMessage = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(bytes);

String text = new String(bytes);
            ctx.collect(text);

         } catch (JMSException e) {
LOG.error(e.getLocalizedMessage());
running = true;
         }
      }
try {
         close();
      } catch (Exception e) {
LOG.error(e.getMessage(), e);
      }
   }

@Override
public void cancel() {
running = false;
   }

@Override
public void close() throws Exception {
LOG.info("Closing");
try {
connection.close();
      } catch (JMSException e) {
throw new RuntimeException("Error while closing ActiveMQ connection ", e);
      }
   }
}










在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道:

>Hi Henry
>
>你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
>level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 log
>等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
>
>Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:
>
>>
>>
>>
>> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>>
>>
>>
>>
>>
>> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
>> >Hi Henry,
>> >
>> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
>> >的源码让它出错后关闭或者进行其它处理...
>> >
>> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
>> >
>> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
>> >> 报错图片链接:
>> >> https://img-blog.csdnimg.cn/20190719092540880.png
>> >> https://img-blog.csdnimg.cn/20190719092848500.png
>> >>
>> >>
>> >>
>> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
>> >>
>> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
Hi Henry,

LOG.error(e.getLocalizedMessage());
running = true;

这里写错了吧,应该是 running = false;

Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道:

>
>
> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source
> 代码,但是里面没有写log里报的哪个错的提示。
> package com.JavaCustoms;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import javax.jms.*;
>
> public class FlinkJMSStreamSource extends RichSourceFunction<String> {
> private static final long serialVersionUID = 1L;
> private static final Logger LOG =
> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
> private transient volatile boolean running;
> private transient MessageConsumer consumer;
> private transient Connection connection;
>
> // topic name
> private static final String topicName = "flink_mypay";
> // tcp str
> private static final String tcpStr = "tcp://server.mn:61616";
> // 持久订阅的id标识
> private static final String clientId = "flink_hz";
> // Subscription name
> private static final String subscriptionName = "flink_topic_mypay";
>
> private void init() throws JMSException {
> // Create a ConnectionFactory
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(tcpStr);
>
> // Create a Connection
> connection = connectionFactory.createConnection();
> connection.setClientID(clientId);
> //    connection.start();
>
>       // Create a Session
> Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
> // Create a MessageConsumer from the Session to the Topic or Queue
> Topic topic = session.createTopic(topicName);
> consumer = session.createDurableSubscriber(topic, subscriptionName);
> connection.start();
>    }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> running = true;
>       init();
>    }
>
> @Override
> public void run(SourceContext<String> ctx) {
> // this source never completes
>
> while (running) {
> try {
>             Message message = consumer.receive();
>             BytesMessage bytesMessage = (BytesMessage) message;
> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
>             bytesMessage.readBytes(bytes);
>
> String text = new String(bytes);
>             ctx.collect(text);
>
>          } catch (JMSException e) {
> LOG.error(e.getLocalizedMessage());
> running = true;
>          }
>       }
> try {
>          close();
>       } catch (Exception e) {
> LOG.error(e.getMessage(), e);
>       }
>    }
>
> @Override
> public void cancel() {
> running = false;
>    }
>
> @Override
> public void close() throws Exception {
> LOG.info("Closing");
> try {
> connection.close();
>       } catch (JMSException e) {
> throw new RuntimeException("Error while closing ActiveMQ connection ", e);
>       }
>    }
> }
>
>
>
>
>
>
>
>
>
>
> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道:
> >Hi Henry
> >
> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把
> log
> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
> >
> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:
> >
> >>
> >>
> >>
> >>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
> >>
> >>
> >>
> >>
> >>
> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
> >> >Hi Henry,
> >> >
> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
> >> >的源码让它出错后关闭或者进行其它处理...
> >> >
> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
> >> >
> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> >> 报错图片链接:
> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >> >>
> >> >>
> >> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >> >>
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Henry


啊!我想起来了,之前忘了因为啥原因了,为了方便调试把  running = false; 改成了 running = true;
感谢感谢!  但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它,
怎么会跳转这里呢?





在 2019-07-20 03:23:28,"Caizhi Weng" <[hidden email]> 写道:

>Hi Henry,
>
>LOG.error(e.getLocalizedMessage());
>running = true;
>
>这里写错了吧,应该是 running = false;
>
>Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道:
>
>>
>>
>> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source
>> 代码,但是里面没有写log里报的哪个错的提示。
>> package com.JavaCustoms;
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import javax.jms.*;
>>
>> public class FlinkJMSStreamSource extends RichSourceFunction<String> {
>> private static final long serialVersionUID = 1L;
>> private static final Logger LOG =
>> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
>> private transient volatile boolean running;
>> private transient MessageConsumer consumer;
>> private transient Connection connection;
>>
>> // topic name
>> private static final String topicName = "flink_mypay";
>> // tcp str
>> private static final String tcpStr = "tcp://server.mn:61616";
>> // 持久订阅的id标识
>> private static final String clientId = "flink_hz";
>> // Subscription name
>> private static final String subscriptionName = "flink_topic_mypay";
>>
>> private void init() throws JMSException {
>> // Create a ConnectionFactory
>> ActiveMQConnectionFactory connectionFactory = new
>> ActiveMQConnectionFactory(tcpStr);
>>
>> // Create a Connection
>> connection = connectionFactory.createConnection();
>> connection.setClientID(clientId);
>> //    connection.start();
>>
>>       // Create a Session
>> Session session = connection.createSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>>
>> // Create a MessageConsumer from the Session to the Topic or Queue
>> Topic topic = session.createTopic(topicName);
>> consumer = session.createDurableSubscriber(topic, subscriptionName);
>> connection.start();
>>    }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> super.open(parameters);
>> running = true;
>>       init();
>>    }
>>
>> @Override
>> public void run(SourceContext<String> ctx) {
>> // this source never completes
>>
>> while (running) {
>> try {
>>             Message message = consumer.receive();
>>             BytesMessage bytesMessage = (BytesMessage) message;
>> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
>>             bytesMessage.readBytes(bytes);
>>
>> String text = new String(bytes);
>>             ctx.collect(text);
>>
>>          } catch (JMSException e) {
>> LOG.error(e.getLocalizedMessage());
>> running = true;
>>          }
>>       }
>> try {
>>          close();
>>       } catch (Exception e) {
>> LOG.error(e.getMessage(), e);
>>       }
>>    }
>>
>> @Override
>> public void cancel() {
>> running = false;
>>    }
>>
>> @Override
>> public void close() throws Exception {
>> LOG.info("Closing");
>> try {
>> connection.close();
>>       } catch (JMSException e) {
>> throw new RuntimeException("Error while closing ActiveMQ connection ", e);
>>       }
>>    }
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道:
>> >Hi Henry
>> >
>> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
>> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把
>> log
>> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
>> >
>> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:
>> >
>> >>
>> >>
>> >>
>> >>
>> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
>> >> >Hi Henry,
>> >> >
>> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
>> >> >的源码让它出错后关闭或者进行其它处理...
>> >> >
>> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
>> >> >
>> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
>> >> >> 报错图片链接:
>> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
>> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
>> >> >>
>> >> >>
>> >> >>
>> >>
>> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
>> >> >>
>> >>
>> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
Hi Henry,

你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running = true...

Henry <[hidden email]> 于2019年7月20日周六 下午9:32写道:

>
>
> 啊!我想起来了,之前忘了因为啥原因了,为了方便调试把  running = false; 改成了 running = true;
> 感谢感谢!  但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它,
> 怎么会跳转这里呢?
>
>
>
>
>
> 在 2019-07-20 03:23:28,"Caizhi Weng" <[hidden email]> 写道:
> >Hi Henry,
> >
> >LOG.error(e.getLocalizedMessage());
> >running = true;
> >
> >这里写错了吧,应该是 running = false;
> >
> >Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道:
> >
> >>
> >>
> >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source
> >> 代码,但是里面没有写log里报的哪个错的提示。
> >> package com.JavaCustoms;
> >> import org.apache.activemq.ActiveMQConnectionFactory;
> >> import org.apache.flink.configuration.Configuration;
> >> import
> org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> >> import org.slf4j.Logger;
> >> import org.slf4j.LoggerFactory;
> >>
> >> import javax.jms.*;
> >>
> >> public class FlinkJMSStreamSource extends RichSourceFunction<String> {
> >> private static final long serialVersionUID = 1L;
> >> private static final Logger LOG =
> >> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
> >> private transient volatile boolean running;
> >> private transient MessageConsumer consumer;
> >> private transient Connection connection;
> >>
> >> // topic name
> >> private static final String topicName = "flink_mypay";
> >> // tcp str
> >> private static final String tcpStr = "tcp://server.mn:61616";
> >> // 持久订阅的id标识
> >> private static final String clientId = "flink_hz";
> >> // Subscription name
> >> private static final String subscriptionName = "flink_topic_mypay";
> >>
> >> private void init() throws JMSException {
> >> // Create a ConnectionFactory
> >> ActiveMQConnectionFactory connectionFactory = new
> >> ActiveMQConnectionFactory(tcpStr);
> >>
> >> // Create a Connection
> >> connection = connectionFactory.createConnection();
> >> connection.setClientID(clientId);
> >> //    connection.start();
> >>
> >>       // Create a Session
> >> Session session = connection.createSession(false,
> >> Session.AUTO_ACKNOWLEDGE);
> >>
> >> // Create a MessageConsumer from the Session to the Topic or Queue
> >> Topic topic = session.createTopic(topicName);
> >> consumer = session.createDurableSubscriber(topic, subscriptionName);
> >> connection.start();
> >>    }
> >>
> >> @Override
> >> public void open(Configuration parameters) throws Exception {
> >> super.open(parameters);
> >> running = true;
> >>       init();
> >>    }
> >>
> >> @Override
> >> public void run(SourceContext<String> ctx) {
> >> // this source never completes
> >>
> >> while (running) {
> >> try {
> >>             Message message = consumer.receive();
> >>             BytesMessage bytesMessage = (BytesMessage) message;
> >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
> >>             bytesMessage.readBytes(bytes);
> >>
> >> String text = new String(bytes);
> >>             ctx.collect(text);
> >>
> >>          } catch (JMSException e) {
> >> LOG.error(e.getLocalizedMessage());
> >> running = true;
> >>          }
> >>       }
> >> try {
> >>          close();
> >>       } catch (Exception e) {
> >> LOG.error(e.getMessage(), e);
> >>       }
> >>    }
> >>
> >> @Override
> >> public void cancel() {
> >> running = false;
> >>    }
> >>
> >> @Override
> >> public void close() throws Exception {
> >> LOG.info("Closing");
> >> try {
> >> connection.close();
> >>       } catch (JMSException e) {
> >> throw new RuntimeException("Error while closing ActiveMQ connection ",
> e);
> >>       }
> >>    }
> >> }
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道:
> >> >Hi Henry
> >> >
> >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
> >> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把
> >> log
> >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
> >> >
> >> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:
> >> >
> >> >>
> >> >>
> >> >>
> >> >>
> >>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
> >> >> >Hi Henry,
> >> >> >
> >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
> >> >> >的源码让它出错后关闭或者进行其它处理...
> >> >> >
> >> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
> >> >> >
> >> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> >> >> 报错图片链接:
> >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >> >> >>
> >> >>
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
> >> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Henry



看到啦,谢谢啦。





在 2019-07-21 19:16:36,"Caizhi Weng" <[hidden email]> 写道:

>Hi Henry,
>
>你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running = true...
>
>Henry <[hidden email]> 于2019年7月20日周六 下午9:32写道:
>
>>
>>
>> 啊!我想起来了,之前忘了因为啥原因了,为了方便调试把  running = false; 改成了 running = true;
>> 感谢感谢!  但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它,
>> 怎么会跳转这里呢?
>>
>>
>>
>>
>>
>> 在 2019-07-20 03:23:28,"Caizhi Weng" <[hidden email]> 写道:
>> >Hi Henry,
>> >
>> >LOG.error(e.getLocalizedMessage());
>> >running = true;
>> >
>> >这里写错了吧,应该是 running = false;
>> >
>> >Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道:
>> >
>> >>
>> >>
>> >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source
>> >> 代码,但是里面没有写log里报的哪个错的提示。
>> >> package com.JavaCustoms;
>> >> import org.apache.activemq.ActiveMQConnectionFactory;
>> >> import org.apache.flink.configuration.Configuration;
>> >> import
>> org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>> >> import org.slf4j.Logger;
>> >> import org.slf4j.LoggerFactory;
>> >>
>> >> import javax.jms.*;
>> >>
>> >> public class FlinkJMSStreamSource extends RichSourceFunction<String> {
>> >> private static final long serialVersionUID = 1L;
>> >> private static final Logger LOG =
>> >> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
>> >> private transient volatile boolean running;
>> >> private transient MessageConsumer consumer;
>> >> private transient Connection connection;
>> >>
>> >> // topic name
>> >> private static final String topicName = "flink_mypay";
>> >> // tcp str
>> >> private static final String tcpStr = "tcp://server.mn:61616";
>> >> // 持久订阅的id标识
>> >> private static final String clientId = "flink_hz";
>> >> // Subscription name
>> >> private static final String subscriptionName = "flink_topic_mypay";
>> >>
>> >> private void init() throws JMSException {
>> >> // Create a ConnectionFactory
>> >> ActiveMQConnectionFactory connectionFactory = new
>> >> ActiveMQConnectionFactory(tcpStr);
>> >>
>> >> // Create a Connection
>> >> connection = connectionFactory.createConnection();
>> >> connection.setClientID(clientId);
>> >> //    connection.start();
>> >>
>> >>       // Create a Session
>> >> Session session = connection.createSession(false,
>> >> Session.AUTO_ACKNOWLEDGE);
>> >>
>> >> // Create a MessageConsumer from the Session to the Topic or Queue
>> >> Topic topic = session.createTopic(topicName);
>> >> consumer = session.createDurableSubscriber(topic, subscriptionName);
>> >> connection.start();
>> >>    }
>> >>
>> >> @Override
>> >> public void open(Configuration parameters) throws Exception {
>> >> super.open(parameters);
>> >> running = true;
>> >>       init();
>> >>    }
>> >>
>> >> @Override
>> >> public void run(SourceContext<String> ctx) {
>> >> // this source never completes
>> >>
>> >> while (running) {
>> >> try {
>> >>             Message message = consumer.receive();
>> >>             BytesMessage bytesMessage = (BytesMessage) message;
>> >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
>> >>             bytesMessage.readBytes(bytes);
>> >>
>> >> String text = new String(bytes);
>> >>             ctx.collect(text);
>> >>
>> >>          } catch (JMSException e) {
>> >> LOG.error(e.getLocalizedMessage());
>> >> running = true;
>> >>          }
>> >>       }
>> >> try {
>> >>          close();
>> >>       } catch (Exception e) {
>> >> LOG.error(e.getMessage(), e);
>> >>       }
>> >>    }
>> >>
>> >> @Override
>> >> public void cancel() {
>> >> running = false;
>> >>    }
>> >>
>> >> @Override
>> >> public void close() throws Exception {
>> >> LOG.info("Closing");
>> >> try {
>> >> connection.close();
>> >>       } catch (JMSException e) {
>> >> throw new RuntimeException("Error while closing ActiveMQ connection ",
>> e);
>> >>       }
>> >>    }
>> >> }
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道:
>> >> >Hi Henry
>> >> >
>> >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
>> >> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把
>> >> log
>> >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
>> >> >
>> >> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道:
>> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道:
>> >> >> >Hi Henry,
>> >> >> >
>> >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
>> >> >> >的源码让它出错后关闭或者进行其它处理...
>> >> >> >
>> >> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道:
>> >> >> >
>> >> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
>> >> >> >> 报错图片链接:
>> >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
>> >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
>> >> >> >>
>> >> >>
>> >>
>> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>> >> >>
>> >>
>>