1.11版本,创建视图后,根据表名称获取视图表对象,表名为临时名称的问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

1.11版本,创建视图后,根据表名称获取视图表对象,表名为临时名称的问题

Asahi Lee
你好!    在创建视图后,通过StreamTableEnvironment.from("")的方法获取视图表对象,这时表对象的名称为临时名称,和我的预期不一致,使得我的业务场景错误,是否可以在后续版本处理该问题?
我的示例程序:package kafka;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test3 {

    public static void main(String[] args) {
        // 环境
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

        // 创建输入表
        String inTablePath = "CREATE TABLE datagen (  " +
                " id INT,  " +
                " total string,  " +
                " ts AS localtimestamp,  " +
                " WATERMARK FOR ts AS ts  " +
                ") WITH (  " +
                " 'connector' = 'datagen',  " +
                " 'rows-per-second'='5',  " +
                " 'fields.id.min'='1',  " +
                " 'fields.id.max'='10',  " +
                " 'fields.total.length'='10'  " +
                ")";
        // 加载输入表
        bsTableEnv.executeSql(inTablePath);

        Table table = bsTableEnv.sqlQuery("select id, total, 12 as col_1 from datagen");
        bsTableEnv.createTemporaryView("table1", table);

        Table table1 = bsTableEnv.from("table1");
        System.out.println(table1);
        // 上面的打印,我预期的表名应该为table1,但是为一个临时表名

        Table queryT = bsTableEnv.sqlQuery("select table1.id, 1 as b from table1");
        System.out.println(queryT.getSchema());

       
        bsTableEnv.sqlQuery("select table1.id from " + bsTableEnv.from("table1"));

    }

}
Reply | Threaded
Open this post in threaded view
|

Re: 1.11版本,创建视图后,根据表名称获取视图表对象,表名为临时名称的问题

Leonard Xu


>        Table table1 = bsTableEnv.from("table1");
>        System.out.println(table1);
>        // 上面的打印,我预期的表名应该为table1,但是为一个临时表名
Table 对象里的 tableName 没有太大的意义,看了下代码从来没有被赋值过。

我理解 Table 都是从QueryOperation 转化来的,而queryOperation对应的是一个 query (select id, total, 12 as col_1 from datagen),本身是没有表名的。createTemporaryView("table1", table); 只是 将 table所对应 queryOperation 对应到了 表名的path(table1)下,  所以 table1 对象拿到的还是同一个query operation。

如果想看创建的临时表,可以用bsTableEnv.listTables()查看。

祝好
Leonard Xu
Reply | Threaded
Open this post in threaded view
|

回复: 1.11版本,创建视图后,根据表名称获取视图表对象,表名为临时名称的问题

Asahi Lee
你好!
      下面的程序我理解的是StreamTableEnvironment.from("")是获取我之前创建的表,但是实际的结果还是获取的是临时表,我认为应该获取的是datagen的table对象,应保证包含表名在内的元数据的一致性;


我的测试程序:
package org.apache.flink.playgrounds.spendreport;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


import java.util.Arrays;


public class Test2 {


    public static void main(String[] args) {
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);


        String sourceDDL = "CREATE TABLE datagen ( " +
                " f_random INT," +
                " f_random_str STRING " +
                " ) WITH ( " +
                " 'connector' = 'datagen', " +
                " 'rows-per-second'='10'," +
                " 'fields.f_random.min'='1', " +
                " 'fields.f_random.max'='10', " +
                " 'fields.f_random_str.length'='10' " +
                " ) ";
        bsTableEnv.executeSql(sourceDDL);
&nbsp; &nbsp; &nbsp; &nbsp; for (int i = 0; i < 10; i++) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Table datagen = bsTableEnv.from("datagen");
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(datagen);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("-----------------" + Arrays.toString(bsTableEnv.listTables()));


&nbsp; &nbsp; }


}


控制台:
UnnamedTable$0
UnnamedTable$1
UnnamedTable$2
UnnamedTable$3
UnnamedTable$4
UnnamedTable$5
UnnamedTable$6
UnnamedTable$7
UnnamedTable$8
UnnamedTable$9
-----------------[UnnamedTable$0, UnnamedTable$1, UnnamedTable$2, UnnamedTable$3, UnnamedTable$4, UnnamedTable$5, UnnamedTable$6, UnnamedTable$7, UnnamedTable$8, UnnamedTable$9, datagen]