Flink 1.11 Hive Streaming Write的问题

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11 Hive Streaming Write的问题

李佳宸
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。

附代码,很简短:

public class KafkaToHiveStreaming {
    public static void main(String[] arg) throws Exception{
        StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir     =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
        String version         = "3.1.2";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
        bsTableEnv.registerCatalog("myhive", hive);
        bsTableEnv.useCatalog("myhive");
        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
                "  id BIGINT ," +
                "  order_id STRING," +
                "  amount DECIMAL(10, 2)," +
                "  create_time TIMESTAMP " +
                ") WITH (" +
                " 'connector' = 'kafka'," +
                " 'topic' = 'order.test'," +
                " 'properties.bootstrap.servers' = 'localhost:9092'," +
                " 'properties.group.id' = 'testGroup'," +
                " 'scan.startup.mode' = 'earliest-offset', " +
                " 'format' = 'json'  " +
                ")");
        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
                "  id BIGINT ," +
                "  order_id STRING," +
                "  amount DECIMAL(10, 2)" +
                "  )");
        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");

        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
                "id, " +
                "order_id, " +
                "amount " +
                "FROM topic_products");

        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
        table1.executeInsert("print_table");
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Hive Streaming Write的问题

李佳宸
好的,谢谢~~~

JasonLee <[hidden email]> 于2020年7月16日周四 下午8:22写道:

> hi
> 需要开启checkpoint
>
>
> | |
> JasonLee
> |
> |
> 邮箱:[hidden email]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月16日 18:03,李佳宸 写道:
> 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> 批量的hive写入,流环境的读取是正常的。
>
> 附代码,很简短:
>
> public class KafkaToHiveStreaming {
>    public static void main(String[] arg) throws Exception{
>        StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>        EnvironmentSettings bsSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>        StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>        String name            = "myhive";
>        String defaultDatabase = "default";
>        String hiveConfDir     =
> "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> path
>        String version         = "3.1.2";
>
>        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
>        bsTableEnv.registerCatalog("myhive", hive);
>        bsTableEnv.useCatalog("myhive");
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
>                "  id BIGINT ," +
>                "  order_id STRING," +
>                "  amount DECIMAL(10, 2)," +
>                "  create_time TIMESTAMP " +
>                ") WITH (" +
>                " 'connector' = 'kafka'," +
>                " 'topic' = 'order.test'," +
>                " 'properties.bootstrap.servers' = 'localhost:9092'," +
>                " 'properties.group.id' = 'testGroup'," +
>                " 'scan.startup.mode' = 'earliest-offset', " +
>                " 'format' = 'json'  " +
>                ")");
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
>        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
>                "  id BIGINT ," +
>                "  order_id STRING," +
>                "  amount DECIMAL(10, 2)" +
>                "  )");
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>        bsTableEnv.executeSql("CREATE TABLE print_table WITH
> ('connector' = 'print')" +
>                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> ALL)");
>
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT
> " +
>                "id, " +
>                "order_id, " +
>                "amount " +
>                "FROM topic_products");
>
>        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
>        table1.executeInsert("print_table");
>    }
> }
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Hive Streaming Write的问题

Dream-底限
hi、
请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据

李佳宸 <[hidden email]> 于2020年7月16日周四 下午10:39写道:

> 好的,谢谢~~~
>
> JasonLee <[hidden email]> 于2020年7月16日周四 下午8:22写道:
>
> > hi
> > 需要开启checkpoint
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:[hidden email]
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年07月16日 18:03,李佳宸 写道:
> > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > 批量的hive写入,流环境的读取是正常的。
> >
> > 附代码,很简短:
> >
> > public class KafkaToHiveStreaming {
> >    public static void main(String[] arg) throws Exception{
> >        StreamExecutionEnvironment bsEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >        EnvironmentSettings bsSettings =
> >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >        StreamTableEnvironment bsTableEnv =
> > StreamTableEnvironment.create(bsEnv, bsSettings);
> >        String name            = "myhive";
> >        String defaultDatabase = "default";
> >        String hiveConfDir     =
> > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > path
> >        String version         = "3.1.2";
> >
> >        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> >        bsTableEnv.registerCatalog("myhive", hive);
> >        bsTableEnv.useCatalog("myhive");
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> >                "  id BIGINT ," +
> >                "  order_id STRING," +
> >                "  amount DECIMAL(10, 2)," +
> >                "  create_time TIMESTAMP " +
> >                ") WITH (" +
> >                " 'connector' = 'kafka'," +
> >                " 'topic' = 'order.test'," +
> >                " 'properties.bootstrap.servers' = 'localhost:9092'," +
> >                " 'properties.group.id' = 'testGroup'," +
> >                " 'scan.startup.mode' = 'earliest-offset', " +
> >                " 'format' = 'json'  " +
> >                ")");
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >
> >        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
> >                "  id BIGINT ," +
> >                "  order_id STRING," +
> >                "  amount DECIMAL(10, 2)" +
> >                "  )");
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >        bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > ('connector' = 'print')" +
> >                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > ALL)");
> >
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> SELECT
> > " +
> >                "id, " +
> >                "order_id, " +
> >                "amount " +
> >                "FROM topic_products");
> >
> >        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> >        table1.executeInsert("print_table");
> >    }
> > }
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Hive Streaming Write的问题

Jingsong Li
Hi Dream,

可以详述下你的测试场景吗?

Best,
Jingsong

On Mon, Jul 20, 2020 at 5:40 PM Dream-底限 <[hidden email]> wrote:

> hi、
> 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
>
> 李佳宸 <[hidden email]> 于2020年7月16日周四 下午10:39写道:
>
> > 好的,谢谢~~~
> >
> > JasonLee <[hidden email]> 于2020年7月16日周四 下午8:22写道:
> >
> > > hi
> > > 需要开启checkpoint
> > >
> > >
> > > | |
> > > JasonLee
> > > |
> > > |
> > > 邮箱:[hidden email]
> > > |
> > >
> > > Signature is customized by Netease Mail Master
> > >
> > > 在2020年07月16日 18:03,李佳宸 写道:
> > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > > 批量的hive写入,流环境的读取是正常的。
> > >
> > > 附代码,很简短:
> > >
> > > public class KafkaToHiveStreaming {
> > >    public static void main(String[] arg) throws Exception{
> > >        StreamExecutionEnvironment bsEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >        EnvironmentSettings bsSettings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > >        StreamTableEnvironment bsTableEnv =
> > > StreamTableEnvironment.create(bsEnv, bsSettings);
> > >        String name            = "myhive";
> > >        String defaultDatabase = "default";
> > >        String hiveConfDir     =
> > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > > path
> > >        String version         = "3.1.2";
> > >
> > >        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > >        bsTableEnv.registerCatalog("myhive", hive);
> > >        bsTableEnv.useCatalog("myhive");
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> > >                "  id BIGINT ," +
> > >                "  order_id STRING," +
> > >                "  amount DECIMAL(10, 2)," +
> > >                "  create_time TIMESTAMP " +
> > >                ") WITH (" +
> > >                " 'connector' = 'kafka'," +
> > >                " 'topic' = 'order.test'," +
> > >                " 'properties.bootstrap.servers' = 'localhost:9092'," +
> > >                " 'properties.group.id' = 'testGroup'," +
> > >                " 'scan.startup.mode' = 'earliest-offset', " +
> > >                " 'format' = 'json'  " +
> > >                ")");
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >
> > >        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming
> (" +
> > >                "  id BIGINT ," +
> > >                "  order_id STRING," +
> > >                "  amount DECIMAL(10, 2)" +
> > >                "  )");
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >        bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > > ('connector' = 'print')" +
> > >                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > > ALL)");
> > >
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> > SELECT
> > > " +
> > >                "id, " +
> > >                "order_id, " +
> > >                "amount " +
> > >                "FROM topic_products");
> > >
> > >        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> > >        table1.executeInsert("print_table");
> > >    }
> > > }
> > >
> >
>


--
Best, Jingsong Lee