下面粘的就是主程序代码
能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助!!!! StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource());//构造hive catalog String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a local path String version = "1.1.0"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.createTemporaryView("users", dataStream); Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users"); tEnv.toRetractStream(result3, TypeInformation.of(new TypeHint<Tuple5<String,Double,String,String,String>>(){})).print("res");// 如果hive中已经存在了相应的表,则这段代码省略 // String hiveSql = "CREATE TABLE fs_table (\n" + // " user_id STRING,\n" + // " order_amount DOUBLE \n" + // ") partitioned by (dt string,h string,m string) \n" + // "stored as textfile \n" + // "TBLPROPERTIES (\n" + // " 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" + // " 'sink.partition-commit.delay'='0s',\n" + // " 'sink.partition-commit.trigger'='partition-time',\n" + // " 'sink.partition-commit.policy.kind'='metastore'" + // ")"; // tEnv.executeSql(hiveSql); String insertSql = "insert into table fs_table partition (dt,h,m) SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users"; tEnv.executeSql(insertSql); bsEnv.execute("test"); [hidden email] |
流数据写入Hive要注意的点:
① 需要开启checkpoint ② 需要 wartermark ③ 涉及到数据提交,里面有个时区的问题..........,也没成功,还等着在 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月11日(星期二) 下午3:19 收件人: "user-zh"<[hidden email]>; 主题: flink 1.11 使用sql将流式数据写入hive 下面粘的就是主程序代码 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助!!!! StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource());//构造hive catalog String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a local path String version = "1.1.0"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.createTemporaryView("users", dataStream); Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users"); tEnv.toRetractStream(result3, TypeInformation.of(new TypeHint<Tuple5<String,Double,String,String,String>>(){})).print("res");// 如果hive中已经存在了相应的表,则这段代码省略 // String hiveSql = "CREATE TABLE fs_table (\n" + // " user_id STRING,\n" + // " order_amount DOUBLE \n" + // ") partitioned by (dt string,h string,m string) \n" + // "stored as textfile \n" + // "TBLPROPERTIES (\n" + // " 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" + // " 'sink.partition-commit.delay'='0s',\n" + // " 'sink.partition-commit.trigger'='partition-time',\n" + // " 'sink.partition-commit.policy.kind'='metastore'" + // ")"; // tEnv.executeSql(hiveSql); String insertSql = "insert into table fs_table partition (dt,h,m) SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users"; tEnv.executeSql(insertSql); bsEnv.execute("test"); [hidden email] |
In reply to this post by pass Li
tEnv.executeSql(insertSql); 是异步提交完任务就返回了,
如果是IDE里运行的话话,进程就直接退出导致job也就结束了。需要需要等到job结束, 目前可以通过下面这种方式 TableResult result = tEnv.executeSql(insertSql); result..getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get(); 另外 tEnv.executeSql(insertSql); 已经提交作业了,不需要调用 bsEnv.execute("test"); [hidden email] <[hidden email]> 于2020年8月11日周二 下午3:20写道: > 下面粘的就是主程序代码 > > 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助!!!! > > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, > bsSettings);DataStream<UserInfo> dataStream = bsEnv.addSource(new > MySource());//构造hive catalog > String name = "myhive"; > String defaultDatabase = "default"; > String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a > local path > String version = "1.1.0"; > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, > version); > tEnv.registerCatalog("myhive", hive); > tEnv.useCatalog("myhive"); > tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > tEnv.createTemporaryView("users", dataStream); > > Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts, > 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM > users"); > > > tEnv.toRetractStream(result3, TypeInformation.of(new > TypeHint<Tuple5<String,Double,String,String,String>>(){})).print("res");// > 如果hive中已经存在了相应的表,则这段代码省略 > // String hiveSql = "CREATE TABLE fs_table (\n" + > // " user_id STRING,\n" + > // " order_amount DOUBLE \n" + > // ") partitioned by (dt string,h string,m string) \n" > + > // "stored as textfile \n" + > // "TBLPROPERTIES (\n" + > // " > 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" + > // " 'sink.partition-commit.delay'='0s',\n" + > // " > 'sink.partition-commit.trigger'='partition-time',\n" + > // " 'sink.partition-commit.policy.kind'='metastore'" > + > // ")"; > // tEnv.executeSql(hiveSql); > > String insertSql = "insert into table fs_table partition (dt,h,m) > SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts, > 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users"; > > tEnv.executeSql(insertSql); > > bsEnv.execute("test"); > > > [hidden email] > |
In reply to this post by pass Li
Hi
可以看下這個demo https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA -- Sent from: http://apache-flink.147419.n8.nabble.com/
Best Wishes
JasonLee |
Administrator
|
In reply to this post by pass Li
你是在 IDE 中直接执行的吗? 还是打包成 jar 包在集群上运行的呢?
如果是在 IDE 中执行,确实会有这个现象。原因是 tEnv.executeSql() 在执行 insert into 语句的时候,提交成功后就返回了,不会等待执行结束。 所以如果要在 IDE 中等待执行结束,需要额外增加等待代码: val tableResult = tEnv.executeSql("insert into ...") // wait job finished tableResult.getJobClient.get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader) .get() Best, Jark On Tue, 11 Aug 2020 at 15:20, [hidden email] <[hidden email]> wrote: > 下面粘的就是主程序代码 > > 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助!!!! > > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, > bsSettings);DataStream<UserInfo> dataStream = bsEnv.addSource(new > MySource());//构造hive catalog > String name = "myhive"; > String defaultDatabase = "default"; > String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a > local path > String version = "1.1.0"; > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, > version); > tEnv.registerCatalog("myhive", hive); > tEnv.useCatalog("myhive"); > tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > tEnv.createTemporaryView("users", dataStream); > > Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts, > 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM > users"); > > > tEnv.toRetractStream(result3, TypeInformation.of(new > TypeHint<Tuple5<String,Double,String,String,String>>(){})).print("res");// > 如果hive中已经存在了相应的表,则这段代码省略 > // String hiveSql = "CREATE TABLE fs_table (\n" + > // " user_id STRING,\n" + > // " order_amount DOUBLE \n" + > // ") partitioned by (dt string,h string,m string) \n" > + > // "stored as textfile \n" + > // "TBLPROPERTIES (\n" + > // " > 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" + > // " 'sink.partition-commit.delay'='0s',\n" + > // " > 'sink.partition-commit.trigger'='partition-time',\n" + > // " 'sink.partition-commit.policy.kind'='metastore'" > + > // ")"; > // tEnv.executeSql(hiveSql); > > String insertSql = "insert into table fs_table partition (dt,h,m) > SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts, > 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users"; > > tEnv.executeSql(insertSql); > > bsEnv.execute("test"); > > > [hidden email] > |
Free forum by Nabble | Edit this page |