flink 1.11.1 版本执行HiveCatalog遇到问题质询

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11.1 版本执行HiveCatalog遇到问题质询

zhongbaoluo

你好!

     我想咨询一下,在 flink 1.11.1 版本里面用 application 模式在包在hdfs上运行 HiveCatalog。 查询元数据之类的都能执行成功,查询数据插入数据执行失败,也没有找到异常。 yarn 状态就是失败,数据也没有执行成功。 本地执行或者jar 包在本地的方式提交都能执行成功。 我想质询一下这个问题

  

   谢谢


**********************************************************************
Thanks & Best Regards!

杉欣集团-技术研究院  云平台
钟保罗

上海浦东新区东方路3261号振华广场B座23楼(杉欣集团)
手机: 18157855633
 


Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询

china_tao
贴一下代码

在 2020/9/8 14:09, zhongbaoluo 写道:
> 据插入数据执行失败,也没有找到异常。 yarn
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询

zhongbaoluo
In reply to this post by zhongbaoluo

Options options = null;

    try {

OptionParser optionParser = new OptionParser(args);

options = optionParser.getOptions();

} catch (Exception e) {

e.printStackTrace();

return;

}

    

    String name            = options.getName();

    String defaultDatabase = options.getDatabase();//"dc_yunpingtai";

    String hiveConfDir     =  options.getHiveConfDir(); //"/Users/zhongbaoluo/Applications/app/apache-hive-3.1.2/conf"; // a local path

    String version         = "3.1.2";

    String sql = options.getSql();

    HiveUtils.hiveConfDir(hiveConfDir);

    

    

    HiveConf hiveConf = new HiveConf();

    hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,"thrift://dcmaster01:9083");

    hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,"/user/hive/warehouse");//hdfs://datacloud-hadoop-cluster

    hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "dcmaster02:2181");

    hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "2181");

    hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, "10000");

    hiveConf.set("hive.server2.zookeeper.namespace", "hiveserver2");

    hiveConf.set("hive.server2.zookeeper.publish.configs", "true");

    hiveConf.set("hive.server2.support.dynamic.service.discovery", "true");

    hiveConf.set("hive.metastore.warehouse.dir", "/user/hive/warehouse");

    

    try {

    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();

    System.out.println("settings  创建完成");

        TableEnvironment tableEnv = TableEnvironment.create(settings);

        System.out.println("tableEnv  创建完成");

        

        MyHiveCatalog hive = new MyHiveCatalog(name, defaultDatabase, hiveConf,version);

        tableEnv.registerCatalog(name, hive);

        System.out.println("hive  创建完成");


        // set the HiveCatalog as the current catalog of the session

        tableEnv.useCatalog(name);

        tableEnv.useDatabase(defaultDatabase);

        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

        tableEnv.executeSql("show tables").print();

        System.out.println("sql:"+sql);

        //tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        

        tableEnv.executeSql("DROP TABLE print_table");

        tableEnv.executeSql("CREATE TABLE print_table(f0 BIGINT) WITH ('connector' = 'print')");

//        tableEnv.executeSql("CREATE TABLE print_table_mysql (\n" + 

//        "f0 BIGINT\n" + 

//        ") WITH ('connector' = 'jdbc',\n" + 

//        "'url' = 'jdbc:mysql://192.168.50.120:3306/datacloud?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useTimezone=true',\n" + 

//        "'table-name' = 't_dc_test',\n" + 

//        "'username' = 'dcuser',\n" + 

//        "'password' = 'datacloud37')");

        tableEnv.executeSql(sql);// 这里的 sql = INSERT INTO print_table select count(1) from t_mpos_integral_sign_water

        //Table result = tableEnv.sqlQuery(sql);//"select count(1) from t_mpos_integral_sign_water"

        System.out.println("tableResult  创建完成");

        //result.execute().print();

} catch (Exception e) {

e.printStackTrace();

}



MyHiveCatalog 类的代码:


public class MyHiveCatalog extends HiveCatalog{

private static final Logger LOG = LoggerFactory.getLogger(MyHiveCatalog.class);

public MyHiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable HiveConf hiveConf, String hiveVersion) {

this(catalogName,defaultDatabase == null ? DEFAULT_DB : defaultDatabase,createHiveConf(hiveConf),hiveVersion,false);

}

protected MyHiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, String hiveVersion,

