你好! 在创建视图后,通过StreamTableEnvironment.from("")的方法获取视图表对象,这时表对象的名称为临时名称,和我的预期不一致,使得我的业务场景错误,是否可以在后续版本处理该问题?
我的示例程序:package kafka; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Test3 { public static void main(String[] args) { // 环境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // 创建输入表 String inTablePath = "CREATE TABLE datagen ( " + " id INT, " + " total string, " + " ts AS localtimestamp, " + " WATERMARK FOR ts AS ts " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='5', " + " 'fields.id.min'='1', " + " 'fields.id.max'='10', " + " 'fields.total.length'='10' " + ")"; // 加载输入表 bsTableEnv.executeSql(inTablePath); Table table = bsTableEnv.sqlQuery("select id, total, 12 as col_1 from datagen"); bsTableEnv.createTemporaryView("table1", table); Table table1 = bsTableEnv.from("table1"); System.out.println(table1); // 上面的打印,我预期的表名应该为table1,但是为一个临时表名 Table queryT = bsTableEnv.sqlQuery("select table1.id, 1 as b from table1"); System.out.println(queryT.getSchema()); bsTableEnv.sqlQuery("select table1.id from " + bsTableEnv.from("table1")); } } |
> Table table1 = bsTableEnv.from("table1"); > System.out.println(table1); > // 上面的打印,我预期的表名应该为table1,但是为一个临时表名 Table 对象里的 tableName 没有太大的意义,看了下代码从来没有被赋值过。 我理解 Table 都是从QueryOperation 转化来的,而queryOperation对应的是一个 query (select id, total, 12 as col_1 from datagen),本身是没有表名的。createTemporaryView("table1", table); 只是 将 table所对应 queryOperation 对应到了 表名的path(table1)下, 所以 table1 对象拿到的还是同一个query operation。 如果想看创建的临时表,可以用bsTableEnv.listTables()查看。 祝好 Leonard Xu |
你好!
下面的程序我理解的是StreamTableEnvironment.from("")是获取我之前创建的表,但是实际的结果还是获取的是临时表,我认为应该获取的是datagen的table对象,应保证包含表名在内的元数据的一致性; 我的测试程序: package org.apache.flink.playgrounds.spendreport; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.Arrays; public class Test2 { public static void main(String[] args) { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String sourceDDL = "CREATE TABLE datagen ( " + " f_random INT," + " f_random_str STRING " + " ) WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='10'," + " 'fields.f_random.min'='1', " + " 'fields.f_random.max'='10', " + " 'fields.f_random_str.length'='10' " + " ) "; bsTableEnv.executeSql(sourceDDL); for (int i = 0; i < 10; i++) { Table datagen = bsTableEnv.from("datagen"); System.out.println(datagen); } System.out.println("-----------------" + Arrays.toString(bsTableEnv.listTables())); } } 控制台: UnnamedTable$0 UnnamedTable$1 UnnamedTable$2 UnnamedTable$3 UnnamedTable$4 UnnamedTable$5 UnnamedTable$6 UnnamedTable$7 UnnamedTable$8 UnnamedTable$9 -----------------[UnnamedTable$0, UnnamedTable$1, UnnamedTable$2, UnnamedTable$3, UnnamedTable$4, UnnamedTable$5, UnnamedTable$6, UnnamedTable$7, UnnamedTable$8, UnnamedTable$9, datagen] |
Free forum by Nabble | Edit this page |