请教一下各位,使用pyflink中的向量化udf后,程序运行一段时间报错,查资料没有类似的问题,麻烦各位看看是咋回事
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 8: Traceback (most recent call last): File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle element.data) File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 625, in decode_from_stream yield self._decode_one_batch_from_stream(in_stream, in_stream.read_var_int64()) File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 636, in _decode_one_batch_from_stream return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)]) File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 629, in _load_from_stream reader = pa.ipc.open_stream(stream) File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py", line 146, in open_stream return RecordBatchStreamReader(source) File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py", line 62, in __init__ self._open(source) File "pyarrow/ipc.pxi", line 360, in pyarrow.lib._RecordBatchStreamReader._open File "pyarrow/error.pxi", line 123, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status OSError: Expected IPC message of type schema but got record batch |
Hi,
你这个报错源自pyarrow反序列数据时的报错。你使用的pyarrow的版本能提供一下吗 pip list | grep pyarrow可以查看 Best, Xingbo 小学生 <[hidden email]> 于2020年12月28日周一 上午10:37写道: > 请教一下各位,使用pyflink中的向量化udf后,程序运行一段时间报错,查资料没有类似的问题,麻烦各位看看是咋回事 > Caused by: java.lang.RuntimeException: Error received from SDK harness for > instruction 8: Traceback (most recent call last): > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 253, in _execute > response = task() > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 310, in <lambda> > lambda: self.create_worker().do_instruction(request), > request) > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 480, in do_instruction > getattr(request, request_type), request.instruction_id) > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 515, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 978, in process_bundle > element.data) > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 218, in process_encoded > self.output(decoded_value) > File "apache_beam/runners/worker/operations.py", line 330, in > apache_beam.runners.worker.operations.Operation.output > File "apache_beam/runners/worker/operations.py", line 332, in > apache_beam.runners.worker.operations.Operation.output > File "apache_beam/runners/worker/operations.py", line 195, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, > in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, > in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", > line 625, in decode_from_stream > yield self._decode_one_batch_from_stream(in_stream, > in_stream.read_var_int64()) > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", > line 636, in _decode_one_batch_from_stream > return arrow_to_pandas(self._timezone, self._field_types, > [next(self._batch_reader)]) > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", > line 629, in _load_from_stream > reader = pa.ipc.open_stream(stream) > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py", > line 146, in open_stream > return RecordBatchStreamReader(source) > File > "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py", > line 62, in __init__ > self._open(source) > File "pyarrow/ipc.pxi", line 360, in > pyarrow.lib._RecordBatchStreamReader._open > File "pyarrow/error.pxi", line 123, in > pyarrow.lib.pyarrow_internal_check_status > File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status > OSError: Expected IPC message of type schema but got record batch |
您好,我使用的pyarrow是0.17.1。
|
Hi,
我试了一下这个版本的pyarrow,跑了一下现有的test并没有能复现这个问题。你方便提供一下你的pandas udf的内容吗? Best, Xingbo 小学生 <[hidden email]> 于2020年12月28日周一 下午3:07写道: > 您好,我使用的pyarrow是0.17.1。 |
您好,我的udf主要是查物理MySQL表查记录,代码如下:
import pymysql import pandas as pd def pole_record_perid1(poleId: int, current_time: int, days: int): days_delta_ts = current_time - days * 24 * 60 * 60 * 1000 days_mh_delta = '''select rt,ts from test where poleId={} and ts between {} and {};'''.format( poleId, days_delta_ts, current_time) # mysql查找记录部分 mysql = pymysql.connect('192.1.1.1', 'test_database', port=3306, charset='utf8') delta = mysql.excute(days_mh_delta) # mysql查找记录部分 info_df = pd.DataFrame(delta, columns=['rt', 'ts']) if info_df.empty: return ["0.0", timeconvert(current_time), "-1", "-1"] else: max_rt_info = info_df.query('rt==rt.max()') min_rt_info = info_df.query('rt==rt.min()') max_rt_info = max_rt_info['ts'].apply(timeconvert).tolist() min_rt_info = min_rt_info['ts'].apply(timeconvert).tolist() max_rt_ts = ",".join(max_rt_info) min_rt_ts = ",".join(min_rt_info) return [str(info_df['rt'].max() - info_df['rt'].min()), timeconvert(current_time), \ max_rt_ts, min_rt_ts] @udf(result_type=DataTypes.ARRAY(DataTypes.STRING()), func_type="pandas") def pole_record_perid(poleId, current_time, days): df = pd.DataFrame({'poleId': poleId, 'current_time': current_time, 'days': days}) df['res'] = df.apply(lambda x: pole_record_perid1(x.poleId, x.current_time, x.days), axis=1) return df['res'] |
In reply to this post by Xingbo Huang
|
Hi,
不好意思,这么晚才回复。因为你这个报错是发生在数据反序列的过程中,还没有到你写的函数体的具体内容。我看你pandas udf的声明没有问题,那就得看下你的如何使用的了。我写了一个简化版本的,Array[String]作为输入,并且作为输出的,运行起来没有问题。 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import udf def extract_data(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=EnvironmentSettings.new_instance() .in_streaming_mode().use_blink_planner().build()) @udf(result_type=DataTypes.ARRAY(DataTypes.STRING()), func_type="pandas") def get_string_element(my_list): return my_list t = t_env.from_elements( [("1", ["1", "2", "3"]), ("3", ["2", "3", "3"]), ("2", ["1", "4", "3"])], DataTypes.ROW( [DataTypes.FIELD("Key", DataTypes.STRING()), DataTypes.FIELD("List_element", DataTypes.ARRAY(DataTypes.STRING()))])) print(t.select(get_string_element(t.List_element)).to_pandas()) if __name__ == '__main__': extract_data() 你可以看下是不是类似的用法,或者你运行我这个demo是不是也会报错。 Best, Xingbo 咿咿呀呀 <[hidden email]> 于2021年1月4日周一 上午9:38写道: > 社区的各位大神,有没有碰到这个问题的,请教。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
好的,谢谢Xingbo大佬的指导,我先去试试。
|
我按照您这个修改了,跟我之前的也是一样的。能运行的通,输出的结果也是正确的,现在最大的问题是——运行一段时间后(3分钟左右)就出现了OSError:
Expected IPC message of type schema but got record batch这个错误 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
这个应该和内存有关,我之前试过,存储的状态无限增长,导致运行几分钟后任务结束,并抛出异常,可以尝试一下加大内存和清理状态
| | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 在2021年1月4日 11:35,咿咿呀呀<[hidden email]> 写道: 我按照您这个修改了,跟我之前的也是一样的。能运行的通,输出的结果也是正确的,现在最大的问题是——运行一段时间后(3分钟左右)就出现了OSError: Expected IPC message of type schema but got record batch这个错误 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |