pyflink-udf 问题反馈

classic Classic list List threaded Threaded
55 messages Options
123
Reply | Threaded
Open this post in threaded view
|

pyflink-udf 问题反馈

whh_960101
您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid input_type:input_type should be DataType but contain RowField(RECID, VARCHAR)
我的pyflink版本:1.11.1
Reply | Threaded
Open this post in threaded view
|

Re: pyflink-udf 问题反馈

Xingbo Huang
Hi,
input_types定义的是每一个列的具体类型。
比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
正确的写法是

    input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]

针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
    input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
DataTypes.STRING())])

Best,
Xingbo

whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:

> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> input_type:input_type should be DataType but contain RowField(RECID,
> VARCHAR)
> 我的pyflink版本:1.11.1
Reply | Threaded
Open this post in threaded view
|

回复:pyflink-udf 问题反馈

whh_960101
我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]

在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
或者正确写法是什么样的,感谢解答!


| |
whh_960101
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年09月03日 21:14,Xingbo Huang 写道:
Hi,
input_types定义的是每一个列的具体类型。
比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
正确的写法是

   input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]

针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
   input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
DataTypes.STRING())])

Best,
Xingbo

whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:

> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> input_type:input_type should be DataType but contain RowField(RECID,
> VARCHAR)
> 我的pyflink版本:1.11.1
Reply | Threaded
Open this post in threaded view
|

Re: pyflink-udf 问题反馈

Xingbo Huang
Hi,

我觉得你从头详细描述一下你的表结构。
比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions

Best,
Xingbo

<[hidden email]> 于2020年9月3日周四 下午9:45写道:

> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>
> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> 或者正确写法是什么样的,感谢解答!
>
>
> | |
> whh_960101
> |
> |
> 邮箱:[hidden email]
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年09月03日 21:14,Xingbo Huang 写道:
> Hi,
> input_types定义的是每一个列的具体类型。
> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> 正确的写法是
>
>    input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>
> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>    input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> DataTypes.STRING())])
>
> Best,
> Xingbo
>
> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
>
> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> > input_type:input_type should be DataType but contain RowField(RECID,
> > VARCHAR)
> > 我的pyflink版本:1.11.1
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink-udf 问题反馈

whh_960101
您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
udf定义如下:
@udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
def fun(data):
     b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错


如果通过table.select("after.b")或者table.select('after').select('b')也会报错
希望您能给我提供好的解决办法,万分感谢!

















在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>
>我觉得你从头详细描述一下你的表结构。
>比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>
>Best,
>Xingbo
>
><[hidden email]> 于2020年9月3日周四 下午9:45写道:
>
>> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>>
>> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> 或者正确写法是什么样的,感谢解答!
>>
>>
>> | |
>> whh_960101
>> |
>> |
>> 邮箱:[hidden email]
>> |
>>
>> 签名由 网易邮箱大师 定制
>>
>> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> Hi,
>> input_types定义的是每一个列的具体类型。
>> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> 正确的写法是
>>
>>    input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>>
>> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>>    input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
>> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> DataTypes.STRING())])
>>
>> Best,
>> Xingbo
>>
>> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
>>
>> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> > input_type:input_type should be DataType but contain RowField(RECID,
>> > VARCHAR)
>> > 我的pyflink版本:1.11.1
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: pyflink-udf 问题反馈

Xingbo Huang
Hi,

你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事

Best,
Xingbo

whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道:

>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> udf定义如下:
>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> def fun(data):
>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>
>
> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> 希望您能给我提供好的解决办法,万分感谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道:
> >Hi,
> >
> >我觉得你从头详细描述一下你的表结构。
> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >
> >Best,
> >Xingbo
> >
> ><[hidden email]> 于2020年9月3日周四 下午9:45写道:
> >
> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
> >>
> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> 或者正确写法是什么样的,感谢解答!
> >>
> >>
> >> | |
> >> whh_960101
> >> |
> >> |
> >> 邮箱:[hidden email]
> >> |
> >>
> >> 签名由 网易邮箱大师 定制
> >>
> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> Hi,
> >> input_types定义的是每一个列的具体类型。
> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> 正确的写法是
> >>
> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()]
> >>
> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >>    input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> DataTypes.STRING())])
> >>
> >> Best,
> >> Xingbo
> >>
> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
> >>
> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> > input_type:input_type should be DataType but contain RowField(RECID,
> >> > VARCHAR)
> >> > 我的pyflink版本:1.11.1
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: pyflink-udf 问题反馈

whh_960101
您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
我的udf输出了一个numpy.array(dtype = str),  result_type设的是DataTypes.ARRAY(DataTypes.STRING())
把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
请问这个问题该怎么解决?

















在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>
>你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>
>Best,
>Xingbo
>
>whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道:
>
>>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> udf定义如下:
>>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> def fun(data):
>>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>>
>>
>> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> 希望您能给我提供好的解决办法,万分感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道:
>> >Hi,
>> >
>> >我觉得你从头详细描述一下你的表结构。
>> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >
>> >Best,
>> >Xingbo
>> >
>> ><[hidden email]> 于2020年9月3日周四 下午9:45写道:
>> >
>> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>> >>
>> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> 或者正确写法是什么样的,感谢解答!
>> >>
>> >>
>> >> | |
>> >> whh_960101
>> >> |
>> >> |
>> >> 邮箱:[hidden email]
>> >> |
>> >>
>> >> 签名由 网易邮箱大师 定制
>> >>
>> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> Hi,
>> >> input_types定义的是每一个列的具体类型。
>> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> 正确的写法是
>> >>
>> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> DataTypes.STRING()]
>> >>
>> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >>    input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
>> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> DataTypes.STRING())])
>> >>
>> >> Best,
>> >> Xingbo
>> >>
>> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
>> >>
>> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> > input_type:input_type should be DataType but contain RowField(RECID,
>> >> > VARCHAR)
>> >> > 我的pyflink版本:1.11.1
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: pyflink-udf 问题反馈

