想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。 附代码,很简短: public class KafkaToHiveStreaming { public static void main(String[] arg) throws Exception{ StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local path String version = "3.1.2"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); bsTableEnv.registerCatalog("myhive", hive); bsTableEnv.useCatalog("myhive"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); bsTableEnv.executeSql("CREATE TABLE topic_products (" + " id BIGINT ," + " order_id STRING," + " amount DECIMAL(10, 2)," + " create_time TIMESTAMP " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'order.test'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'properties.group.id' = 'testGroup'," + " 'scan.startup.mode' = 'earliest-offset', " + " 'format' = 'json' " + ")"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" + " id BIGINT ," + " order_id STRING," + " amount DECIMAL(10, 2)" + " )"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); bsTableEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')" + "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " + "id, " + "order_id, " + "amount " + "FROM topic_products"); Table table1 = bsTableEnv.from("hive_sink_table_streaming"); table1.executeInsert("print_table"); } } |
好的,谢谢~~~
JasonLee <[hidden email]> 于2020年7月16日周四 下午8:22写道: > hi > 需要开启checkpoint > > > | | > JasonLee > | > | > 邮箱:[hidden email] > | > > Signature is customized by Netease Mail Master > > 在2020年07月16日 18:03,李佳宸 写道: > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。 > 批量的hive写入,流环境的读取是正常的。 > > 附代码,很简短: > > public class KafkaToHiveStreaming { > public static void main(String[] arg) throws Exception{ > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = > StreamTableEnvironment.create(bsEnv, bsSettings); > String name = "myhive"; > String defaultDatabase = "default"; > String hiveConfDir = > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local > path > String version = "3.1.2"; > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > hiveConfDir, version); > bsTableEnv.registerCatalog("myhive", hive); > bsTableEnv.useCatalog("myhive"); > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > bsTableEnv.executeSql("CREATE TABLE topic_products (" + > " id BIGINT ," + > " order_id STRING," + > " amount DECIMAL(10, 2)," + > " create_time TIMESTAMP " + > ") WITH (" + > " 'connector' = 'kafka'," + > " 'topic' = 'order.test'," + > " 'properties.bootstrap.servers' = 'localhost:9092'," + > " 'properties.group.id' = 'testGroup'," + > " 'scan.startup.mode' = 'earliest-offset', " + > " 'format' = 'json' " + > ")"); > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" + > " id BIGINT ," + > " order_id STRING," + > " amount DECIMAL(10, 2)" + > " )"); > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > bsTableEnv.executeSql("CREATE TABLE print_table WITH > ('connector' = 'print')" + > "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING > ALL)"); > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT > " + > "id, " + > "order_id, " + > "amount " + > "FROM topic_products"); > > Table table1 = bsTableEnv.from("hive_sink_table_streaming"); > table1.executeInsert("print_table"); > } > } > |
hi、
请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据 李佳宸 <[hidden email]> 于2020年7月16日周四 下午10:39写道: > 好的,谢谢~~~ > > JasonLee <[hidden email]> 于2020年7月16日周四 下午8:22写道: > > > hi > > 需要开启checkpoint > > > > > > | | > > JasonLee > > | > > | > > 邮箱:[hidden email] > > | > > > > Signature is customized by Netease Mail Master > > > > 在2020年07月16日 18:03,李佳宸 写道: > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。 > > 批量的hive写入,流环境的读取是正常的。 > > > > 附代码,很简短: > > > > public class KafkaToHiveStreaming { > > public static void main(String[] arg) throws Exception{ > > StreamExecutionEnvironment bsEnv = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > EnvironmentSettings bsSettings = > > > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > StreamTableEnvironment bsTableEnv = > > StreamTableEnvironment.create(bsEnv, bsSettings); > > String name = "myhive"; > > String defaultDatabase = "default"; > > String hiveConfDir = > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local > > path > > String version = "3.1.2"; > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > > hiveConfDir, version); > > bsTableEnv.registerCatalog("myhive", hive); > > bsTableEnv.useCatalog("myhive"); > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > bsTableEnv.executeSql("CREATE TABLE topic_products (" + > > " id BIGINT ," + > > " order_id STRING," + > > " amount DECIMAL(10, 2)," + > > " create_time TIMESTAMP " + > > ") WITH (" + > > " 'connector' = 'kafka'," + > > " 'topic' = 'order.test'," + > > " 'properties.bootstrap.servers' = 'localhost:9092'," + > > " 'properties.group.id' = 'testGroup'," + > > " 'scan.startup.mode' = 'earliest-offset', " + > > " 'format' = 'json' " + > > ")"); > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > > > bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" + > > " id BIGINT ," + > > " order_id STRING," + > > " amount DECIMAL(10, 2)" + > > " )"); > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > bsTableEnv.executeSql("CREATE TABLE print_table WITH > > ('connector' = 'print')" + > > "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING > > ALL)"); > > > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming > SELECT > > " + > > "id, " + > > "order_id, " + > > "amount " + > > "FROM topic_products"); > > > > Table table1 = bsTableEnv.from("hive_sink_table_streaming"); > > table1.executeInsert("print_table"); > > } > > } > > > |
Hi Dream,
可以详述下你的测试场景吗? Best, Jingsong On Mon, Jul 20, 2020 at 5:40 PM Dream-底限 <[hidden email]> wrote: > hi、 > 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据 > > 李佳宸 <[hidden email]> 于2020年7月16日周四 下午10:39写道: > > > 好的,谢谢~~~ > > > > JasonLee <[hidden email]> 于2020年7月16日周四 下午8:22写道: > > > > > hi > > > 需要开启checkpoint > > > > > > > > > | | > > > JasonLee > > > | > > > | > > > 邮箱:[hidden email] > > > | > > > > > > Signature is customized by Netease Mail Master > > > > > > 在2020年07月16日 18:03,李佳宸 写道: > > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。 > > > 批量的hive写入,流环境的读取是正常的。 > > > > > > 附代码,很简短: > > > > > > public class KafkaToHiveStreaming { > > > public static void main(String[] arg) throws Exception{ > > > StreamExecutionEnvironment bsEnv = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > EnvironmentSettings bsSettings = > > > > > > > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > > StreamTableEnvironment bsTableEnv = > > > StreamTableEnvironment.create(bsEnv, bsSettings); > > > String name = "myhive"; > > > String defaultDatabase = "default"; > > > String hiveConfDir = > > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local > > > path > > > String version = "3.1.2"; > > > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > > > hiveConfDir, version); > > > bsTableEnv.registerCatalog("myhive", hive); > > > bsTableEnv.useCatalog("myhive"); > > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > > bsTableEnv.executeSql("CREATE TABLE topic_products (" + > > > " id BIGINT ," + > > > " order_id STRING," + > > > " amount DECIMAL(10, 2)," + > > > " create_time TIMESTAMP " + > > > ") WITH (" + > > > " 'connector' = 'kafka'," + > > > " 'topic' = 'order.test'," + > > > " 'properties.bootstrap.servers' = 'localhost:9092'," + > > > " 'properties.group.id' = 'testGroup'," + > > > " 'scan.startup.mode' = 'earliest-offset', " + > > > " 'format' = 'json' " + > > > ")"); > > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > > > > > bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming > (" + > > > " id BIGINT ," + > > > " order_id STRING," + > > > " amount DECIMAL(10, 2)" + > > > " )"); > > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > > bsTableEnv.executeSql("CREATE TABLE print_table WITH > > > ('connector' = 'print')" + > > > "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING > > > ALL)"); > > > > > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > > bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming > > SELECT > > > " + > > > "id, " + > > > "order_id, " + > > > "amount " + > > > "FROM topic_products"); > > > > > > Table table1 = bsTableEnv.from("hive_sink_table_streaming"); > > > table1.executeInsert("print_table"); > > > } > > > } > > > > > > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |