flinksql注册udtf使用ROW类型做为输出输出时出错

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flinksql注册udtf使用ROW类型做为输出输出时出错

cxydevelop@163.com
版本:
pyflink==1.0
apache-flink==1.11.2
代码如下:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true')


class SplitStr(TableFunction):
    def eval(self, data):
        for row in data:
            yield row[0], row[1]
splitStr = udtf(
    SplitStr(),
    DataTypes.ARRAY(
        DataTypes.ROW(
            [
                DataTypes.FIELD("name", DataTypes.STRING()),
                DataTypes.FIELD("id", DataTypes.STRING())
            ]
        )
    ),
    DataTypes.ROW(
        [
            DataTypes.FIELD("name", DataTypes.STRING()),
            DataTypes.FIELD("id", DataTypes.STRING())
        ]
    )
)
t_env.register_function("splitStr", splitStr)


t_env.sql_update("""
CREATE TABLE mySource (                                                                                        
id varchar,
data array<ROW<name STRING,age STRING>>                                
) WITH (                                                        
'connector' = 'kafka',
        'topic' = 'mytesttopic',
        'properties.bootstrap.servers' = '172.17.0.2:9092',
        'properties.group.id' = 'flink-test-cxy',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'                                    
)
""")
t_env.sql_update("""
CREATE TABLE mysqlsink (
id varchar
,name varchar
,age  varchar
)
with (
    'connector' = 'print'
)
""")
t_env.sql_update("insert into mysqlsink select id,name,age from mySource ,LATERAL TABLE(splitStr(data)) as T(name, age)")
t_env.execute("test")


最终报错
TypeError: Invalid result_type: result_type should be DataType but contains RowField(name, VARCHAR)
报错的地方是
File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py", line 264, in __init__


def __init__(self, func, input_types, result_types, deterministic=None, name=None):
super(UserDefinedTableFunctionWrapper, self).__init__(
func, input_types, deterministic, name)


if not isinstance(result_types, collections.Iterable):
result_types = [result_types]


for result_type in result_types:
if not isinstance(result_type, DataType):
raise TypeError(
"Invalid result_type: result_type should be DataType but contains {}".format(
result_type))


self._result_types = result_types
self._judtf_placeholder = None


断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗


另外的,假如我在
上面在创建udtf的时候,如果这样写
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()])
却可以正常运行,但是显然类型跟我实际运行的不对应
Reply | Threaded
Open this post in threaded view
|

Re:flinksql注册udtf使用ROW类型做为输出输出时出错

cxydevelop@163.com
上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])<br/>udtf的第三个参数, 好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪
在 2020-09-30 19:07:06,"chenxuying" <[hidden email]> 写道:

>版本:
>pyflink==1.0
>apache-flink==1.11.2
>代码如下:
>env = StreamExecutionEnvironment.get_execution_environment()
>env.set_parallelism(1)
>t_env = StreamTableEnvironment.create(env)
>t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true')
>
>
>class SplitStr(TableFunction):
>    def eval(self, data):
>        for row in data:
>            yield row[0], row[1]
>splitStr = udtf(
>    SplitStr(),
>    DataTypes.ARRAY(
>        DataTypes.ROW(
>            [
>                DataTypes.FIELD("name", DataTypes.STRING()),
>                DataTypes.FIELD("id", DataTypes.STRING())
>            ]
>        )
>    ),
>    DataTypes.ROW(
>        [
>            DataTypes.FIELD("name", DataTypes.STRING()),
>            DataTypes.FIELD("id", DataTypes.STRING())
>        ]
>    )
>)
>t_env.register_function("splitStr", splitStr)
>
>
>t_env.sql_update("""
>CREATE TABLE mySource (                                                                                        
>id varchar,
>data array<ROW<name STRING,age STRING>>                                
>) WITH (                                                        
>'connector' = 'kafka',
>        'topic' = 'mytesttopic',
>        'properties.bootstrap.servers' = '172.17.0.2:9092',
>        'properties.group.id' = 'flink-test-cxy',
>        'scan.startup.mode' = 'latest-offset',
>        'format' = 'json'                                    
>)
>""")
>t_env.sql_update("""
>CREATE TABLE mysqlsink (
>id varchar
>,name varchar
>,age  varchar
>)
>with (
>    'connector' = 'print'
>)
>""")
>t_env.sql_update("insert into mysqlsink select id,name,age from mySource ,LATERAL TABLE(splitStr(data)) as T(name, age)")
>t_env.execute("test")
>
>
>最终报错
>TypeError: Invalid result_type: result_type should be DataType but contains RowField(name, VARCHAR)
>报错的地方是
>File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py", line 264, in __init__
>
>
>def __init__(self, func, input_types, result_types, deterministic=None, name=None):
>super(UserDefinedTableFunctionWrapper, self).__init__(
>func, input_types, deterministic, name)
>
>
>if not isinstance(result_types, collections.Iterable):
>result_types = [result_types]
>
>
>for result_type in result_types:
>if not isinstance(result_type, DataType):
>raise TypeError(
>"Invalid result_type: result_type should be DataType but contains {}".format(
>result_type))
>
>
>self._result_types = result_types
>self._judtf_placeholder = None
>
>
>断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗
>
>
>另外的,假如我在
>上面在创建udtf的时候,如果这样写
>splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()])
>却可以正常运行,但是显然类型跟我实际运行的不对应
Reply | Threaded
Open this post in threaded view
|

