关于pyflink LATERAL TABLE 问题请教

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

关于pyflink LATERAL TABLE 问题请教

陈康
定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
毫无头绪、有大佬遇到过吗?谢谢!

class myKerasMLP(ScalarFunction):

    def eval(self, *args):
        ...
        # 返回预测结果
        return str(trueY[0][0]) + '|' + str(trueY[0][1])

注册UDF函数
myKerasMLP = udf(myKerasMLP(),
                 input_types=[DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
                              DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()],
                 result_type=DataTypes.STRING())

class SplitStr(TableFunction):
    def eval(self, str_value):
        str_arr = str_value.split('|')
        yield str_arr[0], str_arr[1]
        yield str_arr[0], str_arr[1]

注册UDTF函数
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
DataTypes.STRING()])

t_env.register_function('train_and_predict', myKerasMLP)
t_env.register_function("splitStr", splitStr)

==================

t_env.sql_query("""
select A.hotime ,
        A.before_ta ,
        A.before_rssi ,
        A.after_ta ,
        A.after_rssil ,
        A.nb_tath ,
        A.nb_rssith ,
        nbr_rssi ,
        nbr_ta
from (SELECT
    hotime ,
    before_ta ,
    before_rssi ,
    after_ta ,
    after_rssil ,
    nb_tath ,
    nb_rssith ,
    train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil,
nb_tath, nb_rssith) predict
FROM
    source) as  A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
""").insert_into("predict_sink")

====================
报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
Traceback (most recent call last):
  File
"C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py",
line 280, in <module>
    t_env.execute('NT重连预测参数')
  File
"D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py",
line 1057, in execute
    return JobExecutionResult(self._j_tenv.execute(job_name))
  File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line
1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py",
line 147, in deco
    return f(*a, **kw)
  File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328,
in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute.
: java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
        at java.util.ArrayList.rangeCheck(ArrayList.java:657)

====================
这段SQL可以执行
t_env.sql_query("""
SELECT
    hotime ,
    before_ta ,
    before_rssi ,
    after_ta ,
    after_rssil ,
    nb_tath ,
    nb_rssith ,
    train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil,
nb_tath, nb_rssith) predict
FROM
    source
""").insert_into("print_table")

------------------------------
+I(37,14,-66,92,-74,24,-65,22.621065|-64.12096)
+I(291,136,-76,136,-78,22,-65,19.479145|-65.958)
------------------------------



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于pyflink LATERAL TABLE 问题请教

Dian Fu
用的PyFlink版本是多少?另外,如果方便的话,可以提供一个比较容易复现的例子吗?

On Fri, Mar 12, 2021 at 4:57 PM 陈康 <[hidden email]> wrote:

> 定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
> 毫无头绪、有大佬遇到过吗?谢谢!
>
> class myKerasMLP(ScalarFunction):
>
>     def eval(self, *args):
>         ...
>         # 返回预测结果
>         return str(trueY[0][0]) + '|' + str(trueY[0][1])
>
> 注册UDF函数
> myKerasMLP = udf(myKerasMLP(),
>                  input_types=[DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING(), DataTypes.STRING(),
>                               DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()],
>                  result_type=DataTypes.STRING())
>
> class SplitStr(TableFunction):
>     def eval(self, str_value):
>         str_arr = str_value.split('|')
>         yield str_arr[0], str_arr[1]
>         yield str_arr[0], str_arr[1]
>
> 注册UDTF函数
> splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
> DataTypes.STRING()])
>
> t_env.register_function('train_and_predict', myKerasMLP)
> t_env.register_function("splitStr", splitStr)
>
> ==================
>
> t_env.sql_query("""
> select A.hotime ,
>         A.before_ta ,
>         A.before_rssi ,
>         A.after_ta ,
>         A.after_rssil ,
>         A.nb_tath ,
>         A.nb_rssith ,
>         nbr_rssi ,
>         nbr_ta
> from (SELECT
>     hotime ,
>     before_ta ,
>     before_rssi ,
>     after_ta ,
>     after_rssil ,
>     nb_tath ,
>     nb_rssith ,
>     train_and_predict(hotime, before_ta, before_rssi, after_ta,
> after_rssil,
> nb_tath, nb_rssith) predict
> FROM
>     source) as  A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
> """).insert_into("predict_sink")
>
> ====================
> 报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
> Traceback (most recent call last):
>   File
>
> "C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py",
> line 280, in <module>
>     t_env.execute('NT重连预测参数')
>   File
>
> "D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py",
> line 1057, in execute
>     return JobExecutionResult(self._j_tenv.execute(job_name))
>   File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line
> 1286, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py",
> line 147, in deco
>     return f(*a, **kw)
>   File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328,
> in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute.
> : java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
>         at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>
> ====================
> 这段SQL可以执行
> t_env.sql_query("""
> SELECT
>     hotime ,
>     before_ta ,
>     before_rssi ,
>     after_ta ,
>     after_rssil ,
>     nb_tath ,
>     nb_rssith ,
>     train_and_predict(hotime, before_ta, before_rssi, after_ta,
> after_rssil,
> nb_tath, nb_rssith) predict
> FROM
>     source
> """).insert_into("print_table")
>
> ------------------------------
> +I(37,14,-66,92,-74,24,-65,22.621065|-64.12096)
> +I(291,136,-76,136,-78,22,-65,19.479145|-65.958)
> ------------------------------
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: 关于pyflink LATERAL TABLE 问题请教