Xingbo Huang
Hi,

你是调试的时候想看结果吗?
你可以直接table.to_pandas()来看结果,或者用print connector来看。

个人觉得to_pandas最简单,比如你可以试试下面的例子

```
table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])

@udf(input_types=DataTypes.STRING(),
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
     return np.array([a, a, a], dtype=str)

table_env.register_function("func", func)

table.select("func(b)").to_pandas()
```
然后,你可以看看官方文档[1],让你快速上手PyFlink

Best,
Xingbo

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html

whh_960101 <[hidden email]> 于2020年9月4日周五 下午2:50写道:

> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> 我的udf输出了一个numpy.array(dtype = str),
> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>
> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> 请问这个问题该怎么解决?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道:
> >Hi,
> >
>
> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >
> >Best,
> >Xingbo
> >
> >whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道:
> >
> >>
> >>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> udf定义如下:
> >>
> >>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> def fun(data):
> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >>
> >>
> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> 希望您能给我提供好的解决办法,万分感谢!
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道:
> >> >Hi,
> >> >
> >> >我觉得你从头详细描述一下你的表结构。
> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> ><[hidden email]> 于2020年9月3日周四 下午9:45写道:
> >> >
> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()]
> >> >>
> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> 或者正确写法是什么样的,感谢解答!
> >> >>
> >> >>
> >> >> | |
> >> >> whh_960101
> >> >> |
> >> >> |
> >> >> 邮箱:[hidden email]
> >> >> |
> >> >>
> >> >> 签名由 网易邮箱大师 定制
> >> >>
> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> Hi,
> >> >> input_types定义的是每一个列的具体类型。
> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> 正确的写法是
> >> >>
> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> DataTypes.STRING()]
> >> >>
> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
> DataTypes.STRING()),
> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> >> DataTypes.STRING())])
> >> >>
> >> >> Best,
> >> >> Xingbo
> >> >>
> >> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
> >> >>
> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> >> > input_type:input_type should be DataType but contain
> RowField(RECID,
> >> >> > VARCHAR)
> >> >> > 我的pyflink版本:1.11.1
> >> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: pyflink-udf 问题反馈

whh_960101
您好,我是想让输出insert_into到目标表中,具体如下:
st_env=StreamExecutionEnvironment.get_execution_environment()
st_env.connect了一个source table(table包含a字段),
然后
| st_env.connect(FileSystem().path('tmp')) \ |
| | .with_format(OldCsv() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
| | .with_schema(Schema() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
| | .create_temporary_table('sink') |
connect了一个sink表,format、schema都是DataTypes.ARRAY()
然后我定义了一个udf
@udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
    rec_list = a.split(',')
    res_arr = np.arrary(rec_list,dtype=str)
    return res_arr
st_env.register_function("func", func)
st_env.from_path("source").select("func(a)").insert_into("sink")
最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return res_arr[0],tmp文件里面的字符串就是正确。
我想要得到array,该怎么解决?



















在 2020-09-04 15:17:38,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>
>你是调试的时候想看结果吗?
>你可以直接table.to_pandas()来看结果,或者用print connector来看。
>
>个人觉得to_pandas最简单,比如你可以试试下面的例子
>
>```
>table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>
>@udf(input_types=DataTypes.STRING(),
>result_type=DataTypes.ARRAY(DataTypes.STRING()))
>def func(a):
>     return np.array([a, a, a], dtype=str)
>
>table_env.register_function("func", func)
>
>table.select("func(b)").to_pandas()
>```
>然后,你可以看看官方文档[1],让你快速上手PyFlink
>
>Best,
>Xingbo
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>
>whh_960101 <[hidden email]> 于2020年9月4日周五 下午2:50写道:
>
>> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> 我的udf输出了一个numpy.array(dtype = str),
>> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> 请问这个问题该怎么解决?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道:
>> >Hi,
>> >
>>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >
>> >Best,
>> >Xingbo
>> >
>> >whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道:
>> >
>> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> udf定义如下:
>> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> def fun(data):
>> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >>
>> >>
>> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> 希望您能给我提供好的解决办法,万分感谢!
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道:
>> >> >Hi,
>> >> >
>> >> >我觉得你从头详细描述一下你的表结构。
>> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> ><[hidden email]> 于2020年9月3日周四 下午9:45写道:
>> >> >
>> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> DataTypes.STRING()]
>> >> >>
>> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> >> 或者正确写法是什么样的,感谢解答!
>> >> >>
>> >> >>
>> >> >> | |
>> >> >> whh_960101
>> >> >> |
>> >> >> |
>> >> >> 邮箱:[hidden email]
>> >> >> |
>> >> >>
>> >> >> 签名由 网易邮箱大师 定制
>> >> >>
>> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> >> Hi,
>> >> >> input_types定义的是每一个列的具体类型。
>> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> >> 正确的写法是
>> >> >>
>> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> DataTypes.STRING()]
>> >> >>
>> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
>> DataTypes.STRING()),
>> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> >> DataTypes.STRING())])
>> >> >>
>> >> >> Best,
>> >> >> Xingbo
>> >> >>
>> >> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
>> >> >>
>> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> >> > input_type:input_type should be DataType but contain
>> RowField(RECID,
>> >> >> > VARCHAR)
>> >> >> > 我的pyflink版本:1.11.1
>> >> >>
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: pyflink-udf 问题反馈

