大家好,之前那个报错图片大家没看到,重新弄一下。
报错图片链接: https://img-blog.csdnimg.cn/20190719092540880.png https://img-blog.csdnimg.cn/20190719092848500.png 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。 |
Hi Henry,
这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source 的源码让它出错后关闭或者进行其它处理... Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道: > 大家好,之前那个报错图片大家没看到,重新弄一下。 > 报错图片链接: > https://img-blog.csdnimg.cn/20190719092540880.png > https://img-blog.csdnimg.cn/20190719092848500.png > > > 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 > 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。 |
你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道: >Hi Henry, > >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source >的源码让它出错后关闭或者进行其它处理... > >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道: > >> 大家好,之前那个报错图片大家没看到,重新弄一下。 >> 报错图片链接: >> https://img-blog.csdnimg.cn/20190719092540880.png >> https://img-blog.csdnimg.cn/20190719092848500.png >> >> >> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 >> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。 |
Hi Henry
你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 log 等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了... Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道: > > > > 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。 > > > > > > 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道: > >Hi Henry, > > > >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source > >的源码让它出错后关闭或者进行其它处理... > > > >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道: > > > >> 大家好,之前那个报错图片大家没看到,重新弄一下。 > >> 报错图片链接: > >> https://img-blog.csdnimg.cn/20190719092540880.png > >> https://img-blog.csdnimg.cn/20190719092848500.png > >> > >> > >> > 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 > >> > 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。 > |
In reply to this post by Henry
最根本的解法当然是去掉打日志的地方,这 source 不是 Flink 内置的,Flink 当然不能控制你们自定义 source 的行为。
你可以考虑自己改一下 log4j.properties,手动关掉这个 logger, Flink 内置的 log4j.properties 里有 example,参考着改一下 log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file 改成 log4j.logger.com.JavaCustoms.FlinkJMSStreamSource=OFF, file 但是这明显是个 ERROR,最好还是解决一下,要不就是掩耳盗铃啊 Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道: > > > > 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。 > > > > > > 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道: > >Hi Henry, > > > >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source > >的源码让它出错后关闭或者进行其它处理... > > > >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道: > > > >> 大家好,之前那个报错图片大家没看到,重新弄一下。 > >> 报错图片链接: > >> https://img-blog.csdnimg.cn/20190719092540880.png > >> https://img-blog.csdnimg.cn/20190719092848500.png > >> > >> > >> > 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 > >> > 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。 > |
In reply to this post by Caizhi Weng
谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source 代码,但是里面没有写log里报的哪个错的提示。 package com.JavaCustoms; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; public class FlinkJMSStreamSource extends RichSourceFunction<String> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(FlinkJMSStreamSource.class); private transient volatile boolean running; private transient MessageConsumer consumer; private transient Connection connection; // topic name private static final String topicName = "flink_mypay"; // tcp str private static final String tcpStr = "tcp://server.mn:61616"; // 持久订阅的id标识 private static final String clientId = "flink_hz"; // Subscription name private static final String subscriptionName = "flink_topic_mypay"; private void init() throws JMSException { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcpStr); // Create a Connection connection = connectionFactory.createConnection(); connection.setClientID(clientId); // connection.start(); // Create a Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a MessageConsumer from the Session to the Topic or Queue Topic topic = session.createTopic(topicName); consumer = session.createDurableSubscriber(topic, subscriptionName); connection.start(); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); running = true; init(); } @Override public void run(SourceContext<String> ctx) { // this source never completes while (running) { try { Message message = consumer.receive(); BytesMessage bytesMessage = (BytesMessage) message; byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; bytesMessage.readBytes(bytes); String text = new String(bytes); ctx.collect(text); } catch (JMSException e) { LOG.error(e.getLocalizedMessage()); running = true; } } try { close(); } catch (Exception e) { LOG.error(e.getMessage(), e); } } @Override public void cancel() { running = false; } @Override public void close() throws Exception { LOG.info("Closing"); try { connection.close(); } catch (JMSException e) { throw new RuntimeException("Error while closing ActiveMQ connection ", e); } } } 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道: >Hi Henry > >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 log >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了... > >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道: > >> >> >> >> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。 >> >> >> >> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道: >> >Hi Henry, >> > >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source >> >的源码让它出错后关闭或者进行其它处理... >> > >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道: >> > >> >> 大家好,之前那个报错图片大家没看到,重新弄一下。 >> >> 报错图片链接: >> >> https://img-blog.csdnimg.cn/20190719092540880.png >> >> https://img-blog.csdnimg.cn/20190719092848500.png >> >> >> >> >> >> >> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 >> >> >> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。 >> |
Hi Henry,
LOG.error(e.getLocalizedMessage()); running = true; 这里写错了吧,应该是 running = false; Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道: > > > 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source > 代码,但是里面没有写log里报的哪个错的提示。 > package com.JavaCustoms; > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.functions.source.RichSourceFunction; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import javax.jms.*; > > public class FlinkJMSStreamSource extends RichSourceFunction<String> { > private static final long serialVersionUID = 1L; > private static final Logger LOG = > LoggerFactory.getLogger(FlinkJMSStreamSource.class); > private transient volatile boolean running; > private transient MessageConsumer consumer; > private transient Connection connection; > > // topic name > private static final String topicName = "flink_mypay"; > // tcp str > private static final String tcpStr = "tcp://server.mn:61616"; > // 持久订阅的id标识 > private static final String clientId = "flink_hz"; > // Subscription name > private static final String subscriptionName = "flink_topic_mypay"; > > private void init() throws JMSException { > // Create a ConnectionFactory > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(tcpStr); > > // Create a Connection > connection = connectionFactory.createConnection(); > connection.setClientID(clientId); > // connection.start(); > > // Create a Session > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > > // Create a MessageConsumer from the Session to the Topic or Queue > Topic topic = session.createTopic(topicName); > consumer = session.createDurableSubscriber(topic, subscriptionName); > connection.start(); > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > running = true; > init(); > } > > @Override > public void run(SourceContext<String> ctx) { > // this source never completes > > while (running) { > try { > Message message = consumer.receive(); > BytesMessage bytesMessage = (BytesMessage) message; > byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; > bytesMessage.readBytes(bytes); > > String text = new String(bytes); > ctx.collect(text); > > } catch (JMSException e) { > LOG.error(e.getLocalizedMessage()); > running = true; > } > } > try { > close(); > } catch (Exception e) { > LOG.error(e.getMessage(), e); > } > } > > @Override > public void cancel() { > running = false; > } > > @Override > public void close() throws Exception { > LOG.info("Closing"); > try { > connection.close(); > } catch (JMSException e) { > throw new RuntimeException("Error while closing ActiveMQ connection ", e); > } > } > } > > > > > > > > > > > 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道: > >Hi Henry > > > >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root > >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 > log > >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了... > > > >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道: > > > >> > >> > >> > >> > 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。 > >> > >> > >> > >> > >> > >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道: > >> >Hi Henry, > >> > > >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source > >> >的源码让它出错后关闭或者进行其它处理... > >> > > >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道: > >> > > >> >> 大家好,之前那个报错图片大家没看到,重新弄一下。 > >> >> 报错图片链接: > >> >> https://img-blog.csdnimg.cn/20190719092540880.png > >> >> https://img-blog.csdnimg.cn/20190719092848500.png > >> >> > >> >> > >> >> > >> > 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 > >> >> > >> > 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。 > >> > |
啊!我想起来了,之前忘了因为啥原因了,为了方便调试把 running = false; 改成了 running = true; 感谢感谢! 但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它, 怎么会跳转这里呢? 在 2019-07-20 03:23:28,"Caizhi Weng" <[hidden email]> 写道: >Hi Henry, > >LOG.error(e.getLocalizedMessage()); >running = true; > >这里写错了吧,应该是 running = false; > >Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道: > >> >> >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source >> 代码,但是里面没有写log里报的哪个错的提示。 >> package com.JavaCustoms; >> import org.apache.activemq.ActiveMQConnectionFactory; >> import org.apache.flink.configuration.Configuration; >> import org.apache.flink.streaming.api.functions.source.RichSourceFunction; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> import javax.jms.*; >> >> public class FlinkJMSStreamSource extends RichSourceFunction<String> { >> private static final long serialVersionUID = 1L; >> private static final Logger LOG = >> LoggerFactory.getLogger(FlinkJMSStreamSource.class); >> private transient volatile boolean running; >> private transient MessageConsumer consumer; >> private transient Connection connection; >> >> // topic name >> private static final String topicName = "flink_mypay"; >> // tcp str >> private static final String tcpStr = "tcp://server.mn:61616"; >> // 持久订阅的id标识 >> private static final String clientId = "flink_hz"; >> // Subscription name >> private static final String subscriptionName = "flink_topic_mypay"; >> >> private void init() throws JMSException { >> // Create a ConnectionFactory >> ActiveMQConnectionFactory connectionFactory = new >> ActiveMQConnectionFactory(tcpStr); >> >> // Create a Connection >> connection = connectionFactory.createConnection(); >> connection.setClientID(clientId); >> // connection.start(); >> >> // Create a Session >> Session session = connection.createSession(false, >> Session.AUTO_ACKNOWLEDGE); >> >> // Create a MessageConsumer from the Session to the Topic or Queue >> Topic topic = session.createTopic(topicName); >> consumer = session.createDurableSubscriber(topic, subscriptionName); >> connection.start(); >> } >> >> @Override >> public void open(Configuration parameters) throws Exception { >> super.open(parameters); >> running = true; >> init(); >> } >> >> @Override >> public void run(SourceContext<String> ctx) { >> // this source never completes >> >> while (running) { >> try { >> Message message = consumer.receive(); >> BytesMessage bytesMessage = (BytesMessage) message; >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; >> bytesMessage.readBytes(bytes); >> >> String text = new String(bytes); >> ctx.collect(text); >> >> } catch (JMSException e) { >> LOG.error(e.getLocalizedMessage()); >> running = true; >> } >> } >> try { >> close(); >> } catch (Exception e) { >> LOG.error(e.getMessage(), e); >> } >> } >> >> @Override >> public void cancel() { >> running = false; >> } >> >> @Override >> public void close() throws Exception { >> LOG.info("Closing"); >> try { >> connection.close(); >> } catch (JMSException e) { >> throw new RuntimeException("Error while closing ActiveMQ connection ", e); >> } >> } >> } >> >> >> >> >> >> >> >> >> >> >> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道: >> >Hi Henry >> > >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root >> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 >> log >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了... >> > >> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道: >> > >> >> >> >> >> >> >> >> >> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。 >> >> >> >> >> >> >> >> >> >> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道: >> >> >Hi Henry, >> >> > >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source >> >> >的源码让它出错后关闭或者进行其它处理... >> >> > >> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道: >> >> > >> >> >> 大家好,之前那个报错图片大家没看到,重新弄一下。 >> >> >> 报错图片链接: >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png >> >> >> >> >> >> >> >> >> >> >> >> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 >> >> >> >> >> >> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。 >> >> >> |
Hi Henry,
你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running = true... Henry <[hidden email]> 于2019年7月20日周六 下午9:32写道: > > > 啊!我想起来了,之前忘了因为啥原因了,为了方便调试把 running = false; 改成了 running = true; > 感谢感谢! 但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它, > 怎么会跳转这里呢? > > > > > > 在 2019-07-20 03:23:28,"Caizhi Weng" <[hidden email]> 写道: > >Hi Henry, > > > >LOG.error(e.getLocalizedMessage()); > >running = true; > > > >这里写错了吧,应该是 running = false; > > > >Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道: > > > >> > >> > >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source > >> 代码,但是里面没有写log里报的哪个错的提示。 > >> package com.JavaCustoms; > >> import org.apache.activemq.ActiveMQConnectionFactory; > >> import org.apache.flink.configuration.Configuration; > >> import > org.apache.flink.streaming.api.functions.source.RichSourceFunction; > >> import org.slf4j.Logger; > >> import org.slf4j.LoggerFactory; > >> > >> import javax.jms.*; > >> > >> public class FlinkJMSStreamSource extends RichSourceFunction<String> { > >> private static final long serialVersionUID = 1L; > >> private static final Logger LOG = > >> LoggerFactory.getLogger(FlinkJMSStreamSource.class); > >> private transient volatile boolean running; > >> private transient MessageConsumer consumer; > >> private transient Connection connection; > >> > >> // topic name > >> private static final String topicName = "flink_mypay"; > >> // tcp str > >> private static final String tcpStr = "tcp://server.mn:61616"; > >> // 持久订阅的id标识 > >> private static final String clientId = "flink_hz"; > >> // Subscription name > >> private static final String subscriptionName = "flink_topic_mypay"; > >> > >> private void init() throws JMSException { > >> // Create a ConnectionFactory > >> ActiveMQConnectionFactory connectionFactory = new > >> ActiveMQConnectionFactory(tcpStr); > >> > >> // Create a Connection > >> connection = connectionFactory.createConnection(); > >> connection.setClientID(clientId); > >> // connection.start(); > >> > >> // Create a Session > >> Session session = connection.createSession(false, > >> Session.AUTO_ACKNOWLEDGE); > >> > >> // Create a MessageConsumer from the Session to the Topic or Queue > >> Topic topic = session.createTopic(topicName); > >> consumer = session.createDurableSubscriber(topic, subscriptionName); > >> connection.start(); > >> } > >> > >> @Override > >> public void open(Configuration parameters) throws Exception { > >> super.open(parameters); > >> running = true; > >> init(); > >> } > >> > >> @Override > >> public void run(SourceContext<String> ctx) { > >> // this source never completes > >> > >> while (running) { > >> try { > >> Message message = consumer.receive(); > >> BytesMessage bytesMessage = (BytesMessage) message; > >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; > >> bytesMessage.readBytes(bytes); > >> > >> String text = new String(bytes); > >> ctx.collect(text); > >> > >> } catch (JMSException e) { > >> LOG.error(e.getLocalizedMessage()); > >> running = true; > >> } > >> } > >> try { > >> close(); > >> } catch (Exception e) { > >> LOG.error(e.getMessage(), e); > >> } > >> } > >> > >> @Override > >> public void cancel() { > >> running = false; > >> } > >> > >> @Override > >> public void close() throws Exception { > >> LOG.info("Closing"); > >> try { > >> connection.close(); > >> } catch (JMSException e) { > >> throw new RuntimeException("Error while closing ActiveMQ connection ", > e); > >> } > >> } > >> } > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道: > >> >Hi Henry > >> > > >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root > >> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 > >> log > >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了... > >> > > >> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道: > >> > > >> >> > >> >> > >> >> > >> >> > >> > 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。 > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道: > >> >> >Hi Henry, > >> >> > > >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source > >> >> >的源码让它出错后关闭或者进行其它处理... > >> >> > > >> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道: > >> >> > > >> >> >> 大家好,之前那个报错图片大家没看到,重新弄一下。 > >> >> >> 报错图片链接: > >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png > >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png > >> >> >> > >> >> >> > >> >> >> > >> >> > >> > 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 > >> >> >> > >> >> > >> > 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。 > >> >> > >> > |
看到啦,谢谢啦。 在 2019-07-21 19:16:36,"Caizhi Weng" <[hidden email]> 写道: >Hi Henry, > >你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running = true... > >Henry <[hidden email]> 于2019年7月20日周六 下午9:32写道: > >> >> >> 啊!我想起来了,之前忘了因为啥原因了,为了方便调试把 running = false; 改成了 running = true; >> 感谢感谢! 但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它, >> 怎么会跳转这里呢? >> >> >> >> >> >> 在 2019-07-20 03:23:28,"Caizhi Weng" <[hidden email]> 写道: >> >Hi Henry, >> > >> >LOG.error(e.getLocalizedMessage()); >> >running = true; >> > >> >这里写错了吧,应该是 running = false; >> > >> >Henry <[hidden email]> 于2019年7月19日周五 下午4:04写道: >> > >> >> >> >> >> >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source >> >> 代码,但是里面没有写log里报的哪个错的提示。 >> >> package com.JavaCustoms; >> >> import org.apache.activemq.ActiveMQConnectionFactory; >> >> import org.apache.flink.configuration.Configuration; >> >> import >> org.apache.flink.streaming.api.functions.source.RichSourceFunction; >> >> import org.slf4j.Logger; >> >> import org.slf4j.LoggerFactory; >> >> >> >> import javax.jms.*; >> >> >> >> public class FlinkJMSStreamSource extends RichSourceFunction<String> { >> >> private static final long serialVersionUID = 1L; >> >> private static final Logger LOG = >> >> LoggerFactory.getLogger(FlinkJMSStreamSource.class); >> >> private transient volatile boolean running; >> >> private transient MessageConsumer consumer; >> >> private transient Connection connection; >> >> >> >> // topic name >> >> private static final String topicName = "flink_mypay"; >> >> // tcp str >> >> private static final String tcpStr = "tcp://server.mn:61616"; >> >> // 持久订阅的id标识 >> >> private static final String clientId = "flink_hz"; >> >> // Subscription name >> >> private static final String subscriptionName = "flink_topic_mypay"; >> >> >> >> private void init() throws JMSException { >> >> // Create a ConnectionFactory >> >> ActiveMQConnectionFactory connectionFactory = new >> >> ActiveMQConnectionFactory(tcpStr); >> >> >> >> // Create a Connection >> >> connection = connectionFactory.createConnection(); >> >> connection.setClientID(clientId); >> >> // connection.start(); >> >> >> >> // Create a Session >> >> Session session = connection.createSession(false, >> >> Session.AUTO_ACKNOWLEDGE); >> >> >> >> // Create a MessageConsumer from the Session to the Topic or Queue >> >> Topic topic = session.createTopic(topicName); >> >> consumer = session.createDurableSubscriber(topic, subscriptionName); >> >> connection.start(); >> >> } >> >> >> >> @Override >> >> public void open(Configuration parameters) throws Exception { >> >> super.open(parameters); >> >> running = true; >> >> init(); >> >> } >> >> >> >> @Override >> >> public void run(SourceContext<String> ctx) { >> >> // this source never completes >> >> >> >> while (running) { >> >> try { >> >> Message message = consumer.receive(); >> >> BytesMessage bytesMessage = (BytesMessage) message; >> >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; >> >> bytesMessage.readBytes(bytes); >> >> >> >> String text = new String(bytes); >> >> ctx.collect(text); >> >> >> >> } catch (JMSException e) { >> >> LOG.error(e.getLocalizedMessage()); >> >> running = true; >> >> } >> >> } >> >> try { >> >> close(); >> >> } catch (Exception e) { >> >> LOG.error(e.getMessage(), e); >> >> } >> >> } >> >> >> >> @Override >> >> public void cancel() { >> >> running = false; >> >> } >> >> >> >> @Override >> >> public void close() throws Exception { >> >> LOG.info("Closing"); >> >> try { >> >> connection.close(); >> >> } catch (JMSException e) { >> >> throw new RuntimeException("Error while closing ActiveMQ connection ", >> e); >> >> } >> >> } >> >> } >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2019-07-19 14:43:17,"Caizhi Weng" <[hidden email]> 写道: >> >> >Hi Henry >> >> > >> >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `<root >> >> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log 关掉,或者把 >> >> log >> >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了... >> >> > >> >> >Henry <[hidden email]> 于2019年7月19日周五 下午2:20写道: >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <[hidden email]> 写道: >> >> >> >Hi Henry, >> >> >> > >> >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source >> >> >> >的源码让它出错后关闭或者进行其它处理... >> >> >> > >> >> >> >Henry <[hidden email]> 于2019年7月19日周五 上午9:31写道: >> >> >> > >> >> >> >> 大家好,之前那个报错图片大家没看到,重新弄一下。 >> >> >> >> 报错图片链接: >> >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png >> >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 >> >> >> >> >> >> >> >> >> >> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。 >> >> >> >> >> >> |
Free forum by Nabble | Edit this page |