pyflink UDTF求助!

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink UDTF求助!

陈康
定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
毫无头绪、有大佬遇到过吗?谢谢!
好像是 udf 和 udtf 一起使用时出现的~下面有可复现的例子,谢谢

class myKerasMLP(ScalarFunction):

    def eval(self, *args):
        ...
        # 返回预测结果
        return str(trueY[0][0]) + '|' + str(trueY[0][1])

注册UDF函数
myKerasMLP = udf(myKerasMLP(),
                 input_types=[DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
                              DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()],
                 result_type=DataTypes.STRING())

class SplitStr(TableFunction):
    def eval(self, str_value):
        str_arr = str_value.split('|')
        yield str_arr[0], str_arr[1]
        yield str_arr[0], str_arr[1]

注册UDTF函数
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
DataTypes.STRING()])

t_env.register_function('train_and_predict', myKerasMLP)
t_env.register_function("splitStr", splitStr)

==================

t_env.sql_query("""
select A.hotime ,
        A.before_ta ,
        A.before_rssi ,
        A.after_ta ,
        A.after_rssil ,
        A.nb_tath ,
        A.nb_rssith ,
        nbr_rssi ,
        nbr_ta
from (SELECT
    hotime ,
    before_ta ,
    before_rssi ,
    after_ta ,
    after_rssil ,
    nb_tath ,
    nb_rssith ,
    train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil,
nb_tath, nb_rssith) predict
FROM
    source) as  A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
""").insert_into("predict_sink")

====================
报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
Traceback (most recent call last):
  File
"C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py",
line 280, in <module>
    t_env.execute('NT重连预测参数')
  File
"D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py",
line 1057, in execute
    return JobExecutionResult(self._j_tenv.execute(job_name))
  File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line
1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py",
line 147, in deco
    return f(*a, **kw)
  File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328,
in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute.
: java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
        at java.util.ArrayList.rangeCheck(ArrayList.java:657)

====================
这段SQL可以执行
t_env.sql_query("""
SELECT
    hotime ,
    before_ta ,
    before_rssi ,
    after_ta ,
    after_rssil ,
    nb_tath ,
    nb_rssith ,
    train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil,
nb_tath, nb_rssith) predict
FROM
    source
""").insert_into("print_table")

------------------------------
+I(37,14,-66,92,-74,24,-65,22.621065|-64.12096)
+I(291,136,-76,136,-78,22,-65,19.479145|-65.958)
------------------------------


====================== 简单可复现的例子 ========================

