读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败

Bruce
您好,这里有个问题反馈下!

读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
没有抛任何异常但是checkpoint失败:
job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
附件
1.flink.log是yarn jobmanager打印的伪日志
2.Job.txt是job的伪代码
3.jdbc两阶段提交的伪代码附件
Reply | Threaded
Open this post in threaded view
|

Re: 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败

shizk233
hi,附件挂了,没看到饿。可以考虑挂gist放链接出来。

不过看这个信息,checkpoint的失败是由于任务状态变更导致的(应该是任务重启了,所以running变scheduled了)。
建议往任务重启的方向排查一下。

Bruce <[hidden email]> 于2020年8月10日周一 下午5:01写道:

> 您好,这里有个问题反馈下!
>
> 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
> 没有抛任何异常但是checkpoint失败:
> job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED
> instead. Aborting checkpoint.
> 附件
> 1.flink.log是yarn jobmanager打印的伪日志
> 2.Job.txt是job的伪代码
> 3.jdbc两阶段提交的伪代码附件
> ------------------------------
> 发自我的iPhone
>
Reply | Threaded
Open this post in threaded view
|

回复:读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败

Bruce
下面是附件的内容,请问是因为什么导致重启呢?


2阶段提交demo:


@Slf4j public class CommonOracleSink extends TwoPhaseCommitSinkFunction<LinkedList<Object&gt;, CommonOracleSink.ConnectionState, Void&gt; {  &nbsp; &nbsp; private transient String sinkSQL;  &nbsp; &nbsp; public CommonOracleSink() {  &nbsp; &nbsp; &nbsp; &nbsp; super(new KryoSerializer<&gt;(ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE);  &nbsp; &nbsp; }  &nbsp; &nbsp; @Override &nbsp; &nbsp; public void open(Configuration parameters) throws Exception { &nbsp; &nbsp; &nbsp; &nbsp; super.open(parameters); &nbsp; &nbsp; &nbsp; &nbsp; ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); &nbsp; &nbsp; &nbsp; &nbsp; sinkSQL = params.getRequired("sinkSQL"); &nbsp; &nbsp; }  &nbsp; &nbsp; @Override &nbsp; &nbsp; protected void invoke(ConnectionState connectionState, LinkedList<Object&gt; colList, Context context){ &nbsp; &nbsp; &nbsp; &nbsp; try { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.err.println("start invoke......."); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Connection connection = connectionState.connection; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.info("colList----------------------&gt;", JSON.toJSONString(colList)); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TKQueryRunner runner = new TKQueryRunner(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Object[] params = colList.toArray(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.err.println("params size-----&gt;"+params.length); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; runner.update(connection,sinkSQL,params); &nbsp; &nbsp; &nbsp; &nbsp; }catch (Exception e){ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.error(e.getMessage(),e); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.err.println(e.getMessage());  &nbsp; &nbsp; &nbsp; &nbsp; }  &nbsp; &nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;* 获取连接,开启手动提交事物 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp; &nbsp;* @return &nbsp; &nbsp; &nbsp;* @throws Exception &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp; protected ConnectionState beginTransaction() throws Exception {  &nbsp; &nbsp; &nbsp; &nbsp; Connection connection = HikariOUtils.getConnection();  &nbsp; &nbsp; &nbsp; &nbsp; log.info("start beginTransaction......." + connection);  &nbsp; &nbsp; &nbsp; &nbsp; return new ConnectionState(connection); &nbsp; &nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;* 预提交,这里预提交的逻辑在invoke方法中 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp; &nbsp;* @param connectionState &nbsp; &nbsp; &nbsp;* @throws Exception &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp; protected void preCommit(ConnectionState connectionState) throws Exception { &nbsp; &nbsp; &nbsp; &nbsp; log.info("start preCommit......." + connectionState); &nbsp; &nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;* 如果invoke方法执行正常,则提交事务 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp; &nbsp;* @param connectionState &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp; protected void commit(ConnectionState connectionState) { &nbsp; &nbsp; &nbsp; &nbsp; log.info("start commit......." + connectionState);  &nbsp; &nbsp; &nbsp; &nbsp; Connection connection = connectionState.connection;  &nbsp; &nbsp; &nbsp; &nbsp; try { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.commit(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.close(); &nbsp; &nbsp; &nbsp; &nbsp; } catch (SQLException e) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new RuntimeException("提交事物异常"); &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;* 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp; &nbsp;* @param connectionState &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp; protected void abort(ConnectionState connectionState) { &nbsp; &nbsp; &nbsp; &nbsp; log.error("start abort rollback......." + connectionState); &nbsp; &nbsp; &nbsp; &nbsp; Connection connection = connectionState.connection; &nbsp; &nbsp; &nbsp; &nbsp; try { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.rollback(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.close(); &nbsp; &nbsp; &nbsp; &nbsp; } catch (SQLException e) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new RuntimeException("回滚事物异常"); &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; }  &nbsp; &nbsp; static class ConnectionState {  &nbsp; &nbsp; &nbsp; &nbsp; private final transient Connection connection;  &nbsp; &nbsp; &nbsp; &nbsp; ConnectionState(Connection connection) {  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.connection = connection; &nbsp; &nbsp; &nbsp; &nbsp; }  &nbsp; &nbsp; }   }

jobmanager日志

2020-08-10 16:37:31,892 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - --------------------------------------------------------------------------------2020-08-10 16:37:31,897 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;Starting YarnJobClusterEntrypoint (Version: 1.11.1, Scala: 2.11, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)2020-08-10 16:37:31,898 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;OS current user: root2020-08-10 16:37:32,295 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;Current Hadoop/Kerberos user: root2020-08-10 16:37:32,295 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.121-b132020-08-10 16:37:32,295 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;Maximum heap size: 3166 MiBytes2020-08-10 16:37:32,295 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;JAVA_HOME: /home/xxx/app/jdk1.8.0_1212020-08-10 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;Hadoop version: 2.7.72020-08-10 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;JVM Options:2020-08-10 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp; &nbsp; -Xmx34628173762020-08-10 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp; &nbsp; -Xms34628173762020-08-10 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp; &nbsp; -XX:MaxMetaspaceSize=2684354562020-08-10 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp; &nbsp; -Dlog.file=/home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.log2020-08-10 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp; &nbsp; -Dlog4j.configuration=file:log4j.properties2020-08-10 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp; &nbsp; -Dlog4j.configurationFile=file:log4j.properties2020-08-10 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;Program Arguments: (none)2020-08-10 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;Classpath: :UnifyCompFlink-1.0.jar:lib/flink-csv-1.11.1.jar:lib/flink-json-1.11.1.jar:lib/flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:lib/flink-shaded-zookeeper-3.4.14.jar:lib/flink-table-blink_2.11-1.11.1.jar:lib/flink-table_2.11-1.11.1.jar:lib/log4j-1.2-api-2.12.1.jar:lib/log4j-api-2.12.1.jar:lib/log4j-core-2.12.1.jar:lib/log4j-slf4j-impl-2.12.1.jar:flink-dist_2.11-1.11.1.jar:job.graph:flink-conf.yaml::/home/xxx/app/hadoop-2.6.0-cdh5.15.1/etc/hadoop:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-nfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1-tests.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/zookeeper-3.4.5-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/logredactor-1.0.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/activation-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/junit-4.11.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-configuration-1.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/netty-3.10.5.Final.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/slf4j-api-1.7.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/httpclient-4.2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/api-asn1-api-1.0.0-M20.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/avro-1.7.6-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jaxb-api-2.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/mockito-all-1.8.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jackson-jaxrs-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-auth-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-el-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-net-3.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-json-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsch-0.1.42.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jettison-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-annotations-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-recipes-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/snappy-java-1.0.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/gson-2.2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-compress-1.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/java-xmlbuilder-0.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/paranamer-2.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hamcrest-core-1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jackson-xc-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsp-api-2.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-math3-3.1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/xmlenc-0.52.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-digester-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/xz-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jasper-compiler-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/stax-api-1.0-2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jets3t-0.9.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jasper-runtime-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/httpcore-4.2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-beanutils-1.9.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-collections-3.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jaxb-impl-2.2.3-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-client-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-httpclient-3.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/apacheds-i18n-2.0.0-M15.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-framework-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/asm-3.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1-tests.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-nfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/leveldbjni-all-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/netty-3.10.5.Final.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-el-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jsp-api-2.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-daemon-1.0.13.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xmlenc-0.52.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xml-apis-1.3.04.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jackson-core-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xercesImpl-2.9.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jasper-runtime-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/htrace-core4-4.0.1-incubating.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jackson-mapper-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/asm-3.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-registry-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-nodemanager-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-tests-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-api-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-resourcemanager-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-client-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/zookeeper-3.4.5-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/activation-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/aopalliance-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/leveldbjni-all-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/javax.inject-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-client-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jline-2.11.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jaxb-api-2.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-jaxrs-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guice-servlet-3.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-json-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jettison-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-compress-1.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-guice-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-xc-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guice-3.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/xz-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-core-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/stax-api-1.0-2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-collections-3.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-mapper-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jaxb-impl-2.2.3-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/asm-3.2.jar2020-08-10 16:37:32,299 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - --------------------------------------------------------------------------------2020-08-10 16:37:32,301 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - Registered UNIX signal handlers for [TERM, HUP, INT]2020-08-10 16:37:32,306 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - YARN daemon is running as: root Yarn client user obtainer: root2020-08-10 16:37:32,311 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: taskmanager.memory.process.size, 4 gb2020-08-10 16:37:32,311 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: internal.jobgraph-path, job.graph2020-08-10 16:37:32,311 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: jobmanager.execution.failover-strategy, region2020-08-10 16:37:32,312 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: high-availability.cluster-id, application_1591335931326_00242020-08-10 16:37:32,312 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: jobmanager.rpc.address, localhost2020-08-10 16:37:32,312 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.target, yarn-per-job2020-08-10 16:37:32,312 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: jobmanager.memory.process.size, 4 gb2020-08-10 16:37:32,312 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: jobmanager.rpc.port, 61232020-08-10 16:37:32,312 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:32,313 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.attached, true2020-08-10 16:37:32,313 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: internal.cluster.execution-mode, NORMAL2020-08-10 16:37:32,313 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.shutdown-on-attached-exit, false2020-08-10 16:37:32,313 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: pipeline.jars, file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10 16:37:32,313 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: parallelism.default, 82020-08-10 16:37:32,313 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: taskmanager.numberOfTaskSlots, 12020-08-10 16:37:32,313 WARN &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Error while trying to split key and value in configuration file /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_000001/flink-conf.yaml:16: "pipeline.classpaths: "2020-08-10 16:37:32,314 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: $internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10 16:37:32,314 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: $internal.yarn.log-config-file, /home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:32,347 WARN &nbsp;org.apache.flink.configuration.Configuration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses deprecated configuration key 'web.port' instead of proper key 'rest.bind-port'2020-08-10 16:37:32,362 INFO &nbsp;org.apache.flink.runtime.clusterframework.BootstrapTools &nbsp; &nbsp; [] - Setting directories for temporary files to: /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_00242020-08-10 16:37:32,368 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - Starting YarnJobClusterEntrypoint.2020-08-10 16:37:32,413 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - Install default filesystem.2020-08-10 16:37:32,461 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - Install security context.2020-08-10 16:37:32,520 INFO &nbsp;org.apache.flink.runtime.security.modules.HadoopModule &nbsp; &nbsp; &nbsp; [] - Hadoop user set to root (auth:SIMPLE)2020-08-10 16:37:32,529 INFO &nbsp;org.apache.flink.runtime.security.modules.JaasModule &nbsp; &nbsp; &nbsp; &nbsp; [] - Jaas file will be created as /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/jaas-1114046375892877617.conf.2020-08-10 16:37:32,539 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; &nbsp; &nbsp;[] - Initializing cluster services.2020-08-10 16:37:32,556 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp; &nbsp; &nbsp;[] - Trying to start actor system, external address node3:0, bind address 0.0.0.0:0.2020-08-10 16:37:33,191 INFO &nbsp;akka.event.slf4j.Slf4jLogger &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Slf4jLogger started2020-08-10 16:37:33,218 INFO &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting remoting2020-08-10 16:37:33,378 INFO &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Remoting started; listening on addresses :[akka.tcp://flink@node3:40657]2020-08-10 16:37:33,506 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp; &nbsp; &nbsp;[] - Actor system started at akka.tcp://flink@node3:406572020-08-10 16:37:33,539 WARN &nbsp;org.apache.flink.configuration.Configuration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses deprecated configuration key 'web.port' instead of proper key 'rest.port'2020-08-10 16:37:33,551 INFO &nbsp;org.apache.flink.runtime.blob.BlobServer &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Created BLOB server storage directory /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/blobStore-15a573e2-a671-4eb9-975b-b5229cec6bde2020-08-10 16:37:33,555 INFO &nbsp;org.apache.flink.runtime.blob.BlobServer &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Started BLOB server at 0.0.0.0:34380 - max concurrent requests: 50 - max backlog: 10002020-08-10 16:37:33,570 INFO &nbsp;org.apache.flink.runtime.metrics.MetricRegistryImpl &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - No metrics reporter configured, no metrics will be exposed/reported.2020-08-10 16:37:33,574 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp; &nbsp; &nbsp;[] - Trying to start actor system, external address node3:0, bind address 0.0.0.0:0.2020-08-10 16:37:33,591 INFO &nbsp;akka.event.slf4j.Slf4jLogger &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Slf4jLogger started2020-08-10 16:37:33,597 INFO &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting remoting2020-08-10 16:37:33,606 INFO &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Remoting started; listening on addresses :[akka.tcp://flink-metrics@node3:43096]2020-08-10 16:37:33,642 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp; &nbsp; &nbsp;[] - Actor system started at akka.tcp://flink-metrics@node3:430962020-08-10 16:37:33,659 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/rpc/MetricQueryService .2020-08-10 16:37:33,721 WARN &nbsp;org.apache.flink.configuration.Configuration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses deprecated configuration key 'web.port' instead of proper key 'rest.bind-port'2020-08-10 16:37:33,723 INFO &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Upload directory /tmp/flink-web-d25a365e-b6cb-45e6-bb6e-3068b7412f06/flink-web-upload does not exist.&nbsp;2020-08-10 16:37:33,724 INFO &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Created directory /tmp/flink-web-d25a365e-b6cb-45e6-bb6e-3068b7412f06/flink-web-upload for file uploads.2020-08-10 16:37:33,748 INFO &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Starting rest endpoint.2020-08-10 16:37:34,110 INFO &nbsp;org.apache.flink.runtime.webmonitor.WebMonitorUtils &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Determined location of main cluster component log file: /home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.log2020-08-10 16:37:34,111 INFO &nbsp;org.apache.flink.runtime.webmonitor.WebMonitorUtils &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Determined location of main cluster component stdout file: /home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.out2020-08-10 16:37:34,309 INFO &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Rest endpoint listening at node3:394692020-08-10 16:37:34,311 INFO &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - http://node3:39469 was granted leadership with leaderSessionID=00000000-0000-0000-0000-0000000000002020-08-10 16:37:34,312 INFO &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Web frontend listening at <a href="http://node3:39469.2020-08-10">http://node3:39469.2020-08-10 16:37:34,403 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for org.apache.flink.yarn.YarnResourceManager at akka://flink/user/rpc/resourcemanager_0 .2020-08-10 16:37:34,417 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: taskmanager.memory.process.size, 4 gb2020-08-10 16:37:34,417 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: internal.jobgraph-path, job.graph2020-08-10 16:37:34,417 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: jobmanager.execution.failover-strategy, region2020-08-10 16:37:34,417 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: high-availability.cluster-id, application_1591335931326_00242020-08-10 16:37:34,417 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: jobmanager.rpc.address, localhost2020-08-10 16:37:34,417 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.target, yarn-per-job2020-08-10 16:37:34,417 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: jobmanager.memory.process.size, 4 gb2020-08-10 16:37:34,418 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: jobmanager.rpc.port, 61232020-08-10 16:37:34,418 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:34,418 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.attached, true2020-08-10 16:37:34,418 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: internal.cluster.execution-mode, NORMAL2020-08-10 16:37:34,418 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.shutdown-on-attached-exit, false2020-08-10 16:37:34,418 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: pipeline.jars, file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10 16:37:34,418 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: parallelism.default, 82020-08-10 16:37:34,418 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: taskmanager.numberOfTaskSlots, 12020-08-10 16:37:34,418 WARN &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Error while trying to split key and value in configuration file /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_000001/flink-conf.yaml:16: "pipeline.classpaths: "2020-08-10 16:37:34,419 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: $internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10 16:37:34,419 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: $internal.yarn.log-config-file, /home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:34,450 INFO &nbsp;org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled external resources: []2020-08-10 16:37:34,519 INFO &nbsp;org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Start JobDispatcherLeaderProcess.2020-08-10 16:37:34,527 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.MiniDispatcher at akka://flink/user/rpc/dispatcher_1 .2020-08-10 16:37:34,572 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 .2020-08-10 16:37:34,582 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Initializing job empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,615 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,667 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Running initialization on master for job empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,806 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Successfully ran initialization on master in 139 ms.2020-08-10 16:37:34,876 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Recovered 0 containers from previous attempts ([]).2020-08-10 16:37:34,877 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Register application master response contains scheduler resource types: [MEMORY, CPU].2020-08-10 16:37:34,877 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Container matching strategy: MATCH_VCORE.2020-08-10 16:37:34,887 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - ResourceManager akka.tcp://flink@node3:40657/user/rpc/resourcemanager_0 was granted leadership with fencing token 000000000000000000000000000000002020-08-10 16:37:34,891 INFO &nbsp;org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - Starting the SlotManager.2020-08-10 16:37:35,466 INFO &nbsp;org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 2 ms2020-08-10 16:37:35,483 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)2020-08-10 16:37:35,503 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3915bc20 for empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:35,509 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl &nbsp; &nbsp; &nbsp;[] - JobManager runner for job empJOB (eb447d27efb8134da40c0c1dd19fffdf) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@node3:40657/user/rpc/jobmanager_2.2020-08-10 16:37:35,514 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting execution of job empJOB (eb447d27efb8134da40c0c1dd19fffdf) under job master id 00000000000000000000000000000000.2020-08-10 16:37:35,517 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]2020-08-10 16:37:35,518 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Job empJOB (eb447d27efb8134da40c0c1dd19fffdf) switched from state CREATED to RUNNING.2020-08-10 16:37:35,535 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) (5a6410258857c02ebd1b5ec03a78be4b) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (2/6) (299de0d4a8affe02a999edeb84957c41) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (3/6) (1b98df27c9019f64835b55fa3de3f363) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (4/6) (a7612608772c018d819741ce4d9320bd) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (5/6) (b19828c85fc0e92e62f2a7241b610f5b) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (6/6) (c2178a51eda2db900d3212e4f488d00f) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Process -&gt; Sink: Unnamed (1/8) (2a8db3a2b4cd65fd7cd3e6bac031a971) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Process -&gt; Sink: Unnamed (2/8) (7aa8dd779d4ff75e4c985be75a52c427) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Process -&gt; Sink: Unnamed (3/8) (867c814978ea302537065f51516ed766) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Process -&gt; Sink: Unnamed (4/8) (4e186575ab42cc6c1d599ae027bf99b8) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,537 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Process -&gt; Sink: Unnamed (5/8) (b107b8bfb0a08c5e7937400c43a0f9ff) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,537 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Process -&gt; Sink: Unnamed (6/8) (28e1f0fa1b9ebed59e4c67b0598864b9) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,537 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Process -&gt; Sink: Unnamed (7/8) (e27e60ff7dcd5245dfd21b23bbd49985) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,537 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; &nbsp; [] - Process -&gt; Sink: Unnamed (8/8) (c0f6b9e623c68fd7e9205a8ad686d4e5) switched from CREATED to SCHEDULED.2020-08-10 16:37:35,558 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{7c8417d17f555e546cd6427733db6a3b}]2020-08-10 16:37:35,565 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{eb7c6ff479c977fdb6312f8e61dfdfba}]2020-08-10 16:37:35,565 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{a1d23d7e17b9369e80833de148f54bac}]2020-08-10 16:37:35,566 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{2447496efd24d542bce06de1b69ec70d}]2020-08-10 16:37:35,566 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{2ab761d21cd4368751f3187f122705fa}]2020-08-10 16:37:35,566 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{08a52f113cce64d1309509da5d25a1bb}]2020-08-10 16:37:35,574 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Connecting to ResourceManager akka.tcp://flink@node3:40657/user/rpc/resourcemanager_*(00000000000000000000000000000000)2020-08-10 16:37:35,579 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Resolved ResourceManager address, beginning registration2020-08-10 16:37:35,584 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registering job manager [hidden email]://flink@node3:40657/user/rpc/jobmanager_2 for job eb447d27efb8134da40c0c1dd19fffdf.2020-08-10 16:37:35,589 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registered job manager [hidden email]://flink@node3:40657/user/rpc/jobmanager_2 for job eb447d27efb8134da40c0c1dd19fffdf.2020-08-10 16:37:35,593 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.2020-08-10 16:37:35,594 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Requesting new slot [SlotRequestId{7c8417d17f555e546cd6427733db6a3b}] and profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with allocation id e490d3208119fe28d97f4f0fe94cab28.2020-08-10 16:37:35,595 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Requesting new slot [SlotRequestId{eb7c6ff479c977fdb6312f8e61dfdfba}] and profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Requesting new slot [SlotRequestId{a1d23d7e17b9369e80833de148f54bac}] and profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Requesting new slot [SlotRequestId{2447496efd24d542bce06de1b69ec70d}] and profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,596 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Requesting new slot [SlotRequestId{2ab761d21cd4368751f3187f122705fa}] and profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,596 INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] - Requesting new slot [SlotRequestId{08a52f113cce64d1309509da5d25a1bb}] and profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,612 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending workers of this resource is 1.2020-08-10 16:37:35,614 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with allocation id b74299e8619de93adec5869d1fa79d73.2020-08-10 16:37:35,615 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending workers of this resource is 2.2020-08-10 16:37:35,615 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with allocation id 046a3dcf1af40e0539f15fcddfbddf77.2020-08-10 16:37:35,615 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending workers of this resource is 3.2020-08-10 16:37:35,615 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with allocation id 90870250ae0f3bef44cbdd675dede57b.2020-08-10 16:37:35,616 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending workers of this resource is 4.2020-08-10 16:37:35,616 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with allocation id f8063a6fc86162712215a92533532b65.2020-08-10 16:37:35,616 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending workers of this resource is 5.2020-08-10 16:37:35,616 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with allocation id cfe671ec5448d440838f02145cb6267f.2020-08-10 16:37:35,617 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending workers of this resource is 6.2020-08-10 16:37:38,391 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:37:40,933 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Received 1 containers.2020-08-10 16:37:40,940 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Received 1 containers with resource <memory:4096, vCores:1&gt;, 6 pending container requests.2020-08-10 16:37:40,953 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - TaskExecutor container_1591335931326_0024_01_000003 will be started on node1 with TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemorySize=1.340gb (1438814063 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=409.600mb (429496736 bytes)}.2020-08-10 16:37:40,976 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Creating container launch context for TaskManagers2020-08-10 16:37:40,978 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Starting TaskManagers2020-08-10 16:37:40,995 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Removing container request Capability[<memory:4096, vCores:1&gt;]Priority[1].2020-08-10 16:37:40,995 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Accepted 1 requested containers, returned 0 excess containers, 5 pending container requests of resource <memory:4096, vCores:1&gt;.2020-08-10 16:37:46,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:37:47,712 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registering TaskManager with ResourceID container_1591335931326_0024_01_000003 (akka.tcp://flink@node1:40857/user/rpc/taskmanager_0) at ResourceManager2020-08-10 16:37:54,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:02,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:10,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:18,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:26,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:34,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:42,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:50,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:58,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:06,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:14,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:22,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:30,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:38,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:46,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:54,389 INFO &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6) of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.



job代码

public class PlatJobExecution {
&nbsp; &nbsp; private volatile ParameterTool parameters;
&nbsp; &nbsp; public PlatJobExecution(ParameterTool parameters) {&nbsp; &nbsp; &nbsp; &nbsp; this.parameters = parameters;&nbsp; &nbsp; }
&nbsp; &nbsp; public void execute() throws Exception {

&nbsp; &nbsp; &nbsp; &nbsp; //目标数据源:&nbsp; &nbsp; &nbsp; &nbsp; //目标数据表:
&nbsp; &nbsp; &nbsp; &nbsp; //1.读取数据 kafka /oracle &nbsp;把流注册成一张表【这个过程可以手动完成】 &nbsp; &nbsp; &nbsp;--hive
&nbsp; &nbsp; &nbsp; &nbsp; //2.执行sql,返回结果
&nbsp; &nbsp; &nbsp; &nbsp; //3.把结果写入目标数据表 / 写入redis / 写入kafka
&nbsp; &nbsp; &nbsp; &nbsp; InputStream is = ReadKafkaPrint.class.getClassLoader().getResourceAsStream("config.properties");
&nbsp; &nbsp; &nbsp; &nbsp; ParameterTool parameters2 = ParameterTool.fromPropertiesFile(is);



&nbsp; &nbsp; &nbsp; &nbsp; String targetDatabase = parameters.get("sourceDatabase");&nbsp; &nbsp; &nbsp; &nbsp; String executiveSql = parameters.get("executiveSql");&nbsp; &nbsp; &nbsp; &nbsp; String sinkSQL = parameters.get("sinkSQL");&nbsp; &nbsp; &nbsp; &nbsp; String jobName = parameters.get("jobName");
&nbsp; &nbsp; &nbsp; &nbsp; Map<String, String&gt; pMap = Maps.newHashMap();&nbsp; &nbsp; &nbsp; &nbsp; pMap.putAll(parameters2.toMap());&nbsp; &nbsp; &nbsp; &nbsp; pMap.put("sinkSQL",sinkSQL);
&nbsp; &nbsp; &nbsp; &nbsp; parameters2 = ParameterTool.fromMap(pMap);

&nbsp; &nbsp; &nbsp; &nbsp; //1.创建执行环境&nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
&nbsp; &nbsp; &nbsp; &nbsp; //全局参数设置&nbsp; &nbsp; &nbsp; &nbsp; streamEnv.getConfig().setGlobalJobParameters(parameters2);
&nbsp; &nbsp; &nbsp; &nbsp; streamEnv.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);//每隔5s进行一次checkpoint
&nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
&nbsp; &nbsp; &nbsp; &nbsp; //2.流式的TableEnv&nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings);&nbsp; &nbsp; &nbsp; &nbsp; tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);&nbsp; &nbsp; &nbsp; &nbsp; tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(8));
&nbsp; &nbsp; &nbsp; &nbsp; //3.注册HiveCatalog&nbsp; &nbsp; &nbsp; &nbsp; String name &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;= targetDatabase;&nbsp; &nbsp; &nbsp; &nbsp; String defaultDatabase = targetDatabase;&nbsp; &nbsp; &nbsp; &nbsp; String hiveConfDir &nbsp; &nbsp; = "/home/xxx/app/flink-1.11.1/jobcfg";&nbsp; &nbsp; &nbsp; &nbsp; String version &nbsp; &nbsp; &nbsp; &nbsp; = "1.1.0";
&nbsp; &nbsp; &nbsp; &nbsp; HiveCatalog catalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);&nbsp; &nbsp; &nbsp; &nbsp; tableEnv.registerCatalog(name, catalog);&nbsp; &nbsp; &nbsp; &nbsp; tableEnv.useCatalog(name);
&nbsp; &nbsp; &nbsp; &nbsp; //4.流式读取Hive&nbsp; &nbsp; &nbsp; &nbsp; tableEnv.getConfig().getConfiguration().setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
&nbsp; &nbsp; &nbsp; &nbsp; //query&nbsp; &nbsp; &nbsp; &nbsp; Table table = tableEnv.sqlQuery(executiveSql);
&nbsp; &nbsp; &nbsp; &nbsp; // CREATE/INSERT&nbsp; &nbsp; &nbsp; &nbsp; // tableEnv.executeSql()
// &nbsp; &nbsp; &nbsp; &nbsp;tableEnv.toRetractStream(table, Row.class).print().setParallelism(1);
&nbsp; &nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LinkedList<Object&gt;&gt; colList = tableEnv.toAppendStream(table, Row.class).process(new ProcessFunction<Row, LinkedList<Object&gt;&gt;() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void processElement(Row row, Context context, Collector<LinkedList<Object&gt;&gt; collector) throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LinkedList<Object&gt; linkedList = Lists.newLinkedList();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for (int i = 0; i < row.getArity(); i++) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; linkedList.add(row.getField(i));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; collector.collect(linkedList);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; });
&nbsp; &nbsp; &nbsp; &nbsp; colList.addSink(new CommonOracleSink());

&nbsp; &nbsp; &nbsp; &nbsp; //sink to Oracle&nbsp; &nbsp; &nbsp; &nbsp; streamEnv.execute(jobName);

&nbsp; &nbsp; }}


发自我的iPhone


------------------ 原始邮件 ------------------
发件人: shizk233 <[hidden email]&gt;
发送时间: 2020年8月10日 18:04
收件人: [hidden email] <[hidden email]&gt;
主题: 回复:读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败



hi,附件挂了,没看到饿。可以考虑挂gist放链接出来。

不过看这个信息,checkpoint的失败是由于任务状态变更导致的(应该是任务重启了,所以running变scheduled了)。
建议往任务重启的方向排查一下。

Bruce&nbsp;<[hidden email]&gt;&nbsp;于2020年8月10日周一&nbsp;下午5:01写道:

&gt;&nbsp;您好,这里有个问题反馈下!
&gt;
&gt;&nbsp;读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
&gt;&nbsp;没有抛任何异常但是checkpoint失败:
&gt;&nbsp;job&nbsp;eb447d27efb8134da40c0c1dd19fffdf&nbsp;is&nbsp;not&nbsp;in&nbsp;state&nbsp;RUNNING&nbsp;but&nbsp;SCHEDULED
&gt;&nbsp;instead.&nbsp;Aborting&nbsp;checkpoint.
&gt;&nbsp;附件
&gt;&nbsp;1.flink.log是yarn&nbsp;jobmanager打印的伪日志
&gt;&nbsp;2.Job.txt是job的伪代码
&gt;&nbsp;3.jdbc两阶段提交的伪代码附件
&gt;&nbsp;------------------------------
&gt;&nbsp;发自我的iPhone
&gt;
Reply | Threaded
Open this post in threaded view
|

Re: 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败

shizk233
Hi,这个日志全是&nbsp。。。。有点头大。。。

我刚想到,除了task重启外,还有一种情况是task没有调度成功。
你能通过flink web ui观察到task的状态吗,都是RUNNING吗?
如果一直是schedule,那应该是缺少对应的资源进行调度,需要检查下task manager提供的slot资源以及任务所需的资源。
如果是running、failed、schedule的不断切换,那需要检查task manager的日志,应该有warn。

Bruce <[hidden email]> 于2020年8月10日周一 下午6:12写道:

> 下面是附件的内容,请问是因为什么导致重启呢?
>
>
> 2阶段提交demo:
>
>
> @Slf4j public class CommonOracleSink extends
> TwoPhaseCommitSinkFunction<LinkedList<Object&gt;,
> CommonOracleSink.ConnectionState, Void&gt; {  &nbsp; &nbsp; private
> transient String sinkSQL;  &nbsp; &nbsp; public CommonOracleSink() {
> &nbsp; &nbsp; &nbsp; &nbsp; super(new
> KryoSerializer<&gt;(ConnectionState.class, new ExecutionConfig()),
> VoidSerializer.INSTANCE);  &nbsp; &nbsp; }  &nbsp; &nbsp; @Override &nbsp;
> &nbsp; public void open(Configuration parameters) throws Exception { &nbsp;
> &nbsp; &nbsp; &nbsp; super.open(parameters); &nbsp; &nbsp; &nbsp; &nbsp;
> ParameterTool params = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); &nbsp;
> &nbsp; &nbsp; &nbsp; sinkSQL = params.getRequired("sinkSQL"); &nbsp; &nbsp;
> }  &nbsp; &nbsp; @Override &nbsp; &nbsp; protected void
> invoke(ConnectionState connectionState, LinkedList<Object&gt; colList,
> Context context){ &nbsp; &nbsp; &nbsp; &nbsp; try { &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; System.err.println("start invoke......."); &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Connection connection =
> connectionState.connection; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> log.info("colList----------------------&gt;",
> JSON.toJSONString(colList)); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> TKQueryRunner runner = new TKQueryRunner(); &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; Object[] params = colList.toArray(); &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; System.err.println("params
> size-----&gt;"+params.length); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> runner.update(connection,sinkSQL,params); &nbsp; &nbsp; &nbsp; &nbsp;
> }catch (Exception e){ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> log.error(e.getMessage(),e); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> System.err.println(e.getMessage());  &nbsp; &nbsp; &nbsp; &nbsp; }  &nbsp;
> &nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;* 获取连接,开启手动提交事物 &nbsp;
> &nbsp; &nbsp;* &nbsp; &nbsp; &nbsp;* @return &nbsp; &nbsp; &nbsp;* @throws
> Exception &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp;
> protected ConnectionState beginTransaction() throws Exception {  &nbsp;
> &nbsp; &nbsp; &nbsp; Connection connection = HikariOUtils.getConnection();
> &nbsp; &nbsp; &nbsp; &nbsp; log.info("start beginTransaction......." +
> connection);  &nbsp; &nbsp; &nbsp; &nbsp; return new
> ConnectionState(connection); &nbsp; &nbsp; }  &nbsp; &nbsp; /** &nbsp;
> &nbsp; &nbsp;* 预提交,这里预提交的逻辑在invoke方法中 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp;
> &nbsp;* @param connectionState &nbsp; &nbsp; &nbsp;* @throws Exception
> &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp; protected void
> preCommit(ConnectionState connectionState) throws Exception { &nbsp; &nbsp;
> &nbsp; &nbsp; log.info("start preCommit......." + connectionState);
> &nbsp; &nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;*
> 如果invoke方法执行正常,则提交事务 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp; &nbsp;* @param
> connectionState &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp;
> &nbsp; protected void commit(ConnectionState connectionState) { &nbsp;
> &nbsp; &nbsp; &nbsp; log.info("start commit......." + connectionState);
> &nbsp; &nbsp; &nbsp; &nbsp; Connection connection =
> connectionState.connection;  &nbsp; &nbsp; &nbsp; &nbsp; try { &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.commit(); &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; connection.close(); &nbsp; &nbsp; &nbsp; &nbsp;
> } catch (SQLException e) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw
> new RuntimeException("提交事物异常"); &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp;
> }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;*
> 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp;
> &nbsp;* @param connectionState &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp;
> @Override &nbsp; &nbsp; protected void abort(ConnectionState
> connectionState) { &nbsp; &nbsp; &nbsp; &nbsp; log.error("start abort
> rollback......." + connectionState); &nbsp; &nbsp; &nbsp; &nbsp; Connection
> connection = connectionState.connection; &nbsp; &nbsp; &nbsp; &nbsp; try {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.rollback(); &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.close(); &nbsp; &nbsp; &nbsp;
> &nbsp; } catch (SQLException e) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> throw new RuntimeException("回滚事物异常"); &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp;
> &nbsp; }  &nbsp; &nbsp; static class ConnectionState {  &nbsp; &nbsp;
> &nbsp; &nbsp; private final transient Connection connection;  &nbsp; &nbsp;
> &nbsp; &nbsp; ConnectionState(Connection connection) {  &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; this.connection = connection; &nbsp; &nbsp;
> &nbsp; &nbsp; }  &nbsp; &nbsp; }   }
>
> jobmanager日志
>
> 2020-08-10 16:37:31,892 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] -
> --------------------------------------------------------------------------------2020-08-10
> 16:37:31,897 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;Starting YarnJobClusterEntrypoint (Version: 1.11.1,
> Scala: 2.11, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)2020-08-10
> 16:37:31,898 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;OS current user: root2020-08-10 16:37:32,295 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;Current Hadoop/Kerberos user: root2020-08-10
> 16:37:32,295 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle
> Corporation - 1.8/25.121-b132020-08-10 16:37:32,295 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;Maximum heap size: 3166 MiBytes2020-08-10
> 16:37:32,295 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;JAVA_HOME: /home/xxx/app/jdk1.8.0_1212020-08-10
> 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;Hadoop version: 2.7.72020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;JVM Options:2020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp; -Xmx34628173762020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp; -Xms34628173762020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp; -XX:MaxMetaspaceSize=2684354562020-08-10
> 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp;
> -Dlog.file=/home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.log2020-08-10
> 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp;
> -Dlog4j.configuration=file:log4j.properties2020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp;
> -Dlog4j.configurationFile=file:log4j.properties2020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;Program Arguments: (none)2020-08-10 16:37:32,297
> INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp;
> &nbsp; &nbsp; &nbsp;[] - &nbsp;Classpath:
> :UnifyCompFlink-1.0.jar:lib/flink-csv-1.11.1.jar:lib/flink-json-1.11.1.jar:lib/flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:lib/flink-shaded-zookeeper-3.4.14.jar:lib/flink-table-blink_2.11-1.11.1.jar:lib/flink-table_2.11-1.11.1.jar:lib/log4j-1.2-api-2.12.1.jar:lib/log4j-api-2.12.1.jar:lib/log4j-core-2.12.1.jar:lib/log4j-slf4j-impl-2.12.1.jar:flink-dist_2.11-1.11.1.jar:job.graph:flink-conf.yaml::/home/xxx/app/hadoop-2.6.0-cdh5.15.1/etc/hadoop:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-nfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1-tests.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/zookeeper-3.4.5-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/logredactor-1.0.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/activation-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/junit-4.11.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-configuration-1.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/netty-3.10.5.Final.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/slf4j-api-1.7.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/httpclient-4.2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/api-asn1-api-1.0.0-M20.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/avro-1.7.6-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jaxb-api-2.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/mockito-all-1.8.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jackson-jaxrs-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-auth-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-el-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-net-3.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-json-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsch-0.1.42.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jettison-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-annotations-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-recipes-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/snappy-java-1.0.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/gson-2.2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-compress-1.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/java-xmlbuilder-0.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/paranamer-2.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hamcrest-core-1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jackson-xc-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsp-api-2.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-math3-3.1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/xmlenc-0.52.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-digester-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/xz-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jasper-compiler-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/stax-api-1.0-2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jets3t-0.9.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jasper-runtime-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/httpcore-4.2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-beanutils-1.9.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-collections-3.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jaxb-impl-2.2.3-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-client-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-httpclient-3.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/apacheds-i18n-2.0.0-M15.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-framework-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/asm-3.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1-tests.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-nfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/leveldbjni-all-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/netty-3.10.5.Final.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-el-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jsp-api-2.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-daemon-1.0.13.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xmlenc-0.52.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xml-apis-1.3.04.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jackson-core-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xercesImpl-2.9.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jasper-runtime-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/htrace-core4-4.0.1-incubating.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jackson-mapper-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/asm-3.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-registry-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-nodemanager-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-tests-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-api-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-resourcemanager-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-client-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/zookeeper-3.4.5-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/activation-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/aopalliance-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/leveldbjni-all-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/javax.inject-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-client-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jline-2.11.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jaxb-api-2.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-jaxrs-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guice-servlet-3.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-json-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jettison-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-compress-1.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-guice-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-xc-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guice-3.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/xz-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-core-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/stax-api-1.0-2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-collections-3.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-mapper-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jaxb-impl-2.2.3-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/asm-3.2.jar2020-08-10
> 16:37:32,299 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] -
> --------------------------------------------------------------------------------2020-08-10
> 16:37:32,301 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Registered UNIX signal handlers for [TERM, HUP,
> INT]2020-08-10 16:37:32,306 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - YARN daemon is running as: root Yarn client user
> obtainer: root2020-08-10 16:37:32,311 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> taskmanager.memory.process.size, 4 gb2020-08-10 16:37:32,311 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> internal.jobgraph-path, job.graph2020-08-10 16:37:32,311 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.execution.failover-strategy, region2020-08-10 16:37:32,312 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> high-availability.cluster-id, application_1591335931326_00242020-08-10
> 16:37:32,312 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.rpc.address, localhost2020-08-10 16:37:32,312 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.target,
> yarn-per-job2020-08-10 16:37:32,312 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.memory.process.size, 4 gb2020-08-10 16:37:32,312 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.rpc.port, 61232020-08-10 16:37:32,312 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:32,313
> INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.attached, true2020-08-10 16:37:32,313 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> internal.cluster.execution-mode, NORMAL2020-08-10 16:37:32,313 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.shutdown-on-attached-exit, false2020-08-10 16:37:32,313 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property: pipeline.jars,
> file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10
> 16:37:32,313 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> parallelism.default, 82020-08-10 16:37:32,313 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> taskmanager.numberOfTaskSlots, 12020-08-10 16:37:32,313 WARN
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Error while trying to split key and value in
> configuration file
> /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_000001/flink-conf.yaml:16:
> "pipeline.classpaths: "2020-08-10 16:37:32,314 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> $internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10
> 16:37:32,314 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> $internal.yarn.log-config-file,
> /home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:32,347
> WARN &nbsp;org.apache.flink.configuration.Configuration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses deprecated
> configuration key 'web.port' instead of proper key
> 'rest.bind-port'2020-08-10 16:37:32,362 INFO
> &nbsp;org.apache.flink.runtime.clusterframework.BootstrapTools &nbsp;
> &nbsp; [] - Setting directories for temporary files to:
> /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_00242020-08-10
> 16:37:32,368 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Starting YarnJobClusterEntrypoint.2020-08-10 16:37:32,413
> INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp;
> &nbsp; &nbsp; &nbsp;[] - Install default filesystem.2020-08-10 16:37:32,461
> INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp;
> &nbsp; &nbsp; &nbsp;[] - Install security context.2020-08-10 16:37:32,520
> INFO &nbsp;org.apache.flink.runtime.security.modules.HadoopModule &nbsp;
> &nbsp; &nbsp; [] - Hadoop user set to root (auth:SIMPLE)2020-08-10
> 16:37:32,529 INFO
> &nbsp;org.apache.flink.runtime.security.modules.JaasModule &nbsp; &nbsp;
> &nbsp; &nbsp; [] - Jaas file will be created as
> /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/jaas-1114046375892877617.conf.2020-08-10
> 16:37:32,539 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Initializing cluster services.2020-08-10 16:37:32,556
> INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp;
> &nbsp; &nbsp; &nbsp;[] - Trying to start actor system, external address
> node3:0, bind address 0.0.0.0:0.2020-08-10 16:37:33,191 INFO
> &nbsp;akka.event.slf4j.Slf4jLogger &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; [] - Slf4jLogger started2020-08-10 16:37:33,218 INFO
> &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Starting remoting2020-08-10 16:37:33,378 INFO
> &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Remoting started; listening on addresses
> :[akka.tcp://flink@node3:40657]2020-08-10 16:37:33,506 INFO
> &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Actor system started at akka.tcp://flink@node3:406572020-08-10
> 16:37:33,539 WARN &nbsp;org.apache.flink.configuration.Configuration &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses
> deprecated configuration key 'web.port' instead of proper key
> 'rest.port'2020-08-10 16:37:33,551 INFO
> &nbsp;org.apache.flink.runtime.blob.BlobServer &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Created BLOB server storage
> directory
> /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/blobStore-15a573e2-a671-4eb9-975b-b5229cec6bde2020-08-10
> 16:37:33,555 INFO &nbsp;org.apache.flink.runtime.blob.BlobServer &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Started
> BLOB server at 0.0.0.0:34380 - max concurrent requests: 50 - max backlog:
> 10002020-08-10 16:37:33,570 INFO
> &nbsp;org.apache.flink.runtime.metrics.MetricRegistryImpl &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp;[] - No metrics reporter configured, no metrics will be
> exposed/reported.2020-08-10 16:37:33,574 INFO
> &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Trying to start actor system, external address node3:0,
> bind address 0.0.0.0:0.2020-08-10 16:37:33,591 INFO
> &nbsp;akka.event.slf4j.Slf4jLogger &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; [] - Slf4jLogger started2020-08-10 16:37:33,597 INFO
> &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Starting remoting2020-08-10 16:37:33,606 INFO
> &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Remoting started; listening on addresses
> :[akka.tcp://flink-metrics@node3:43096]2020-08-10 16:37:33,642 INFO
> &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Actor system started at akka.tcp://flink-metrics@node3:430962020-08-10
> 16:37:33,659 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for
> org.apache.flink.runtime.metrics.dump.MetricQueryService at
> akka://flink-metrics/user/rpc/MetricQueryService .2020-08-10 16:37:33,721
> WARN &nbsp;org.apache.flink.configuration.Configuration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses deprecated
> configuration key 'web.port' instead of proper key
> 'rest.bind-port'2020-08-10 16:37:33,723 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Upload directory
> /tmp/flink-web-d25a365e-b6cb-45e6-bb6e-3068b7412f06/flink-web-upload does
> not exist.&nbsp;2020-08-10 16:37:33,724 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Created directory
> /tmp/flink-web-d25a365e-b6cb-45e6-bb6e-3068b7412f06/flink-web-upload for
> file uploads.2020-08-10 16:37:33,748 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Starting rest endpoint.2020-08-10 16:37:34,110 INFO
> &nbsp;org.apache.flink.runtime.webmonitor.WebMonitorUtils &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp;[] - Determined location of main cluster component log
> file:
> /home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.log2020-08-10
> 16:37:34,111 INFO &nbsp;org.apache.flink.runtime.webmonitor.WebMonitorUtils
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Determined location of main cluster
> component stdout file:
> /home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.out2020-08-10
> 16:37:34,309 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Rest endpoint listening at node3:394692020-08-10 16:37:34,311 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> http://node3:39469 was granted leadership with
> leaderSessionID=00000000-0000-0000-0000-0000000000002020-08-10 16:37:34,312
> INFO &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint []
> - Web frontend listening at <a href="http://node3:39469.2020-08-10">http://node3:39469.2020-08-10 16:37:34,403
> INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for
> org.apache.flink.yarn.YarnResourceManager at
> akka://flink/user/rpc/resourcemanager_0 .2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> taskmanager.memory.process.size, 4 gb2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> internal.jobgraph-path, job.graph2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.execution.failover-strategy, region2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> high-availability.cluster-id, application_1591335931326_00242020-08-10
> 16:37:34,417 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.rpc.address, localhost2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.target,
> yarn-per-job2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.memory.process.size, 4 gb2020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.rpc.port, 61232020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:34,418
> INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.attached, true2020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> internal.cluster.execution-mode, NORMAL2020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.shutdown-on-attached-exit, false2020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property: pipeline.jars,
> file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10
> 16:37:34,418 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> parallelism.default, 82020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> taskmanager.numberOfTaskSlots, 12020-08-10 16:37:34,418 WARN
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Error while trying to split key and value in
> configuration file
> /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_000001/flink-conf.yaml:16:
> "pipeline.classpaths: "2020-08-10 16:37:34,419 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> $internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10
> 16:37:34,419 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> $internal.yarn.log-config-file,
> /home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:34,450
> INFO &nbsp;org.apache.flink.runtime.externalresource.ExternalResourceUtils
> [] - Enabled external resources: []2020-08-10 16:37:34,519 INFO
> &nbsp;org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess
> [] - Start JobDispatcherLeaderProcess.2020-08-10 16:37:34,527 INFO
> &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for
> org.apache.flink.runtime.dispatcher.MiniDispatcher at
> akka://flink/user/rpc/dispatcher_1 .2020-08-10 16:37:34,572 INFO
> &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for
> org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/rpc/jobmanager_2 .2020-08-10 16:37:34,582 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Initializing job empJOB
> (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,615 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Using restart back off time
> strategy
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
> backoffTimeMS=1000) for empJOB
> (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,667 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Running initialization on master
> for job empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,806
> INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Successfully ran
> initialization on master in 139 ms.2020-08-10 16:37:34,876 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Recovered 0 containers from
> previous attempts ([]).2020-08-10 16:37:34,877 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Register application master
> response contains scheduler resource types: [MEMORY, CPU].2020-08-10
> 16:37:34,877 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] -
> Container matching strategy: MATCH_VCORE.2020-08-10 16:37:34,887 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - ResourceManager
> akka.tcp://flink@node3:40657/user/rpc/resourcemanager_0 was granted
> leadership with fencing token 000000000000000000000000000000002020-08-10
> 16:37:34,891 INFO
> &nbsp;org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
> [] - Starting the SlotManager.2020-08-10 16:37:35,466 INFO
> &nbsp;org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology
> [] - Built 1 pipelined regions in 2 ms2020-08-10 16:37:35,483 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - No state backend has been
> configured, using default (Memory / JobManager) MemoryStateBackend (data in
> heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints:
> 'null', asynchronous: TRUE, maxStateSize: 5242880)2020-08-10 16:37:35,503
> INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Using failover strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3915bc20
> for empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:35,509 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl &nbsp; &nbsp;
> &nbsp;[] - JobManager runner for job empJOB
> (eb447d27efb8134da40c0c1dd19fffdf) was granted leadership with session id
> 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@node3:40657/user/rpc/jobmanager_2.2020-08-10
> 16:37:35,514 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting execution of
> job empJOB (eb447d27efb8134da40c0c1dd19fffdf) under job master id
> 00000000000000000000000000000000.2020-08-10 16:37:35,517 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting scheduling with scheduling
> strategy
> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]2020-08-10
> 16:37:35,518 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Job empJOB (eb447d27efb8134da40c0c1dd19fffdf) switched from
> state CREATED to RUNNING.2020-08-10 16:37:35,535 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (1/6)
> (5a6410258857c02ebd1b5ec03a78be4b) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (2/6)
> (299de0d4a8affe02a999edeb84957c41) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (3/6)
> (1b98df27c9019f64835b55fa3de3f363) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (4/6)
> (a7612608772c018d819741ce4d9320bd) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (5/6)
> (b19828c85fc0e92e62f2a7241b610f5b) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (6/6)
> (c2178a51eda2db900d3212e4f488d00f) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (1/8)
> (2a8db3a2b4cd65fd7cd3e6bac031a971) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (2/8)
> (7aa8dd779d4ff75e4c985be75a52c427) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (3/8)
> (867c814978ea302537065f51516ed766) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (4/8)
> (4e186575ab42cc6c1d599ae027bf99b8) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,537 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (5/8)
> (b107b8bfb0a08c5e7937400c43a0f9ff) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,537 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (6/8)
> (28e1f0fa1b9ebed59e4c67b0598864b9) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,537 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (7/8)
> (e27e60ff7dcd5245dfd21b23bbd49985) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,537 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (8/8)
> (c0f6b9e623c68fd7e9205a8ad686d4e5) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,558 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{7c8417d17f555e546cd6427733db6a3b}]2020-08-10 16:37:35,565
> INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{eb7c6ff479c977fdb6312f8e61dfdfba}]2020-08-10 16:37:35,565
> INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{a1d23d7e17b9369e80833de148f54bac}]2020-08-10 16:37:35,566
> INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{2447496efd24d542bce06de1b69ec70d}]2020-08-10 16:37:35,566
> INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{2ab761d21cd4368751f3187f122705fa}]2020-08-10 16:37:35,566
> INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{08a52f113cce64d1309509da5d25a1bb}]2020-08-10 16:37:35,574
> INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Connecting to
> ResourceManager akka.tcp://flink@node3:40657/user/rpc/resourcemanager_*(00000000000000000000000000000000)2020-08-10
> 16:37:35,579 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Resolved
> ResourceManager address, beginning registration2020-08-10 16:37:35,584 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registering job manager
> [hidden email]://flink@node3:40657/user/rpc/jobmanager_2
> for job eb447d27efb8134da40c0c1dd19fffdf.2020-08-10 16:37:35,589 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registered job manager
> [hidden email]://flink@node3:40657/user/rpc/jobmanager_2
> for job eb447d27efb8134da40c0c1dd19fffdf.2020-08-10 16:37:35,593 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - JobManager successfully registered
> at ResourceManager, leader id: 00000000000000000000000000000000.2020-08-10
> 16:37:35,594 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{7c8417d17f555e546cd6427733db6a3b}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id e490d3208119fe28d97f4f0fe94cab28.2020-08-10 16:37:35,595 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{eb7c6ff479c977fdb6312f8e61dfdfba}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{a1d23d7e17b9369e80833de148f54bac}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{2447496efd24d542bce06de1b69ec70d}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,596 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{2ab761d21cd4368751f3187f122705fa}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,596 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{08a52f113cce64d1309509da5d25a1bb}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,612 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 1.2020-08-10 16:37:35,614 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id b74299e8619de93adec5869d1fa79d73.2020-08-10 16:37:35,615 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 2.2020-08-10 16:37:35,615 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id 046a3dcf1af40e0539f15fcddfbddf77.2020-08-10 16:37:35,615 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 3.2020-08-10 16:37:35,615 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id 90870250ae0f3bef44cbdd675dede57b.2020-08-10 16:37:35,616 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 4.2020-08-10 16:37:35,616 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id f8063a6fc86162712215a92533532b65.2020-08-10 16:37:35,616 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 5.2020-08-10 16:37:35,616 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id cfe671ec5448d440838f02145cb6267f.2020-08-10 16:37:35,617 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 6.2020-08-10 16:37:38,391 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:37:40,933 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Received 1
> containers.2020-08-10 16:37:40,940 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Received 1 containers with
> resource <memory:4096, vCores:1&gt;, 6 pending container
> requests.2020-08-10 16:37:40,953 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - TaskExecutor
> container_1591335931326_0024_01_000003 will be started on node1 with
> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb
> (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemorySize=1.340gb
> (1438814063 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
> jvmOverheadSize=409.600mb (429496736 bytes)}.2020-08-10 16:37:40,976 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Creating container launch
> context for TaskManagers2020-08-10 16:37:40,978 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Starting
> TaskManagers2020-08-10 16:37:40,995 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Removing container request
> Capability[<memory:4096, vCores:1&gt;]Priority[1].2020-08-10 16:37:40,995
> INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Accepted 1 requested
> containers, returned 0 excess containers, 5 pending container requests of
> resource <memory:4096, vCores:1&gt;.2020-08-10 16:37:46,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:37:47,712 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registering TaskManager with
> ResourceID container_1591335931326_0024_01_000003 (akka.tcp://flink@node1:40857/user/rpc/taskmanager_0)
> at ResourceManager2020-08-10 16:37:54,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:02,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:10,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:18,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:26,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:34,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:42,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:50,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:58,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:06,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:14,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:22,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:30,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:38,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:46,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:54,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.
>
>
>
> job代码
>
> public class PlatJobExecution {
> &nbsp; &nbsp; private volatile ParameterTool parameters;
> &nbsp; &nbsp; public PlatJobExecution(ParameterTool parameters) {&nbsp;
> &nbsp; &nbsp; &nbsp; this.parameters = parameters;&nbsp; &nbsp; }
> &nbsp; &nbsp; public void execute() throws Exception {
>
> &nbsp; &nbsp; &nbsp; &nbsp; //目标数据源:&nbsp; &nbsp; &nbsp; &nbsp; //目标数据表:
> &nbsp; &nbsp; &nbsp; &nbsp; //1.读取数据 kafka /oracle
> &nbsp;把流注册成一张表【这个过程可以手动完成】 &nbsp; &nbsp; &nbsp;--hive
> &nbsp; &nbsp; &nbsp; &nbsp; //2.执行sql,返回结果
> &nbsp; &nbsp; &nbsp; &nbsp; //3.把结果写入目标数据表 / 写入redis / 写入kafka
> &nbsp; &nbsp; &nbsp; &nbsp; InputStream is =
> ReadKafkaPrint.class.getClassLoader().getResourceAsStream("config.properties");
> &nbsp; &nbsp; &nbsp; &nbsp; ParameterTool parameters2 =
> ParameterTool.fromPropertiesFile(is);
>
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; String targetDatabase =
> parameters.get("sourceDatabase");&nbsp; &nbsp; &nbsp; &nbsp; String
> executiveSql = parameters.get("executiveSql");&nbsp; &nbsp; &nbsp; &nbsp;
> String sinkSQL = parameters.get("sinkSQL");&nbsp; &nbsp; &nbsp; &nbsp;
> String jobName = parameters.get("jobName");
> &nbsp; &nbsp; &nbsp; &nbsp; Map<String, String&gt; pMap =
> Maps.newHashMap();&nbsp; &nbsp; &nbsp; &nbsp;
> pMap.putAll(parameters2.toMap());&nbsp; &nbsp; &nbsp; &nbsp;
> pMap.put("sinkSQL",sinkSQL);
> &nbsp; &nbsp; &nbsp; &nbsp; parameters2 = ParameterTool.fromMap(pMap);
>
> &nbsp; &nbsp; &nbsp; &nbsp; //1.创建执行环境&nbsp; &nbsp; &nbsp; &nbsp;
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> &nbsp; &nbsp; &nbsp; &nbsp; //全局参数设置&nbsp; &nbsp; &nbsp; &nbsp;
> streamEnv.getConfig().setGlobalJobParameters(parameters2);
> &nbsp; &nbsp; &nbsp; &nbsp; streamEnv.enableCheckpointing(8000,
> CheckpointingMode.EXACTLY_ONCE);//每隔5s进行一次checkpoint
> &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings tableEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> &nbsp; &nbsp; &nbsp; &nbsp; //2.流式的TableEnv&nbsp; &nbsp; &nbsp; &nbsp;
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings);&nbsp; &nbsp; &nbsp; &nbsp;
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE);&nbsp; &nbsp; &nbsp; &nbsp;
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(8));
> &nbsp; &nbsp; &nbsp; &nbsp; //3.注册HiveCatalog&nbsp; &nbsp; &nbsp; &nbsp;
> String name &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;=
> targetDatabase;&nbsp; &nbsp; &nbsp; &nbsp; String defaultDatabase =
> targetDatabase;&nbsp; &nbsp; &nbsp; &nbsp; String hiveConfDir &nbsp; &nbsp;
> = "/home/xxx/app/flink-1.11.1/jobcfg";&nbsp; &nbsp; &nbsp; &nbsp; String
> version &nbsp; &nbsp; &nbsp; &nbsp; = "1.1.0";
> &nbsp; &nbsp; &nbsp; &nbsp; HiveCatalog catalog = new HiveCatalog(name,
> defaultDatabase, hiveConfDir, version);&nbsp; &nbsp; &nbsp; &nbsp;
> tableEnv.registerCatalog(name, catalog);&nbsp; &nbsp; &nbsp; &nbsp;
> tableEnv.useCatalog(name);
> &nbsp; &nbsp; &nbsp; &nbsp; //4.流式读取Hive&nbsp; &nbsp; &nbsp; &nbsp;
> tableEnv.getConfig().getConfiguration().setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,
> true);
> &nbsp; &nbsp; &nbsp; &nbsp; //query&nbsp; &nbsp; &nbsp; &nbsp; Table table
> = tableEnv.sqlQuery(executiveSql);
> &nbsp; &nbsp; &nbsp; &nbsp; // CREATE/INSERT&nbsp; &nbsp; &nbsp; &nbsp; //
> tableEnv.executeSql()
> // &nbsp; &nbsp; &nbsp; &nbsp;tableEnv.toRetractStream(table,
> Row.class).print().setParallelism(1);
> &nbsp; &nbsp; &nbsp; &nbsp;
> SingleOutputStreamOperator<LinkedList<Object&gt;&gt; colList =
> tableEnv.toAppendStream(table, Row.class).process(new ProcessFunction<Row,
> LinkedList<Object&gt;&gt;() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void
> processElement(Row row, Context context,
> Collector<LinkedList<Object&gt;&gt; collector) throws Exception {&nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LinkedList<Object&gt;
> linkedList = Lists.newLinkedList();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; for (int i = 0; i < row.getArity(); i++) {&nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> linkedList.add(row.getField(i));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> collector.collect(linkedList);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> }&nbsp; &nbsp; &nbsp; &nbsp; });
> &nbsp; &nbsp; &nbsp; &nbsp; colList.addSink(new CommonOracleSink());
>
> &nbsp; &nbsp; &nbsp; &nbsp; //sink to Oracle&nbsp; &nbsp; &nbsp; &nbsp;
> streamEnv.execute(jobName);
>
> &nbsp; &nbsp; }}
>
>
> 发自我的iPhone
>
>
> ------------------ 原始邮件 ------------------
> 发件人: shizk233 <[hidden email]&gt;
> 发送时间: 2020年8月10日 18:04
> 收件人: [hidden email] <[hidden email]&gt;
> 主题: 回复:读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败
>
>
>
> hi,附件挂了,没看到饿。可以考虑挂gist放链接出来。
>
> 不过看这个信息,checkpoint的失败是由于任务状态变更导致的(应该是任务重启了,所以running变scheduled了)。
> 建议往任务重启的方向排查一下。
>
> Bruce&nbsp;<[hidden email]&gt;&nbsp;于2020年8月10日周一&nbsp;下午5:01写道:
>
> &gt;&nbsp;您好,这里有个问题反馈下!
> &gt;
> &gt;&nbsp;读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
> &gt;&nbsp;没有抛任何异常但是checkpoint失败:
>
> &gt;&nbsp;job&nbsp;eb447d27efb8134da40c0c1dd19fffdf&nbsp;is&nbsp;not&nbsp;in&nbsp;state&nbsp;RUNNING&nbsp;but&nbsp;SCHEDULED
> &gt;&nbsp;instead.&nbsp;Aborting&nbsp;checkpoint.
> &gt;&nbsp;附件
> &gt;&nbsp;1.flink.log是yarn&nbsp;jobmanager打印的伪日志
> &gt;&nbsp;2.Job.txt是job的伪代码
> &gt;&nbsp;3.jdbc两阶段提交的伪代码附件
> &gt;&nbsp;------------------------------
> &gt;&nbsp;发自我的iPhone
> &gt;