hi,大家好:
我启动了yarn-session:bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli > /dev/null 2>&1 & 然后通过sql-client,提交了一个sql: 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。 逻辑稍微有些复杂,可以忽略下面的sql代码: # -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 -- 每5分钟将近5分钟统计结果写入mysql insert into rt_app.app_video_cover_abtest_test select begin_time, vid, vid_group, max(dv), max(click), max(vv), max(effectivevv) from( select t1.begin_time begin_time, t1.u_vid vid, t1.u_vid_group vid_group, dv, click, vv, if(effectivevv is null,0,effectivevv) effectivevv from ( -- dv、click、vv select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time, cast(u_vid as bigint) u_vid, u_vid_group, sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and u_c_module='M011',1,0)) dv, sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and u_c_module='M011',1,0)) click, sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) vv FROM rt_ods.ods_applog_vidsplit where u_vid is not null and trim(u_vid)<>'' and u_vid_group is not null and trim(u_vid_group) not in ('','-1') and ( (concat(u_mod,'-',u_ac) in ('emptylog-video_display','emptylog-video_click') and u_c_module='M011') or (concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011') ) group by TUMBLE(bjdt, INTERVAL '5' MINUTE), cast(u_vid as bigint), u_vid_group ) t1 left join ( -- effectivevv select begin_time, u_vid, u_vid_group, count(1) effectivevv from ( select begin_time, u_vid, u_vid_group, u_diu, u_playid, m_pt, q70 from dw.video_pic_title_q70 a join ( select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time, cast(u_vid as bigint) u_vid, u_vid_group, u_diu, u_playid, max(u_playtime) m_pt FROM rt_ods.ods_applog_vidsplit where u_vid is not null and trim(u_vid)<>'' and u_vid_group is not null and trim(u_vid_group) not in ('','-1') and concat(u_mod,'-',u_ac)='emptylog-video_play_speed' and u_f_module='M011' and u_playtime>0 group by TUMBLE(bjdt, INTERVAL '5' MINUTE), cast(u_vid as bigint), u_vid_group, u_diu, u_playid ) b on a.vid=b.u_vid group by begin_time, u_vid, u_vid_group, u_diu, u_playid, m_pt, q70 ) temp where m_pt>=q70 group by begin_time, u_vid, u_vid_group ) t2 on t1.begin_time=t2.begin_time and t1.u_vid=t2.u_vid and t1.u_vid_group=t2.u_vid_group )t3 group by begin_time, vid, vid_group ; 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图: https://s1.ax1x.com/2020/06/18/NnyX24.png 日志中能看到INFO级别的异常: 2020-06-17 21:27:07,968 INFO org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Interrupted while waiting for queue java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:201 4) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) ps: 1. kafka中一直有数据在写入的 2. flink版本1.10.0 请问,什么情况下会出现状态变为SUCCEEDED呢? 谢谢大家! |
Free forum by Nabble | Edit this page |