Fwd: Flink SQL窗口计算结果无法sink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Flink SQL窗口计算结果无法sink

王超
Hello,

我遇到了类似https://www.mail-archive.com/[hidden email]/msg03916.html
中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决,请教各位大神帮忙看一下。

public static void main (String[] args) throws Exception {
        // set up the streaming execution environment
ClientConfig clientConfig =
ClientConfig.builder().controllerURI(URI.create("tcp://192.168.188.130:9090")).build();
        StreamManager streamManager = StreamManager.create(clientConfig);
        streamManager.createStream("Demo",
"result",StreamConfiguration.builder().build());
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        EnvironmentSettings envSetting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
       StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env, envSetting);
        tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
       String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT,
datatime BIGINT, list ARRAY <ROW(id STRING, v FLOAT, q INTEGER)>, ts
AS TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts -
INTERVAL '5' SECOND)" +
                " WITH (" +
                "'connector.type' = 'pravega'," +
                "'connector.version' = '1'," +
                "'connector.connection-config.controller-uri'=
'tcp://192.168.188.130:9090'," +
                "'connector.connection-config.default-scope' = 'Demo'," +
                "'connector.reader.stream-info.0.stream' = 'test'," +
                "'format.type' = 'json'," +
                "'format.fail-on-missing-field' = 'false', " +
                "'update-mode' = 'append')";
        tableEnv.sqlUpdate(sqlDdlAnaTable);        String
sqlDdlSinkTable = "CREATE TABLE tb_sink" +
                "(id STRING, " +
                "wStart TIMESTAMP(3) , " +
                "v FLOAT)" +
                " WITH (" +
                "'connector.type' = 'pravega'," +
                "'connector.version' = '1'," +
                "'connector.connection-config.controller-uri'=
'tcp://192.168.188.130:9090'," +
                "'connector.connection-config.default-scope' = 'Demo'," +
                "'connector.writer.stream' = 'result'," +
                "'connector.writer.routingkey-field-name' = 'id'," +
                "'connector.writer.mode' = 'atleast_once'," +
                "'format.type' = 'json'," +
                "'update-mode' = 'append')";
        tableEnv.sqlUpdate(sqlDdlSinkTable);        String sqlJson =
