flink1.11 run

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.11 run

Dream-底限
hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:

异常:
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: No operators defined in streaming topology. Cannot
generate StreamGraph.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.IllegalStateException: No operators defined in
streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at
com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
代码:

 StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(environment, settings);

        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        environment.setStateBackend(new MemoryStateBackend());
        environment.getCheckpointConfig().setCheckpointInterval(5000);

        String name = "myhive";
        String defaultDatabase = "tmp";
        String hiveConfDir = "/etc/alternatives/hive-conf/";
        String version = "1.1.0";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
        tableEnv.registerCatalog("myhive", hive);
        tableEnv.useCatalog("myhive");

        tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
                "  user_id BIGINT,\n" +
                "  item_id STRING,\n" +
                "  behavior STRING,\n" +
                "  ts AS PROCTIME()\n" +
                ") WITH (\n" +
                " 'connector' = 'kafka-0.11',\n" +
                " 'topic' = 'user_behavior',\n" +
                " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
                " 'properties.group.id' = 'testGroup',\n" +
                " 'scan.startup.mode' = 'earliest-offset',\n" +
                " 'format' = 'json',\n" +
                " 'json.fail-on-missing-field' = 'false',\n" +
                " 'json.ignore-parse-errors' = 'true'\n" +
                ")");

