Hello,
我遇到了类似https://www.mail-archive.com/[hidden email]/msg03916.html 中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决,请教各位大神帮忙看一下。 public static void main (String[] args) throws Exception { // set up the streaming execution environment ClientConfig clientConfig = ClientConfig.builder().controllerURI(URI.create("tcp://192.168.188.130:9090")).build(); StreamManager streamManager = StreamManager.create(clientConfig); streamManager.createStream("Demo", "result",StreamConfiguration.builder().build()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); EnvironmentSettings envSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting); tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT, datatime BIGINT, list ARRAY <ROW(id STRING, v FLOAT, q INTEGER)>, ts AS TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND)" + " WITH (" + "'connector.type' = 'pravega'," + "'connector.version' = '1'," + "'connector.connection-config.controller-uri'= 'tcp://192.168.188.130:9090'," + "'connector.connection-config.default-scope' = 'Demo'," + "'connector.reader.stream-info.0.stream' = 'test'," + "'format.type' = 'json'," + "'format.fail-on-missing-field' = 'false', " + "'update-mode' = 'append')"; tableEnv.sqlUpdate(sqlDdlAnaTable); String sqlDdlSinkTable = "CREATE TABLE tb_sink" + "(id STRING, " + "wStart TIMESTAMP(3) , " + "v FLOAT)" + " WITH (" + "'connector.type' = 'pravega'," + "'connector.version' = '1'," + "'connector.connection-config.controller-uri'= 'tcp://192.168.188.130:9090'," + "'connector.connection-config.default-scope' = 'Demo'," + "'connector.writer.stream' = 'result'," + "'connector.writer.routingkey-field-name' = 'id'," + "'connector.writer.mode' = 'atleast_once'," + "'format.type' = 'json'," + "'update-mode' = 'append')"; tableEnv.sqlUpdate(sqlDdlSinkTable); String sqlJson = "SELECT ts, type, l.id AS id, l.v AS v, l.q AS q " + "FROM ana_Source " + "CROSS JOIN UNNEST(list) as l (id,v,q)"; Table tableJsonRecord = tableEnv.sqlQuery(sqlJson); tableEnv.registerTable("tb_JsonRecord", tableJsonRecord); System.out.println("------------------print {} schema------------------" + "tb_JsonRecord"); tableJsonRecord.printSchema(); //tableEnv.toAppendStream(tableRecord, Row.class).print(); String sqlAna = "SELECT ts, id, v " + "FROM tb_JsonRecord " + "WHERE q=1 AND type=1"; Table tableAnaRecord = tableEnv.sqlQuery(sqlAna); tableEnv.registerTable("tb_AnaRecord", tableAnaRecord); System.out.println("------------------print {} schema------------------" + "tb_AnaRecord"); tableAnaRecord.printSchema(); //tableEnv.toAppendStream(tableAnaRecord, Row.class).print(); String sqlAnaAvg = "SELECT id, " + "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, " + "AVG(v) FROM tb_AnaRecord " + "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id"; Table tableAvgRecord = tableEnv.sqlQuery(sqlAnaAvg); tableEnv.registerTable("tb_AvgRecord", tableAvgRecord); System.out.println("------------------print {} schema------------------" + "tb_AvgRecord"); tableAvgRecord.printSchema(); tableEnv.toAppendStream(tableAvgRecord, Row.class).print();/* String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " + "SELECT id, " + "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, " + "AVG(v) FROM tb_AnaRecord " + "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id"; tableEnv.sqlUpdate(sqlAnaAvg);*/ tableEnv.execute("Streaming Job"); |
Hi,
我试了一下,TO_TIMESTAMP(FROM_UNIXTIME())这种方式不会有时区问题呀, 你可以说下你具体遇到的是什么问题么?比如怎么观察到的,以及问题的表现是什么。 王超 <[hidden email]> 于2020年6月15日周一 下午3:31写道: > Hello, > > 我遇到了类似https://www.mail-archive.com/[hidden email]/msg03916.html > 中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决,请教各位大神帮忙看一下。 > > public static void main (String[] args) throws Exception { > // set up the streaming execution environment > ClientConfig clientConfig = > ClientConfig.builder().controllerURI(URI.create("tcp:// > 192.168.188.130:9090")).build(); > StreamManager streamManager = StreamManager.create(clientConfig); > streamManager.createStream("Demo", > "result",StreamConfiguration.builder().build()); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.setParallelism(1); > EnvironmentSettings envSetting = > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env, envSetting); > tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); > String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT, > datatime BIGINT, list ARRAY <ROW(id STRING, v FLOAT, q INTEGER)>, ts > AS TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts - > INTERVAL '5' SECOND)" + > " WITH (" + > "'connector.type' = 'pravega'," + > "'connector.version' = '1'," + > "'connector.connection-config.controller-uri'= > 'tcp://192.168.188.130:9090'," + > "'connector.connection-config.default-scope' = 'Demo'," + > "'connector.reader.stream-info.0.stream' = 'test'," + > "'format.type' = 'json'," + > "'format.fail-on-missing-field' = 'false', " + > "'update-mode' = 'append')"; > tableEnv.sqlUpdate(sqlDdlAnaTable); String > sqlDdlSinkTable = "CREATE TABLE tb_sink" + > "(id STRING, " + > "wStart TIMESTAMP(3) , " + > "v FLOAT)" + > " WITH (" + > "'connector.type' = 'pravega'," + > "'connector.version' = '1'," + > "'connector.connection-config.controller-uri'= > 'tcp://192.168.188.130:9090'," + > "'connector.connection-config.default-scope' = 'Demo'," + > "'connector.writer.stream' = 'result'," + > "'connector.writer.routingkey-field-name' = 'id'," + > "'connector.writer.mode' = 'atleast_once'," + > "'format.type' = 'json'," + > "'update-mode' = 'append')"; > tableEnv.sqlUpdate(sqlDdlSinkTable); String sqlJson = > "SELECT ts, type, l.id AS id, l.v AS v, l.q AS q " + > "FROM ana_Source " + > "CROSS JOIN UNNEST(list) as l (id,v,q)"; > Table tableJsonRecord = tableEnv.sqlQuery(sqlJson); > tableEnv.registerTable("tb_JsonRecord", tableJsonRecord); > System.out.println("------------------print {} > schema------------------" + "tb_JsonRecord"); > tableJsonRecord.printSchema(); > //tableEnv.toAppendStream(tableRecord, Row.class).print(); > String sqlAna = "SELECT ts, id, v " + > "FROM tb_JsonRecord " + > "WHERE q=1 AND type=1"; > Table tableAnaRecord = tableEnv.sqlQuery(sqlAna); > tableEnv.registerTable("tb_AnaRecord", tableAnaRecord); > System.out.println("------------------print {} > schema------------------" + "tb_AnaRecord"); > tableAnaRecord.printSchema(); > //tableEnv.toAppendStream(tableAnaRecord, Row.class).print(); > String sqlAnaAvg = "SELECT id, " + > "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, " + > "AVG(v) FROM tb_AnaRecord " + > "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id"; > Table tableAvgRecord = tableEnv.sqlQuery(sqlAnaAvg); > tableEnv.registerTable("tb_AvgRecord", tableAvgRecord); > System.out.println("------------------print {} > schema------------------" + "tb_AvgRecord"); > tableAvgRecord.printSchema(); > tableEnv.toAppendStream(tableAvgRecord, Row.class).print();/* > String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " + > "SELECT id, " + > "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, " + > "AVG(v) FROM tb_AnaRecord " + > "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id"; > tableEnv.sqlUpdate(sqlAnaAvg);*/ tableEnv.execute("Streaming > Job"); > |
Free forum by Nabble | Edit this page |