whh_960101
In reply to this post by Xingbo Huang
您好,我是想让输出insert_into到目标表中,具体如下:
st_env=StreamExecutionEnvironment.get_execution_environment()
st_env.connect了一个source table(table包含a字段),
然后
| st_env.connect(FileSystem().path('tmp')) \ |
| | .with_format(OldCsv() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
| | .with_schema(Schema() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
| | .create_temporary_table('sink') |
connect了一个sink表,format、schema都是DataTypes.ARRAY()
然后我定义了一个udf
@udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
    rec_list = a.split(',')
    res_arr = np.arrary(rec_list,dtype=str)
    return res_arr
st_env.register_function("func", func)
st_env.from_path("source").select("func(a)").insert_into("sink")
最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容(假如res_arr=['1','2','3']),如果我单独返回一个值,比如return res_arr[0],tmp文件就显示'1'。
我想要得到array,该怎么解决?



















在 2020-09-04 15:17:38,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>
>你是调试的时候想看结果吗?
>你可以直接table.to_pandas()来看结果,或者用print connector来看。
>
>个人觉得to_pandas最简单,比如你可以试试下面的例子
>
>```
>table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>
>@udf(input_types=DataTypes.STRING(),
>result_type=DataTypes.ARRAY(DataTypes.STRING()))
>def func(a):
>     return np.array([a, a, a], dtype=str)
>
>table_env.register_function("func", func)
>
>table.select("func(b)").to_pandas()
>```
>然后,你可以看看官方文档[1],让你快速上手PyFlink
>
>Best,
>Xingbo
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>
>whh_960101 <[hidden email]> 于2020年9月4日周五 下午2:50写道:
>
>> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> 我的udf输出了一个numpy.array(dtype = str),
>> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> 请问这个问题该怎么解决?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道:
>> >Hi,
>> >
>>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >
>> >Best,
>> >Xingbo
>> >
>> >whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道:
>> >
>> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> udf定义如下:
>> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> def fun(data):
>> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >>
>> >>
>> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> 希望您能给我提供好的解决办法,万分感谢!
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道:
>> >> >Hi,
>> >> >
>> >> >我觉得你从头详细描述一下你的表结构。
>> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> ><[hidden email]> 于2020年9月3日周四 下午9:45写道:
>> >> >
>> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> DataTypes.STRING()]
>> >> >>
>> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> >> 或者正确写法是什么样的,感谢解答!
>> >> >>
>> >> >>
>> >> >> | |
>> >> >> whh_960101
>> >> >> |
>> >> >> |
>> >> >> 邮箱:[hidden email]
>> >> >> |
>> >> >>
>> >> >> 签名由 网易邮箱大师 定制
>> >> >>
>> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> >> Hi,
>> >> >> input_types定义的是每一个列的具体类型。
>> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> >> 正确的写法是
>> >> >>
>> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> DataTypes.STRING()]
>> >> >>
>> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
>> DataTypes.STRING()),
>> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> >> DataTypes.STRING())])
>> >> >>
>> >> >> Best,
>> >> >> Xingbo
>> >> >>
>> >> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
>> >> >>
>> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> >> > input_type:input_type should be DataType but contain
>> RowField(RECID,
>> >> >> > VARCHAR)
>> >> >> > 我的pyflink版本:1.11.1
>> >> >>
>> >>
>>





 
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: pyflink-udf 问题反馈

Xingbo Huang
In reply to this post by whh_960101
Hi,

推荐你使用ddl来声明你上下游用的connector

```
table_env.execute_sql("""
CREATE TABLE output (
data STRING ARRAY
) WITH (
 'connector' = 'filesystem',           -- required: specify the connector
 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)
""")

table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
```

Best,
Xingbo



whh_960101 <[hidden email]> 于2020年9月4日周五 下午3:46写道:

> 您好,我是想让输出insert_into到目标表中,具体如下:
> st_env=StreamExecutionEnvironment.get_execution_environment()
> st_env.connect了一个source table(table包含a字段),
> 然后
> | st_env.connect(FileSystem().path('tmp')) \ |
> | | .with_format(OldCsv() |
> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> | | .with_schema(Schema() |
> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> | | .create_temporary_table('sink') |
> connect了一个sink表,format、schema都是DataTypes.ARRAY()
> 然后我定义了一个udf
>
> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def func(a):
>     rec_list = a.split(',')
>     res_arr = np.arrary(rec_list,dtype=str)
>     return res_arr
> st_env.register_function("func", func)
> st_env.from_path("source").select("func(a)").insert_into("sink")
> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return
> res_arr[0],tmp文件里面的字符串就是正确。
> 我想要得到array,该怎么解决?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-04 15:17:38,"Xingbo Huang" <[hidden email]> 写道:
> >Hi,
> >
> >你是调试的时候想看结果吗?
> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
> >
> >个人觉得to_pandas最简单,比如你可以试试下面的例子
> >
> >```
> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
> >
> >@udf(input_types=DataTypes.STRING(),
> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >def func(a):
> >     return np.array([a, a, a], dtype=str)
> >
> >table_env.register_function("func", func)
> >
> >table.select("func(b)").to_pandas()
> >```
> >然后,你可以看看官方文档[1],让你快速上手PyFlink
> >
> >Best,
> >Xingbo
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >
> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午2:50写道:
> >
> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> >> 我的udf输出了一个numpy.array(dtype = str),
> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
> >>
> >>
> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> >> 请问这个问题该怎么解决?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道:
> >> >Hi,
> >> >
> >>
> >>
> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道:
> >> >
> >> >>
> >> >>
> >>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> >> udf定义如下:
> >> >>
> >> >>
> >>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> >> def fun(data):
> >> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >> >>
> >> >>
> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> >> 希望您能给我提供好的解决办法,万分感谢!
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道:
> >> >> >Hi,
> >> >> >
> >> >> >我觉得你从头详细描述一下你的表结构。
> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >>
> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >> >
> >> >> >[1]
> >> >> >
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >> >
> >> >> >Best,
> >> >> >Xingbo
> >> >> >
> >> >> ><[hidden email]> 于2020年9月3日周四 下午9:45写道:
> >> >> >
> >> >> >>
> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> DataTypes.STRING()]
> >> >> >>
> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> >> 或者正确写法是什么样的,感谢解答!
> >> >> >>
> >> >> >>
> >> >> >> | |
> >> >> >> whh_960101
> >> >> >> |
> >> >> >> |
> >> >> >> 邮箱:[hidden email]
> >> >> >> |
> >> >> >>
> >> >> >> 签名由 网易邮箱大师 定制
> >> >> >>
> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> >> Hi,
> >> >> >> input_types定义的是每一个列的具体类型。
> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> >> 正确的写法是
> >> >> >>
> >> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> DataTypes.STRING()]
> >> >> >>
> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
> >> DataTypes.STRING()),
> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> >> >> DataTypes.STRING())])
> >> >> >>
> >> >> >> Best,
> >> >> >> Xingbo
> >> >> >>
> >> >> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
> >> >> >>
> >> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> >> >> > input_type:input_type should be DataType but contain
> >> RowField(RECID,
> >> >> >> > VARCHAR)
> >> >> >> > 我的pyflink版本:1.11.1
> >> >> >>
> >> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: Re: pyflink-udf 问题反馈