//        tableEnv.executeSql("CREATE TABLE print_table (\n" +
//                " user_id BIGINT,\n" +
//                " item_id STRING,\n" +
//                " behavior STRING,\n" +
//                " tsdata STRING\n" +
//                ") WITH (\n" +
//                " 'connector' = 'print'\n" +
//                ")");
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
                " user_id BIGINT,\n" +
                " item_id STRING,\n" +
                " behavior STRING,\n" +
                " tsdata STRING\n" +
                ") STORED AS parquet TBLPROPERTIES (\n" +
                " 'sink.rolling-policy.file-size' = '12MB',\n" +
                " 'sink.rolling-policy.rollover-interval' = '1 min',\n" +
                " 'sink.rolling-policy.check-interval' = '1 min',\n" +
                " 'execution.checkpointing.interval' = 'true'\n" +
                ")");

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tableEnv.executeSql("insert into streamhivetest select
user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from
user_behavior");

        tableEnv.execute("stream-write-hive");
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 run

Rui Li
tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈

On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> wrote:

> hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
>
> 异常:
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: No operators defined in streaming topology. Cannot
> generate StreamGraph.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.lang.IllegalStateException: No operators defined in
> streaming topology. Cannot generate StreamGraph.
> at
>
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> at
>
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> at
>
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more
> 代码:
>
>  StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings settings =
>
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>         StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(environment, settings);
>
>
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         environment.setStateBackend(new MemoryStateBackend());
>         environment.getCheckpointConfig().setCheckpointInterval(5000);
>
>         String name = "myhive";
>         String defaultDatabase = "tmp";
>         String hiveConfDir = "/etc/alternatives/hive-conf/";
>         String version = "1.1.0";
>
>         HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
>         tableEnv.registerCatalog("myhive", hive);
>         tableEnv.useCatalog("myhive");
>
>         tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
>                 "  user_id BIGINT,\n" +
>                 "  item_id STRING,\n" +
>                 "  behavior STRING,\n" +
>                 "  ts AS PROCTIME()\n" +
>                 ") WITH (\n" +
>                 " 'connector' = 'kafka-0.11',\n" +
>                 " 'topic' = 'user_behavior',\n" +
>                 " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
>                 " 'properties.group.id' = 'testGroup',\n" +
>                 " 'scan.startup.mode' = 'earliest-offset',\n" +
>                 " 'format' = 'json',\n" +
>                 " 'json.fail-on-missing-field' = 'false',\n" +
>                 " 'json.ignore-parse-errors' = 'true'\n" +
>                 ")");
>
> //        tableEnv.executeSql("CREATE TABLE print_table (\n" +
> //                " user_id BIGINT,\n" +
> //                " item_id STRING,\n" +
> //                " behavior STRING,\n" +
> //                " tsdata STRING\n" +
> //                ") WITH (\n" +
> //                " 'connector' = 'print'\n" +
> //                ")");
>         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>         tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
>                 " user_id BIGINT,\n" +
>                 " item_id STRING,\n" +
>                 " behavior STRING,\n" +
>                 " tsdata STRING\n" +
>                 ") STORED AS parquet TBLPROPERTIES (\n" +
>                 " 'sink.rolling-policy.file-size' = '12MB',\n" +
>                 " 'sink.rolling-policy.rollover-interval' = '1 min',\n" +
>                 " 'sink.rolling-policy.check-interval' = '1 min',\n" +
>                 " 'execution.checkpointing.interval' = 'true'\n" +
>                 ")");
>
>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>         tableEnv.executeSql("insert into streamhivetest select
> user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from
> user_behavior");
>
>         tableEnv.execute("stream-write-hive");
>


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 run

Dream-底限
 hi
好的,想问一下stream写hive表的时候:
1、一定要在flink内部先建立hive表吗?
2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗

Rui Li <[hidden email]> 于2020年7月20日周一 下午4:44写道:

> tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
>
> On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> wrote:
>
> > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> >
> > 异常:
> > The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: No operators defined in streaming topology. Cannot
> > generate StreamGraph.
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > at
> >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > Caused by: java.lang.IllegalStateException: No operators defined in
> > streaming topology. Cannot generate StreamGraph.
> > at
> >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > at
> >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > at
> >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > ... 11 more
> > 代码:
> >
> >  StreamExecutionEnvironment environment =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         EnvironmentSettings settings =
> >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> >         StreamTableEnvironment tableEnv =
> > StreamTableEnvironment.create(environment, settings);
> >
> >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >         environment.setStateBackend(new MemoryStateBackend());
> >         environment.getCheckpointConfig().setCheckpointInterval(5000);
> >
> >         String name = "myhive";
> >         String defaultDatabase = "tmp";
> >         String hiveConfDir = "/etc/alternatives/hive-conf/";
> >         String version = "1.1.0";
> >
> >         HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> >         tableEnv.registerCatalog("myhive", hive);
> >         tableEnv.useCatalog("myhive");
> >
> >         tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> >                 "  user_id BIGINT,\n" +
> >                 "  item_id STRING,\n" +
> >                 "  behavior STRING,\n" +
> >                 "  ts AS PROCTIME()\n" +
> >                 ") WITH (\n" +
> >                 " 'connector' = 'kafka-0.11',\n" +
> >                 " 'topic' = 'user_behavior',\n" +
> >                 " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> >                 " 'properties.group.id' = 'testGroup',\n" +
> >                 " 'scan.startup.mode' = 'earliest-offset',\n" +
> >                 " 'format' = 'json',\n" +
> >                 " 'json.fail-on-missing-field' = 'false',\n" +
> >                 " 'json.ignore-parse-errors' = 'true'\n" +
> >                 ")");
> >
> > //        tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > //                " user_id BIGINT,\n" +
> > //                " item_id STRING,\n" +
> > //                " behavior STRING,\n" +
> > //                " tsdata STRING\n" +
> > //                ") WITH (\n" +
> > //                " 'connector' = 'print'\n" +
> > //                ")");
> >         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >         tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
> >                 " user_id BIGINT,\n" +
> >                 " item_id STRING,\n" +
> >                 " behavior STRING,\n" +
> >                 " tsdata STRING\n" +
> >                 ") STORED AS parquet TBLPROPERTIES (\n" +
> >                 " 'sink.rolling-policy.file-size' = '12MB',\n" +
> >                 " 'sink.rolling-policy.rollover-interval' = '1 min',\n" +
> >                 " 'sink.rolling-policy.check-interval' = '1 min',\n" +
> >                 " 'execution.checkpointing.interval' = 'true'\n" +
> >                 ")");
> >
> >         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >         tableEnv.executeSql("insert into streamhivetest select
> > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from
> > user_behavior");
> >
> >         tableEnv.execute("stream-write-hive");
> >
>
>
> --
> Best regards!
> Rui Li
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 run

Jingsong Li
Hi Dream,

> 1.一定要在flink内部先建立hive表吗?

不用,哪边建无所谓

> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗

可以,默认下 128MB 滚动,Checkpoint 滚动。

Best,
Jingsong

On Mon, Jul 20, 2020 at 5:15 PM Dream-底限 <[hidden email]> wrote:

>  hi
> 好的,想问一下stream写hive表的时候:
> 1、一定要在flink内部先建立hive表吗?
> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> Rui Li <[hidden email]> 于2020年7月20日周一 下午4:44写道:
>
> > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> >
> > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> wrote:
> >
> > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > >
> > > 异常:
> > > The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > method
> > > caused an error: No operators defined in streaming topology. Cannot
> > > generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > at
> > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > streaming topology. Cannot generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > at
> > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > at
> > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > at
> > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > ... 11 more
> > > 代码:
> > >
> > >  StreamExecutionEnvironment environment =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >         EnvironmentSettings settings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > >         StreamTableEnvironment tableEnv =
> > > StreamTableEnvironment.create(environment, settings);
> > >
> > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > >         environment.setStateBackend(new MemoryStateBackend());
> > >         environment.getCheckpointConfig().setCheckpointInterval(5000);
> > >
> > >         String name = "myhive";
> > >         String defaultDatabase = "tmp";
> > >         String hiveConfDir = "/etc/alternatives/hive-conf/";
> > >         String version = "1.1.0";
> > >
> > >         HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > >         tableEnv.registerCatalog("myhive", hive);
> > >         tableEnv.useCatalog("myhive");
> > >
> > >         tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > >                 "  user_id BIGINT,\n" +
> > >                 "  item_id STRING,\n" +
> > >                 "  behavior STRING,\n" +
> > >                 "  ts AS PROCTIME()\n" +
> > >                 ") WITH (\n" +
> > >                 " 'connector' = 'kafka-0.11',\n" +
> > >                 " 'topic' = 'user_behavior',\n" +
> > >                 " 'properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> > >                 " 'properties.group.id' = 'testGroup',\n" +
> > >                 " 'scan.startup.mode' = 'earliest-offset',\n" +
> > >                 " 'format' = 'json',\n" +
> > >                 " 'json.fail-on-missing-field' = 'false',\n" +
> > >                 " 'json.ignore-parse-errors' = 'true'\n" +
> > >                 ")");
> > >
> > > //        tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > > //                " user_id BIGINT,\n" +
> > > //                " item_id STRING,\n" +
> > > //                " behavior STRING,\n" +
> > > //                " tsdata STRING\n" +
> > > //                ") WITH (\n" +
> > > //                " 'connector' = 'print'\n" +
> > > //                ")");
> > >         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >         tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
> > >                 " user_id BIGINT,\n" +
> > >                 " item_id STRING,\n" +
> > >                 " behavior STRING,\n" +
> > >                 " tsdata STRING\n" +
> > >                 ") STORED AS parquet TBLPROPERTIES (\n" +
> > >                 " 'sink.rolling-policy.file-size' = '12MB',\n" +
> > >                 " 'sink.rolling-policy.rollover-interval' = '1
> min',\n" +
> > >                 " 'sink.rolling-policy.check-interval' = '1 min',\n" +
> > >                 " 'execution.checkpointing.interval' = 'true'\n" +
> > >                 ")");
> > >
> > >         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >         tableEnv.executeSql("insert into streamhivetest select
> > > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from
> > > user_behavior");
> > >
> > >         tableEnv.execute("stream-write-hive");
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 run

Dream-底限
hi、
对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件

》可以,默认下 128MB 滚动,Checkpoint 滚动

Jingsong Li <[hidden email]> 于2020年7月20日周一 下午6:12写道:

> Hi Dream,
>
> > 1.一定要在flink内部先建立hive表吗?
>
> 不用,哪边建无所谓
>
> > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> 可以,默认下 128MB 滚动,Checkpoint 滚动。
>
> Best,
> Jingsong
>
> On Mon, Jul 20, 2020 at 5:15 PM Dream-底限 <[hidden email]> wrote:
>
> >  hi
> > 好的,想问一下stream写hive表的时候:
> > 1、一定要在flink内部先建立hive表吗?
> > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> >
> > Rui Li <[hidden email]> 于2020年7月20日周一 下午4:44写道:
> >
> > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> > >
> > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> wrote:
> > >
> > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > > >
> > > > 异常:
> > > > The program finished with the following exception:
> > > >
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > method
> > > > caused an error: No operators defined in streaming topology. Cannot
> > > > generate StreamGraph.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > > at
> > >
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > > streaming topology. Cannot generate StreamGraph.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > > at
> > > >
> > > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > > ... 11 more
> > > > 代码:
> > > >
> > > >  StreamExecutionEnvironment environment =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > >         EnvironmentSettings settings =
> > > >
> > > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > >         StreamTableEnvironment tableEnv =
> > > > StreamTableEnvironment.create(environment, settings);
> > > >
> > > >
> > > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > >         environment.setStateBackend(new MemoryStateBackend());
> > > >
>  environment.getCheckpointConfig().setCheckpointInterval(5000);
> > > >
> > > >         String name = "myhive";
> > > >         String defaultDatabase = "tmp";
> > > >         String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > >         String version = "1.1.0";
> > > >
> > > >         HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > > hiveConfDir, version);
> > > >         tableEnv.registerCatalog("myhive", hive);
> > > >         tableEnv.useCatalog("myhive");
> > > >
> > > >         tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > > >                 "  user_id BIGINT,\n" +
> > > >                 "  item_id STRING,\n" +
> > > >                 "  behavior STRING,\n" +
> > > >                 "  ts AS PROCTIME()\n" +
> > > >                 ") WITH (\n" +
> > > >                 " 'connector' = 'kafka-0.11',\n" +
> > > >                 " 'topic' = 'user_behavior',\n" +
> > > >                 " 'properties.bootstrap.servers' =
> > 'localhost:9092',\n" +
> > > >                 " 'properties.group.id' = 'testGroup',\n" +
> > > >                 " 'scan.startup.mode' = 'earliest-offset',\n" +
> > > >                 " 'format' = 'json',\n" +
> > > >                 " 'json.fail-on-missing-field' = 'false',\n" +
> > > >                 " 'json.ignore-parse-errors' = 'true'\n" +
> > > >                 ")");
> > > >
> > > > //        tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > > > //                " user_id BIGINT,\n" +
> > > > //                " item_id STRING,\n" +
> > > > //                " behavior STRING,\n" +
> > > > //                " tsdata STRING\n" +
> > > > //                ") WITH (\n" +
> > > > //                " 'connector' = 'print'\n" +
> > > > //                ")");
> > > >         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > > >         tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
> > > >                 " user_id BIGINT,\n" +
> > > >                 " item_id STRING,\n" +
> > > >                 " behavior STRING,\n" +
> > > >                 " tsdata STRING\n" +
> > > >                 ") STORED AS parquet TBLPROPERTIES (\n" +
> > > >                 " 'sink.rolling-policy.file-size' = '12MB',\n" +
> > > >                 " 'sink.rolling-policy.rollover-interval' = '1
> > min',\n" +
> > > >                 " 'sink.rolling-policy.check-interval' = '1 min',\n"
> +
> > > >                 " 'execution.checkpointing.interval' = 'true'\n" +
> > > >                 ")");
> > > >
> > > >         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > > >         tableEnv.executeSql("insert into streamhivetest select
> > > > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from
> > > > user_behavior");
> > > >
> > > >         tableEnv.execute("stream-write-hive");
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 run

Jingsong Li
是的。

但是不管怎么滚动,最终都是checkpoint完成后文件才可见

On Mon, Jul 20, 2020 at 7:10 PM Dream-底限 <[hidden email]> wrote:

> hi、
> 对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件
>
> 》可以,默认下 128MB 滚动,Checkpoint 滚动
>
> Jingsong Li <[hidden email]> 于2020年7月20日周一 下午6:12写道:
>
> > Hi Dream,
> >
> > > 1.一定要在flink内部先建立hive表吗?
> >
> > 不用,哪边建无所谓
> >
> > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> >
> > 可以,默认下 128MB 滚动,Checkpoint 滚动。
> >
> > Best,
> > Jingsong
> >
> > On Mon, Jul 20, 2020 at 5:15 PM Dream-底限 <[hidden email]> wrote:
> >
> > >  hi
> > > 好的,想问一下stream写hive表的时候:
> > > 1、一定要在flink内部先建立hive表吗?
> > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> > >
> > > Rui Li <[hidden email]> 于2020年7月20日周一 下午4:44写道:
> > >
> > > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> > > >
> > > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]>
> wrote:
> > > >
> > > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > > > >
> > > > > 异常:
> > > > > The program finished with the following exception:
> > > > >
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > method
> > > > > caused an error: No operators defined in streaming topology. Cannot
> > > > > generate StreamGraph.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > > > at
> > > >
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > > > at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > > > at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > > > streaming topology. Cannot generate StreamGraph.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > > > ... 11 more
> > > > > 代码:
> > > > >
> > > > >  StreamExecutionEnvironment environment =
> > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > >         EnvironmentSettings settings =
> > > > >
> > > > >
> > > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > > >         StreamTableEnvironment tableEnv =
> > > > > StreamTableEnvironment.create(environment, settings);
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > > >         environment.setStateBackend(new MemoryStateBackend());
> > > > >
> >  environment.getCheckpointConfig().setCheckpointInterval(5000);
> > > > >
> > > > >         String name = "myhive";
> > > > >         String defaultDatabase = "tmp";
> > > > >         String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > > >         String version = "1.1.0";
> > > > >
> > > > >         HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > > > hiveConfDir, version);
> > > > >         tableEnv.registerCatalog("myhive", hive);
> > > > >         tableEnv.useCatalog("myhive");
> > > > >
> > > > >         tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > > > >                 "  user_id BIGINT,\n" +
> > > > >                 "  item_id STRING,\n" +
> > > > >                 "  behavior STRING,\n" +
> > > > >                 "  ts AS PROCTIME()\n" +
> > > > >                 ") WITH (\n" +
> > > > >                 " 'connector' = 'kafka-0.11',\n" +
> > > > >                 " 'topic' = 'user_behavior',\n" +
> > > > >                 " 'properties.bootstrap.servers' =
> > > 'localhost:9092',\n" +
> > > > >                 " 'properties.group.id' = 'testGroup',\n" +
> > > > >                 " 'scan.startup.mode' = 'earliest-offset',\n" +
> > > > >                 " 'format' = 'json',\n" +
> > > > >                 " 'json.fail-on-missing-field' = 'false',\n" +
> > > > >                 " 'json.ignore-parse-errors' = 'true'\n" +
> > > > >                 ")");
> > > > >
> > > > > //        tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > > > > //                " user_id BIGINT,\n" +
> > > > > //                " item_id STRING,\n" +
> > > > > //                " behavior STRING,\n" +
> > > > > //                " tsdata STRING\n" +
> > > > > //                ") WITH (\n" +
> > > > > //                " 'connector' = 'print'\n" +
> > > > > //                ")");
> > > > >         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > > > >         tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
> > > > >                 " user_id BIGINT,\n" +
> > > > >                 " item_id STRING,\n" +
> > > > >                 " behavior STRING,\n" +
> > > > >                 " tsdata STRING\n" +
> > > > >                 ") STORED AS parquet TBLPROPERTIES (\n" +
> > > > >                 " 'sink.rolling-policy.file-size' = '12MB',\n" +
> > > > >                 " 'sink.rolling-policy.rollover-interval' = '1
> > > min',\n" +
> > > > >                 " 'sink.rolling-policy.check-interval' = '1
> min',\n"
> > +
> > > > >                 " 'execution.checkpointing.interval' = 'true'\n" +
> > > > >                 ")");
> > > > >
> > > > >         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > > > >         tableEnv.executeSql("insert into streamhivetest select
> > > > > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata
> from
> > > > > user_behavior");
> > > > >
> > > > >         tableEnv.execute("stream-write-hive");
> > > > >
> > > >
> > > >
> > > > --
> > > > Best regards!
> > > > Rui Li
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 run

Rui Li
In reply to this post by Dream-底限
可以写已有的表,相关的配置 [1] 需要添加到表的property当中。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing

On Mon, Jul 20, 2020 at 5:14 PM Dream-底限 <[hidden email]> wrote:

>  hi
> 好的,想问一下stream写hive表的时候:
> 1、一定要在flink内部先建立hive表吗?
> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> Rui Li <[hidden email]> 于2020年7月20日周一 下午4:44写道:
>
> > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> >
> > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 <[hidden email]> wrote:
> >
> > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > >
> > > 异常:
> > > The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > method
> > > caused an error: No operators defined in streaming topology. Cannot
> > > generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > at
> > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > streaming topology. Cannot generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > at
> > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > at
> > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > at
> > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > ... 11 more
> > > 代码:
> > >
> > >  StreamExecutionEnvironment environment =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >         EnvironmentSettings settings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > >         StreamTableEnvironment tableEnv =
> > > StreamTableEnvironment.create(environment, settings);
> > >
> > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > >         environment.setStateBackend(new MemoryStateBackend());
> > >         environment.getCheckpointConfig().setCheckpointInterval(5000);
> > >
> > >         String name = "myhive";
> > >         String defaultDatabase = "tmp";
> > >         String hiveConfDir = "/etc/alternatives/hive-conf/";
> > >         String version = "1.1.0";
> > >
> > >         HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > >         tableEnv.registerCatalog("myhive", hive);
> > >         tableEnv.useCatalog("myhive");
> > >
> > >         tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > >                 "  user_id BIGINT,\n" +
> > >                 "  item_id STRING,\n" +
> > >                 "  behavior STRING,\n" +
> > >                 "  ts AS PROCTIME()\n" +
> > >                 ") WITH (\n" +
> > >                 " 'connector' = 'kafka-0.11',\n" +
> > >                 " 'topic' = 'user_behavior',\n" +
> > >                 " 'properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> > >                 " 'properties.group.id' = 'testGroup',\n" +
> > >                 " 'scan.startup.mode' = 'earliest-offset',\n" +
> > >                 " 'format' = 'json',\n" +
> > >                 " 'json.fail-on-missing-field' = 'false',\n" +
> > >                 " 'json.ignore-parse-errors' = 'true'\n" +
> > >                 ")");
> > >
> > > //        tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > > //                " user_id BIGINT,\n" +
> > > //                " item_id STRING,\n" +
> > > //                " behavior STRING,\n" +
> > > //                " tsdata STRING\n" +
> > > //                ") WITH (\n" +
> > > //                " 'connector' = 'print'\n" +
> > > //                ")");
> > >         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >         tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
> > >                 " user_id BIGINT,\n" +
> > >                 " item_id STRING,\n" +
> > >                 " behavior STRING,\n" +
> > >                 " tsdata STRING\n" +
> > >                 ") STORED AS parquet TBLPROPERTIES (\n" +
> > >                 " 'sink.rolling-policy.file-size' = '12MB',\n" +
> > >                 " 'sink.rolling-policy.rollover-interval' = '1
> min',\n" +
> > >                 " 'sink.rolling-policy.check-interval' = '1 min',\n" +
> > >                 " 'execution.checkpointing.interval' = 'true'\n" +
> > >                 ")");
> > >
> > >         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >         tableEnv.executeSql("insert into streamhivetest select
> > > user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from
> > > user_behavior");
> > >
> > >         tableEnv.execute("stream-write-hive");
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


--
Best regards!
Rui Li