测试pandas udf报错:AttributeError: 'decimal.Decimal' object has no attribute 'isnull'

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

测试pandas udf报错:AttributeError: 'decimal.Decimal' object has no attribute 'isnull'

肖越
# 定义计算逻辑函数

@udf(input_types=DataTypes.DECIMAL(38,18,True), result_type=DataTypes.DECIMAL(38,18,True), udf_type="pandas")

def multi_production(yldrate):

    yldrate_1 = yldrate + 1

    return np.prod(yldrate_1) - 1


调用:env.sql_query('SELECT multi_production(YLDRATE) FROM query_result')
由于官网并未找到再详细的例子,pandas类型的udf 内部,可以遵循pandas风格处理数据么?
【报错信息】:
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:345)
at org.apache.flink.python.AbstractPythonFunctionRunner.finishBundle(AbstractPythonFunctionRunner.java:230)
... 17 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 167, in _execute
    response = task()
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 223, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 352, in do_instruction
    request.instruction_id)
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 386, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 812, in process_bundle
    data.transform_id].process_encoded(data.data)
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 205, in process_encoded
    self.output(decoded_value)
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py", line 304, in output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py", line 178, in receive
    self.consumer.process(windowed_value)
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\operations.py", line 92, in process
    self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, True)
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py", line 467, in encode_to_stream
    self._value_coder.encode_to_stream(value, out, nested)
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py", line 438, in encode_to_stream
    pandas_to_arrow(self._schema, self._timezone, self._field_types, cols))
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py", line 35, in pandas_to_arrow
    schema.types[i]) for i in range(0, len(schema))]
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py", line 35, in <listcomp>
    schema.types[i]) for i in range(0, len(schema))]
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py", line 27, in create_array
    return pa.Array.from_pandas(s, mask=s.isnull(), type=t)
AttributeError: 'decimal.Decimal' object has no attribute 'isnull'


at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

Reply | Threaded
Open this post in threaded view
|

Re: 测试pandas udf报错:AttributeError: 'decimal.Decimal' object has no attribute 'isnull'

Xingbo Huang
Hi,

报错的原因是你函数逻辑实际上是一个aggregate function的语义, 不是scalar function的语义。
scalar function要求的是一进一出,输入输出的数量是保持一致的,pandas
udf只是利用了pandas的batch特性,把数据封装成了一个batch的series给你,但你实际上用还是得保持输入输出数量一致。比如你输入的是pd.Series([1,2,3]),你执行完+1操作之后,结果就是pd.Series([2,3,4]),两个series的长度是保持一致的,都是3。
而对于你这个函数,你实际上是把一整个pd.series的数据聚合成了一个结果,比如输入pd.Series([1,2,3]),你的返回结果就是6,这是多个进,一个出的语义。对于这种情况,你应该使用pandas
udaf。pandas udaf在release-1.12开始支持的,具体可以参考文档[1]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Best,
Xingbo

肖越 <[hidden email]> 于2021年2月3日周三 上午11:50写道:

> # 定义计算逻辑函数
>
> @udf(input_types=DataTypes.DECIMAL(38,18,True),
> result_type=DataTypes.DECIMAL(38,18,True), udf_type="pandas")
>
> def multi_production(yldrate):
>
>     yldrate_1 = yldrate + 1
>
>     return np.prod(yldrate_1) - 1
>
>
> 调用:env.sql_query('SELECT multi_production(YLDRATE) FROM query_result')
> 由于官网并未找到再详细的例子,pandas类型的udf 内部,可以遵循pandas风格处理数据么?
> 【报错信息】:
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:345)
> at
> org.apache.flink.python.AbstractPythonFunctionRunner.finishBundle(AbstractPythonFunctionRunner.java:230)
> ... 17 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 2: Traceback (most recent call last):
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 167, in _execute
>     response = task()
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 223, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 352, in do_instruction
>     request.instruction_id)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 386, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
> line 812, in process_bundle
>     data.transform_id].process_encoded(data.data)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
> line 205, in process_encoded
>     self.output(decoded_value)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py",
> line 304, in output
>     cython.cast(Receiver,
> self.receivers[output_index]).receive(windowed_value)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py",
> line 178, in receive
>     self.consumer.process(windowed_value)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\operations.py",
> line 92, in process
>     self._value_coder_impl.encode_to_stream(self.func(o.value),
> output_stream, True)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py",
> line 467, in encode_to_stream
>     self._value_coder.encode_to_stream(value, out, nested)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py",
> line 438, in encode_to_stream
>     pandas_to_arrow(self._schema, self._timezone, self._field_types, cols))
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
> line 35, in pandas_to_arrow
>     schema.types[i]) for i in range(0, len(schema))]
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
> line 35, in <listcomp>
>     schema.types[i]) for i in range(0, len(schema))]
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
> line 27, in create_array
>     return pa.Array.from_pandas(s, mask=s.isnull(), type=t)
> AttributeError: 'decimal.Decimal' object has no attribute 'isnull'
>
>
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
>
>