定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
毫无头绪、有大佬遇到过吗?谢谢! 好像是 udf 和 udtf 一起使用时出现的~下面有可复现的例子,谢谢 class myKerasMLP(ScalarFunction): def eval(self, *args): ... # 返回预测结果 return str(trueY[0][0]) + '|' + str(trueY[0][1]) 注册UDF函数 myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] yield str_arr[0], str_arr[1] 注册UDTF函数 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()]) t_env.register_function('train_and_predict', myKerasMLP) t_env.register_function("splitStr", splitStr) ================== t_env.sql_query(""" select A.hotime , A.before_ta , A.before_rssi , A.after_ta , A.after_rssil , A.nb_tath , A.nb_rssith , nbr_rssi , nbr_ta from (SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) """).insert_into("predict_sink") ==================== 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 Traceback (most recent call last): File "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py", line 280, in <module> t_env.execute('NT重连预测参数') File "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute. : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 at java.util.ArrayList.rangeCheck(ArrayList.java:657) ==================== 这段SQL可以执行 t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source """).insert_into("print_table") ------------------------------ +I(37,14,-66,92,-74,24,-65,22.621065|-64.12096) +I(291,136,-76,136,-78,22,-65,19.479145|-65.958) ------------------------------ ====================== 简单可复现的例子 ======================== =======================SQL 源================= /* Navicat MySQL Data Transfer Source Server : localhost Source Server Version : 50717 Source Host : localhost:3306 Source Database : nufront-nt Target Server Type : MYSQL Target Server Version : 50717 File Encoding : 65001 Date: 2021-03-13 14:23:41 */ SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for test -- ---------------------------- DROP TABLE IF EXISTS `test`; CREATE TABLE `test` ( `hotime` varchar(5) DEFAULT NULL, `before_ta` varchar(5) DEFAULT NULL, `before_rssi` varchar(10) DEFAULT NULL, `after_ta` varchar(5) DEFAULT NULL, `after_rssil` varchar(10) DEFAULT NULL, `nb_tath` varchar(5) DEFAULT NULL, `nb_rssith` varchar(10) DEFAULT NULL, `predict` varchar(50) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of test -- ---------------------------- INSERT INTO `test` VALUES ('35', '8', '-62', '136', '-65', '20', '-65', '22.30014|-63.884907'); INSERT INTO `test` VALUES ('43', '8', '-71', '248', '-73', '20', '-65', '20.598848|-65.127464'); INSERT INTO `test` VALUES ('82', '216', '-74', '208', '-74', '20', '-65', '14.919615|-66.15158'); ================== 程序 =================== # -*- coding: utf-8 -* import logging import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=env_settings) # 设置该参数以使用 UDF t_env.get_config().get_configuration().set_boolean("python.fn-execution.4memory.managed", True) t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", "80m") class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()]) t_env.register_function("splitStr", splitStr) class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] class myKerasMLP(ScalarFunction): def eval(self, *args): # 拼接参数 a = '' for u in args: a += u + "|" return a myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) t_env.register_function('train_and_predict', myKerasMLP) t_env.execute_sql(""" CREATE TABLE print_table ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'print' ) """) t_env.execute_sql(""" CREATE TABLE source ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/nufront-nt', 'table-name' = 'test', 'username' = 'root', 'password' = '123456' ) """) t_env.execute_sql(""" CREATE TABLE predict_sink ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , nbr_rssi STRING , nbr_ta STRING ) WITH ( 'connector' = 'print' ) """) ####################################### ## 可执行 ####################################### # t_env.sql_query(""" # select A.hotime , # A.before_ta , # A.before_rssi , # A.after_ta , # A.after_rssil , # A.nb_tath , # A.nb_rssith , # nbr_rssi , # nbr_ta # from (SELECT # hotime , # before_ta , # before_rssi , # after_ta , # after_rssil , # nb_tath , # nb_rssith , # predict # FROM # source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) # """).insert_into("predict_sink") ####################################### ## 执行报错 ####################################### # t_env.sql_query(""" # select A.hotime , # A.before_ta , # A.before_rssi , # A.after_ta , # A.after_rssil , # A.nb_tath , # A.nb_rssith , # nbr_rssi , # nbr_ta # from (SELECT # hotime , # before_ta , # before_rssi , # after_ta , # after_rssil , # nb_tath , # nb_rssith , # train_and_predict(nb_tath, nb_rssith) predict # FROM # source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) # """).insert_into("predict_sink") ####################################### ## 可执行 ####################################### t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta) predict FROM source """).insert_into("print_table") t_env.execute('pyflink UDTF') -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
经过排查,这个确实一个bug。问题出在没有正确处理在sub-query中使用的python udf。我已经创建JIRA[1] 来记录这个问题了。目前的workaroud方案是使用Table API。 具体可以参考下面的代码: >>> a = t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(nb_tath, nb_rssith) predict FROM source """) result = a.join_lateral("splitStr(predict) as (nbr_rssi, nbr_ta)") [1] https://issues.apache.org/jira/browse/FLINK-21856 Best, Xingbo 陈康 <[hidden email]> 于2021年3月18日周四 下午1:30写道: > apache-flink 1.11.1 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |