PyFlink Expected IPC message of type schema but got record batch

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink Expected IPC message of type schema but got record batch

苗红宾
Hi:

Hope you are good! I have a question for pyflink, details as below:

Feature: Windows of size 10 minutes that slides by 5 minutes for data aggregate, then do something, almost 2GB data per window, 1 million data items.

Job params:

bin/yarn-session.sh -s 2 -jm 2048 -tm 48768 \
-Dyarn.containers.vcores=4 \
-Dtaskmanager.memory.managed.consumer-weights=DATAPROC:30,PYTHON:70 \
-Dtaskmanager.memory.managed.fraction=0.7 \
-Dtaskmanager.memory.task.off-heap.size=5120m \
-nm $task_name -qu $queue -d


Exception msg as below:

Traceback (most recent call last):
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
    response = task()
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle
    element.data)
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 627, in decode_from_stream
    yield self._decode_one_batch_from_stream(in_stream, in_stream.read_var_int64())
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 638, in _decode_one_batch_from_stream
    return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)])
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 631, in _load_from_stream
    reader = pa.ipc.open_stream(stream)
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py", line 137, in open_stream
    return RecordBatchStreamReader(source)
  File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py", line 61, in __init__
    self._open(source)
  File "pyarrow/ipc.pxi", line 352, in pyarrow.lib._RecordBatchStreamReader._open
  File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
OSError: Expected IPC message of type schema but got record batch


Many Thanks!




 
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Expected IPC message of type schema but got record batch

Xingbo Huang
Hi,

Sorry for the late reply. Thanks for reporting this issue which has been
recorded in FLINK-21208[1]. I will fix it as soon as possible.

[1] https://issues.apache.org/jira/browse/FLINK-21208

Best,
Xingbo

苗红宾 <[hidden email]> 于2021年1月31日周日 下午3:28写道:

> Hi:
>
> Hope you are good! I have a question for pyflink, details as below:
>
> Feature: Windows of size 10 minutes that slides by 5 minutes for data
> aggregate, then do something, almost 2GB data per window, 1 million data
> items.
>
> Job params:
>
> bin/yarn-session.sh -s 2 -jm 2048 -tm 48768 \
> -Dyarn.containers.vcores=4 \
> -Dtaskmanager.memory.managed.consumer-weights=DATAPROC:30,PYTHON:70 \
> -Dtaskmanager.memory.managed.fraction=0.7 \
> -Dtaskmanager.memory.task.off-heap.size=5120m \
> -nm $task_name -qu $queue -d
>
>
> Exception msg as below:
>
> Traceback (most recent call last):
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
>     response = task()
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 515, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 978, in process_bundle
>     element.data)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 218, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 330, in
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 332, in
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 195, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 627, in decode_from_stream
>     yield self._decode_one_batch_from_stream(in_stream,
> in_stream.read_var_int64())
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 638, in _decode_one_batch_from_stream
>     return arrow_to_pandas(self._timezone, self._field_types,
> [next(self._batch_reader)])
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 631, in _load_from_stream
>     reader = pa.ipc.open_stream(stream)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py",
> line 137, in open_stream
>     return RecordBatchStreamReader(source)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py",
> line 61, in __init__
>     self._open(source)
>   File "pyarrow/ipc.pxi", line 352, in
> pyarrow.lib._RecordBatchStreamReader._open
>   File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
> OSError: Expected IPC message of type schema but got record batch
>
>
> Many Thanks!
>
>
>
>
>