Hi All !
flink1.12,本地idea运行以下代码结果正确,但是将代码打包成jar发布到yarn上运行结果就不正确了,数据少了本来应该有325条结果只有一百多条,希望各位给点排查建议 以下是代码: public class FlinkTestDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings); CheckpointConfig config = env.getCheckpointConfig(); //ExecutionConfig executionConfig = env.getConfig(); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); // set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "6000"); configuration.setString("table.exec.mini-batch.size", "5000"); StatementSet stmtSet = bsTableEnv.createStatementSet();//多条insert操作需要使用StatementSet对象来执行 bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '10.0.15.83',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = 'Cdh2020:1',\n" + " 'database-name' = 'flink-test',\n" + " 'table-name' = 'tm_dealers'\n" + ")"); /**********************************************集客管理指标-start************************************************************ * 网销线索表(mysql-cdc数据源) */ bsTableEnv.executeSql("CREATE TABLE new_clue_list_cdc (\n" + "entity_code STRING,data_source STRING,clue_level STRING,customer_no STRING,fail_apply_time timestamp,clue_fail_type INT,\n" + "PRIMARY KEY (customer_no) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '10.0.17.224',\n" + " 'port' = '3306',\n" + " 'username' = 'liuhai',\n" + " 'password' = 'liuhai_2020',\n" + " 'database-name' = 'wk_mall_service',\n" + " 'table-name' = 'new_clue_list'\n" + ")"); /** * 汇总层Hbase表 轻度汇总中间层-网销数据域-网销线索按来源渠道维度无效线索汇总表 */ bsTableEnv.executeSql("CREATE TABLE DWM_NETSALES_WXXSALYQDWDWXXSHZB_SS_I_HBASE (" + "ROWKEY VARCHAR," + "F1 ROW(ENTITY_CODE VARCHAR," + "ZWXXS VARCHAR," + "QCZJWXZBXS VARCHAR," + "YCWXZBXS VARCHAR," + "TPYWXZBXS VARCHAR," + "AKWXZBXS VARCHAR," + "DCDWXZBXS VARCHAR," + "SUM_TIME VARCHAR)," + "PRIMARY KEY (ROWKEY) NOT ENFORCED" + ") WITH (" + " 'connector' = 'hbase-2.2'," + " 'table-name' = 'DWM:DWM_NETSALES_WXXSALYQDWDWXXSHZB_SS_I_HBASE'," + " 'zookeeper.quorum' = '10.0.15.83:2181'," + " 'zookeeper.znode.parent' = '/hbase'" + // " 'sink.buffer-flush.max-size' = '3mb'," + // " 'sink.buffer-flush.max-rows' = '1000'," + // " 'sink.buffer-flush.interval' = '3s'" + ")"); stmtSet.addInsertSql("INSERT INTO DWM_NETSALES_WXXSALYQDWDWXXSHZB_SS_I_HBASE "+ "SELECT " + "ROWKEY,ROW(dealer_code,ZWXXS,QCZJWXZBXS,YCWXZBXS,TPYWXZBXS,AKWXZBXS,DCDWXZBXS,SUM_TIME) as F1 " + "FROM " + "(SELECT CONCAT_WS('',SUBSTRING(MD5(t1.dealer_code) FROM 0 FOR 6 ),t1.dealer_code,FROM_UNIXTIME(UNIX_TIMESTAMP(),'YYYYMMDD')) AS ROWKEY," + "t1.dealer_code," + "IF(t2.ZWXXS IS NULL, '0',t2.ZWXXS) AS ZWXXS,"+ "IF(t2.QCZJWXZBXS IS NULL, '0',t2.QCZJWXZBXS) AS QCZJWXZBXS," + "IF(t2.YCWXZBXS IS NULL, '0',t2.YCWXZBXS) AS YCWXZBXS," + "IF(t2.TPYWXZBXS IS NULL, '0',t2.TPYWXZBXS) AS TPYWXZBXS," + "IF(t2.AKWXZBXS IS NULL, '0',t2.AKWXZBXS) AS AKWXZBXS," + "IF(t2.DCDWXZBXS IS NULL, '0',t2.DCDWXZBXS) AS DCDWXZBXS," + "FROM_UNIXTIME(UNIX_TIMESTAMP()) AS SUM_TIME " + "FROM " + "tm_dealers AS t1 " + "LEFT JOIN " + "(SELECT " + "C.entity_code, " + "CAST(COUNT(1) AS STRING) AS ZWXXS," + "CAST(COUNT(CASE WHEN C.data_source = '20' OR C.data_source = '100' THEN 1 END ) AS STRING) AS QCZJWXZBXS," + "CAST(COUNT( CASE WHEN C.data_source = '40' OR C.data_source = '30' THEN 1 END ) AS STRING) AS YCWXZBXS," + "CAST(COUNT( CASE WHEN C.data_source = '10' OR C.data_source = '80' THEN 1 END ) AS STRING) AS TPYWXZBXS," + "CAST(COUNT( CASE WHEN C.data_source = '60' OR C.data_source = '50' THEN 1 END ) AS STRING) AS AKWXZBXS," + "CAST(COUNT( CASE WHEN C.data_source = '130' OR C.data_source = '140' THEN 1 END ) AS STRING) AS DCDWXZBXS " + "FROM " + "new_clue_list_cdc AS C " + "WHERE " + "C.fail_apply_time IS NOT NULL AND C.clue_fail_type = 20 GROUP BY C.entity_code) AS t2 " + "ON t1.dealer_code = t2.entity_code WHERE t1.is_valid=12781001)"); /** * 汇总层Hbase表 网销线索按来源渠道维度汇总表 */ bsTableEnv.executeSql("CREATE TABLE DWM_NETSALES_WXXSALYQDWDHZB_SS_I_HBASE (" + "ROWKEY VARCHAR," + "F1 ROW(ENTITY_CODE VARCHAR,ZXS VARCHAR, QCZJXS VARCHAR, QCZJSBXS VARCHAR, QCZJZXS VARCHAR, YCXS VARCHAR," + " YCSBXS VARCHAR, YCZXS VARCHAR, TPYXS VARCHAR, TPYSBXS VARCHAR, TPYZXS VARCHAR, AKXS VARCHAR, AKSBXS VARCHAR," + " AKZXS VARCHAR, DCDXS VARCHAR, DCDSBXS VARCHAR, DCDZXS VARCHAR, SUM_TIME VARCHAR)," + "PRIMARY KEY (ROWKEY) NOT ENFORCED " + ") WITH (" + " 'connector' = 'hbase-2.2'," + " 'table-name' = 'DWM:DWM_NETSALES_WXXSALYQDWDHZB_SS_I_HBASE'," + " 'zookeeper.quorum' = '10.0.15.83:2181'," + " 'zookeeper.znode.parent' = '/hbase'" + // " 'sink.buffer-flush.max-size' = '3mb',"+ // " 'sink.buffer-flush.max-rows' = '1000',"+ // " 'sink.buffer-flush.interval' = '3s'"+ ")"); stmtSet.addInsertSql( "INSERT INTO DWM_NETSALES_WXXSALYQDWDHZB_SS_I_HBASE " + "SELECT " + "ROWKEY,ROW(dealer_code,ZXS,QCZJXS,QCZJSBXS,QCZJZXS,YCXS,YCSBXS,YCZXS,TPYXS,TPYSBXS," + "TPYZXS,AKXS,AKSBXS,AKZXS,DCDXS,DCDSBXS,DCDZXS,SUM_TIME) as F1 " + "FROM " + "(SELECT CONCAT_WS('',SUBSTRING(MD5(t1.dealer_code) FROM 0 FOR 6 ),t1.dealer_code,FROM_UNIXTIME(UNIX_TIMESTAMP(),'YYYYMMDD')) AS ROWKEY," + "t1.dealer_code," + "IF(t2.ZXS IS NULL, '0',t2.ZXS) AS ZXS,"+ "IF(t2.QCZJXS IS NULL, '0',t2.QCZJXS) AS QCZJXS,"+ "IF(t2.QCZJSBXS IS NULL, '0',t2.QCZJSBXS) AS QCZJSBXS,"+ "IF(t2.QCZJZXS IS NULL, '0',t2.QCZJZXS) AS QCZJZXS,"+ "IF(t2.YCXS IS NULL, '0',t2.YCXS) AS YCXS,"+ "IF(t2.YCSBXS IS NULL, '0',t2.YCSBXS) AS YCSBXS,"+ "IF(t2.YCZXS IS NULL, '0',t2.YCZXS) AS YCZXS,"+ "IF(t2.TPYXS IS NULL, '0',t2.TPYXS) AS TPYXS,"+ "IF(t2.TPYSBXS IS NULL, '0',t2.TPYSBXS) AS TPYSBXS,"+ "IF(t2.TPYZXS IS NULL, '0',t2.TPYZXS) AS TPYZXS,"+ "IF(t2.AKXS IS NULL, '0',t2.AKXS) AS AKXS,"+ "IF(t2.AKSBXS IS NULL, '0',t2.AKSBXS) AS AKSBXS,"+ "IF(t2.AKZXS IS NULL, '0',t2.AKZXS) AS AKZXS,"+ "IF(t2.DCDXS IS NULL, '0',t2.DCDXS) AS DCDXS,"+ "IF(t2.DCDSBXS IS NULL, '0',t2.DCDSBXS) AS DCDSBXS,"+ "IF(t2.DCDZXS IS NULL, '0',t2.DCDZXS) AS DCDZXS,"+ "FROM_UNIXTIME(UNIX_TIMESTAMP()) AS SUM_TIME " + "FROM " + "tm_dealers AS t1 " + "LEFT JOIN " + "(SELECT " + "t.entity_code," + "CAST(count(*) AS STRING) AS ZXS," + "CAST(SUM(t.QCZJ_XS_LM) AS STRING) AS QCZJXS," + "CAST(SUM(t.QCZJ_SB_LM) AS STRING) AS QCZJSBXS," + "CAST(SUM(t.QCZJ_XS_LM)+SUM(t.QCZJ_SB_LM) AS STRING) AS QCZJZXS,"+ "CAST(SUM(t.YC_XS_LM) AS STRING) AS YCXS," + "CAST(SUM(t.YC_SB_LM) AS STRING) AS YCSBXS," + "CAST(SUM(t.YC_XS_LM)+SUM(t.YC_SB_LM) AS STRING) AS YCZXS,"+ "CAST(SUM(t.TPY_XS_LM) AS STRING) AS TPYXS," + "CAST(SUM(t.TPY_SB_LM) AS STRING) AS TPYSBXS," + "CAST(SUM(t.TPY_XS_LM)+SUM(t.TPY_SB_LM) AS STRING) AS TPYZXS,"+ "CAST(SUM(t.AK_XS_LM) AS STRING) AS AKXS," + "CAST(SUM(t.AK_SB_LM) AS STRING) AS AKSBXS," + "CAST(SUM(t.AK_XS_LM)+SUM(t.AK_SB_LM) AS STRING) AS AKZXS,"+ "CAST(SUM(t.DCD_XS_LM) AS STRING) AS DCDXS," + "CAST(SUM(t.DCD_SB_LM) AS STRING) AS DCDSBXS," + "CAST(SUM(t.DCD_XS_LM)+SUM(t.DCD_SB_LM) AS STRING) AS DCDZXS "+ "FROM " + "(SELECT " + "C.entity_code," + "COUNT(*) AS NUM_2_LM," + "COUNT( CASE WHEN C.data_source = '20' THEN 1 END ) AS QCZJ_XS_LM," + "COUNT( CASE WHEN C.data_source = '100' THEN 1 END ) AS QCZJ_SB_LM," + "COUNT( CASE WHEN C.data_source = '40' THEN 1 END ) AS YC_XS_LM," + "COUNT( CASE WHEN C.data_source = '30' THEN 1 END ) AS YC_SB_LM," + "COUNT( CASE WHEN C.data_source = '10' THEN 1 END ) AS TPY_XS_LM," + "COUNT( CASE WHEN C.data_source = '80' THEN 1 END ) AS TPY_SB_LM," + "COUNT( CASE WHEN C.data_source = '60' THEN 1 END ) AS AK_XS_LM," + "COUNT( CASE WHEN C.data_source = '50' THEN 1 END ) AS AK_SB_LM," + "COUNT( CASE WHEN C.data_source = '130' THEN 1 END ) AS DCD_XS_LM," + "COUNT( CASE WHEN C.data_source = '140' THEN 1 END ) AS DCD_SB_LM " + "FROM " + "new_clue_list_cdc AS C " + "WHERE " + "((C.clue_level IN ('13101007','13101006')) OR C.clue_level IN ('13101001','13101002','13101003','13101004')) " + "GROUP BY " + "C.entity_code,C.customer_no " + ") AS t " + "GROUP BY t.entity_code ) AS t2 ON t1.dealer_code = t2.entity_code WHERE t1.is_valid=12781001)"); stmtSet.execute(); } } | | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 |
Free forum by Nabble | Edit this page |