=======================SQL 源=================
/*
Navicat MySQL Data Transfer

Source Server         : localhost
Source Server Version : 50717
Source Host           : localhost:3306
Source Database       : nufront-nt

Target Server Type    : MYSQL
Target Server Version : 50717
File Encoding         : 65001

Date: 2021-03-13 14:23:41
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for test
-- ----------------------------
DROP TABLE IF EXISTS `test`;
CREATE TABLE `test` (
  `hotime` varchar(5) DEFAULT NULL,
  `before_ta` varchar(5) DEFAULT NULL,
  `before_rssi` varchar(10) DEFAULT NULL,
  `after_ta` varchar(5) DEFAULT NULL,
  `after_rssil` varchar(10) DEFAULT NULL,
  `nb_tath` varchar(5) DEFAULT NULL,
  `nb_rssith` varchar(10) DEFAULT NULL,
  `predict` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of test
-- ----------------------------
INSERT INTO `test` VALUES ('35', '8', '-62', '136', '-65', '20', '-65',
'22.30014|-63.884907');
INSERT INTO `test` VALUES ('43', '8', '-71', '248', '-73', '20', '-65',
'20.598848|-65.127464');
INSERT INTO `test` VALUES ('82', '216', '-74', '208', '-74', '20', '-65',
'14.919615|-66.15158');

================== 程序 ===================

# -*- coding: utf-8 -*
import logging
import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,
environment_settings=env_settings)
# 设置该参数以使用 UDF
t_env.get_config().get_configuration().set_boolean("python.fn-execution.4memory.managed",
True)
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
"80m")

class SplitStr(TableFunction):
    def eval(self, str_value):
        str_arr = str_value.split('|')
        yield str_arr[0], str_arr[1]


splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
DataTypes.STRING()])
t_env.register_function("splitStr", splitStr)

class SplitStr(TableFunction):
    def eval(self, str_value):
        str_arr = str_value.split('|')
        yield str_arr[0], str_arr[1]

class myKerasMLP(ScalarFunction):
    def eval(self, *args):
        # 拼接参数
        a = ''
        for u in args:
            a += u + "|"
        return a

myKerasMLP = udf(myKerasMLP(),
                 input_types=[DataTypes.STRING(), DataTypes.STRING()],
                 result_type=DataTypes.STRING())
t_env.register_function('train_and_predict', myKerasMLP)

t_env.execute_sql("""
CREATE TABLE print_table (
        hotime STRING ,  
        before_ta STRING ,
        before_rssi STRING ,
        after_ta STRING ,
        after_rssil STRING ,
        nb_tath STRING ,
        nb_rssith STRING ,
        predict STRING
) WITH (
 'connector' = 'print'
)
""")

t_env.execute_sql("""
    CREATE TABLE source (
        hotime STRING ,  
        before_ta STRING ,
        before_rssi STRING ,
        after_ta STRING ,
        after_rssil STRING ,
        nb_tath STRING ,
        nb_rssith STRING  ,
        predict STRING  
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://localhost:3306/nufront-nt',
        'table-name' = 'test',
        'username' = 'root',
        'password' = '123456'
    )
""")

t_env.execute_sql("""
CREATE TABLE predict_sink (
    hotime STRING ,  
    before_ta STRING ,
    before_rssi STRING ,
    after_ta STRING ,
    after_rssil STRING ,
    nb_tath STRING ,
    nb_rssith STRING ,
    nbr_rssi STRING ,
    nbr_ta STRING      
) WITH (
  'connector' = 'print'
)
""")

#######################################
## 可执行
#######################################
# t_env.sql_query("""
# select A.hotime ,
#         A.before_ta ,
#         A.before_rssi ,
#         A.after_ta ,
#         A.after_rssil ,
#         A.nb_tath ,
#         A.nb_rssith ,
#         nbr_rssi ,
#         nbr_ta
# from (SELECT
#     hotime ,
#     before_ta ,
#     before_rssi ,
#     after_ta ,
#     after_rssil ,
#     nb_tath ,
#     nb_rssith ,
#     predict
# FROM
#     source) as  A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
# """).insert_into("predict_sink")

#######################################
## 执行报错
#######################################
# t_env.sql_query("""
# select A.hotime ,
#         A.before_ta ,
#         A.before_rssi ,
#         A.after_ta ,
#         A.after_rssil ,
#         A.nb_tath ,
#         A.nb_rssith ,
#         nbr_rssi ,
#         nbr_ta
# from (SELECT
#     hotime ,
#     before_ta ,
#     before_rssi ,
#     after_ta ,
#     after_rssil ,
#     nb_tath ,
#     nb_rssith ,
#     train_and_predict(nb_tath, nb_rssith) predict
# FROM
#     source) as  A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
# """).insert_into("predict_sink")


#######################################
## 可执行
#######################################
t_env.sql_query("""
SELECT
    hotime ,
    before_ta ,
    before_rssi ,
    after_ta ,
    after_rssil ,
    nb_tath ,
    nb_rssith ,
    train_and_predict(hotime, before_ta) predict
FROM
    source
""").insert_into("print_table")

t_env.execute('pyflink UDTF')




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink UDTF求助!

陈康
apache-flink 1.11.1



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink UDTF求助!

Xingbo Huang
Hi,

经过排查,这个确实一个bug。问题出在没有正确处理在sub-query中使用的python udf。我已经创建JIRA[1]
来记录这个问题了。目前的workaroud方案是使用Table API。
具体可以参考下面的代码:
>>>
a = t_env.sql_query("""
SELECT
     hotime ,
     before_ta ,
     before_rssi ,
     after_ta ,
     after_rssil ,
     nb_tath ,
     nb_rssith ,
     train_and_predict(nb_tath, nb_rssith) predict
FROM source
""")
result = a.join_lateral("splitStr(predict) as (nbr_rssi, nbr_ta)")


[1] https://issues.apache.org/jira/browse/FLINK-21856

Best,
Xingbo

陈康 <[hidden email]> 于2021年3月18日周四 下午1:30写道:

> apache-flink 1.11.1
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: pyflink UDTF求助!

陈康
感谢回复!



--
Sent from: http://apache-flink.147419.n8.nabble.com/