"SELECT ts, type, l.id AS id, l.v AS v, l.q AS q " +
                "FROM ana_Source " +
                "CROSS JOIN UNNEST(list) as l (id,v,q)";
        Table tableJsonRecord = tableEnv.sqlQuery(sqlJson);
        tableEnv.registerTable("tb_JsonRecord", tableJsonRecord);
        System.out.println("------------------print {}
schema------------------" + "tb_JsonRecord");
        tableJsonRecord.printSchema();
        //tableEnv.toAppendStream(tableRecord, Row.class).print();
   String sqlAna = "SELECT ts, id, v " +
                "FROM tb_JsonRecord " +
                "WHERE q=1 AND type=1";
        Table tableAnaRecord = tableEnv.sqlQuery(sqlAna);
        tableEnv.registerTable("tb_AnaRecord", tableAnaRecord);
        System.out.println("------------------print {}
schema------------------" + "tb_AnaRecord");
        tableAnaRecord.printSchema();
        //tableEnv.toAppendStream(tableAnaRecord, Row.class).print();
      String sqlAnaAvg = "SELECT id, " +
                "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, "  +
                "AVG(v) FROM tb_AnaRecord " +
                "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";
        Table tableAvgRecord = tableEnv.sqlQuery(sqlAnaAvg);
        tableEnv.registerTable("tb_AvgRecord", tableAvgRecord);
        System.out.println("------------------print {}
schema------------------" + "tb_AvgRecord");
        tableAvgRecord.printSchema();
        tableEnv.toAppendStream(tableAvgRecord, Row.class).print();/*
        String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " +
                "SELECT id, " +
                "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, "  +
                "AVG(v) FROM tb_AnaRecord " +
                "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";
 tableEnv.sqlUpdate(sqlAnaAvg);*/        tableEnv.execute("Streaming
Job");
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL窗口计算结果无法sink

Benchao Li-2
Hi,

我试了一下,TO_TIMESTAMP(FROM_UNIXTIME())这种方式不会有时区问题呀,
你可以说下你具体遇到的是什么问题么?比如怎么观察到的,以及问题的表现是什么。

王超 <[hidden email]> 于2020年6月15日周一 下午3:31写道:

> Hello,
>
> 我遇到了类似https://www.mail-archive.com/[hidden email]/msg03916.html
> 中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决,请教各位大神帮忙看一下。
>
> public static void main (String[] args) throws Exception {
>         // set up the streaming execution environment
> ClientConfig clientConfig =
> ClientConfig.builder().controllerURI(URI.create("tcp://
> 192.168.188.130:9090")).build();
>         StreamManager streamManager = StreamManager.create(clientConfig);
>         streamManager.createStream("Demo",
> "result",StreamConfiguration.builder().build());
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         env.setParallelism(1);
>         EnvironmentSettings envSetting =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>        StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env, envSetting);
>         tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
>        String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT,
> datatime BIGINT, list ARRAY <ROW(id STRING, v FLOAT, q INTEGER)>, ts
> AS TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts -
> INTERVAL '5' SECOND)" +
>                 " WITH (" +
>                 "'connector.type' = 'pravega'," +
>                 "'connector.version' = '1'," +
>                 "'connector.connection-config.controller-uri'=
> 'tcp://192.168.188.130:9090'," +
>                 "'connector.connection-config.default-scope' = 'Demo'," +
>                 "'connector.reader.stream-info.0.stream' = 'test'," +
>                 "'format.type' = 'json'," +
>                 "'format.fail-on-missing-field' = 'false', " +
>                 "'update-mode' = 'append')";
>         tableEnv.sqlUpdate(sqlDdlAnaTable);        String
> sqlDdlSinkTable = "CREATE TABLE tb_sink" +
>                 "(id STRING, " +
>                 "wStart TIMESTAMP(3) , " +
>                 "v FLOAT)" +
>                 " WITH (" +
>                 "'connector.type' = 'pravega'," +
>                 "'connector.version' = '1'," +
>                 "'connector.connection-config.controller-uri'=
> 'tcp://192.168.188.130:9090'," +
>                 "'connector.connection-config.default-scope' = 'Demo'," +
>                 "'connector.writer.stream' = 'result'," +
>                 "'connector.writer.routingkey-field-name' = 'id'," +
>                 "'connector.writer.mode' = 'atleast_once'," +
>                 "'format.type' = 'json'," +
>                 "'update-mode' = 'append')";
>         tableEnv.sqlUpdate(sqlDdlSinkTable);        String sqlJson =
> "SELECT ts, type, l.id AS id, l.v AS v, l.q AS q " +
>                 "FROM ana_Source " +
>                 "CROSS JOIN UNNEST(list) as l (id,v,q)";
>         Table tableJsonRecord = tableEnv.sqlQuery(sqlJson);
>         tableEnv.registerTable("tb_JsonRecord", tableJsonRecord);
>         System.out.println("------------------print {}
> schema------------------" + "tb_JsonRecord");
>         tableJsonRecord.printSchema();
>         //tableEnv.toAppendStream(tableRecord, Row.class).print();
>    String sqlAna = "SELECT ts, id, v " +
>                 "FROM tb_JsonRecord " +
>                 "WHERE q=1 AND type=1";
>         Table tableAnaRecord = tableEnv.sqlQuery(sqlAna);
>         tableEnv.registerTable("tb_AnaRecord", tableAnaRecord);
>         System.out.println("------------------print {}
> schema------------------" + "tb_AnaRecord");
>         tableAnaRecord.printSchema();
>         //tableEnv.toAppendStream(tableAnaRecord, Row.class).print();
>       String sqlAnaAvg = "SELECT id, " +
>                 "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, "  +
>                 "AVG(v) FROM tb_AnaRecord " +
>                 "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";
>         Table tableAvgRecord = tableEnv.sqlQuery(sqlAnaAvg);
>         tableEnv.registerTable("tb_AvgRecord", tableAvgRecord);
>         System.out.println("------------------print {}
> schema------------------" + "tb_AvgRecord");
>         tableAvgRecord.printSchema();
>         tableEnv.toAppendStream(tableAvgRecord, Row.class).print();/*
>         String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " +
>                 "SELECT id, " +
>                 "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, "  +
>                 "AVG(v) FROM tb_AnaRecord " +
>                 "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";
>  tableEnv.sqlUpdate(sqlAnaAvg);*/        tableEnv.execute("Streaming
> Job");
>