hi,大家好:
我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 2>&1 & 然后通过sql-client,提交了一个sql: 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图:https://s1.ax1x.com/2020/06/29/Nf2dIA.png 日志中能看到INFO级别的异常,15:34任务结束时的日志如下: 2020-06-29 14:53:20,260 INFO org.apache.flink.api.common.io.LocatableInputSplitAssigner - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 14:53:22,845 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29 15:34:52,982 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting YarnSessionClusterEntrypoint down with application status SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Removing cache directory /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29 15:34:53,073 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 15:34:53,074 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shut down complete. 2020-06-29 15:34:53,074 INFO org.apache.flink.yarn.YarnResourceManager - Shut down cluster because application is in SUCCEEDED, diagnostics null. 2020-06-29 15:34:53,076 INFO org.apache.flink.yarn.YarnResourceManager - Unregister application from the YARN Resource Manager with final status SUCCEEDED. 2020-06-29 15:34:53,088 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for application to be successfully unregistered. 2020-06-29 15:34:53,306 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent - Closing components. 2020-06-29 15:34:53,308 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 15:34:53,310 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping all currently running jobs of dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 15:34:53,311 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job default: insert into rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Interrupted while waiting for queue java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) 2020-06-29 15:34:53,324 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - Opening proxy : uhadoop-op3raf-core12:23333 ps: 1. kafka中一直有数据在写入的 2. flink版本1.10.0 请问,任务状态为什么会变为SUCCEEDED呢? 谢谢大家! 逻辑稍微有些复杂,可以忽略下面的sql代码: # -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 -- 每5分钟将近5分钟统计结果写入mysql insert into rt_app.app_video_cover_abtest_test select begin_time, vid, vid_group, max(dv), max(click), max(vv), max(effectivevv) from( select t1.begin_time begin_time, t1.u_vid vid, t1.u_vid_group vid_group, dv, click, vv, if(effectivevv is null,0,effectivevv) effectivevv from ( -- dv、click、vv select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time, cast(u_vid as bigint) u_vid, u_vid_group, sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and u_c_module='M011',1,0)) dv, sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and u_c_module='M011',1,0)) click, sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) vv FROM rt_ods.ods_applog_vidsplit where u_vid is not null and trim(u_vid)<>'' and u_vid_group is not null and trim(u_vid_group) not in ('','-1') and ( (concat(u_mod,'-',u_ac) in ('emptylog-video_display','emptylog-video_click') and u_c_module='M011') or (concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011') ) group by TUMBLE(bjdt, INTERVAL '5' MINUTE), cast(u_vid as bigint), u_vid_group ) t1 left join ( -- effectivevv select begin_time, u_vid, u_vid_group, count(1) effectivevv from ( select begin_time, u_vid, u_vid_group, u_diu, u_playid, m_pt, q70 from dw.video_pic_title_q70 a join ( select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time, cast(u_vid as bigint) u_vid, u_vid_group, u_diu, u_playid, max(u_playtime) m_pt FROM rt_ods.ods_applog_vidsplit where u_vid is not null and trim(u_vid)<>'' and u_vid_group is not null and trim(u_vid_group) not in ('','-1') and concat(u_mod,'-',u_ac)='emptylog-video_play_speed' and u_f_module='M011' and u_playtime>0 group by TUMBLE(bjdt, INTERVAL '5' MINUTE), cast(u_vid as bigint), u_vid_group, u_diu, u_playid ) b on a.vid=b.u_vid group by begin_time, u_vid, u_vid_group, u_diu, u_playid, m_pt, q70 ) temp where m_pt>=q70 group by begin_time, u_vid, u_vid_group ) t2 on t1.begin_time=t2.begin_time and t1.u_vid=t2.u_vid and t1.u_vid_group=t2.u_vid_group )t3 group by begin_time, vid, vid_group ; |
Hi
看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "MuChen"<[hidden email]>; 发送时间: 2020年6月29日(星期一) 下午4:53 收件人: "user-zh"<[hidden email]>; 主题: flinksql流计算任务非正常结束 hi,大家好: 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli&nbsp; 2&gt;&amp;1 &amp; 然后通过sql-client,提交了一个sql: 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。&nbsp; 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图:https://s1.ax1x.com/2020/06/29/Nf2dIA.png 日志中能看到INFO级别的异常,15:34任务结束时的日志如下: 2020-06-29 14:53:20,260 INFO org.apache.flink.api.common.io.LocatableInputSplitAssigner - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 14:53:22,845 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29 15:34:52,982 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting YarnSessionClusterEntrypoint down with application status SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Removing cache directory /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29 15:34:53,073 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 15:34:53,074 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shut down complete. 2020-06-29 15:34:53,074 INFO org.apache.flink.yarn.YarnResourceManager - Shut down cluster because application is in SUCCEEDED, diagnostics null. 2020-06-29 15:34:53,076 INFO org.apache.flink.yarn.YarnResourceManager - Unregister application from the YARN Resource Manager with final status SUCCEEDED. 2020-06-29 15:34:53,088 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for application to be successfully unregistered. 2020-06-29 15:34:53,306 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent - Closing components. 2020-06-29 15:34:53,308 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 15:34:53,310 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping all currently running jobs of dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 15:34:53,311 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job default: insert into rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Interrupted while waiting for queue java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) 2020-06-29 15:34:53,324 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - Opening proxy : uhadoop-op3raf-core12:23333 ps:&nbsp; 1. kafka中一直有数据在写入的 2. flink版本1.10.0 请问,任务状态为什么会变为SUCCEEDED呢? 谢谢大家! 逻辑稍微有些复杂,可以忽略下面的sql代码: # -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 -- 每5分钟将近5分钟统计结果写入mysql insert into rt_app.app_video_cover_abtest_test select begin_time, vid, vid_group, max(dv), max(click), max(vv), max(effectivevv) from( select t1.begin_time begin_time, t1.u_vid vid, t1.u_vid_group vid_group, dv, click, vv, if(effectivevv is null,0,effectivevv) effectivevv from ( -- dv、click、vv select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time, cast(u_vid as bigint) u_vid, u_vid_group, sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and u_c_module='M011',1,0)) dv, sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and u_c_module='M011',1,0)) click, sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) vv FROM rt_ods.ods_applog_vidsplit where u_vid is not null and trim(u_vid)<&gt;'' and u_vid_group is not null and trim(u_vid_group) not in ('','-1') and ( (concat(u_mod,'-',u_ac) in ('emptylog-video_display','emptylog-video_click') and u_c_module='M011') or (concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011') ) group by TUMBLE(bjdt, INTERVAL '5' MINUTE), cast(u_vid as bigint), u_vid_group ) t1 left join ( -- effectivevv select begin_time, u_vid, u_vid_group, count(1) effectivevv from ( select begin_time, u_vid, u_vid_group, u_diu, u_playid, m_pt, q70 from dw.video_pic_title_q70 a join ( select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time, cast(u_vid as bigint) u_vid, u_vid_group, u_diu, u_playid, max(u_playtime) m_pt FROM rt_ods.ods_applog_vidsplit where u_vid is not null and trim(u_vid)<&gt;'' and u_vid_group is not null and trim(u_vid_group) not in ('','-1') and concat(u_mod,'-',u_ac)='emptylog-video_play_speed' and u_f_module='M011' and u_playtime&gt;0 group by TUMBLE(bjdt, INTERVAL '5' MINUTE), cast(u_vid as bigint), u_vid_group, u_diu, u_playid ) b on a.vid=b.u_vid group by begin_time, u_vid, u_vid_group, u_diu, u_playid, m_pt, q70 ) temp where m_pt&gt;=q70 group by begin_time, u_vid, u_vid_group ) t2 on t1.begin_time=t2.begin_time and t1.u_vid=t2.u_vid and t1.u_vid_group=t2.u_vid_group )t3 group by begin_time, vid, vid_group ; |
是不是作业是一个批作业呀?
Yichao Yang <[hidden email]> 于2020年6月29日周一 下午6:58写道: > Hi > > > 看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。 > > > Best, > Yichao Yang > > > > > ------------------ 原始邮件 ------------------ > 发件人: "MuChen"<[hidden email]>; > 发送时间: 2020年6月29日(星期一) 下午4:53 > 收件人: "user-zh"<[hidden email]>; > > 主题: flinksql流计算任务非正常结束 > > > > hi,大家好: > > 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm > fsql-cli&nbsp; 2&gt;&amp;1 &amp; > > 然后通过sql-client,提交了一个sql: > > 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。&nbsp; > > 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图: > https://s1.ax1x.com/2020/06/29/Nf2dIA.png > > 日志中能看到INFO级别的异常,15:34任务结束时的日志如下: > 2020-06-29 14:53:20,260 INFO > org.apache.flink.api.common.io.LocatableInputSplitAssigner > - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 > 14:53:22,845 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, > PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 > 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29 > 15:34:52,982 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > - Shutting YarnSessionClusterEntrypoint down with application status > SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > - Removing cache directory > /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29 > 15:34:53,073 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 > 15:34:53,074 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > - Shut down complete. 2020-06-29 15:34:53,074 INFO > org.apache.flink.yarn.YarnResourceManager > - Shut down cluster because application is in SUCCEEDED, diagnostics null. > 2020-06-29 15:34:53,076 INFO > org.apache.flink.yarn.YarnResourceManager > - Unregister application from the YARN Resource Manager with final status > SUCCEEDED. 2020-06-29 15:34:53,088 INFO > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl > - Waiting for application to be successfully unregistered. 2020-06-29 > 15:34:53,306 INFO > org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent > - Closing components. 2020-06-29 15:34:53,308 INFO > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess > - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 > INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher > - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. > 2020-06-29 15:34:53,310 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher > - Stopping all currently running jobs of dispatcher > akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 > 15:34:53,311 INFO > org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job default: insert into > rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - > Interrupted while waiting for queue > java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) > 2020-06-29 15:34:53,324 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy > - Opening proxy : uhadoop-op3raf-core12:23333 > > > ps:&nbsp; > > 1. kafka中一直有数据在写入的 > 2. flink版本1.10.0 > 请问,任务状态为什么会变为SUCCEEDED呢? > > 谢谢大家! > > > > > 逻辑稍微有些复杂,可以忽略下面的sql代码: > # -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 -- > 每5分钟将近5分钟统计结果写入mysql insert into rt_app.app_video_cover_abtest_test > select begin_time, vid, vid_group, max(dv), > max(click), max(vv), max(effectivevv) from( > select t1.begin_time begin_time, t1.u_vid > vid, t1.u_vid_group vid_group, dv, > click, vv, if(effectivevv is null,0,effectivevv) > effectivevv from ( -- dv、click、vv > select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) > AS STRING) begin_time, cast(u_vid as bigint) > u_vid, u_vid_group, > sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and > u_c_module='M011',1,0)) dv, > sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and > u_c_module='M011',1,0)) click, > sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) > vv FROM rt_ods.ods_applog_vidsplit where u_vid is > not null and trim(u_vid)<&gt;'' and u_vid_group is > not null and trim(u_vid_group) not in ('','-1') and > ( (concat(u_mod,'-',u_ac) in > ('emptylog-video_display','emptylog-video_click') and > u_c_module='M011') or (concat(u_mod,'-',u_ac)='top-hits' and > u_f_module='M011') ) group > by TUMBLE(bjdt, INTERVAL '5' > MINUTE), cast(u_vid as bigint), > u_vid_group ) t1 left join ( -- > effectivevv select > begin_time, u_vid, > u_vid_group, count(1) effectivevv > from ( select begin_time, > u_vid, u_vid_group, u_diu, u_playid, m_pt, > q70 from dw.video_pic_title_q70 > a join ( > select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) > begin_time, cast(u_vid as bigint) u_vid, u_vid_group, > u_diu, u_playid, max(u_playtime) m_pt > FROM rt_ods.ods_applog_vidsplit where u_vid is not > null and trim(u_vid)<&gt;'' and u_vid_group is not null and > trim(u_vid_group) not in ('','-1') and > concat(u_mod,'-',u_ac)='emptylog-video_play_speed' and > u_f_module='M011' and u_playtime&gt;0 > group by TUMBLE(bjdt, INTERVAL '5' MINUTE), cast(u_vid as > bigint), u_vid_group, u_diu, u_playid ) > b on a.vid=b.u_vid group by > begin_time, u_vid, u_vid_group, u_diu, > u_playid, m_pt, q70 ) temp where > m_pt&gt;=q70 group by > begin_time, u_vid, u_vid_group ) > t2 on t1.begin_time=t2.begin_time and > t1.u_vid=t2.u_vid and t1.u_vid_group=t2.u_vid_group > )t3 group by begin_time, vid, vid_group ; |
看了配置文件,是流作业啊
$ grep -v \# sql-client-defaults.yaml |grep -v ^$ catalogs: - name: myhive type: hive hive-conf-dir: /home/fsql/hive/conf default-database: default execution: planner: blink type: streaming time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 4 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: myhive current-database: default restart-strategy: type: fixed-delay deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 ------------------ 原始邮件 ------------------ 发件人: "zhisheng"<[hidden email]>; 发送时间: 2020年6月30日(星期二) 上午9:05 收件人: "user-zh"<[hidden email]>; 主题: Re: flinksql流计算任务非正常结束 是不是作业是一个批作业呀? Yichao Yang <[hidden email]> 于2020年6月29日周一 下午6:58写道: > Hi > > > 看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。 > > > Best, > Yichao Yang > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"MuChen"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月29日(星期一) 下午4:53 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;flinksql流计算任务非正常结束 > > > > hi,大家好: > > 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm > fsql-cli&amp;nbsp; 2&amp;gt;&amp;amp;1 &amp;amp; > > 然后通过sql-client,提交了一个sql: > > 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。&amp;nbsp; > > 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图: > https://s1.ax1x.com/2020/06/29/Nf2dIA.png > > 日志中能看到INFO级别的异常,15:34任务结束时的日志如下: > 2020-06-29 14:53:20,260 INFO&nbsp; > org.apache.flink.api.common.io.LocatableInputSplitAssigner&nbsp;&nbsp;&nbsp; > - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 > 14:53:22,845 INFO&nbsp; > org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, > PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 > 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29 > 15:34:52,982 INFO&nbsp; > org.apache.flink.runtime.entrypoint.ClusterEntrypoint&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > - Shutting YarnSessionClusterEntrypoint down with application status > SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO&nbsp; > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; > - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO&nbsp; > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; > - Removing cache directory > /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29 > 15:34:53,073 INFO&nbsp; > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; > - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 > 15:34:53,074 INFO&nbsp; > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; > - Shut down complete. 2020-06-29 15:34:53,074 INFO&nbsp; > org.apache.flink.yarn.YarnResourceManager&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > - Shut down cluster because application is in SUCCEEDED, diagnostics null. > 2020-06-29 15:34:53,076 INFO&nbsp; > org.apache.flink.yarn.YarnResourceManager&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > - Unregister application from the YARN Resource Manager with final status > SUCCEEDED. 2020-06-29 15:34:53,088 INFO&nbsp; > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > - Waiting for application to be successfully unregistered. 2020-06-29 > 15:34:53,306 INFO&nbsp; > org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent&nbsp; > - Closing components. 2020-06-29 15:34:53,308 INFO&nbsp; > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess&nbsp; > - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 > INFO&nbsp; > org.apache.flink.runtime.dispatcher.StandaloneDispatcher&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. > 2020-06-29 15:34:53,310 INFO&nbsp; > org.apache.flink.runtime.dispatcher.StandaloneDispatcher&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > - Stopping all currently running jobs of dispatcher > akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 > 15:34:53,311 INFO&nbsp; > org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > - Stopping the JobMaster for job default: insert into > rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO&nbsp; > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl&nbsp; - > Interrupted while waiting for queue > java.lang.InterruptedException&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) > 2020-06-29 15:34:53,324 INFO&nbsp; > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy&nbsp; > - Opening proxy : uhadoop-op3raf-core12:23333 > > &nbsp; > ps:&amp;nbsp; > > 1. kafka中一直有数据在写入的 > 2. flink版本1.10.0 > 请问,任务状态为什么会变为SUCCEEDED呢? > > 谢谢大家! > > > > > 逻辑稍微有些复杂,可以忽略下面的sql代码: > #&nbsp; -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 -- > 每5分钟将近5分钟统计结果写入mysql insert into rt_app.app_video_cover_abtest_test&nbsp; > select&nbsp; begin_time,&nbsp; vid,&nbsp; vid_group,&nbsp; max(dv),&nbsp; > max(click),&nbsp; max(vv),&nbsp; max(effectivevv) from(&nbsp; > select&nbsp;&nbsp; t1.begin_time begin_time,&nbsp;&nbsp; t1.u_vid > vid,&nbsp;&nbsp; t1.u_vid_group vid_group,&nbsp;&nbsp; dv,&nbsp;&nbsp; > click,&nbsp;&nbsp; vv,&nbsp;&nbsp; if(effectivevv is null,0,effectivevv) > effectivevv&nbsp; from&nbsp; (&nbsp;&nbsp; -- dv、click、vv&nbsp;&nbsp; > select&nbsp;&nbsp;&nbsp;&nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) > AS STRING) begin_time,&nbsp;&nbsp;&nbsp; cast(u_vid as bigint) > u_vid,&nbsp;&nbsp;&nbsp; u_vid_group,&nbsp;&nbsp;&nbsp; > sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and > u_c_module='M011',1,0)) dv,&nbsp;&nbsp;&nbsp; > sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and > u_c_module='M011',1,0)) click,&nbsp;&nbsp;&nbsp; > sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) > vv&nbsp;&nbsp; FROM rt_ods.ods_applog_vidsplit&nbsp;&nbsp; where u_vid is > not null and trim(u_vid)<&amp;gt;''&nbsp;&nbsp;&nbsp; and u_vid_group is > not null and trim(u_vid_group) not in ('','-1')&nbsp;&nbsp;&nbsp; and > (&nbsp; (concat(u_mod,'-',u_ac) in > ('emptylog-video_display','emptylog-video_click')&nbsp; and > u_c_module='M011')&nbsp; or&nbsp; (concat(u_mod,'-',u_ac)='top-hits' and > u_f_module='M011')&nbsp;&nbsp;&nbsp;&nbsp; )&nbsp;&nbsp; group > by&nbsp;&nbsp;&nbsp;&nbsp; TUMBLE(bjdt, INTERVAL '5' > MINUTE),&nbsp;&nbsp;&nbsp; cast(u_vid as bigint),&nbsp;&nbsp;&nbsp; > u_vid_group&nbsp; ) t1&nbsp; left join&nbsp; (&nbsp;&nbsp; -- > effectivevv&nbsp;&nbsp; select&nbsp;&nbsp;&nbsp; > begin_time,&nbsp;&nbsp;&nbsp; u_vid,&nbsp;&nbsp;&nbsp; > u_vid_group,&nbsp;&nbsp;&nbsp; count(1) effectivevv&nbsp;&nbsp; > from&nbsp;&nbsp; (&nbsp;&nbsp;&nbsp; select&nbsp; begin_time,&nbsp; > u_vid,&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; u_playid,&nbsp; m_pt,&nbsp; > q70&nbsp;&nbsp;&nbsp; from&nbsp;&nbsp;&nbsp; dw.video_pic_title_q70 > a&nbsp;&nbsp;&nbsp; join&nbsp;&nbsp;&nbsp; (&nbsp;&nbsp;&nbsp;&nbsp; > select&nbsp;&nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) > begin_time,&nbsp; cast(u_vid as bigint) u_vid,&nbsp; u_vid_group,&nbsp; > u_diu,&nbsp; u_playid,&nbsp; max(u_playtime) m_pt&nbsp;&nbsp;&nbsp;&nbsp; > FROM rt_ods.ods_applog_vidsplit&nbsp;&nbsp;&nbsp;&nbsp; where u_vid is not > null and trim(u_vid)<&amp;gt;''&nbsp; and u_vid_group is not null and > trim(u_vid_group) not in ('','-1')&nbsp; and > concat(u_mod,'-',u_ac)='emptylog-video_play_speed'&nbsp; and > u_f_module='M011'&nbsp; and u_playtime&amp;gt;0&nbsp;&nbsp;&nbsp;&nbsp; > group by&nbsp;&nbsp; TUMBLE(bjdt, INTERVAL '5' MINUTE),&nbsp; cast(u_vid as > bigint),&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; u_playid&nbsp;&nbsp;&nbsp; ) > b&nbsp;&nbsp;&nbsp; on a.vid=b.u_vid&nbsp;&nbsp;&nbsp; group by&nbsp;&nbsp; > begin_time,&nbsp; u_vid,&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; > u_playid,&nbsp; m_pt,&nbsp; q70&nbsp;&nbsp; ) temp&nbsp;&nbsp; where > m_pt&amp;gt;=q70&nbsp;&nbsp; group by&nbsp;&nbsp;&nbsp;&nbsp; > begin_time,&nbsp;&nbsp;&nbsp; u_vid,&nbsp;&nbsp;&nbsp; u_vid_group&nbsp; ) > t2&nbsp; on t1.begin_time=t2.begin_time&nbsp;&nbsp; and > t1.u_vid=t2.u_vid&nbsp;&nbsp; and t1.u_vid_group=t2.u_vid_group > )t3&nbsp;&nbsp; group by begin_time,&nbsp; vid,&nbsp; vid_group ; |
作业最后的状态是成功结束么?Hive table source是一个bounded
stream,所以hive表的数据读完这个stream就结束了,不知道这个对作业是不是有影响。 On Tue, Jun 30, 2020 at 10:39 AM MuChen <[hidden email]> wrote: > 看了配置文件,是流作业啊 > > > $ grep -v \# sql-client-defaults.yaml |grep -v ^$ catalogs: - name: > myhive type: hive hive-conf-dir: /home/fsql/hive/conf > default-database: default execution: planner: blink type: streaming > time-characteristic: event-time periodic-watermarks-interval: 200 > result-mode: table max-table-result-rows: 1000000 parallelism: 4 > max-parallelism: 128 min-idle-state-retention: 0 > max-idle-state-retention: 0 current-catalog: myhive current-database: > default restart-strategy: type: fixed-delay deployment: > response-timeout: 5000 gateway-address: "" gateway-port: 0 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "zhisheng"<[hidden email]>; > 发送时间: 2020年6月30日(星期二) 上午9:05 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: flinksql流计算任务非正常结束 > > > > 是不是作业是一个批作业呀? > > Yichao Yang <[hidden email]> 于2020年6月29日周一 下午6:58写道: > > > Hi > > > > > > 看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。 > > > > > > Best, > > Yichao Yang > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"MuChen"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年6月29日(星期一) 下午4:53 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;flinksql流计算任务非正常结束 > > > > > > > > hi,大家好: > > > > 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu > root.flink -nm > > fsql-cli&amp;nbsp; 2&amp;gt;&amp;amp;1 &amp;amp; > > > > 然后通过sql-client,提交了一个sql: > > > > 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。&amp;nbsp; > > > > 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图: > > https://s1.ax1x.com/2020/06/29/Nf2dIA.png > > > > 日志中能看到INFO级别的异常,15:34任务结束时的日志如下: > > 2020-06-29 14:53:20,260 INFO&nbsp; > > > org.apache.flink.api.common.io.LocatableInputSplitAssigner&nbsp;&nbsp;&nbsp; > > - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 > > 14:53:22,845 INFO&nbsp; > > > org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, > > PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 > > 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. > 2020-06-29 > > 15:34:52,982 INFO&nbsp; > > > org.apache.flink.runtime.entrypoint.ClusterEntrypoint&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > - Shutting YarnSessionClusterEntrypoint down with application status > > SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO&nbsp; > > > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; > > - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO&nbsp; > > > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; > > - Removing cache directory > > /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui > 2020-06-29 > > 15:34:53,073 INFO&nbsp; > > > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; > > - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 > > 15:34:53,074 INFO&nbsp; > > > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; > > - Shut down complete. 2020-06-29 15:34:53,074 INFO&nbsp; > > > org.apache.flink.yarn.YarnResourceManager&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > - Shut down cluster because application is in SUCCEEDED, diagnostics > null. > > 2020-06-29 15:34:53,076 INFO&nbsp; > > > org.apache.flink.yarn.YarnResourceManager&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > - Unregister application from the YARN Resource Manager with final > status > > SUCCEEDED. 2020-06-29 15:34:53,088 INFO&nbsp; > > > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > - Waiting for application to be successfully unregistered. 2020-06-29 > > 15:34:53,306 INFO&nbsp; > > > org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent&nbsp; > > - Closing components. 2020-06-29 15:34:53,308 INFO&nbsp; > > > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess&nbsp; > > - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 > > INFO&nbsp; > > > org.apache.flink.runtime.dispatcher.StandaloneDispatcher&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1 > :38817/user/dispatcher. > > 2020-06-29 15:34:53,310 INFO&nbsp; > > > org.apache.flink.runtime.dispatcher.StandaloneDispatcher&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > - Stopping all currently running jobs of dispatcher > > akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. > 2020-06-29 > > 15:34:53,311 INFO&nbsp; > > > org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > - Stopping the JobMaster for job default: insert into > > rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 > INFO&nbsp; > > > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl&nbsp; - > > Interrupted while waiting for queue > > > java.lang.InterruptedException&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) > > 2020-06-29 15:34:53,324 INFO&nbsp; > > > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy&nbsp; > > - Opening proxy : uhadoop-op3raf-core12:23333 > > > > &nbsp; > > ps:&amp;nbsp; > > > > 1. kafka中一直有数据在写入的 > > 2. flink版本1.10.0 > > 请问,任务状态为什么会变为SUCCEEDED呢? > > > > 谢谢大家! > > > > > > > > > > 逻辑稍微有些复杂,可以忽略下面的sql代码: > > #&nbsp; -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 -- > > 每5分钟将近5分钟统计结果写入mysql insert into > rt_app.app_video_cover_abtest_test&nbsp; > > select&nbsp; begin_time,&nbsp; vid,&nbsp; > vid_group,&nbsp; max(dv),&nbsp; > > max(click),&nbsp; max(vv),&nbsp; max(effectivevv) > from(&nbsp; > > select&nbsp;&nbsp; t1.begin_time > begin_time,&nbsp;&nbsp; t1.u_vid > > vid,&nbsp;&nbsp; t1.u_vid_group > vid_group,&nbsp;&nbsp; dv,&nbsp;&nbsp; > > click,&nbsp;&nbsp; vv,&nbsp;&nbsp; if(effectivevv is > null,0,effectivevv) > > effectivevv&nbsp; from&nbsp; (&nbsp;&nbsp; -- > dv、click、vv&nbsp;&nbsp; > > select&nbsp;&nbsp;&nbsp;&nbsp; > CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) > > AS STRING) begin_time,&nbsp;&nbsp;&nbsp; cast(u_vid as > bigint) > > u_vid,&nbsp;&nbsp;&nbsp; > u_vid_group,&nbsp;&nbsp;&nbsp; > > sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and > > u_c_module='M011',1,0)) dv,&nbsp;&nbsp;&nbsp; > > sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and > > u_c_module='M011',1,0)) click,&nbsp;&nbsp;&nbsp; > > sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) > > vv&nbsp;&nbsp; FROM > rt_ods.ods_applog_vidsplit&nbsp;&nbsp; where u_vid is > > not null and trim(u_vid)<&amp;gt;''&nbsp;&nbsp;&nbsp; > and u_vid_group is > > not null and trim(u_vid_group) not in > ('','-1')&nbsp;&nbsp;&nbsp; and > > (&nbsp; (concat(u_mod,'-',u_ac) in > > ('emptylog-video_display','emptylog-video_click')&nbsp; and > > u_c_module='M011')&nbsp; or&nbsp; > (concat(u_mod,'-',u_ac)='top-hits' and > > u_f_module='M011')&nbsp;&nbsp;&nbsp;&nbsp; > )&nbsp;&nbsp; group > > by&nbsp;&nbsp;&nbsp;&nbsp; TUMBLE(bjdt, INTERVAL '5' > > MINUTE),&nbsp;&nbsp;&nbsp; cast(u_vid as > bigint),&nbsp;&nbsp;&nbsp; > > u_vid_group&nbsp; ) t1&nbsp; left join&nbsp; > (&nbsp;&nbsp; -- > > effectivevv&nbsp;&nbsp; select&nbsp;&nbsp;&nbsp; > > begin_time,&nbsp;&nbsp;&nbsp; > u_vid,&nbsp;&nbsp;&nbsp; > > u_vid_group,&nbsp;&nbsp;&nbsp; count(1) > effectivevv&nbsp;&nbsp; > > from&nbsp;&nbsp; (&nbsp;&nbsp;&nbsp; > select&nbsp; begin_time,&nbsp; > > u_vid,&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; > u_playid,&nbsp; m_pt,&nbsp; > > q70&nbsp;&nbsp;&nbsp; from&nbsp;&nbsp;&nbsp; > dw.video_pic_title_q70 > > a&nbsp;&nbsp;&nbsp; join&nbsp;&nbsp;&nbsp; > (&nbsp;&nbsp;&nbsp;&nbsp; > > select&nbsp;&nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5' > MINUTE) AS STRING) > > begin_time,&nbsp; cast(u_vid as bigint) u_vid,&nbsp; > u_vid_group,&nbsp; > > u_diu,&nbsp; u_playid,&nbsp; max(u_playtime) > m_pt&nbsp;&nbsp;&nbsp;&nbsp; > > FROM > rt_ods.ods_applog_vidsplit&nbsp;&nbsp;&nbsp;&nbsp; where > u_vid is not > > null and trim(u_vid)<&amp;gt;''&nbsp; and u_vid_group is not > null and > > trim(u_vid_group) not in ('','-1')&nbsp; and > > concat(u_mod,'-',u_ac)='emptylog-video_play_speed'&nbsp; and > > u_f_module='M011'&nbsp; and > u_playtime&amp;gt;0&nbsp;&nbsp;&nbsp;&nbsp; > > group by&nbsp;&nbsp; TUMBLE(bjdt, INTERVAL '5' > MINUTE),&nbsp; cast(u_vid as > > bigint),&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; > u_playid&nbsp;&nbsp;&nbsp; ) > > b&nbsp;&nbsp;&nbsp; on > a.vid=b.u_vid&nbsp;&nbsp;&nbsp; group by&nbsp;&nbsp; > > begin_time,&nbsp; u_vid,&nbsp; u_vid_group,&nbsp; > u_diu,&nbsp; > > u_playid,&nbsp; m_pt,&nbsp; q70&nbsp;&nbsp; ) > temp&nbsp;&nbsp; where > > m_pt&amp;gt;=q70&nbsp;&nbsp; group > by&nbsp;&nbsp;&nbsp;&nbsp; > > begin_time,&nbsp;&nbsp;&nbsp; > u_vid,&nbsp;&nbsp;&nbsp; u_vid_group&nbsp; ) > > t2&nbsp; on t1.begin_time=t2.begin_time&nbsp;&nbsp; and > > t1.u_vid=t2.u_vid&nbsp;&nbsp; and > t1.u_vid_group=t2.u_vid_group > > )t3&nbsp;&nbsp; group by begin_time,&nbsp; vid,&nbsp; > vid_group ; -- Best regards! Rui Li |
是的,作业状态是成功结束。
任务中是把hive table作为维表使用了,从任务dag看,只是在任务启动的开始查询了hive表,读hive表只花了3秒钟,然后这个subtask就是结束状态了,如图: https://s1.ax1x.com/2020/06/30/N4qxNq.png 但是,其他的subtask还是一直处于执行running状态,图上任务已经执行了19个小时了,但是可能随时会以SUCCESS状态停止。 ------------------ 原始邮件 ------------------ 发件人: "Rui Li"<[hidden email]>; 发送时间: 2020年6月30日(星期二) 中午11:01 收件人: "user-zh"<[hidden email]>; 主题: Re: flinksql流计算任务非正常结束 作业最后的状态是成功结束么?Hive table source是一个bounded stream,所以hive表的数据读完这个stream就结束了,不知道这个对作业是不是有影响。 On Tue, Jun 30, 2020 at 10:39 AM MuChen <[hidden email]> wrote: > 看了配置文件,是流作业啊 > > > $ grep -v \# sql-client-defaults.yaml |grep -v ^$ catalogs: - name: > myhive type: hive hive-conf-dir: /home/fsql/hive/conf > default-database: default execution: planner: blink type: streaming > time-characteristic: event-time periodic-watermarks-interval: 200 > result-mode: table max-table-result-rows: 1000000 parallelism: 4 > max-parallelism: 128 min-idle-state-retention: 0 > max-idle-state-retention: 0 current-catalog: myhive current-database: > default restart-strategy: type: fixed-delay deployment: > response-timeout: 5000 gateway-address: "" gateway-port: 0 > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"zhisheng"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月30日(星期二) 上午9:05 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: flinksql流计算任务非正常结束 > > > > 是不是作业是一个批作业呀? > > Yichao Yang <[hidden email]&gt; 于2020年6月29日周一 下午6:58写道: > > &gt; Hi > &gt; > &gt; > &gt; 看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。 > &gt; > &gt; > &gt; Best, > &gt; Yichao Yang > &gt; > &gt; > &gt; > &gt; > &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > &gt; 发件人:&amp;nbsp;"MuChen"<[hidden email]&amp;gt;; > &gt; 发送时间:&amp;nbsp;2020年6月29日(星期一) 下午4:53 > &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; > &gt; > &gt; 主题:&amp;nbsp;flinksql流计算任务非正常结束 > &gt; > &gt; > &gt; > &gt; hi,大家好: > &gt; > &gt; 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu > root.flink -nm > &gt; fsql-cli&amp;amp;nbsp; 2&amp;amp;gt;&amp;amp;amp;1 &amp;amp;amp; > &gt; > &gt; 然后通过sql-client,提交了一个sql: > &gt; > &gt; 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。&amp;amp;nbsp; > &gt; > &gt; 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图: > &gt; https://s1.ax1x.com/2020/06/29/Nf2dIA.png > &gt; > &gt; 日志中能看到INFO级别的异常,15:34任务结束时的日志如下: > &gt; 2020-06-29 14:53:20,260 INFO&amp;nbsp; > &gt; > org.apache.flink.api.common.io.LocatableInputSplitAssigner&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 > &gt; 14:53:22,845 INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, > &gt; PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 > &gt; 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. > 2020-06-29 > &gt; 15:34:52,982 INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.entrypoint.ClusterEntrypoint&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Shutting YarnSessionClusterEntrypoint down with application status > &gt; SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Removing cache directory > &gt; /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui > 2020-06-29 > &gt; 15:34:53,073 INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 > &gt; 15:34:53,074 INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Shut down complete. 2020-06-29 15:34:53,074 INFO&amp;nbsp; > &gt; > org.apache.flink.yarn.YarnResourceManager&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Shut down cluster because application is in SUCCEEDED, diagnostics > null. > &gt; 2020-06-29 15:34:53,076 INFO&amp;nbsp; > &gt; > org.apache.flink.yarn.YarnResourceManager&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Unregister application from the YARN Resource Manager with final > status > &gt; SUCCEEDED. 2020-06-29 15:34:53,088 INFO&amp;nbsp; > &gt; > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Waiting for application to be successfully unregistered. 2020-06-29 > &gt; 15:34:53,306 INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent&amp;nbsp; > &gt; - Closing components. 2020-06-29 15:34:53,308 INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess&amp;nbsp; > &gt; - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 > &gt; INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.dispatcher.StandaloneDispatcher&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1 > :38817/user/dispatcher. > &gt; 2020-06-29 15:34:53,310 INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.dispatcher.StandaloneDispatcher&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Stopping all currently running jobs of dispatcher > &gt; akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. > 2020-06-29 > &gt; 15:34:53,311 INFO&amp;nbsp; > &gt; > org.apache.flink.runtime.jobmaster.JobMaster&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; - Stopping the JobMaster for job default: insert into > &gt; rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 > INFO&amp;nbsp; > &gt; > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl&amp;nbsp; - > &gt; Interrupted while waiting for queue > &gt; > java.lang.InterruptedException&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; at > &gt; > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; at > &gt; > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; at > &gt; > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; at > &gt; > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) > &gt; 2020-06-29 15:34:53,324 INFO&amp;nbsp; > &gt; > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy&amp;nbsp; > &gt; - Opening proxy : uhadoop-op3raf-core12:23333 > &gt; > &gt; &amp;nbsp; > &gt; ps:&amp;amp;nbsp; > &gt; > &gt; 1. kafka中一直有数据在写入的 > &gt; 2. flink版本1.10.0 > &gt; 请问,任务状态为什么会变为SUCCEEDED呢? > &gt; > &gt; 谢谢大家! > &gt; > &gt; > &gt; > &gt; > &gt; 逻辑稍微有些复杂,可以忽略下面的sql代码: > &gt; #&amp;nbsp; -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 -- > &gt; 每5分钟将近5分钟统计结果写入mysql insert into > rt_app.app_video_cover_abtest_test&amp;nbsp; > &gt; select&amp;nbsp; begin_time,&amp;nbsp; vid,&amp;nbsp; > vid_group,&amp;nbsp; max(dv),&amp;nbsp; > &gt; max(click),&amp;nbsp; max(vv),&amp;nbsp; max(effectivevv) > from(&amp;nbsp; > &gt; select&amp;nbsp;&amp;nbsp; t1.begin_time > begin_time,&amp;nbsp;&amp;nbsp; t1.u_vid > &gt; vid,&amp;nbsp;&amp;nbsp; t1.u_vid_group > vid_group,&amp;nbsp;&amp;nbsp; dv,&amp;nbsp;&amp;nbsp; > &gt; click,&amp;nbsp;&amp;nbsp; vv,&amp;nbsp;&amp;nbsp; if(effectivevv is > null,0,effectivevv) > &gt; effectivevv&amp;nbsp; from&amp;nbsp; (&amp;nbsp;&amp;nbsp; -- > dv、click、vv&amp;nbsp;&amp;nbsp; > &gt; select&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) > &gt; AS STRING) begin_time,&amp;nbsp;&amp;nbsp;&amp;nbsp; cast(u_vid as > bigint) > &gt; u_vid,&amp;nbsp;&amp;nbsp;&amp;nbsp; > u_vid_group,&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and > &gt; u_c_module='M011',1,0)) dv,&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and > &gt; u_c_module='M011',1,0)) click,&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) > &gt; vv&amp;nbsp;&amp;nbsp; FROM > rt_ods.ods_applog_vidsplit&amp;nbsp;&amp;nbsp; where u_vid is > &gt; not null and trim(u_vid)<&amp;amp;gt;''&amp;nbsp;&amp;nbsp;&amp;nbsp; > and u_vid_group is > &gt; not null and trim(u_vid_group) not in > ('','-1')&amp;nbsp;&amp;nbsp;&amp;nbsp; and > &gt; (&amp;nbsp; (concat(u_mod,'-',u_ac) in > &gt; ('emptylog-video_display','emptylog-video_click')&amp;nbsp; and > &gt; u_c_module='M011')&amp;nbsp; or&amp;nbsp; > (concat(u_mod,'-',u_ac)='top-hits' and > &gt; u_f_module='M011')&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > )&amp;nbsp;&amp;nbsp; group > &gt; by&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; TUMBLE(bjdt, INTERVAL '5' > &gt; MINUTE),&amp;nbsp;&amp;nbsp;&amp;nbsp; cast(u_vid as > bigint),&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; u_vid_group&amp;nbsp; ) t1&amp;nbsp; left join&amp;nbsp; > (&amp;nbsp;&amp;nbsp; -- > &gt; effectivevv&amp;nbsp;&amp;nbsp; select&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; begin_time,&amp;nbsp;&amp;nbsp;&amp;nbsp; > u_vid,&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; u_vid_group,&amp;nbsp;&amp;nbsp;&amp;nbsp; count(1) > effectivevv&amp;nbsp;&amp;nbsp; > &gt; from&amp;nbsp;&amp;nbsp; (&amp;nbsp;&amp;nbsp;&amp;nbsp; > select&amp;nbsp; begin_time,&amp;nbsp; > &gt; u_vid,&amp;nbsp; u_vid_group,&amp;nbsp; u_diu,&amp;nbsp; > u_playid,&amp;nbsp; m_pt,&amp;nbsp; > &gt; q70&amp;nbsp;&amp;nbsp;&amp;nbsp; from&amp;nbsp;&amp;nbsp;&amp;nbsp; > dw.video_pic_title_q70 > &gt; a&amp;nbsp;&amp;nbsp;&amp;nbsp; join&amp;nbsp;&amp;nbsp;&amp;nbsp; > (&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; select&amp;nbsp;&amp;nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5' > MINUTE) AS STRING) > &gt; begin_time,&amp;nbsp; cast(u_vid as bigint) u_vid,&amp;nbsp; > u_vid_group,&amp;nbsp; > &gt; u_diu,&amp;nbsp; u_playid,&amp;nbsp; max(u_playtime) > m_pt&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; FROM > rt_ods.ods_applog_vidsplit&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; where > u_vid is not > &gt; null and trim(u_vid)<&amp;amp;gt;''&amp;nbsp; and u_vid_group is not > null and > &gt; trim(u_vid_group) not in ('','-1')&amp;nbsp; and > &gt; concat(u_mod,'-',u_ac)='emptylog-video_play_speed'&amp;nbsp; and > &gt; u_f_module='M011'&amp;nbsp; and > u_playtime&amp;amp;gt;0&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; group by&amp;nbsp;&amp;nbsp; TUMBLE(bjdt, INTERVAL '5' > MINUTE),&amp;nbsp; cast(u_vid as > &gt; bigint),&amp;nbsp; u_vid_group,&amp;nbsp; u_diu,&amp;nbsp; > u_playid&amp;nbsp;&amp;nbsp;&amp;nbsp; ) > &gt; b&amp;nbsp;&amp;nbsp;&amp;nbsp; on > a.vid=b.u_vid&amp;nbsp;&amp;nbsp;&amp;nbsp; group by&amp;nbsp;&amp;nbsp; > &gt; begin_time,&amp;nbsp; u_vid,&amp;nbsp; u_vid_group,&amp;nbsp; > u_diu,&amp;nbsp; > &gt; u_playid,&amp;nbsp; m_pt,&amp;nbsp; q70&amp;nbsp;&amp;nbsp; ) > temp&amp;nbsp;&amp;nbsp; where > &gt; m_pt&amp;amp;gt;=q70&amp;nbsp;&amp;nbsp; group > by&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &gt; begin_time,&amp;nbsp;&amp;nbsp;&amp;nbsp; > u_vid,&amp;nbsp;&amp;nbsp;&amp;nbsp; u_vid_group&amp;nbsp; ) > &gt; t2&amp;nbsp; on t1.begin_time=t2.begin_time&amp;nbsp;&amp;nbsp; and > &gt; t1.u_vid=t2.u_vid&amp;nbsp;&amp;nbsp; and > t1.u_vid_group=t2.u_vid_group > &gt; )t3&amp;nbsp;&amp;nbsp; group by begin_time,&amp;nbsp; vid,&amp;nbsp; > vid_group ; -- Best regards! Rui Li |
Free forum by Nabble | Edit this page |