whh_960101
您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
dic = {1:'a',2:'b'}
此时定义udf如下:
@udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
def func(dic,f):
   ......
   return L
st_env.register_function("func", func)
st_env.from_path("source").select("func(dic,t)").insert_into("sink") #这时我在外部定义好的数据类型dic字典如何作为参数传进来
这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑











在 2020-09-04 16:02:56,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>
>推荐你使用ddl来声明你上下游用的connector
>
>```
>table_env.execute_sql("""
>CREATE TABLE output (
>data STRING ARRAY
>) WITH (
> 'connector' = 'filesystem',           -- required: specify the connector
> 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
> 'format' = 'json',
> 'json.fail-on-missing-field' = 'false',
> 'json.ignore-parse-errors' = 'true'
>)
>""")
>
>table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
>```
>
>Best,
>Xingbo
>
>
>
>whh_960101 <[hidden email]> 于2020年9月4日周五 下午3:46写道:
>
>> 您好,我是想让输出insert_into到目标表中,具体如下:
>> st_env=StreamExecutionEnvironment.get_execution_environment()
>> st_env.connect了一个source table(table包含a字段),
>> 然后
>> | st_env.connect(FileSystem().path('tmp')) \ |
>> | | .with_format(OldCsv() |
>> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
>> | | .with_schema(Schema() |
>> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
>> | | .create_temporary_table('sink') |
>> connect了一个sink表,format、schema都是DataTypes.ARRAY()
>> 然后我定义了一个udf
>>
>> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> def func(a):
>>     rec_list = a.split(',')
>>     res_arr = np.arrary(rec_list,dtype=str)
>>     return res_arr
>> st_env.register_function("func", func)
>> st_env.from_path("source").select("func(a)").insert_into("sink")
>> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return
>> res_arr[0],tmp文件里面的字符串就是正确。
>> 我想要得到array,该怎么解决?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 15:17:38,"Xingbo Huang" <[hidden email]> 写道:
>> >Hi,
>> >
>> >你是调试的时候想看结果吗?
>> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
>> >
>> >个人觉得to_pandas最简单,比如你可以试试下面的例子
>> >
>> >```
>> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>> >
>> >@udf(input_types=DataTypes.STRING(),
>> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >def func(a):
>> >     return np.array([a, a, a], dtype=str)
>> >
>> >table_env.register_function("func", func)
>> >
>> >table.select("func(b)").to_pandas()
>> >```
>> >然后,你可以看看官方文档[1],让你快速上手PyFlink
>> >
>> >Best,
>> >Xingbo
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>> >
>> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午2:50写道:
>> >
>> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> >> 我的udf输出了一个numpy.array(dtype = str),
>> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>> >>
>> >>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> >> 请问这个问题该怎么解决?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道:
>> >> >Hi,
>> >> >
>> >>
>> >>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道:
>> >> >
>> >> >>
>> >> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> >> udf定义如下:
>> >> >>
>> >> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> >> def fun(data):
>> >> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >> >>
>> >> >>
>> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> >> 希望您能给我提供好的解决办法,万分感谢!
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道:
>> >> >> >Hi,
>> >> >> >
>> >> >> >我觉得你从头详细描述一下你的表结构。
>> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >>
>> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >> >
>> >> >> >[1]
>> >> >> >
>> >> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >> >
>> >> >> >Best,
>> >> >> >Xingbo
>> >> >> >
>> >> >> ><[hidden email]> 于2020年9月3日周四 下午9:45写道:
>> >> >> >
>> >> >> >>
>> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> DataTypes.STRING()]
>> >> >> >>
>> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> >> >> 或者正确写法是什么样的,感谢解答!
>> >> >> >>
>> >> >> >>
>> >> >> >> | |
>> >> >> >> whh_960101
>> >> >> >> |
>> >> >> >> |
>> >> >> >> 邮箱:[hidden email]
>> >> >> >> |
>> >> >> >>
>> >> >> >> 签名由 网易邮箱大师 定制
>> >> >> >>
>> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> >> >> Hi,
>> >> >> >> input_types定义的是每一个列的具体类型。
>> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> >> >> 正确的写法是
>> >> >> >>
>> >> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> >> DataTypes.STRING()]
>> >> >> >>
>> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
>> >> DataTypes.STRING()),
>> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> >> >> DataTypes.STRING())])
>> >> >> >>
>> >> >> >> Best,
>> >> >> >> Xingbo
>> >> >> >>
>> >> >> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
>> >> >> >>
>> >> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> >> >> > input_type:input_type should be DataType but contain
>> >> RowField(RECID,
>> >> >> >> > VARCHAR)
>> >> >> >> > 我的pyflink版本:1.11.1
>> >> >> >>
>> >> >>
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Re: pyflink-udf 问题反馈

