flink 1.11 使用sql将流式数据写入hive

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 使用sql将流式数据写入hive

pass Li
下面粘的就是主程序代码      
能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助!!!!

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource());//构造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a local path
String version = "1.1.0";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

tEnv.createTemporaryView("users", dataStream);

Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users");


tEnv.toRetractStream(result3, TypeInformation.of(new TypeHint<Tuple5<String,Double,String,String,String>>(){})).print("res");//      如果hive中已经存在了相应的表,则这段代码省略
//    String hiveSql = "CREATE TABLE fs_table (\n" +
//                     "  user_id STRING,\n" +
//                     "  order_amount DOUBLE \n" +
//                     ") partitioned by (dt string,h string,m string) \n" +
//                     "stored as textfile \n" +
//                     "TBLPROPERTIES (\n" +
//                     "  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
//                     "  'sink.partition-commit.delay'='0s',\n" +
//                     "  'sink.partition-commit.trigger'='partition-time',\n" +
//                     "  'sink.partition-commit.policy.kind'='metastore'" +
//                     ")";
//    tEnv.executeSql(hiveSql);

        String insertSql = "insert into table fs_table partition (dt,h,m) SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users";

        tEnv.executeSql(insertSql);

        bsEnv.execute("test");


[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复:flink 1.11 使用sql将流式数据写入hive

liujian
流数据写入Hive要注意的点:
① 需要开启checkpoint
② 需要 wartermark
③ 涉及到数据提交,里面有个时区的问题..........,也没成功,还等着在




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月11日(星期二) 下午3:19
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;flink 1.11 使用sql将流式数据写入hive



下面粘的就是主程序代码&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助!!!!

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);DataStream<UserInfo&gt; dataStream = bsEnv.addSource(new MySource());//构造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a local path
String version = "1.1.0";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

tEnv.createTemporaryView("users", dataStream);

Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users");


tEnv.toRetractStream(result3, TypeInformation.of(new TypeHint<Tuple5<String,Double,String,String,String&gt;&gt;(){})).print("res");//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 如果hive中已经存在了相应的表,则这段代码省略
//&nbsp;&nbsp;&nbsp; String hiveSql = "CREATE TABLE fs_table (\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; user_id STRING,\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; order_amount DOUBLE \n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ") partitioned by (dt string,h string,m string) \n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "stored as textfile \n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "TBLPROPERTIES (\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; 'sink.partition-commit.delay'='0s',\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; 'sink.partition-commit.trigger'='partition-time',\n" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "&nbsp; 'sink.partition-commit.policy.kind'='metastore'" +
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ")";
//&nbsp;&nbsp;&nbsp; tEnv.executeSql(hiveSql);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String insertSql = "insert into table fs_table partition (dt,h,m) SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users";

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; tEnv.executeSql(insertSql);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsEnv.execute("test");


[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 使用sql将流式数据写入hive

godfrey he
In reply to this post by pass Li
 tEnv.executeSql(insertSql); 是异步提交完任务就返回了,
如果是IDE里运行的话话,进程就直接退出导致job也就结束了。需要需要等到job结束,
目前可以通过下面这种方式
TableResult result = tEnv.executeSql(insertSql);
result..getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

另外  tEnv.executeSql(insertSql); 已经提交作业了,不需要调用  bsEnv.execute("test");

[hidden email] <[hidden email]> 于2020年8月11日周二 下午3:20写道:

> 下面粘的就是主程序代码
>
> 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助!!!!
>
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);DataStream<UserInfo> dataStream = bsEnv.addSource(new
> MySource());//构造hive catalog
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a
> local path
> String version = "1.1.0";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
> version);
> tEnv.registerCatalog("myhive", hive);
> tEnv.useCatalog("myhive");
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> tEnv.createTemporaryView("users", dataStream);
>
> Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts,
> 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM
> users");
>
>
> tEnv.toRetractStream(result3, TypeInformation.of(new
> TypeHint<Tuple5<String,Double,String,String,String>>(){})).print("res");//
>     如果hive中已经存在了相应的表,则这段代码省略
> //    String hiveSql = "CREATE TABLE fs_table (\n" +
> //                     "  user_id STRING,\n" +
> //                     "  order_amount DOUBLE \n" +
> //                     ") partitioned by (dt string,h string,m string) \n"
> +
> //                     "stored as textfile \n" +
> //                     "TBLPROPERTIES (\n" +
> //                     "
> 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
> //                     "  'sink.partition-commit.delay'='0s',\n" +
> //                     "
> 'sink.partition-commit.trigger'='partition-time',\n" +
> //                     "  'sink.partition-commit.policy.kind'='metastore'"
> +
> //                     ")";
> //    tEnv.executeSql(hiveSql);
>
>         String insertSql = "insert into table fs_table partition (dt,h,m)
> SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts,
> 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users";
>
>         tEnv.executeSql(insertSql);
>
>         bsEnv.execute("test");
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 使用sql将流式数据写入hive

JasonLee
In reply to this post by pass Li
Hi

可以看下這個demo
https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Best Wishes
JasonLee
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 使用sql将流式数据写入hive

Jark
Administrator
In reply to this post by pass Li
你是在 IDE 中直接执行的吗? 还是打包成 jar 包在集群上运行的呢?
如果是在 IDE 中执行,确实会有这个现象。原因是 tEnv.executeSql() 在执行 insert into
语句的时候,提交成功后就返回了,不会等待执行结束。
所以如果要在 IDE 中等待执行结束,需要额外增加等待代码:

val tableResult = tEnv.executeSql("insert into ...")
// wait job finished
tableResult.getJobClient.get()
  .getJobExecutionResult(Thread.currentThread().getContextClassLoader)
  .get()


Best,
Jark

On Tue, 11 Aug 2020 at 15:20, [hidden email] <[hidden email]>
wrote:

> 下面粘的就是主程序代码
>
> 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助!!!!
>
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);DataStream<UserInfo> dataStream = bsEnv.addSource(new
> MySource());//构造hive catalog
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a
> local path
> String version = "1.1.0";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
> version);
> tEnv.registerCatalog("myhive", hive);
> tEnv.useCatalog("myhive");
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> tEnv.createTemporaryView("users", dataStream);
>
> Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts,
> 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM
> users");
>
>
> tEnv.toRetractStream(result3, TypeInformation.of(new
> TypeHint<Tuple5<String,Double,String,String,String>>(){})).print("res");//
>     如果hive中已经存在了相应的表,则这段代码省略
> //    String hiveSql = "CREATE TABLE fs_table (\n" +
> //                     "  user_id STRING,\n" +
> //                     "  order_amount DOUBLE \n" +
> //                     ") partitioned by (dt string,h string,m string) \n"
> +
> //                     "stored as textfile \n" +
> //                     "TBLPROPERTIES (\n" +
> //                     "
> 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
> //                     "  'sink.partition-commit.delay'='0s',\n" +
> //                     "
> 'sink.partition-commit.trigger'='partition-time',\n" +
> //                     "  'sink.partition-commit.policy.kind'='metastore'"
> +
> //                     ")";
> //    tEnv.executeSql(hiveSql);
>
>         String insertSql = "insert into table fs_table partition (dt,h,m)
> SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts,
> 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users";
>
>         tEnv.executeSql(insertSql);
>
>         bsEnv.execute("test");
>
>
> [hidden email]
>