版本:
pyflink==1.0 apache-flink==1.11.2 代码如下: env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true') class SplitStr(TableFunction): def eval(self, data): for row in data: yield row[0], row[1] splitStr = udtf( SplitStr(), DataTypes.ARRAY( DataTypes.ROW( [ DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.STRING()) ] ) ), DataTypes.ROW( [ DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.STRING()) ] ) ) t_env.register_function("splitStr", splitStr) t_env.sql_update(""" CREATE TABLE mySource ( id varchar, data array<ROW<name STRING,age STRING>> ) WITH ( 'connector' = 'kafka', 'topic' = 'mytesttopic', 'properties.bootstrap.servers' = '172.17.0.2:9092', 'properties.group.id' = 'flink-test-cxy', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """) t_env.sql_update(""" CREATE TABLE mysqlsink ( id varchar ,name varchar ,age varchar ) with ( 'connector' = 'print' ) """) t_env.sql_update("insert into mysqlsink select id,name,age from mySource ,LATERAL TABLE(splitStr(data)) as T(name, age)") t_env.execute("test") 最终报错 TypeError: Invalid result_type: result_type should be DataType but contains RowField(name, VARCHAR) 报错的地方是 File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py", line 264, in __init__ def __init__(self, func, input_types, result_types, deterministic=None, name=None): super(UserDefinedTableFunctionWrapper, self).__init__( func, input_types, deterministic, name) if not isinstance(result_types, collections.Iterable): result_types = [result_types] for result_type in result_types: if not isinstance(result_type, DataType): raise TypeError( "Invalid result_type: result_type should be DataType but contains {}".format( result_type)) self._result_types = result_types self._judtf_placeholder = None 断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗 另外的,假如我在 上面在创建udtf的时候,如果这样写 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()]) 却可以正常运行,但是显然类型跟我实际运行的不对应 |
上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])<br/>udtf的第三个参数, 好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪
在 2020-09-30 19:07:06,"chenxuying" <[hidden email]> 写道: >版本: >pyflink==1.0 >apache-flink==1.11.2 >代码如下: >env = StreamExecutionEnvironment.get_execution_environment() >env.set_parallelism(1) >t_env = StreamTableEnvironment.create(env) >t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true') > > >class SplitStr(TableFunction): > def eval(self, data): > for row in data: > yield row[0], row[1] >splitStr = udtf( > SplitStr(), > DataTypes.ARRAY( > DataTypes.ROW( > [ > DataTypes.FIELD("name", DataTypes.STRING()), > DataTypes.FIELD("id", DataTypes.STRING()) > ] > ) > ), > DataTypes.ROW( > [ > DataTypes.FIELD("name", DataTypes.STRING()), > DataTypes.FIELD("id", DataTypes.STRING()) > ] > ) >) >t_env.register_function("splitStr", splitStr) > > >t_env.sql_update(""" >CREATE TABLE mySource ( >id varchar, >data array<ROW<name STRING,age STRING>> >) WITH ( >'connector' = 'kafka', > 'topic' = 'mytesttopic', > 'properties.bootstrap.servers' = '172.17.0.2:9092', > 'properties.group.id' = 'flink-test-cxy', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' >) >""") >t_env.sql_update(""" >CREATE TABLE mysqlsink ( >id varchar >,name varchar >,age varchar >) >with ( > 'connector' = 'print' >) >""") >t_env.sql_update("insert into mysqlsink select id,name,age from mySource ,LATERAL TABLE(splitStr(data)) as T(name, age)") >t_env.execute("test") > > >最终报错 >TypeError: Invalid result_type: result_type should be DataType but contains RowField(name, VARCHAR) >报错的地方是 >File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py", line 264, in __init__ > > >def __init__(self, func, input_types, result_types, deterministic=None, name=None): >super(UserDefinedTableFunctionWrapper, self).__init__( >func, input_types, deterministic, name) > > >if not isinstance(result_types, collections.Iterable): >result_types = [result_types] > > >for result_type in result_types: >if not isinstance(result_type, DataType): >raise TypeError( >"Invalid result_type: result_type should be DataType but contains {}".format( >result_type)) > > >self._result_types = result_types >self._judtf_placeholder = None > > >断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗 > > >另外的,假如我在 >上面在创建udtf的时候,如果这样写 >splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()]) >却可以正常运行,但是显然类型跟我实际运行的不对应 |
Hello,
这个算是个易用性的问题,我之前有创建了JIRA[1]。你现在直接用[DataTypes.STRING(), DataTypes.STRING()]作resultType就是对的。关于input_types那个问题,实际上input_types在内部是通过上游的result_type匹配得出来的,所以你这里没对应也是对的,1.12版本将不再需要指定result_type了。 Best, Xingbo [1] https://issues.apache.org/jira/browse/FLINK-19138 chenxuying <[hidden email]> 于2020年9月30日周三 下午7:18写道: > 上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), > DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(), > [DataTypes.STRING(), DataTypes.STRING()])<br/>udtf的第三个参数, > 好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪 > 在 2020-09-30 19:07:06,"chenxuying" <[hidden email]> 写道: > >版本: > >pyflink==1.0 > >apache-flink==1.11.2 > >代码如下: > >env = StreamExecutionEnvironment.get_execution_environment() > >env.set_parallelism(1) > >t_env = StreamTableEnvironment.create(env) > >t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", > 'true') > > > > > >class SplitStr(TableFunction): > > def eval(self, data): > > for row in data: > > yield row[0], row[1] > >splitStr = udtf( > > SplitStr(), > > DataTypes.ARRAY( > > DataTypes.ROW( > > [ > > DataTypes.FIELD("name", DataTypes.STRING()), > > DataTypes.FIELD("id", DataTypes.STRING()) > > ] > > ) > > ), > > DataTypes.ROW( > > [ > > DataTypes.FIELD("name", DataTypes.STRING()), > > DataTypes.FIELD("id", DataTypes.STRING()) > > ] > > ) > >) > >t_env.register_function("splitStr", splitStr) > > > > > >t_env.sql_update(""" > >CREATE TABLE mySource ( > > >id varchar, > >data array<ROW<name STRING,age STRING>> > >) WITH ( > >'connector' = 'kafka', > > 'topic' = 'mytesttopic', > > 'properties.bootstrap.servers' = '172.17.0.2:9092', > > 'properties.group.id' = 'flink-test-cxy', > > 'scan.startup.mode' = 'latest-offset', > > 'format' = 'json' > >) > >""") > >t_env.sql_update(""" > >CREATE TABLE mysqlsink ( > >id varchar > >,name varchar > >,age varchar > >) > >with ( > > 'connector' = 'print' > >) > >""") > >t_env.sql_update("insert into mysqlsink select id,name,age from mySource > ,LATERAL TABLE(splitStr(data)) as T(name, age)") > >t_env.execute("test") > > > > > >最终报错 > >TypeError: Invalid result_type: result_type should be DataType but > contains RowField(name, VARCHAR) > >报错的地方是 > >File > "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py", > line 264, in __init__ > > > > > >def __init__(self, func, input_types, result_types, deterministic=None, > name=None): > >super(UserDefinedTableFunctionWrapper, self).__init__( > >func, input_types, deterministic, name) > > > > > >if not isinstance(result_types, collections.Iterable): > >result_types = [result_types] > > > > > >for result_type in result_types: > >if not isinstance(result_type, DataType): > >raise TypeError( > >"Invalid result_type: result_type should be DataType but contains > {}".format( > >result_type)) > > > > > >self._result_types = result_types > >self._judtf_placeholder = None > > > > > >断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗 > > > > > >另外的,假如我在 > >上面在创建udtf的时候,如果这样写 > >splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), > DataTypes.BIGINT()]) > >却可以正常运行,但是显然类型跟我实际运行的不对应 > |
Free forum by Nabble | Edit this page |