Xingbo Huang
Hi,
你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。

你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
[2] 来读取一个dataframe。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html

Best,
Xingbo

whh_960101 <[hidden email]> 于2020年9月7日周一 上午11:22写道:

> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
> dic = {1:'a',2:'b'}
> 此时定义udf如下:
>
> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
> def func(dic,f):
>    ......
>    return L
> st_env.register_function("func", func)
> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-04 16:02:56,"Xingbo Huang" <[hidden email]> 写道:
> >Hi,
> >
> >推荐你使用ddl来声明你上下游用的connector
> >
> >```
> >table_env.execute_sql("""
> >CREATE TABLE output (
> >data STRING ARRAY
> >) WITH (
> > 'connector' = 'filesystem',           -- required: specify the connector
> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
> > 'format' = 'json',
> > 'json.fail-on-missing-field' = 'false',
> > 'json.ignore-parse-errors' = 'true'
> >)
> >""")
> >
>
> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
> >```
> >
> >Best,
> >Xingbo
> >
> >
> >
> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午3:46写道:
> >
> >> 您好,我是想让输出insert_into到目标表中,具体如下:
> >> st_env=StreamExecutionEnvironment.get_execution_environment()
> >> st_env.connect了一个source table(table包含a字段),
> >> 然后
> >> | st_env.connect(FileSystem().path('tmp')) \ |
> >> | | .with_format(OldCsv() |
> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> >> | | .with_schema(Schema() |
> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> >> | | .create_temporary_table('sink') |
> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
> >> 然后我定义了一个udf
> >>
> >>
> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> def func(a):
> >>     rec_list = a.split(',')
> >>     res_arr = np.arrary(rec_list,dtype=str)
> >>     return res_arr
> >> st_env.register_function("func", func)
> >> st_env.from_path("source").select("func(a)").insert_into("sink")
> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
> >> res_arr[0],tmp文件里面的字符串就是正确。
> >> 我想要得到array,该怎么解决?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-04 15:17:38,"Xingbo Huang" <[hidden email]> 写道:
> >> >Hi,
> >> >
> >> >你是调试的时候想看结果吗?
> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
> >> >
> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
> >> >
> >> >```
> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
> >> >
> >> >@udf(input_types=DataTypes.STRING(),
> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >def func(a):
> >> >     return np.array([a, a, a], dtype=str)
> >> >
> >> >table_env.register_function("func", func)
> >> >
> >> >table.select("func(b)").to_pandas()
> >> >```
> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >> >
> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午2:50写道:
> >> >
> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> >> >> 我的udf输出了一个numpy.array(dtype = str),
> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
> >> >>
> >> >>
> >>
> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> >> >> 请问这个问题该怎么解决?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道:
> >> >> >Hi,
> >> >> >
> >> >>
> >> >>
> >>
> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >> >> >
> >> >> >Best,
> >> >> >Xingbo
> >> >> >
> >> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道:
> >> >> >
> >> >> >>
> >> >> >>
> >> >>
> >>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> >> >> udf定义如下:
> >> >> >>
> >> >> >>
> >> >>
> >>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> >> >> def fun(data):
> >> >> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >> >> >>
> >> >> >>
> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> >> >> 希望您能给我提供好的解决办法,万分感谢!
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道:
> >> >> >> >Hi,
> >> >> >> >
> >> >> >> >我觉得你从头详细描述一下你的表结构。
> >> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >> >>
> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >> >> >
> >> >> >> >[1]
> >> >> >> >
> >> >> >>
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >> >> >
> >> >> >> >Best,
> >> >> >> >Xingbo
> >> >> >> >
> >> >> >> ><[hidden email]> 于2020年9月3日周四 下午9:45写道:
> >> >> >> >
> >> >> >> >>
> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> DataTypes.STRING()]
> >> >> >> >>
> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> >> >> 或者正确写法是什么样的,感谢解答!
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> | |
> >> >> >> >> whh_960101
> >> >> >> >> |
> >> >> >> >> |
> >> >> >> >> 邮箱:[hidden email]
> >> >> >> >> |
> >> >> >> >>
> >> >> >> >> 签名由 网易邮箱大师 定制
> >> >> >> >>
> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> >> >> Hi,
> >> >> >> >> input_types定义的是每一个列的具体类型。
> >> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> >> >> 正确的写法是
> >> >> >> >>
> >> >> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> >> DataTypes.STRING()]
> >> >> >> >>
> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
> >> >> DataTypes.STRING()),
> >> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> >> >> >> DataTypes.STRING())])
> >> >> >> >>
> >> >> >> >> Best,
> >> >> >> >> Xingbo
> >> >> >> >>
> >> >> >> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
> >> >> >> >>
> >> >> >> >> >
> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> >> >> >> > input_type:input_type should be DataType but contain
> >> >> RowField(RECID,
> >> >> >> >> > VARCHAR)
> >> >> >> >> > 我的pyflink版本:1.11.1
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: Re: Re: pyflink-udf 问题反馈

whh_960101
您好,
图中像datagen和print这样的connector更像是表名,之前听说的只有json、csv、filesystem这种类型的connector,请问connector在使用连接器DDL创建表时的作用是什么







