Flink 的 log 文件夹下产生了 44G 日志

Flink 的 log 文件夹下产生了 44G 日志


我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
Hi Henry,

这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source

Re:Re: Flink 的 log 文件夹下产生了 44G 日志



Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
Hi Henry

你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root
level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 log
等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...

Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Biao Liu
最根本的解法当然是去掉打日志的地方,这 source 不是 Flink 内置的,Flink 当然不能控制你们自定义 source 的行为。

你可以考虑自己改一下 log4j.properties,手动关掉这个 logger, Flink 内置的 log4j.properties 里有

改成 log4j.logger.com.JavaCustoms.FlinkJMSStreamSource=OFF, file

但是这明显是个 ERROR,最好还是解决一下,要不就是掩耳盗铃啊

Re:Re: Re: Flink 的 log 文件夹下产生了 44G 日志

谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source 代码,但是里面没有写log里报的哪个错的提示。
package com.JavaCustoms;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

public class FlinkJMSStreamSource extends RichSourceFunction<String> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(FlinkJMSStreamSource.class);
private transient volatile boolean running;
private transient MessageConsumer consumer;
private transient Connection connection;

// topic name
private static final String topicName = "flink_mypay";
// tcp str
private static final String tcpStr = "tcp://server.mn:61616";
// 持久订阅的id标识
private static final String clientId = "flink_hz";
// Subscription name
private static final String subscriptionName = "flink_topic_mypay";

private void init() throws JMSException {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcpStr);

// Create a Connection
connection = connectionFactory.createConnection();
//    connection.start();

      // Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create a MessageConsumer from the Session to the Topic or Queue
Topic topic = session.createTopic(topicName);
consumer = session.createDurableSubscriber(topic, subscriptionName);

public void open(Configuration parameters) throws Exception {
running = true;

public void run(SourceContext<String> ctx) {
// this source never completes

while (running) {
try {
            Message message = consumer.receive();
            BytesMessage bytesMessage = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];

String text = new String(bytes);

         } catch (JMSException e) {
running = true;
try {
      } catch (Exception e) {
LOG.error(e.getMessage(), e);

public void cancel() {
running = false;

public void close() throws Exception {
try {
      } catch (JMSException e) {
throw new RuntimeException("Error while closing ActiveMQ connection ", e);

Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
Hi Henry,

running = true;

这里写错了吧,应该是 running = false;

> >>
Re:Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志


啊!我想起来了,之前忘了因为啥原因了,为了方便调试把  running = false; 改成了 running = true;
感谢感谢!  但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它,

Re: Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

Caizhi Weng
Hi Henry,

你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running = true...

Re:Re: Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志



