你好! 我想咨询一下,在 flink 1.11.1 版本里面用 application 模式在包在hdfs上运行 HiveCatalog。 查询元数据之类的都能执行成功,查询数据插入数据执行失败,也没有找到异常。 yarn 状态就是失败,数据也没有执行成功。 本地执行或者jar 包在本地的方式提交都能执行成功。 我想质询一下这个问题
谢谢 ********************************************************************** Thanks & Best Regards! 杉欣集团-技术研究院 云平台 钟保罗 上海浦东新区东方路3261号振华广场B座23楼(杉欣集团) email: [hidden email] 手机: 18157855633 |
贴一下代码
在 2020/9/8 14:09, zhongbaoluo 写道: > 据插入数据执行失败,也没有找到异常。 yarn |
In reply to this post by zhongbaoluo
Options options = null; try { OptionParser optionParser = new OptionParser(args); options = optionParser.getOptions(); } catch (Exception e) { e.printStackTrace(); return; }
String name = options.getName(); String defaultDatabase = options.getDatabase();//"dc_yunpingtai"; String hiveConfDir = options.getHiveConfDir(); //"/Users/zhongbaoluo/Applications/app/apache-hive-3.1.2/conf"; // a local path String version = "3.1.2"; String sql = options.getSql(); HiveUtils.hiveConfDir(hiveConfDir);
HiveConf hiveConf = new HiveConf(); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,"thrift://dcmaster01:9083"); hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,"/user/hive/warehouse");//hdfs://datacloud-hadoop-cluster hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "dcmaster02:2181"); hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "2181"); hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, "10000"); hiveConf.set("hive.server2.zookeeper.namespace", "hiveserver2"); hiveConf.set("hive.server2.zookeeper.publish.configs", "true"); hiveConf.set("hive.server2.support.dynamic.service.discovery", "true"); hiveConf.set("hive.metastore.warehouse.dir", "/user/hive/warehouse");
try { EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); System.out.println("settings 创建完成"); TableEnvironment tableEnv = TableEnvironment.create(settings); System.out.println("tableEnv 创建完成");
MyHiveCatalog hive = new MyHiveCatalog(name, defaultDatabase, hiveConf,version); tableEnv.registerCatalog(name, hive); System.out.println("hive 创建完成"); // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog(name); tableEnv.useDatabase(defaultDatabase); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("show tables").print(); System.out.println("sql:"+sql); //tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("DROP TABLE print_table"); tableEnv.executeSql("CREATE TABLE print_table(f0 BIGINT) WITH ('connector' = 'print')"); // tableEnv.executeSql("CREATE TABLE print_table_mysql (\n" + // "f0 BIGINT\n" + // ") WITH ('connector' = 'jdbc',\n" + // "'url' = 'jdbc:mysql://192.168.50.120:3306/datacloud?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useTimezone=true',\n" + // "'table-name' = 't_dc_test',\n" + // "'username' = 'dcuser',\n" + // "'password' = 'datacloud37')"); tableEnv.executeSql(sql);// 这里的 sql = INSERT INTO print_table select count(1) from t_mpos_integral_sign_water //Table result = tableEnv.sqlQuery(sql);//"select count(1) from t_mpos_integral_sign_water" System.out.println("tableResult 创建完成"); //result.execute().print(); } catch (Exception e) { e.printStackTrace();
} MyHiveCatalog 类的代码: public class MyHiveCatalog extends HiveCatalog{
private static final Logger LOG = LoggerFactory.getLogger(MyHiveCatalog.class);
public MyHiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable HiveConf hiveConf, String hiveVersion) { this(catalogName,defaultDatabase == null ? DEFAULT_DB : defaultDatabase,createHiveConf(hiveConf),hiveVersion,false); }
protected MyHiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, String hiveVersion, boolean allowEmbedded) { super(catalogName, defaultDatabase, hiveConf, hiveVersion, allowEmbedded); // TODO Auto-generated constructor stub }
private static HiveConf createHiveConf(@Nullable HiveConf hiveConf) { //LOG.info("Setting hive conf dir as {}", hiveConfDir); // try { // HiveConf.setHiveSiteLocation( // hiveConfDir == null ? // null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL()); // } catch (MalformedURLException e) { // throw new CatalogException( // String.format("Failed to get hive-site.xml from %s", hiveConfDir), e); // } // create HiveConf from hadoop configuration Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration()); // Add mapred-site.xml. We need to read configurations like compression codec. for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new org.apache.flink.configuration.Configuration())) { File mapredSite = new File(new File(possibleHadoopConfPath), "mapred-site.xml"); if (mapredSite.exists()) { hadoopConf.addResource(new Path(mapredSite.getAbsolutePath())); break; } }
HiveConf conf = new HiveConf(hadoopConf, HiveConf.class); conf.addResource(hiveConf); return conf; }
} ********************************************************************** Thanks & Best Regards! 杉欣集团-技术研究院 云平台 钟保罗 上海浦东新区东方路3261号振华广场B座23楼(杉欣集团) email: [hidden email] 手机: 18157855633 原始邮件 发件人: taochanglian<[hidden email]> 收件人: user-zh<[hidden email]>; zhongbaoluo<[hidden email]> 发送时间: 2020年9月8日(周二) 16:51 主题: Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询 贴一下代码 在 2020/9/8 14:09, zhongbaoluo 写道:
据插入数据执行失败,也没有找到异常。 yarn |
Hi,
执行insert的时候需要在代码里等作业结束,可以参考这个util类的写法来做: https://github.com/apache/flink/blob/release-1.11.1/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableEnvUtil.scala#L26 On Wed, Sep 9, 2020 at 2:01 PM zhongbaoluo <[hidden email]> wrote: > Options options = *null*; > > *try* { > > OptionParser optionParser = *new* OptionParser(args); > > options = optionParser.getOptions(); > > } *catch* (Exception e) { > > e.printStackTrace(); > > *return*; > > } > > > > String name = options.getName(); > > String defaultDatabase = options.getDatabase();//"dc_yunpingtai"; > > String hiveConfDir = options.getHiveConfDir(); //"/Users/ > zhongbaoluo/Applications/app/apache-hive-3.1.2/conf"; // a local path > > String version = "3.1.2"; > > String sql = options.getSql(); > > HiveUtils.*hiveConfDir*(hiveConfDir); > > > > > > HiveConf hiveConf = *new* HiveConf(); > > hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, > "thrift://dcmaster01:9083"); > > hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, > "/user/hive/warehouse");//hdfs://datacloud-hadoop-cluster > > hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_QUORUM*, > "dcmaster02:2181"); > > hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_CLIENT_PORT*, "2181" > ); > > hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_SESSION_TIMEOUT*, > "10000"); > > hiveConf.set("hive.server2.zookeeper.namespace", "hiveserver2"); > > hiveConf.set("hive.server2.zookeeper.publish.configs", "true"); > > hiveConf.set("hive.server2.support.dynamic.service.discovery", "true" > ); > > hiveConf.set("hive.metastore.warehouse.dir", "/user/hive/warehouse"); > > > > *try* { > > EnvironmentSettings settings = EnvironmentSettings.*newInstance* > ().inBatchMode().build(); > > System.*out*.println("settings 创建完成"); > > TableEnvironment tableEnv = TableEnvironment.*create*(settings); > > System.*out*.println("tableEnv 创建完成"); > > > > MyHiveCatalog hive = *new* MyHiveCatalog(name, defaultDatabase, > hiveConf,version); > > tableEnv.registerCatalog(name, hive); > > System.*out*.println("hive 创建完成"); > > > // set the HiveCatalog as the current catalog of the session > > tableEnv.useCatalog(name); > > tableEnv.useDatabase(defaultDatabase); > > tableEnv.getConfig().setSqlDialect(SqlDialect.*HIVE*); > > tableEnv.executeSql("show tables").print(); > > System.*out*.println("sql:"+sql); > > //tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > > > tableEnv.executeSql("DROP TABLE print_table"); > > tableEnv.executeSql("CREATE TABLE print_table(f0 BIGINT) WITH > ('connector' = 'print')"); > > // tableEnv.executeSql("CREATE TABLE print_table_mysql (\n" + > > // "f0 BIGINT\n" + > > // ") WITH ('connector' = 'jdbc',\n" + > > // "'url' = 'jdbc:mysql://192.168.50.120:3306/datacloud > ?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useTimezone=true',\n" > + > > // "'table-name' = 't_dc_test',\n" + > > // "'username' = 'dcuser',\n" + > > // "'password' = 'datacloud37')"); > > tableEnv.executeSql(sql);// 这里的 sql = INSERT INTO print_table > select count(1) from t_mpos_integral_sign_water > > //Table result = tableEnv.sqlQuery(sql);//"select count(1) from > t_mpos_integral_sign_water" > > System.*out*.println("tableResult 创建完成"); > > //result.execute().print(); > > } *catch* (Exception e) { > > e.printStackTrace(); > > } > > > MyHiveCatalog 类的代码: > > > *public* *class* MyHiveCatalog *extends* HiveCatalog{ > > *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger* > (MyHiveCatalog.*class*); > > *public* MyHiveCatalog(String catalogName, @Nullable String > defaultDatabase, @Nullable HiveConf hiveConf, String hiveVersion) { > > *this*(catalogName,defaultDatabase == *null* ? *DEFAULT_DB* : > defaultDatabase,*createHiveConf*(hiveConf),hiveVersion,*false*); > > } > > *protected* MyHiveCatalog(String catalogName, String defaultDatabase, > HiveConf hiveConf, String hiveVersion, > > *boolean* allowEmbedded) { > > *super*(catalogName, defaultDatabase, hiveConf, hiveVersion, allowEmbedded > ); > > // *TODO* Auto-generated constructor stub > > } > > *private* *static* HiveConf createHiveConf(@Nullable HiveConf hiveConf) { > > //LOG.info("Setting hive conf dir as {}", hiveConfDir); > > > // try { > > // HiveConf.setHiveSiteLocation( > > // hiveConfDir == null ? > > // null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL()); > > // } catch (MalformedURLException e) { > > // throw new CatalogException( > > // String.format("Failed to get hive-site.xml from %s", hiveConfDir), e); > > // } > > > // create HiveConf from hadoop configuration > > Configuration hadoopConf = HadoopUtils.*getHadoopConfiguration*(*new* > org.apache.flink.configuration.Configuration()); > > > // Add mapred-site.xml. We need to read configurations like compression > codec. > > *for* (String possibleHadoopConfPath : HadoopUtils. > *possibleHadoopConfPaths*(*new* > org.apache.flink.configuration.Configuration())) { > > File mapredSite = *new* File(*new* File(possibleHadoopConfPath), > "mapred-site.xml"); > > *if* (mapredSite.exists()) { > > hadoopConf.addResource(*new* Path(mapredSite.getAbsolutePath())); > > *break*; > > } > > } > > HiveConf conf = *new* HiveConf(hadoopConf, HiveConf.*class*); > > conf.addResource(hiveConf); > > *return* conf; > > } > > > } > > ********************************************************************** > Thanks & Best Regards! > > 杉欣集团-技术研究院 云平台 > 钟保罗 > > 上海浦东新区东方路3261号振华广场B座23楼(杉欣集团) > email: [hidden email] > 手机: 18157855633 > > > > 原始邮件 > *发件人:* taochanglian<[hidden email]> > *收件人:* user-zh<[hidden email]>; zhongbaoluo< > [hidden email]> > *发送时间:* 2020年9月8日(周二) 16:51 > *主题:* Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询 > > 贴一下代码 > 在 2020/9/8 14:09, zhongbaoluo 写道: > > 据插入数据执行失败,也没有找到异常。 yarn > > > -- Best regards! Rui Li |
Free forum by Nabble | Edit this page |