在 2020-09-07 11:33:06,"Xingbo Huang" <[hidden email]> 写道: >Hi, >你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。 > >你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas >[2] 来读取一个dataframe。 > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html >[2] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html > >Best, >Xingbo > >whh_960101 <[hidden email]> 于2020年9月7日周一 上午11:22写道: > >> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如 >> dic = {1:'a',2:'b'} >> 此时定义udf如下: >> >> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING()) >> def func(dic,f): >> ...... >> return L >> st_env.register_function("func", func) >> st_env.from_path("source").select("func(dic,t)").insert_into("sink") >> #这时我在外部定义好的数据类型dic字典如何作为参数传进来 >> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑 >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-04 16:02:56,"Xingbo Huang" <[hidden email]> 写道: >> >Hi, >> > >> >推荐你使用ddl来声明你上下游用的connector >> > >> >``` >> >table_env.execute_sql(""" >> >CREATE TABLE output ( >> >data STRING ARRAY >> >) WITH ( >> > 'connector' = 'filesystem', -- required: specify the connector >> > 'path' = 'file:///tmp/test.csv', -- required: path to a directory >> > 'format' = 'json', >> > 'json.fail-on-missing-field' = 'false', >> > 'json.ignore-parse-errors' = 'true' >> >) >> >""") >> > >> >> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result() >> >``` >> > >> >Best, >> >Xingbo >> > >> > >> > >> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午3:46写道: >> > >> >> 您好,我是想让输出insert_into到目标表中,具体如下: >> >> st_env=StreamExecutionEnvironment.get_execution_environment() >> >> st_env.connect了一个source table(table包含a字段), >> >> 然后 >> >> | st_env.connect(FileSystem().path('tmp')) \ | >> >> | | .with_format(OldCsv() | >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ | >> >> | | .with_schema(Schema() | >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ | >> >> | | .create_temporary_table('sink') | >> >> connect了一个sink表,format、schema都是DataTypes.ARRAY() >> >> 然后我定义了一个udf >> >> >> >> >> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING())) >> >> def func(a): >> >> rec_list = a.split(',') >> >> res_arr = np.arrary(rec_list,dtype=str) >> >> return res_arr >> >> st_env.register_function("func", func) >> >> st_env.from_path("source").select("func(a)").insert_into("sink") >> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串 >> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return >> >> res_arr[0],tmp文件里面的字符串就是正确。 >> >> 我想要得到array,该怎么解决? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-04 15:17:38,"Xingbo Huang" <[hidden email]> 写道: >> >> >Hi, >> >> > >> >> >你是调试的时候想看结果吗? >> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。 >> >> > >> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子 >> >> > >> >> >``` >> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b']) >> >> > >> >> >@udf(input_types=DataTypes.STRING(), >> >> >result_type=DataTypes.ARRAY(DataTypes.STRING())) >> >> >def func(a): >> >> > return np.array([a, a, a], dtype=str) >> >> > >> >> >table_env.register_function("func", func) >> >> > >> >> >table.select("func(b)").to_pandas() >> >> >``` >> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink >> >> > >> >> >Best, >> >> >Xingbo >> >> > >> >> >[1] >> >> > >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html >> >> > >> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午2:50写道: >> >> > >> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗 >> >> >> 我的udf输出了一个numpy.array(dtype = str), >> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING()) >> >> >> >> >> >> >> >> >> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING())) >> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容 >> >> >> 请问这个问题该怎么解决? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道: >> >> >> >Hi, >> >> >> > >> >> >> >> >> >> >> >> >> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事 >> >> >> > >> >> >> >Best, >> >> >> >Xingbo >> >> >> > >> >> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道: >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值 >> >> >> >> udf定义如下: >> >> >> >> >> >> >> >> >> >> >> >> >> >> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING()) >> >> >> >> def fun(data): >> >> >> >> b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错 >> >> >> >> >> >> >> >> >> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错 >> >> >> >> 希望您能给我提供好的解决办法,万分感谢! >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道: >> >> >> >> >Hi, >> >> >> >> > >> >> >> >> >我觉得你从头详细描述一下你的表结构。 >> >> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的, >> >> >> >> >> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。 >> >> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1] >> >> >> >> > >> >> >> >> >[1] >> >> >> >> > >> >> >> >> >> >> >> >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions >> >> >> >> > >> >> >> >> >Best, >> >> >> >> >Xingbo >> >> >> >> > >> >> >> >> ><[hidden email]> 于2020年9月3日周四 下午9:45写道: >> >> >> >> > >> >> >> >> >> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 >> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), >> >> >> DataTypes.STRING()] >> >> >> >> >> >> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 >> >> >> >> >> 或者正确写法是什么样的,感谢解答! >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> | | >> >> >> >> >> whh_960101 >> >> >> >> >> | >> >> >> >> >> | >> >> >> >> >> 邮箱:[hidden email] >> >> >> >> >> | >> >> >> >> >> >> >> >> >> >> 签名由 网易邮箱大师 定制 >> >> >> >> >> >> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道: >> >> >> >> >> Hi, >> >> >> >> >> input_types定义的是每一个列的具体类型。 >> >> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你 >> >> >> >> >> 正确的写法是 >> >> >> >> >> >> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), >> >> >> >> DataTypes.STRING()] >> >> >> >> >> >> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的) >> >> >> >> >> input_types=DataTypes.Row([DataTypes.FIELD("a", >> >> >> DataTypes.STRING()), >> >> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c", >> >> >> >> >> DataTypes.STRING())]) >> >> >> >> >> >> >> >> >> >> Best, >> >> >> >> >> Xingbo >> >> >> >> >> >> >> >> >> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道: >> >> >> >> >> >> >> >> >> >> > >> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid >> >> >> >> >> > input_type:input_type should be DataType but contain >> >> >> RowField(RECID, >> >> >> >> >> > VARCHAR) >> >> >> >> >> > 我的pyflink版本:1.11.1 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>


 

Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Re: Re: pyflink-udf 问题反馈

Xingbo Huang
Hi,

你这个图挂了。json, csv这种是format[1] 。filesystem,datagen, print,
kafka等这种都是connector[2] ,用来从外部一个数据源读入数据或者写出数据。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
Best,
Xingbo

whh_960101 <[hidden email]> 于2020年9月7日周一 下午5:14写道:

> 您好,
>
> 图中像datagen和print这样的connector更像是表名,之前听说的只有json、csv、filesystem这种类型的connector,请问connector在使用连接器DDL创建表时的作用是什么
>
>
>
>
>
>
>
> 在 2020-09-07 11:33:06,"Xingbo Huang" <[hidden email]> 写道:
> >Hi,
> >你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。
> >
> >你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
> >[2] 来读取一个dataframe。
> >
> >[1]
> >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >[2]
> >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html
> >
> >Best,
> >Xingbo
> >
> >whh_960101 <[hidden email]> 于2020年9月7日周一 上午11:22写道:
> >
> >> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
> >> dic = {1:'a',2:'b'}
> >> 此时定义udf如下:
> >>
> >> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
> >> def func(dic,f):
> >>    ......
> >>    return L
> >> st_env.register_function("func", func)
> >> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
> >> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
> >> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-04 16:02:56,"Xingbo Huang" <[hidden email]> 写道:
> >> >Hi,
> >> >
> >> >推荐你使用ddl来声明你上下游用的connector
> >> >
> >> >```
> >> >table_env.execute_sql("""
> >> >CREATE TABLE output (
> >> >data STRING ARRAY
> >> >) WITH (
> >> > 'connector' = 'filesystem',           -- required: specify the connector
> >> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
> >> > 'format' = 'json',
> >> > 'json.fail-on-missing-field' = 'false',
> >> > 'json.ignore-parse-errors' = 'true'
> >> >)
> >> >""")
> >> >
> >>
> >> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
> >> >```
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> >
> >> >
> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午3:46写道:
> >> >
> >> >> 您好,我是想让输出insert_into到目标表中,具体如下:
> >> >> st_env=StreamExecutionEnvironment.get_execution_environment()
> >> >> st_env.connect了一个source table(table包含a字段),
> >> >> 然后
> >> >> | st_env.connect(FileSystem().path('tmp')) \ |
> >> >> | | .with_format(OldCsv() |
> >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> >> >> | | .with_schema(Schema() |
> >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> >> >> | | .create_temporary_table('sink') |
> >> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
> >> >> 然后我定义了一个udf
> >> >>
> >> >>
> >> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >> def func(a):
> >> >>     rec_list = a.split(',')
> >> >>     res_arr = np.arrary(rec_list,dtype=str)
> >> >>     return res_arr
> >> >> st_env.register_function("func", func)
> >> >> st_env.from_path("source").select("func(a)").insert_into("sink")
> >> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
> >> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
> >> >> res_arr[0],tmp文件里面的字符串就是正确。
> >> >> 我想要得到array,该怎么解决?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-04 15:17:38,"Xingbo Huang" <[hidden email]> 写道:
> >> >> >Hi,
> >> >> >
> >> >> >你是调试的时候想看结果吗?
> >> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
> >> >> >
> >> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
> >> >> >
> >> >> >```
> >> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
> >> >> >
> >> >> >@udf(input_types=DataTypes.STRING(),
> >> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >> >def func(a):
> >> >> >     return np.array([a, a, a], dtype=str)
> >> >> >
> >> >> >table_env.register_function("func", func)
> >> >> >
> >> >> >table.select("func(b)").to_pandas()
> >> >> >```
> >> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink
> >> >> >
> >> >> >Best,
> >> >> >Xingbo
> >> >> >
> >> >> >[1]
> >> >> >
> >> >>
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >> >> >
> >> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 下午2:50写道:
> >> >> >
> >> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> >> >> >> 我的udf输出了一个numpy.array(dtype = str),
> >> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
> >> >> >>
> >> >> >>
> >> >>
> >> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> >> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> >> >> >> 请问这个问题该怎么解决?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <[hidden email]> 写道:
> >> >> >> >Hi,
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >>
> >> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >> >> >> >
> >> >> >> >Best,
> >> >> >> >Xingbo
> >> >> >> >
> >> >> >> >whh_960101 <[hidden email]> 于2020年9月4日周五 上午9:26写道:
> >> >> >> >
> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> >> >> >> udf定义如下:
> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> >> >> >> def fun(data):
> >> >> >> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> >> >> >> 希望您能给我提供好的解决办法,万分感谢!
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[hidden email]> 写道:
> >> >> >> >> >Hi,
> >> >> >> >> >
> >> >> >> >> >我觉得你从头详细描述一下你的表结构。
> >> >> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >> >> >>
> >> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >> >> >> >
> >> >> >> >> >[1]
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >> >> >> >
> >> >> >> >> >Best,
> >> >> >> >> >Xingbo
> >> >> >> >> >
> >> >> >> >> ><[hidden email]> 于2020年9月3日周四 下午9:45写道:
> >> >> >> >> >
> >> >> >> >> >>
> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> >> DataTypes.STRING()]
> >> >> >> >> >>
> >> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> >> >> >> 或者正确写法是什么样的,感谢解答!
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >> | |
> >> >> >> >> >> whh_960101
> >> >> >> >> >> |
> >> >> >> >> >> |
> >> >> >> >> >> 邮箱:[hidden email]
> >> >> >> >> >> |
> >> >> >> >> >>
> >> >> >> >> >> 签名由 网易邮箱大师 定制
> >> >> >> >> >>
> >> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> >> >> >> Hi,
> >> >> >> >> >> input_types定义的是每一个列的具体类型。
> >> >> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> >> >> >> 正确的写法是
> >> >> >> >> >>
> >> >> >> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> >> >> DataTypes.STRING()]
> >> >> >> >> >>
> >> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >> >> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
> >> >> >> DataTypes.STRING()),
> >> >> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> >> >> >> >> DataTypes.STRING())])
> >> >> >> >> >>
> >> >> >> >> >> Best,
> >> >> >> >> >> Xingbo
> >> >> >> >> >>
> >> >> >> >> >> whh_960101 <[hidden email]> 于2020年9月3日周四 下午9:03写道:
> >> >> >> >> >>
> >> >> >> >> >> >
> >> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> >> >> >> >> > input_type:input_type should be DataType but contain
> >> >> >> RowField(RECID,
> >> >> >> >> >> > VARCHAR)
> >> >> >> >> >> > 我的pyflink版本:1.11.1
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

