定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
毫无头绪、有大佬遇到过吗?谢谢! class myKerasMLP(ScalarFunction): def eval(self, *args): ... # 返回预测结果 return str(trueY[0][0]) + '|' + str(trueY[0][1]) 注册UDF函数 myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] yield str_arr[0], str_arr[1] 注册UDTF函数 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()]) t_env.register_function('train_and_predict', myKerasMLP) t_env.register_function("splitStr", splitStr) ================== t_env.sql_query(""" select A.hotime , A.before_ta , A.before_rssi , A.after_ta , A.after_rssil , A.nb_tath , A.nb_rssith , nbr_rssi , nbr_ta from (SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) """).insert_into("predict_sink") ==================== 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 Traceback (most recent call last): File "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py", line 280, in <module> t_env.execute('NT重连预测参数') File "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute. : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 at java.util.ArrayList.rangeCheck(ArrayList.java:657) ==================== 这段SQL可以执行 t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil, nb_tath, nb_rssith) predict FROM source """).insert_into("print_table") ------------------------------ +I(37,14,-66,92,-74,24,-65,22.621065|-64.12096) +I(291,136,-76,136,-78,22,-65,19.479145|-65.958) ------------------------------ -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
用的PyFlink版本是多少?另外,如果方便的话,可以提供一个比较容易复现的例子吗?
On Fri, Mar 12, 2021 at 4:57 PM 陈康 <[hidden email]> wrote: > 定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 > 毫无头绪、有大佬遇到过吗?谢谢! > > class myKerasMLP(ScalarFunction): > > def eval(self, *args): > ... > # 返回预测结果 > return str(trueY[0][0]) + '|' + str(trueY[0][1]) > > 注册UDF函数 > myKerasMLP = udf(myKerasMLP(), > input_types=[DataTypes.STRING(), DataTypes.STRING(), > DataTypes.STRING(), DataTypes.STRING(), > DataTypes.STRING(), DataTypes.STRING(), > DataTypes.STRING()], > result_type=DataTypes.STRING()) > > class SplitStr(TableFunction): > def eval(self, str_value): > str_arr = str_value.split('|') > yield str_arr[0], str_arr[1] > yield str_arr[0], str_arr[1] > > 注册UDTF函数 > splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), > DataTypes.STRING()]) > > t_env.register_function('train_and_predict', myKerasMLP) > t_env.register_function("splitStr", splitStr) > > ================== > > t_env.sql_query(""" > select A.hotime , > A.before_ta , > A.before_rssi , > A.after_ta , > A.after_rssil , > A.nb_tath , > A.nb_rssith , > nbr_rssi , > nbr_ta > from (SELECT > hotime , > before_ta , > before_rssi , > after_ta , > after_rssil , > nb_tath , > nb_rssith , > train_and_predict(hotime, before_ta, before_rssi, after_ta, > after_rssil, > nb_tath, nb_rssith) predict > FROM > source) as A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) > """).insert_into("predict_sink") > > ==================== > 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 > Traceback (most recent call last): > File > > "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py", > line 280, in <module> > t_env.execute('NT重连预测参数') > File > > "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py", > line 1057, in execute > return JobExecutionResult(self._j_tenv.execute(job_name)) > File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line > 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py", > line 147, in deco > return f(*a, **kw) > File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328, > in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute. > : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) > > ==================== > 这段SQL可以执行 > t_env.sql_query(""" > SELECT > hotime , > before_ta , > before_rssi , > after_ta , > after_rssil , > nb_tath , > nb_rssith , > train_and_predict(hotime, before_ta, before_rssi, after_ta, > after_rssil, > nb_tath, nb_rssith) predict > FROM > source > """).insert_into("print_table") > > ------------------------------ > +I(37,14,-66,92,-74,24,-65,22.621065|-64.12096) > +I(291,136,-76,136,-78,22,-65,19.479145|-65.958) > ------------------------------ > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
感谢回复:尝试着编辑简单可复现如下:请帮忙看看谢谢!
=======================SQL 源================= /* Navicat MySQL Data Transfer Source Server : localhost Source Server Version : 50717 Source Host : localhost:3306 Source Database : nufront-nt Target Server Type : MYSQL Target Server Version : 50717 File Encoding : 65001 Date: 2021-03-13 14:23:41 */ SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for test -- ---------------------------- DROP TABLE IF EXISTS `test`; CREATE TABLE `test` ( `hotime` varchar(5) DEFAULT NULL, `before_ta` varchar(5) DEFAULT NULL, `before_rssi` varchar(10) DEFAULT NULL, `after_ta` varchar(5) DEFAULT NULL, `after_rssil` varchar(10) DEFAULT NULL, `nb_tath` varchar(5) DEFAULT NULL, `nb_rssith` varchar(10) DEFAULT NULL, `predict` varchar(50) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of test -- ---------------------------- INSERT INTO `test` VALUES ('35', '8', '-62', '136', '-65', '20', '-65', '22.30014|-63.884907'); INSERT INTO `test` VALUES ('43', '8', '-71', '248', '-73', '20', '-65', '20.598848|-65.127464'); INSERT INTO `test` VALUES ('82', '216', '-74', '208', '-74', '20', '-65', '14.919615|-66.15158'); ================== 程序 =================== # -*- coding: utf-8 -* import logging import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=env_settings) # 设置该参数以使用 UDF t_env.get_config().get_configuration().set_boolean("python.fn-execution.4memory.managed", True) t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", "80m") class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()]) t_env.register_function("splitStr", splitStr) class SplitStr(TableFunction): def eval(self, str_value): str_arr = str_value.split('|') yield str_arr[0], str_arr[1] class myKerasMLP(ScalarFunction): def eval(self, *args): # 拼接参数 a = '' for u in args: a = u + "|" return a myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) t_env.register_function('train_and_predict', myKerasMLP) t_env.execute_sql(""" CREATE TABLE print_table ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'print' ) """) t_env.execute_sql(""" CREATE TABLE source ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , predict STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/nufront-nt', 'table-name' = 'test', 'username' = 'root', 'password' = '123456' ) """) t_env.execute_sql(""" CREATE TABLE predict_sink ( hotime STRING , before_ta STRING , before_rssi STRING , after_ta STRING , after_rssil STRING , nb_tath STRING , nb_rssith STRING , nbr_rssi STRING , nbr_ta STRING ) WITH ( 'connector' = 'print' ) """) ####################################### ## 可执行 ####################################### # t_env.sql_query(""" # select A.hotime , # A.before_ta , # A.before_rssi , # A.after_ta , # A.after_rssil , # A.nb_tath , # A.nb_rssith , # nbr_rssi , # nbr_ta # from (SELECT # hotime , # before_ta , # before_rssi , # after_ta , # after_rssil , # nb_tath , # nb_rssith , # predict # FROM # source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) # """).insert_into("predict_sink") ####################################### ## 执行报错 ####################################### # t_env.sql_query(""" # select A.hotime , # A.before_ta , # A.before_rssi , # A.after_ta , # A.after_rssil , # A.nb_tath , # A.nb_rssith , # nbr_rssi , # nbr_ta # from (SELECT # hotime , # before_ta , # before_rssi , # after_ta , # after_rssil , # nb_tath , # nb_rssith , # train_and_predict(nb_tath, nb_rssith) predict # FROM # source) as A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta) # """).insert_into("predict_sink") ####################################### ## 可执行 ####################################### t_env.sql_query(""" SELECT hotime , before_ta , before_rssi , after_ta , after_rssil , nb_tath , nb_rssith , train_and_predict(hotime, before_ta) predict FROM source """).insert_into("print_table") t_env.execute('pyflink UDTF') -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by Dian Fu
|
In reply to this post by Dian Fu
|
Free forum by Nabble | Edit this page |