您好,完全叙述一下我的问题:
1.首先我需要将一个定义好的python字典作为udf的输入参数,假设这个字典为dic = {1:'a',2:'b'} 那么我在定义udf的时候,如何写输入(一共两个输入参数,一是这个定义好的字典dic,二是一个DataTypes.ARRAY(DataTypes.STRING()),即下文的re_list) 我的方法是: class fun(ScalarFunction): def __int__(self): self.dic = {1:'a',2:'b'} def eval(self,re_list): #调用dic时,使用self.dic #...... return res #返回的输出也是一个python的字典,实际是想输出一个两层嵌套的json字典,{'table':'a','data':{'data1':'b','data2':'c'} st_env.register_function("fun",udf(fun(),DataTypes.ARRAY(DataTypes.STRING()),DataTypes.MAP(DataTypes.STRING(),DataTypes.STRING()))) #这样写是否正确 st_env.from_path("source").select("fun(t)").execute_insert("sink") 2.我在定义sink表采用 st_env.execute_sql(""" CREATE TABLE sink( table STRING, data STRING )WITH( 'connector' = 'filesystem', 'path' = 'home/res/', 'format' = 'csv') #format如果是json就报错ParseException:Encountered"table"at line 1,column 43 was expecting one of "CONSTRAINT" ... "PRIMARY" """) #我想要在问题1中打印出来输出的json字典,这样的ddl的定义方式是否正确 在 2020-09-07 11:33:06,"Xingbo Huang" <[hidden email]> 写道: >Hi, >你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。 > >你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas >[2] 来读取一个dataframe。 > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html >[2] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html > >Best, >Xingbo > >whh_960101 <[hidden email]> 于2020年9月7日周一 上午11:22写道: > >> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如 >> dic = {1:'a',2:'b'} >> 此时定义udf如下: >> >> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING()) >> def func(dic,f): >> ...... >> return L >> st_env.register_function("func", func) >> st_env.from_path("source").select("func(dic,t)").insert_into("sink") >> #这时我在外部定义好的数据类型dic字典如何作为参数传进来 >> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑 >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-04 16:02:56,"Xingbo Huang" <[hidden email]> 写道: >> >Hi, >> > >> >推荐你使用ddl来声明你上下游用的connector >> > >> >``` >> >table_env.execute_sql(""" >> >CREATE TABLE output ( >> >data STRING ARRAY >> >) WITH ( >> > 'connector' = 'filesystem', -- required: specify the connector >> > 'path' = 'file:///tmp/test.csv', -- required: path to a directory >> > 'format' = 'json', >> > 'json.fail-on-missing-field' = 'false', >> > 'json.ignore-parse-errors' = 'true' >> >) >> >""") >> > >> >> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result() >> >``` >> > >> >Best, >> >Xingbo >> > >> > >> > >> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午3:46写道: >> > >> >> 您好,我是想让输出insert_into到目标表中,具体如下: >> >> st_env=StreamExecutionEnvironment.get_execution_environment() >> >> st_env.connect了一个source table(table包含a字段), >> >> 然后 >> >> | st_env.connect(FileSystem().path('tmp')) \ | >> >> | | .with_format(OldCsv() | >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ | >> >> | | .with_schema(Schema() | >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ | >> >> | | .create_temporary_table('sink') | >> >> connect了一个sink表,format、schema都是DataTypes.ARRAY() >> >> 然后我定义了一个udf >> >> >> >> >> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING())) >> >> def func(a): >> >> rec_list = a.split(',') >> >> res_arr = np.arrary(rec_list,dtype=str) >> >> return res_arr >> >> st_env.register_function("func", func) >> >> st_env.from_path("source").select("func(a)").insert_into("sink") >> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串 >> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return >> >> res_arr[0],tmp文件里面的字符串就是正确。 >> >> 我想要得到array,该怎么解决? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-04 15:17:38,"Xingbo Huang" <[hidden email]> 写道: >> >> >Hi, >> >> > >> >> >你是调试的时候想看结果吗? >> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。 >> >> > >> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子 >> >> > >> >> >``` >> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b']) >> >> > >> >> >@udf(input_types=DataTypes.STRING(), >> >> >result_type=DataTypes.ARRAY(DataTypes.STRING())) >> >> >def func(a): >> >> > return np.array([a, a, a], dtype=str) >> >> > >> >> >table_env.register_function("func", func) >> >> > >> >> >table.select("func(b)").to_pandas() >> >> >``` >> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink >> >> > >> >> >Best, >> >> >Xingbo >> >> > >> >> >[1] >> >> > >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html >> >> > >> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午2:50写道: >> >> > >> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗 >> >> >> 我的udf输出了一个numpy.array(dtype = str), >> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING()) >> >> >> >> >> >> >> >> >> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING())) >> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容 >> >> >> 请问这个问题该怎么解决? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道: >> >> >> >Hi, >> >> >> > >> >> >> >> >> >> >> >> >> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事 >> >> >> > >> >> >> >Best, >> >> >> >Xingbo >> >> >> > >> >> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道: >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值 >> >> >> >> udf定义如下: >> >> >> >> >> >> >> >> >> >> >> >> >> >> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING()) >> >> >> >> def fun(data): >> >> >> >> b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错 >> >> >> >> >> >> >> >> >> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错 >> >> >> >> 希望您能给我提供好的解决办法,万分感谢! >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道: >> >> >> >> >Hi, >> >> >> >> > >> >> >> >> >我觉得你从头详细描述一下你的表结构。 >> >> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的, >> >> >> >> >> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。 >> >> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1] >> >> >> >> > >> >> >> >> >[1] >> >> >> >> > >> >> >> >> >> >> >> >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions >> >> >> >> > >> >> >> >> >Best, >> >> >> >> >Xingbo >> >> >> >> > >> >> >> >> ><[hidden email]> 于2020年9月3日周四 下午9:45写道: >> >> >> >> > >> >> >> >> >> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 >> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), >> >> >> DataTypes.STRING()] >> >> >> >> >> >> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 >> >> >> >> >> 或者正确写法是什么样的,感谢解答! >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> | | >> >> >> >> >> whh_960101 >> >> >> >> >> | >> >> >> >> >> | >> >> >> >> >> 邮箱:[hidden email] >> >> >> >> >> | >> >> >> >> >> >> >> >> >> >> 签名由 网易邮箱大师 定制 >> >> >> >> >> >> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道: >> >> >> >> >> Hi, >> >> >> >> >> input_types定义的是每一个列的具体类型。 >> >> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你 >> >> >> >> >> 正确的写法是 >> >> >> >> >> >> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), >> >> >> >> DataTypes.STRING()] >> >> >> >> >> >> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的) >> >> >> >> >> input_types=DataTypes.Row([DataTypes.FIELD("a", >> >> >> DataTypes.STRING()), >> >> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c", >> >> >> >> >> DataTypes.STRING())]) >> >> >> >> >> >> >> >> >> >> Best, >> >> >> >> >> Xingbo >> >> >> >> >> >> >> >> >> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道: >> >> >> >> >> >> >> >> >> >> > >> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid >> >> >> >> >> > input_type:input_type should be DataType but contain >> >> >> RowField(RECID, >> >> >> >> >> > VARCHAR) >> >> >> >> >> > 我的pyflink版本:1.11.1 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> |
Hi,
你那个嵌套json串没法用Map来玩(要求所有key是一个类型,value是一个类型),我觉得你得用Row来表示,类型声明类似于Row(String, Map(String, String))这种(根据你给的那个{'table':'a','data':{'data1':'b','data2':'c'}这样的数据来的), 你可以认为Row是我们提供的继承自tuple的,所以你的这个json串还得你自己转成Row,方式也简单。 比如你的data 是上面说的{'table':'a','data':{'data1':'b','data2':'c'} Row('table'=data['table'], 'data'=data['data']) Best, Xingbo whh_960101 <[hidden email]> 于2020年9月7日周一 下午3:11写道: > 您好,完全叙述一下我的问题: > 1.首先我需要将一个定义好的python字典作为udf的输入参数,假设这个字典为dic = {1:'a',2:'b'} > > 那么我在定义udf的时候,如何写输入(一共两个输入参数,一是这个定义好的字典dic,二是一个DataTypes.ARRAY(DataTypes.STRING()),即下文的re_list) > 我的方法是: > class fun(ScalarFunction): > def __int__(self): > self.dic = {1:'a',2:'b'} > def eval(self,re_list): > #调用dic时,使用self.dic > #...... > return res > #返回的输出也是一个python的字典,实际是想输出一个两层嵌套的json字典,{'table':'a','data':{'data1':'b','data2':'c'} > > > > > > st_env.register_function("fun",udf(fun(),DataTypes.ARRAY(DataTypes.STRING()),DataTypes.MAP(DataTypes.STRING(),DataTypes.STRING()))) > #这样写是否正确 > st_env.from_path("source").select("fun(t)").execute_insert("sink") > > > 2.我在定义sink表采用 > st_env.execute_sql(""" > CREATE TABLE sink( > table STRING, > data STRING > )WITH( > 'connector' = 'filesystem', > 'path' = 'home/res/', > 'format' = 'csv') #format如果是json就报错ParseException:Encountered"table"at > line 1,column 43 was expecting one of "CONSTRAINT" ... "PRIMARY" > """) #我想要在问题1中打印出来输出的json字典,这样的ddl的定义方式是否正确 > > > > > > > > > > > > 在 2020-09-07 11:33:06,"Xingbo Huang" <[hidden email]> 写道: > >Hi, > > >你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。 > > > > >你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas > >[2] 来读取一个dataframe。 > > > >[1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html > >[2] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html > > > >Best, > >Xingbo > > > >whh_960101 <[hidden email]> 于2020年9月7日周一 上午11:22写道: > > > >> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如 > >> dic = {1:'a',2:'b'} > >> 此时定义udf如下: > >> > >> > @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING()) > >> def func(dic,f): > >> ...... > >> return L > >> st_env.register_function("func", func) > >> st_env.from_path("source").select("func(dic,t)").insert_into("sink") > >> #这时我在外部定义好的数据类型dic字典如何作为参数传进来 > >> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-09-04 16:02:56,"Xingbo Huang" <[hidden email]> 写道: > >> >Hi, > >> > > >> >推荐你使用ddl来声明你上下游用的connector > >> > > >> >``` > >> >table_env.execute_sql(""" > >> >CREATE TABLE output ( > >> >data STRING ARRAY > >> >) WITH ( > >> > 'connector' = 'filesystem', -- required: specify the > connector > >> > 'path' = 'file:///tmp/test.csv', -- required: path to a directory > >> > 'format' = 'json', > >> > 'json.fail-on-missing-field' = 'false', > >> > 'json.ignore-parse-errors' = 'true' > >> >) > >> >""") > >> > > >> > >> > >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result() > >> >``` > >> > > >> >Best, > >> >Xingbo > >> > > >> > > >> > > >> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午3:46写道: > >> > > >> >> 您好,我是想让输出insert_into到目标表中,具体如下: > >> >> st_env=StreamExecutionEnvironment.get_execution_environment() > >> >> st_env.connect了一个source table(table包含a字段), > >> >> 然后 > >> >> | st_env.connect(FileSystem().path('tmp')) \ | > >> >> | | .with_format(OldCsv() | > >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ | > >> >> | | .with_schema(Schema() | > >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ | > >> >> | | .create_temporary_table('sink') | > >> >> connect了一个sink表,format、schema都是DataTypes.ARRAY() > >> >> 然后我定义了一个udf > >> >> > >> >> > >> > @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING())) > >> >> def func(a): > >> >> rec_list = a.split(',') > >> >> res_arr = np.arrary(rec_list,dtype=str) > >> >> return res_arr > >> >> st_env.register_function("func", func) > >> >> st_env.from_path("source").select("func(a)").insert_into("sink") > >> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串 > >> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return > >> >> res_arr[0],tmp文件里面的字符串就是正确。 > >> >> 我想要得到array,该怎么解决? > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 在 2020-09-04 15:17:38,"Xingbo Huang" <[hidden email]> 写道: > >> >> >Hi, > >> >> > > >> >> >你是调试的时候想看结果吗? > >> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。 > >> >> > > >> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子 > >> >> > > >> >> >``` > >> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b']) > >> >> > > >> >> >@udf(input_types=DataTypes.STRING(), > >> >> >result_type=DataTypes.ARRAY(DataTypes.STRING())) > >> >> >def func(a): > >> >> > return np.array([a, a, a], dtype=str) > >> >> > > >> >> >table_env.register_function("func", func) > >> >> > > >> >> >table.select("func(b)").to_pandas() > >> >> >``` > >> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink > >> >> > > >> >> >Best, > >> >> >Xingbo > >> >> > > >> >> >[1] > >> >> > > >> >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html > >> >> > > >> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午2:50写道: > >> >> > > >> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗 > >> >> >> 我的udf输出了一个numpy.array(dtype = str), > >> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING()) > >> >> >> > >> >> >> > >> >> > >> > 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING())) > >> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容 > >> >> >> 请问这个问题该怎么解决? > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道: > >> >> >> >Hi, > >> >> >> > > >> >> >> > >> >> >> > >> >> > >> > >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事 > >> >> >> > > >> >> >> >Best, > >> >> >> >Xingbo > >> >> >> > > >> >> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道: > >> >> >> > > >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值 > >> >> >> >> udf定义如下: > >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING()) > >> >> >> >> def fun(data): > >> >> >> >> b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错 > >> >> >> >> > >> >> >> >> > >> >> >> >> > 如果通过table.select("after.b")或者table.select('after').select('b')也会报错 > >> >> >> >> 希望您能给我提供好的解决办法,万分感谢! > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道: > >> >> >> >> >Hi, > >> >> >> >> > > >> >> >> >> >我觉得你从头详细描述一下你的表结构。 > >> >> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的, > >> >> >> >> > >> >> > >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。 > >> >> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1] > >> >> >> >> > > >> >> >> >> >[1] > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions > >> >> >> >> > > >> >> >> >> >Best, > >> >> >> >> >Xingbo > >> >> >> >> > > >> >> >> >> ><[hidden email]> 于2020年9月3日周四 下午9:45写道: > >> >> >> >> > > >> >> >> >> >> > >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 > >> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), > >> >> >> DataTypes.STRING()] > >> >> >> >> >> > >> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 > >> >> >> >> >> 或者正确写法是什么样的,感谢解答! > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> | | > >> >> >> >> >> whh_960101 > >> >> >> >> >> | > >> >> >> >> >> | > >> >> >> >> >> 邮箱:[hidden email] > >> >> >> >> >> | > >> >> >> >> >> > >> >> >> >> >> 签名由 网易邮箱大师 定制 > >> >> >> >> >> > >> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道: > >> >> >> >> >> Hi, > >> >> >> >> >> input_types定义的是每一个列的具体类型。 > >> >> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你 > >> >> >> >> >> 正确的写法是 > >> >> >> >> >> > >> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), > >> >> >> >> DataTypes.STRING()] > >> >> >> >> >> > >> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的) > >> >> >> >> >> input_types=DataTypes.Row([DataTypes.FIELD("a", > >> >> >> DataTypes.STRING()), > >> >> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), > DataTypes.FIELD("c", > >> >> >> >> >> DataTypes.STRING())]) > >> >> >> >> >> > >> >> >> >> >> Best, > >> >> >> >> >> Xingbo > >> >> >> >> >> > >> >> >> >> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道: > >> >> >> >> >> > >> >> >> >> >> > > >> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid > >> >> >> >> >> > input_type:input_type should be DataType but contain > >> >> >> RowField(RECID, > >> >> >> >> >> > VARCHAR) > >> >> >> >> >> > 我的pyflink版本:1.11.1 > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > |
Free forum by Nabble | Edit this page |