Re: flinksql注册udtf使用ROW类型做为输出输出时出错

Xingbo Huang
Hello,
这个算是个易用性的问题,我之前有创建了JIRA[1]。你现在直接用[DataTypes.STRING(),
DataTypes.STRING()]作resultType就是对的。关于input_types那个问题,实际上input_types在内部是通过上游的result_type匹配得出来的,所以你这里没对应也是对的,1.12版本将不再需要指定result_type了。

Best,
Xingbo

[1] https://issues.apache.org/jira/browse/FLINK-19138

chenxuying <[hidden email]> 于2020年9月30日周三 下午7:18写道:

> 上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(),
> DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(),
> [DataTypes.STRING(), DataTypes.STRING()])<br/>udtf的第三个参数,
> 好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪
> 在 2020-09-30 19:07:06,"chenxuying" <[hidden email]> 写道:
> >版本:
> >pyflink==1.0
> >apache-flink==1.11.2
> >代码如下:
> >env = StreamExecutionEnvironment.get_execution_environment()
> >env.set_parallelism(1)
> >t_env = StreamTableEnvironment.create(env)
> >t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
> 'true')
> >
> >
> >class SplitStr(TableFunction):
> >    def eval(self, data):
> >        for row in data:
> >            yield row[0], row[1]
> >splitStr = udtf(
> >    SplitStr(),
> >    DataTypes.ARRAY(
> >        DataTypes.ROW(
> >            [
> >                DataTypes.FIELD("name", DataTypes.STRING()),
> >                DataTypes.FIELD("id", DataTypes.STRING())
> >            ]
> >        )
> >    ),
> >    DataTypes.ROW(
> >        [
> >            DataTypes.FIELD("name", DataTypes.STRING()),
> >            DataTypes.FIELD("id", DataTypes.STRING())
> >        ]
> >    )
> >)
> >t_env.register_function("splitStr", splitStr)
> >
> >
> >t_env.sql_update("""
> >CREATE TABLE mySource (
>
> >id varchar,
> >data array<ROW<name STRING,age STRING>>
> >) WITH (
> >'connector' = 'kafka',
> >        'topic' = 'mytesttopic',
> >        'properties.bootstrap.servers' = '172.17.0.2:9092',
> >        'properties.group.id' = 'flink-test-cxy',
> >        'scan.startup.mode' = 'latest-offset',
> >        'format' = 'json'
> >)
> >""")
> >t_env.sql_update("""
> >CREATE TABLE mysqlsink (
> >id varchar
> >,name varchar
> >,age  varchar
> >)
> >with (
> >    'connector' = 'print'
> >)
> >""")
> >t_env.sql_update("insert into mysqlsink select id,name,age from mySource
> ,LATERAL TABLE(splitStr(data)) as T(name, age)")
> >t_env.execute("test")
> >
> >
> >最终报错
> >TypeError: Invalid result_type: result_type should be DataType but
> contains RowField(name, VARCHAR)
> >报错的地方是
> >File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
> line 264, in __init__
> >
> >
> >def __init__(self, func, input_types, result_types, deterministic=None,
> name=None):
> >super(UserDefinedTableFunctionWrapper, self).__init__(
> >func, input_types, deterministic, name)
> >
> >
> >if not isinstance(result_types, collections.Iterable):
> >result_types = [result_types]
> >
> >
> >for result_type in result_types:
> >if not isinstance(result_type, DataType):
> >raise TypeError(
> >"Invalid result_type: result_type should be DataType but contains
> {}".format(
> >result_type))
> >
> >
> >self._result_types = result_types
> >self._judtf_placeholder = None
> >
> >
> >断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗
> >
> >
> >另外的,假如我在
> >上面在创建udtf的时候,如果这样写
> >splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(),
> DataTypes.BIGINT()])
> >却可以正常运行,但是显然类型跟我实际运行的不对应
>