boolean allowEmbedded) {

super(catalogName, defaultDatabase, hiveConf, hiveVersion, allowEmbedded);

// TODO Auto-generated constructor stub

}

private static HiveConf createHiveConf(@Nullable HiveConf hiveConf) {

//LOG.info("Setting hive conf dir as {}", hiveConfDir);


// try {

// HiveConf.setHiveSiteLocation(

// hiveConfDir == null ?

// null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL());

// } catch (MalformedURLException e) {

// throw new CatalogException(

// String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);

// }


// create HiveConf from hadoop configuration

Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration());


// Add mapred-site.xml. We need to read configurations like compression codec.

for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new org.apache.flink.configuration.Configuration())) {

File mapredSite = new File(new File(possibleHadoopConfPath), "mapred-site.xml");

if (mapredSite.exists()) {

hadoopConf.addResource(new Path(mapredSite.getAbsolutePath()));

break;

}

}

HiveConf conf = new HiveConf(hadoopConf, HiveConf.class);

conf.addResource(hiveConf);

return conf;

}


}


**********************************************************************
Thanks & Best Regards!

杉欣集团-技术研究院  云平台
钟保罗

上海浦东新区东方路3261号振华广场B座23楼(杉欣集团)
手机: 18157855633
 


 原始邮件 
发件人: taochanglian<[hidden email]>
收件人: user-zh<[hidden email]>; zhongbaoluo<[hidden email]>
发送时间: 2020年9月8日(周二) 16:51
主题: Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询

贴一下代码

在 2020/9/8 14:09, zhongbaoluo 写道:
据插入数据执行失败,也没有找到异常。 yarn

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询

Rui Li
Hi,

执行insert的时候需要在代码里等作业结束,可以参考这个util类的写法来做:
https://github.com/apache/flink/blob/release-1.11.1/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableEnvUtil.scala#L26

On Wed, Sep 9, 2020 at 2:01 PM zhongbaoluo <[hidden email]> wrote:

> Options options = *null*;
>
>     *try* {
>
> OptionParser optionParser = *new* OptionParser(args);
>
> options = optionParser.getOptions();
>
> } *catch* (Exception e) {
>
> e.printStackTrace();
>
> *return*;
>
> }
>
>
>
>     String name            = options.getName();
>
>     String defaultDatabase = options.getDatabase();//"dc_yunpingtai";
>
>     String hiveConfDir     =  options.getHiveConfDir(); //"/Users/
> zhongbaoluo/Applications/app/apache-hive-3.1.2/conf"; // a local path
>
>     String version         = "3.1.2";
>
>     String sql = options.getSql();
>
>     HiveUtils.*hiveConfDir*(hiveConfDir);
>
>
>
>
>
>     HiveConf hiveConf = *new* HiveConf();
>
>     hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,
> "thrift://dcmaster01:9083");
>
>     hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,
> "/user/hive/warehouse");//hdfs://datacloud-hadoop-cluster
>
>     hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_QUORUM*,
> "dcmaster02:2181");
>
>     hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_CLIENT_PORT*, "2181"
> );
>
>     hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_SESSION_TIMEOUT*,
> "10000");
>
>     hiveConf.set("hive.server2.zookeeper.namespace", "hiveserver2");
>
>     hiveConf.set("hive.server2.zookeeper.publish.configs", "true");
>
>     hiveConf.set("hive.server2.support.dynamic.service.discovery", "true"
> );
>
>     hiveConf.set("hive.metastore.warehouse.dir", "/user/hive/warehouse");
>
>
>
>     *try* {
>
>     EnvironmentSettings settings = EnvironmentSettings.*newInstance*
> ().inBatchMode().build();
>
>     System.*out*.println("settings  创建完成");
>
>         TableEnvironment tableEnv = TableEnvironment.*create*(settings);
>
>         System.*out*.println("tableEnv  创建完成");
>
>
>
>         MyHiveCatalog hive = *new* MyHiveCatalog(name, defaultDatabase,
> hiveConf,version);
>
>         tableEnv.registerCatalog(name, hive);
>
>         System.*out*.println("hive  创建完成");
>
>
>         // set the HiveCatalog as the current catalog of the session
>
>         tableEnv.useCatalog(name);
>
>         tableEnv.useDatabase(defaultDatabase);
>
>         tableEnv.getConfig().setSqlDialect(SqlDialect.*HIVE*);
>
>         tableEnv.executeSql("show tables").print();
>
>         System.*out*.println("sql:"+sql);
>
>         //tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>
>
>
>         tableEnv.executeSql("DROP TABLE print_table");
>
>         tableEnv.executeSql("CREATE TABLE print_table(f0 BIGINT) WITH
> ('connector' = 'print')");
>
> //        tableEnv.executeSql("CREATE TABLE print_table_mysql (\n" +
>
> //        "f0 BIGINT\n" +
>
> //        ") WITH ('connector' = 'jdbc',\n" +
>
> //        "'url' = 'jdbc:mysql://192.168.50.120:3306/datacloud
> ?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useTimezone=true',\n"
> +
>
> //        "'table-name' = 't_dc_test',\n" +
>
> //        "'username' = 'dcuser',\n" +
>
> //        "'password' = 'datacloud37')");
>
>         tableEnv.executeSql(sql);// 这里的 sql = INSERT INTO print_table
> select count(1) from t_mpos_integral_sign_water
>
>         //Table result = tableEnv.sqlQuery(sql);//"select count(1) from
> t_mpos_integral_sign_water"
>
>         System.*out*.println("tableResult  创建完成");
>
>         //result.execute().print();
>
> } *catch* (Exception e) {
>
> e.printStackTrace();
>
> }
>
>
> MyHiveCatalog 类的代码:
>
>
> *public* *class* MyHiveCatalog *extends* HiveCatalog{
>
> *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger*
> (MyHiveCatalog.*class*);
>
> *public* MyHiveCatalog(String catalogName, @Nullable String
> defaultDatabase, @Nullable HiveConf hiveConf, String hiveVersion) {
>
> *this*(catalogName,defaultDatabase == *null* ? *DEFAULT_DB* :
> defaultDatabase,*createHiveConf*(hiveConf),hiveVersion,*false*);
>
> }
>
> *protected* MyHiveCatalog(String catalogName, String defaultDatabase,
> HiveConf hiveConf, String hiveVersion,
>
> *boolean* allowEmbedded) {
>
> *super*(catalogName, defaultDatabase, hiveConf, hiveVersion, allowEmbedded
> );
>
> // *TODO* Auto-generated constructor stub
>
> }
>
> *private* *static* HiveConf createHiveConf(@Nullable HiveConf hiveConf) {
>
> //LOG.info("Setting hive conf dir as {}", hiveConfDir);
>
>
> // try {
>
> // HiveConf.setHiveSiteLocation(
>
> // hiveConfDir == null ?
>
> // null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL());
>
> // } catch (MalformedURLException e) {
>
> // throw new CatalogException(
>
> // String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);
>
> // }
>
>
> // create HiveConf from hadoop configuration
>
> Configuration hadoopConf = HadoopUtils.*getHadoopConfiguration*(*new*
> org.apache.flink.configuration.Configuration());
>
>
> // Add mapred-site.xml. We need to read configurations like compression
> codec.
>
> *for* (String possibleHadoopConfPath : HadoopUtils.
> *possibleHadoopConfPaths*(*new*
> org.apache.flink.configuration.Configuration())) {
>
> File mapredSite = *new* File(*new* File(possibleHadoopConfPath),
> "mapred-site.xml");
>
> *if* (mapredSite.exists()) {
>
> hadoopConf.addResource(*new* Path(mapredSite.getAbsolutePath()));
>
> *break*;
>
> }
>
> }
>
> HiveConf conf = *new* HiveConf(hadoopConf, HiveConf.*class*);
>
> conf.addResource(hiveConf);
>
> *return* conf;
>
> }
>
>
> }
>
> **********************************************************************
> Thanks & Best Regards!
>
> 杉欣集团-技术研究院  云平台
> 钟保罗
>
> 上海浦东新区东方路3261号振华广场B座23楼(杉欣集团)
> email: [hidden email]
> 手机: 18157855633
>
>
>
>  原始邮件
> *发件人:* taochanglian<[hidden email]>
> *收件人:* user-zh<[hidden email]>; zhongbaoluo<
> [hidden email]>
> *发送时间:* 2020年9月8日(周二) 16:51
> *主题:* Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询
>
> 贴一下代码
> 在 2020/9/8 14:09, zhongbaoluo 写道:
>
> 据插入数据执行失败,也没有找到异常。 yarn
>
>
>

--
Best regards!
Rui Li