大家好:
我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, 那么在eval方法接收到的就是Row[], 问题出在,Row[]中的数据获取不到,里面的元素都是NULL 通过下面的步骤和代码可还原车祸场景: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } 代码1:Problem.java package com.flink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode2", new ExplodeFunction()); String ddlSource = "CREATE TABLE actionTable (\n" + " action ARRAY<\n" + " ROW<" + " actionID STRING,\n" + " actionName STRING\n" + " >\n" + " >\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json'\n" + ")"; bsEnv.sqlUpdate(ddlSource); // Table table = bsEnv.sqlQuery("select `action` from actionTable"); Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL TABLE(explode2(`action`)) as T(`word`)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print("==tb=="); bsEnv.execute("ARRAY tableFunction Problem"); } } 代码2:ExplodeFunction.java package com.flink; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import java.util.ArrayList; import java.util.Arrays; public class ExplodeFunction extends TableFunction<Row> { public void eval(Row[] values) { System.out.println(values.length); if (values.length > 0) { for (Row row : values) { if (row != null) {// 这里debug出来的row总是空 ArrayList<Object> list = new ArrayList<>(); for (int i = 0; i < row.getArity(); i++) { Object field = row.getField(i); list.add(field); } collector.collect(Row.of(Arrays.toString(list.toArray()))); } } } } } 最后贴个debug的图 |
Administrator
|
Hi,
当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。 https://issues.apache.org/jira/browse/FLINK-17855 Best, Jark On Mon, 6 Jul 2020 at 10:19, Jim Chen <[hidden email]> wrote: > 大家好: > 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 > 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > 那么在eval方法接收到的就是Row[], > 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > > 通过下面的步骤和代码可还原车祸场景: > kafka topic: test_action > kafka message: > {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > "id002", "actionName": "bbb"} ] } > > 代码1:Problem.java > package com.flink; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > > /** > * > * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > * 那么在eval方法接收到的就是Row[], > * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > * > * kafka topic: test_action > * > * kafka message: > * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > "id002", "actionName": "bbb"} ] } > */ > public class Problem { > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, > envSettings); > bsEnv.registerFunction("explode2", new ExplodeFunction()); > > String ddlSource = "CREATE TABLE actionTable (\n" + > " action ARRAY<\n" + > " ROW<" + > " actionID STRING,\n" + > " actionName STRING\n" + > " >\n" + > " >\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka',\n" + > " 'connector.version' = '0.11',\n" + > " 'connector.topic' = 'test_action',\n" + > " 'connector.startup-mode' = 'earliest-offset',\n" + > " 'connector.properties.zookeeper.connect' = > 'localhost:2181',\n" + > " 'connector.properties.bootstrap.servers' = > 'localhost:9092',\n" + > " 'update-mode' = 'append',\n" + > " 'format.type' = 'json'\n" + > ")"; > bsEnv.sqlUpdate(ddlSource); > > // Table table = bsEnv.sqlQuery("select `action` from actionTable"); > Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL > TABLE(explode2(`action`)) as T(`word`)"); > table.printSchema(); > bsEnv.toAppendStream(table, Row.class) > .print("==tb=="); > > > bsEnv.execute("ARRAY tableFunction Problem"); > } > } > > 代码2:ExplodeFunction.java > package com.flink; > > import org.apache.flink.table.functions.TableFunction; > import org.apache.flink.types.Row; > > import java.util.ArrayList; > import java.util.Arrays; > > public class ExplodeFunction extends TableFunction<Row> { > > public void eval(Row[] values) { > System.out.println(values.length); > if (values.length > 0) { > for (Row row : values) { > if (row != null) {// 这里debug出来的row总是空 > ArrayList<Object> list = new ArrayList<>(); > for (int i = 0; i < row.getArity(); i++) { > Object field = row.getField(i); > list.add(field); > } > > collector.collect(Row.of(Arrays.toString(list.toArray()))); > } > } > } > } > } > > 最后贴个debug的图 > [image: image.png] > |
Hi,
我现在转换思路,就是在定义表的时候,把ARRYA看成STRING, 那么,现在的问题,就是查询出来,都是空。 基于上面的代码环境,新写了一个类 Problem2.java package com.flink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * 现在思路:就是在定义表的时候,把ARRYA看成STRING, * 现在的问题,就是查询出来,都是空 * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode3", new ExplodeFunction()); String ddlSource = "CREATE TABLE actionTable3 (\n" + " action STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json',\n" + " 'format.derive-schema' = 'false',\n" + " 'format.json-schema' = '{\"type\": \"object\", \"properties\": {\"action\": {\"type\": \"string\"} } }'" + ")"; System.out.println(ddlSource); bsEnv.sqlUpdate(ddlSource); Table table = bsEnv.sqlQuery("select * from actionTable3"); // Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(`action`)) as T(`word`)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print();// 输出都是空 bsEnv.execute("ARRAY tableFunction Problem"); } } Jark Wu <[hidden email]> 于2020年7月6日周一 上午10:36写道: > Hi, > > 当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。 > https://issues.apache.org/jira/browse/FLINK-17855 > > > Best, > Jark > > On Mon, 6 Jul 2020 at 10:19, Jim Chen <[hidden email]> wrote: > > > 大家好: > > 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 > > 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > > 那么在eval方法接收到的就是Row[], > > 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > > > > 通过下面的步骤和代码可还原车祸场景: > > kafka topic: test_action > > kafka message: > > {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > > "id002", "actionName": "bbb"} ] } > > > > 代码1:Problem.java > > package com.flink; > > > > import > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > import org.apache.flink.table.api.EnvironmentSettings; > > import org.apache.flink.table.api.Table; > > import org.apache.flink.table.api.java.StreamTableEnvironment; > > import org.apache.flink.types.Row; > > > > /** > > * > > * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > > * 那么在eval方法接收到的就是Row[], > > * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > > * > > * kafka topic: test_action > > * > > * kafka message: > > * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > > "id002", "actionName": "bbb"} ] } > > */ > > public class Problem { > > > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > EnvironmentSettings envSettings = > EnvironmentSettings.newInstance() > > .useBlinkPlanner() > > .inStreamingMode() > > .build(); > > StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, > > envSettings); > > bsEnv.registerFunction("explode2", new ExplodeFunction()); > > > > String ddlSource = "CREATE TABLE actionTable (\n" + > > " action ARRAY<\n" + > > " ROW<" + > > " actionID STRING,\n" + > > " actionName STRING\n" + > > " >\n" + > > " >\n" + > > ") WITH (\n" + > > " 'connector.type' = 'kafka',\n" + > > " 'connector.version' = '0.11',\n" + > > " 'connector.topic' = 'test_action',\n" + > > " 'connector.startup-mode' = 'earliest-offset',\n" + > > " 'connector.properties.zookeeper.connect' = > > 'localhost:2181',\n" + > > " 'connector.properties.bootstrap.servers' = > > 'localhost:9092',\n" + > > " 'update-mode' = 'append',\n" + > > " 'format.type' = 'json'\n" + > > ")"; > > bsEnv.sqlUpdate(ddlSource); > > > > // Table table = bsEnv.sqlQuery("select `action` from > actionTable"); > > Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL > > TABLE(explode2(`action`)) as T(`word`)"); > > table.printSchema(); > > bsEnv.toAppendStream(table, Row.class) > > .print("==tb=="); > > > > > > bsEnv.execute("ARRAY tableFunction Problem"); > > } > > } > > > > 代码2:ExplodeFunction.java > > package com.flink; > > > > import org.apache.flink.table.functions.TableFunction; > > import org.apache.flink.types.Row; > > > > import java.util.ArrayList; > > import java.util.Arrays; > > > > public class ExplodeFunction extends TableFunction<Row> { > > > > public void eval(Row[] values) { > > System.out.println(values.length); > > if (values.length > 0) { > > for (Row row : values) { > > if (row != null) {// 这里debug出来的row总是空 > > ArrayList<Object> list = new ArrayList<>(); > > for (int i = 0; i < row.getArity(); i++) { > > Object field = row.getField(i); > > list.add(field); > > } > > > > collector.collect(Row.of(Arrays.toString(list.toArray()))); > > } > > } > > } > > } > > } > > > > 最后贴个debug的图 > > [image: image.png] > > > |
Free forum by Nabble | Edit this page |