pyflink execute_insert问题求解答

whh_960101
您好,我使用pyflink时的代码如下,有如下两个问题:
1.
source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
main_table = source.select(".......")
sub_table = source.select(".......")
main_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()


最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?


2.
for i in range(1,20):
     sub_table = source.select("...%s...%d...." %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
     sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result() #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案


以上两个问题希望您们能够给予解答!感谢!





Reply | Threaded
Open this post in threaded view
|

Re: pyflink execute_insert问题求解答

Dian Fu
这两个看起来是同一个问题,1.11是支持的,可以看一下TableEnvironment.create_statement_set(): https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/table_environment.html#executeexplain-jobs

> 在 2020年9月9日,上午11:31,whh_960101 <[hidden email]> 写道:
>
> 您好,我使用pyflink时的代码如下,有如下两个问题:
> 1.
> source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
> main_table = source.select(".......")
> sub_table = source.select(".......")
> main_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
> sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
>
>
> 最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
>
>
> 2.
> for i in range(1,20):
>     sub_table = source.select("...%s...%d...." %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>     sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result() #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
>
>
> 以上两个问题希望您们能够给予解答!感谢!
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: pyflink execute_insert问题求解答

nicholasjiang
In reply to this post by whh_960101
1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
针对Multiple Sink的话推荐通过Statement Set方式:
statement_set = TableEnvironment.create_statement_set()
main_table = source.select(".......")
sub_table = source.select(".......")
statement_set.add_insert("main_table", main_table)
statement_set.add_insert("sub_table", sub_table)

2.for i in range(1,20):
     sub_table = source.select("...%s...%d...."
%(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
   
sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
#这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
按照上述方式进行Multiple Sink是可以插入多个表。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink execute_insert问题求解答

whh_960101
问题1:
我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api
例如:
if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗
   ......




问题2:
full_outer_join(right, join_predicate)[source]¶

Joins two Table. Similar to a SQL full outer join. The fields of the two joined operations must not overlap, use alias() to rename fields if necessary.

Note

 

Both tables must be bound to the same TableEnvironment and its TableConfig must have null check enabled (default).

Example:

>>> left.full_outer_join(right,"a = b").select("a, b, d")
Parameters

right (pyflink.table.Table) – Right table.

join_predicate (str) – The join predicate expression string.

Returns

The result table.

Return type

pyflink.table.Table

The fields of the two joined operations must not overlap是什么意思,sql中的full_outer_join例如:
SELECT Persons.LastName, Persons.FirstName, Orders.OrderNo
FROM Persons
FULL JOIN Orders
ON Persons.Id_P=Orders.Id_P

 #on中的两个表的字段是可以重复的,The fields of the two joined operations must not overlap意思是做匹配的两个字段名不能重复吗

在 2020-09-09 15:54:35,"nicholasjiang" <[hidden email]> 写道:

>1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
>针对Multiple Sink的话推荐通过Statement Set方式:
>statement_set = TableEnvironment.create_statement_set()
>main_table = source.select(".......")
>sub_table = source.select(".......")
>statement_set.add_insert("main_table", main_table)
>statement_set.add_insert("sub_table", sub_table)
>
>2.for i in range(1,20):
>     sub_table = source.select("...%s...%d...."
>%(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>    
>sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
>#这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
>按照上述方式进行Multiple Sink是可以插入多个表。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink execute_insert问题求解答

Dian Fu
针对问题1: 你的需求是这样的吗:先获取表中字段'data'的值(第一行的值),根据'data'的值,再构造不同的作业逻辑?

针对问题2:现在join不支持两个表的字段名重复,可以看一下JIRA [1],所以目前必须保证两个表的字段名不重复。

[1] https://issues.apache.org/jira/browse/FLINK-18679 <https://issues.apache.org/jira/browse/FLINK-18679>

> 在 2020年9月9日,下午4:27,whh_960101 <[hidden email]> 写道:
>
> 问题1:
> 我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api
> 例如:
> if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗
>   ......
>
>
>
>
> 问题2:
> full_outer_join(right, join_predicate)[source]¶
>
> Joins two Table. Similar to a SQL full outer join. The fields of the two joined operations must not overlap, use alias() to rename fields if necessary.
>
> Note
>
>
>
> Both tables must be bound to the same TableEnvironment and its TableConfig must have null check enabled (default).
>
> Example:
>
>>>> left.full_outer_join(right,"a = b").select("a, b, d")
> Parameters
>
> right (pyflink.table.Table) – Right table.
>
> join_predicate (str) – The join predicate expression string.
>
> Returns
>
> The result table.
>
> Return type
>
> pyflink.table.Table
>
> The fields of the two joined operations must not overlap是什么意思,sql中的full_outer_join例如:
> SELECT Persons.LastName, Persons.FirstName, Orders.OrderNo
> FROM Persons
> FULL JOIN Orders
> ON Persons.Id_P=Orders.Id_P
>
> #on中的两个表的字段是可以重复的,The fields of the two joined operations must not overlap意思是做匹配的两个字段名不能重复吗
>
> 在 2020-09-09 15:54:35,"nicholasjiang" <[hidden email]> 写道:
>> 1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
>> 针对Multiple Sink的话推荐通过Statement Set方式:
>> statement_set = TableEnvironment.create_statement_set()
>> main_table = source.select(".......")
>> sub_table = source.select(".......")
>> statement_set.add_insert("main_table", main_table)
>> statement_set.add_insert("sub_table", sub_table)
>>
>> 2.for i in range(1,20):
>>    sub_table = source.select("...%s...%d...."
>> %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>>
>> sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
>> #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
>> 按照上述方式进行Multiple Sink是可以插入多个表。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/

123