Hi,
可以通过以下步骤还原车祸现场: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } 代码Problem2.java: package com.flink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * 现在思路:就是在定义表的时候,把ARRYA看成STRING, * 现在的问题,就是查询出来,都是空 * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode3", new ExplodeFunction()); String ddlSource = "CREATE TABLE actionTable3 (\n" + " action STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json',\n" + " 'format.derive-schema' = 'false',\n" + " 'format.json-schema' = '{\"type\": \"object\", \"properties\": {\"action\": {\"type\": \"string\"} } }'" + ")"; System.out.println(ddlSource); bsEnv.sqlUpdate(ddlSource); Table table = bsEnv.sqlQuery("select * from actionTable3"); // Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(`action`)) as T(`word`)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print();// 输出都是空 bsEnv.execute("ARRAY tableFunction Problem"); } } |
Hi Jim,
这是一个已知问题[1],你可以看下这个issue,是否可以解决你的问题? [1] https://issues.apache.org/jira/browse/FLINK-18002 Jim Chen <[hidden email]> 于2020年7月6日周一 上午11:28写道: > Hi, > 可以通过以下步骤还原车祸现场: > kafka topic: test_action > kafka message: > {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > "id002", "actionName": "bbb"} ] } > > 代码Problem2.java: > package com.flink; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > > /** > * > * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > * 那么在eval方法接收到的就是Row[], > * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > * > * 现在思路:就是在定义表的时候,把ARRYA看成STRING, > * 现在的问题,就是查询出来,都是空 > * > * kafka topic: test_action > * > * kafka message: > * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > "id002", "actionName": "bbb"} ] } > */ > public class Problem2 { > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, > envSettings); > bsEnv.registerFunction("explode3", new ExplodeFunction()); > > String ddlSource = "CREATE TABLE actionTable3 (\n" + > " action STRING\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka',\n" + > " 'connector.version' = '0.11',\n" + > " 'connector.topic' = 'test_action',\n" + > " 'connector.startup-mode' = 'earliest-offset',\n" + > " 'connector.properties.zookeeper.connect' = > 'localhost:2181',\n" + > " 'connector.properties.bootstrap.servers' = > 'localhost:9092',\n" + > " 'update-mode' = 'append',\n" + > " 'format.type' = 'json',\n" + > " 'format.derive-schema' = 'false',\n" + > " 'format.json-schema' = '{\"type\": \"object\", > \"properties\": {\"action\": {\"type\": \"string\"} } }'" + > ")"; > System.out.println(ddlSource); > bsEnv.sqlUpdate(ddlSource); > > Table table = bsEnv.sqlQuery("select * from actionTable3"); > // Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL > TABLE(explode3(`action`)) as T(`word`)"); > table.printSchema(); > bsEnv.toAppendStream(table, Row.class) > .print();// 输出都是空 > > bsEnv.execute("ARRAY tableFunction Problem"); > } > } > -- Best, Benchao Li |
Free forum by Nabble | Edit this page |