陈康
感谢回复:尝试着编辑简单可复现如下:请帮忙看看谢谢!

=======================SQL 源=================
/*
Navicat MySQL Data Transfer

Source Server         : localhost
Source Server Version : 50717
Source Host           : localhost:3306
Source Database       : nufront-nt

Target Server Type    : MYSQL
Target Server Version : 50717
File Encoding         : 65001

Date: 2021-03-13 14:23:41
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for test
-- ----------------------------
DROP TABLE IF EXISTS `test`;
CREATE TABLE `test` (
  `hotime` varchar(5) DEFAULT NULL,
  `before_ta` varchar(5) DEFAULT NULL,
  `before_rssi` varchar(10) DEFAULT NULL,
  `after_ta` varchar(5) DEFAULT NULL,
  `after_rssil` varchar(10) DEFAULT NULL,
  `nb_tath` varchar(5) DEFAULT NULL,
  `nb_rssith` varchar(10) DEFAULT NULL,
  `predict` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of test
-- ----------------------------
INSERT INTO `test` VALUES ('35', '8', '-62', '136', '-65', '20', '-65',
'22.30014|-63.884907');
INSERT INTO `test` VALUES ('43', '8', '-71', '248', '-73', '20', '-65',
'20.598848|-65.127464');
INSERT INTO `test` VALUES ('82', '216', '-74', '208', '-74', '20', '-65',
'14.919615|-66.15158');

================== 程序 ===================

# -*- coding: utf-8 -*
import logging
import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,
environment_settings=env_settings)
# 设置该参数以使用 UDF
t_env.get_config().get_configuration().set_boolean("python.fn-execution.4memory.managed",
True)
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
"80m")

class SplitStr(TableFunction):
    def eval(self, str_value):
        str_arr = str_value.split('|')
        yield str_arr[0], str_arr[1]


splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
DataTypes.STRING()])
t_env.register_function("splitStr", splitStr)

class SplitStr(TableFunction):
    def eval(self, str_value):
        str_arr = str_value.split('|')
        yield str_arr[0], str_arr[1]

class myKerasMLP(ScalarFunction):
    def eval(self, *args):
        # 拼接参数
        a = ''
        for u in args:
            a = u + "|"
        return a

myKerasMLP = udf(myKerasMLP(),
                 input_types=[DataTypes.STRING(), DataTypes.STRING()],
                 result_type=DataTypes.STRING())
t_env.register_function('train_and_predict', myKerasMLP)

t_env.execute_sql("""
CREATE TABLE print_table (
        hotime STRING ,  
        before_ta STRING ,
        before_rssi STRING ,
        after_ta STRING ,
        after_rssil STRING ,
        nb_tath STRING ,
        nb_rssith STRING ,
        predict STRING
) WITH (
 'connector' = 'print'
)
""")

t_env.execute_sql("""
    CREATE TABLE source (
        hotime STRING ,  
        before_ta STRING ,
        before_rssi STRING ,
        after_ta STRING ,
        after_rssil STRING ,
        nb_tath STRING ,
        nb_rssith STRING  ,
        predict STRING  
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://localhost:3306/nufront-nt',
        'table-name' = 'test',
        'username' = 'root',
        'password' = '123456'
    )
""")

t_env.execute_sql("""
CREATE TABLE predict_sink (
    hotime STRING ,  
    before_ta STRING ,
    before_rssi STRING ,
    after_ta STRING ,
    after_rssil STRING ,
    nb_tath STRING ,
    nb_rssith STRING ,
    nbr_rssi STRING ,
    nbr_ta STRING      
) WITH (
  'connector' = 'print'
)
""")

#######################################
## 可执行
#######################################
# t_env.sql_query("""
# select A.hotime ,
#         A.before_ta ,
#         A.before_rssi ,
#         A.after_ta ,
#         A.after_rssil ,
#         A.nb_tath ,
#         A.nb_rssith ,
#         nbr_rssi ,
#         nbr_ta
# from (SELECT
#     hotime ,
#     before_ta ,
#     before_rssi ,
#     after_ta ,
#     after_rssil ,
#     nb_tath ,
#     nb_rssith ,
#     predict
# FROM
#     source) as  A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
# """).insert_into("predict_sink")

#######################################
## 执行报错
#######################################
# t_env.sql_query("""
# select A.hotime ,
#         A.before_ta ,
#         A.before_rssi ,
#         A.after_ta ,
#         A.after_rssil ,
#         A.nb_tath ,
#         A.nb_rssith ,
#         nbr_rssi ,
#         nbr_ta
# from (SELECT
#     hotime ,
#     before_ta ,
#     before_rssi ,
#     after_ta ,
#     after_rssil ,
#     nb_tath ,
#     nb_rssith ,
#     train_and_predict(nb_tath, nb_rssith) predict
# FROM
#     source) as  A,LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
# """).insert_into("predict_sink")


#######################################
## 可执行
#######################################
t_env.sql_query("""
SELECT
    hotime ,
    before_ta ,
    before_rssi ,
    after_ta ,
    after_rssil ,
    nb_tath ,
    nb_rssith ,
    train_and_predict(hotime, before_ta) predict
FROM
    source
""").insert_into("print_table")

t_env.execute('pyflink UDTF')





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于pyflink LATERAL TABLE 问题请教

陈康
In reply to this post by Dian Fu
apache-flink 1.11.1



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 关于pyflink LATERAL TABLE 问题请教

陈康
In reply to this post by Dian Fu
简单提供了下 可复现的例子,请帮忙看看~谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/