Hi:
Hope you are good! I have a question for pyflink, details as below: Feature: Windows of size 10 minutes that slides by 5 minutes for data aggregate, then do something, almost 2GB data per window, 1 million data items. Job params: bin/yarn-session.sh -s 2 -jm 2048 -tm 48768 \ -Dyarn.containers.vcores=4 \ -Dtaskmanager.memory.managed.consumer-weights=DATAPROC:30,PYTHON:70 \ -Dtaskmanager.memory.managed.fraction=0.7 \ -Dtaskmanager.memory.task.off-heap.size=5120m \ -nm $task_name -qu $queue -d Exception msg as below: Traceback (most recent call last): File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle element.data) File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 627, in decode_from_stream yield self._decode_one_batch_from_stream(in_stream, in_stream.read_var_int64()) File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 638, in _decode_one_batch_from_stream return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)]) File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 631, in _load_from_stream reader = pa.ipc.open_stream(stream) File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py", line 137, in open_stream return RecordBatchStreamReader(source) File "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py", line 61, in __init__ self._open(source) File "pyarrow/ipc.pxi", line 352, in pyarrow.lib._RecordBatchStreamReader._open File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status OSError: Expected IPC message of type schema but got record batch Many Thanks! |
Hi,
Sorry for the late reply. Thanks for reporting this issue which has been recorded in FLINK-21208[1]. I will fix it as soon as possible. [1] https://issues.apache.org/jira/browse/FLINK-21208 Best, Xingbo 苗红宾 <[hidden email]> 于2021年1月31日周日 下午3:28写道: > Hi: > > Hope you are good! I have a question for pyflink, details as below: > > Feature: Windows of size 10 minutes that slides by 5 minutes for data > aggregate, then do something, almost 2GB data per window, 1 million data > items. > > Job params: > > bin/yarn-session.sh -s 2 -jm 2048 -tm 48768 \ > -Dyarn.containers.vcores=4 \ > -Dtaskmanager.memory.managed.consumer-weights=DATAPROC:30,PYTHON:70 \ > -Dtaskmanager.memory.managed.fraction=0.7 \ > -Dtaskmanager.memory.task.off-heap.size=5120m \ > -nm $task_name -qu $queue -d > > > Exception msg as below: > > Traceback (most recent call last): > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 253, in _execute > response = task() > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 310, in <lambda> > lambda: self.create_worker().do_instruction(request), request) > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 480, in do_instruction > getattr(request, request_type), request.instruction_id) > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 515, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 978, in process_bundle > element.data) > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 218, in process_encoded > self.output(decoded_value) > File "apache_beam/runners/worker/operations.py", line 330, in > apache_beam.runners.worker.operations.Operation.output > File "apache_beam/runners/worker/operations.py", line 332, in > apache_beam.runners.worker.operations.Operation.output > File "apache_beam/runners/worker/operations.py", line 195, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in > pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in > pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", > line 627, in decode_from_stream > yield self._decode_one_batch_from_stream(in_stream, > in_stream.read_var_int64()) > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", > line 638, in _decode_one_batch_from_stream > return arrow_to_pandas(self._timezone, self._field_types, > [next(self._batch_reader)]) > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", > line 631, in _load_from_stream > reader = pa.ipc.open_stream(stream) > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py", > line 137, in open_stream > return RecordBatchStreamReader(source) > File > "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py", > line 61, in __init__ > self._open(source) > File "pyarrow/ipc.pxi", line 352, in > pyarrow.lib._RecordBatchStreamReader._open > File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status > OSError: Expected IPC message of type schema but got record batch > > > Many Thanks! > > > > > |
Free forum by Nabble | Edit this page |