flink1.12错误OSError: Expected IPC message of type schema but got record batch

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.12错误OSError: Expected IPC message of type schema but got record batch

咿咿呀呀
请教一下各位,使用pyflink中的向量化udf后,程序运行一段时间报错,查资料没有类似的问题,麻烦各位看看是咋回事
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 8: Traceback (most recent call last):
  File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
    response = task()
&nbsp; File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda&gt;
&nbsp; &nbsp; lambda: self.create_worker().do_instruction(request), request)
&nbsp; File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
&nbsp; &nbsp; getattr(request, request_type), request.instruction_id)
&nbsp; File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle
&nbsp; &nbsp; bundle_processor.process_bundle(instruction_id))
&nbsp; File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle
&nbsp; &nbsp; element.data)
&nbsp; File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
&nbsp; &nbsp; self.output(decoded_value)
&nbsp; File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
&nbsp; File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
&nbsp; File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
&nbsp; File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
&nbsp; File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
&nbsp; File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 625, in decode_from_stream
&nbsp; &nbsp; yield self._decode_one_batch_from_stream(in_stream, in_stream.read_var_int64())
&nbsp; File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 636, in _decode_one_batch_from_stream
&nbsp; &nbsp; return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)])
&nbsp; File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 629, in _load_from_stream
&nbsp; &nbsp; reader = pa.ipc.open_stream(stream)
&nbsp; File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py", line 146, in open_stream
&nbsp; &nbsp; return RecordBatchStreamReader(source)
&nbsp; File "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py", line 62, in __init__
&nbsp; &nbsp; self._open(source)
&nbsp; File "pyarrow/ipc.pxi", line 360, in pyarrow.lib._RecordBatchStreamReader._open
&nbsp; File "pyarrow/error.pxi", line 123, in pyarrow.lib.pyarrow_internal_check_status
&nbsp; File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
OSError: Expected IPC message of type schema but got record batch
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

Xingbo Huang
Hi,

你这个报错源自pyarrow反序列数据时的报错。你使用的pyarrow的版本能提供一下吗
pip list | grep pyarrow可以查看

Best,
Xingbo


小学生 <[hidden email]> 于2020年12月28日周一 上午10:37写道:

> 请教一下各位,使用pyflink中的向量化udf后,程序运行一段时间报错,查资料没有类似的问题,麻烦各位看看是咋回事
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 8: Traceback (most recent call last):
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
> &nbsp; &nbsp; response = task()
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, in <lambda&gt;
> &nbsp; &nbsp; lambda: self.create_worker().do_instruction(request),
> request)
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
> &nbsp; &nbsp; getattr(request, request_type), request.instruction_id)
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 515, in process_bundle
> &nbsp; &nbsp; bundle_processor.process_bundle(instruction_id))
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 978, in process_bundle
> &nbsp; &nbsp; element.data)
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 218, in process_encoded
> &nbsp; &nbsp; self.output(decoded_value)
> &nbsp; File "apache_beam/runners/worker/operations.py", line 330, in
> apache_beam.runners.worker.operations.Operation.output
> &nbsp; File "apache_beam/runners/worker/operations.py", line 332, in
> apache_beam.runners.worker.operations.Operation.output
> &nbsp; File "apache_beam/runners/worker/operations.py", line 195, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> &nbsp; File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71,
> in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> &nbsp; File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73,
> in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 625, in decode_from_stream
> &nbsp; &nbsp; yield self._decode_one_batch_from_stream(in_stream,
> in_stream.read_var_int64())
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 636, in _decode_one_batch_from_stream
> &nbsp; &nbsp; return arrow_to_pandas(self._timezone, self._field_types,
> [next(self._batch_reader)])
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 629, in _load_from_stream
> &nbsp; &nbsp; reader = pa.ipc.open_stream(stream)
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py",
> line 146, in open_stream
> &nbsp; &nbsp; return RecordBatchStreamReader(source)
> &nbsp; File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py",
> line 62, in __init__
> &nbsp; &nbsp; self._open(source)
> &nbsp; File "pyarrow/ipc.pxi", line 360, in
> pyarrow.lib._RecordBatchStreamReader._open
> &nbsp; File "pyarrow/error.pxi", line 123, in
> pyarrow.lib.pyarrow_internal_check_status
> &nbsp; File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
> OSError: Expected IPC message of type schema but got record batch
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

咿咿呀呀
您好,我使用的pyarrow是0.17.1。
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

Xingbo Huang
Hi,

我试了一下这个版本的pyarrow,跑了一下现有的test并没有能复现这个问题。你方便提供一下你的pandas udf的内容吗?

Best,
Xingbo

小学生 <[hidden email]> 于2020年12月28日周一 下午3:07写道:

> 您好,我使用的pyarrow是0.17.1。
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

咿咿呀呀
您好,我的udf主要是查物理MySQL表查记录,代码如下:
import pymysql
import pandas as pd
def pole_record_perid1(poleId: int, current_time: int, days: int):
&nbsp; &nbsp; days_delta_ts = current_time - days * 24 * 60 * 60 * 1000


&nbsp; &nbsp; days_mh_delta = '''select rt,ts from test where&nbsp; poleId={} and
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ts between {} and {};'''.format( poleId, days_delta_ts,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;current_time)
        # mysql查找记录部分 &nbsp; &nbsp;
&nbsp; &nbsp; mysql = pymysql.connect('192.1.1.1', 'test_database', port=3306, charset='utf8')
&nbsp; &nbsp; delta = mysql.excute(days_mh_delta)
        # mysql查找记录部分
       
&nbsp; &nbsp; info_df = pd.DataFrame(delta, columns=['rt', 'ts'])
&nbsp; &nbsp; if info_df.empty:
&nbsp; &nbsp; &nbsp; &nbsp; return ["0.0", timeconvert(current_time), "-1", "-1"]
&nbsp; &nbsp; else:
&nbsp; &nbsp; &nbsp; &nbsp; max_rt_info = info_df.query('rt==rt.max()')
&nbsp; &nbsp; &nbsp; &nbsp; min_rt_info = info_df.query('rt==rt.min()')
&nbsp; &nbsp; &nbsp; &nbsp; max_rt_info = max_rt_info['ts'].apply(timeconvert).tolist()
&nbsp; &nbsp; &nbsp; &nbsp; min_rt_info = min_rt_info['ts'].apply(timeconvert).tolist()
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; max_rt_ts = ",".join(max_rt_info)
&nbsp; &nbsp; &nbsp; &nbsp; min_rt_ts = ",".join(min_rt_info)
&nbsp; &nbsp; &nbsp; &nbsp; return [str(info_df['rt'].max() - info_df['rt'].min()), timeconvert(current_time), \
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; max_rt_ts, min_rt_ts]


@udf(result_type=DataTypes.ARRAY(DataTypes.STRING()), func_type="pandas")
def pole_record_perid(poleId, current_time, days):
&nbsp; &nbsp; df = pd.DataFrame({'poleId': poleId, 'current_time': current_time, 'days': days})
&nbsp; &nbsp; df['res'] = df.apply(lambda x: pole_record_perid1(x.poleId, x.current_time, x.days), axis=1)
&nbsp; &nbsp; return df['res']
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

咿咿呀呀
In reply to this post by Xingbo Huang
您好,这个问题是不是使用错误导致的呀



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

咿咿呀呀
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

咿咿呀呀
社区的各位大神,有没有碰到这个问题的,请教。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

Xingbo Huang
Hi,
不好意思,这么晚才回复。因为你这个报错是发生在数据反序列的过程中,还没有到你写的函数体的具体内容。我看你pandas
udf的声明没有问题,那就得看下你的如何使用的了。我写了一个简化版本的,Array[String]作为输入,并且作为输出的,运行起来没有问题。
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf

def extract_data():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance()

.in_streaming_mode().use_blink_planner().build())

    @udf(result_type=DataTypes.ARRAY(DataTypes.STRING()),
func_type="pandas")
    def get_string_element(my_list):
        return my_list

    t = t_env.from_elements(
        [("1", ["1", "2", "3"]), ("3", ["2", "3", "3"]), ("2", ["1", "4",
"3"])],
        DataTypes.ROW(
            [DataTypes.FIELD("Key", DataTypes.STRING()),
             DataTypes.FIELD("List_element",
DataTypes.ARRAY(DataTypes.STRING()))]))
    print(t.select(get_string_element(t.List_element)).to_pandas())


if __name__ == '__main__':
    extract_data()

你可以看下是不是类似的用法,或者你运行我这个demo是不是也会报错。

Best,
Xingbo


咿咿呀呀 <[hidden email]> 于2021年1月4日周一 上午9:38写道:

> 社区的各位大神,有没有碰到这个问题的,请教。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

咿咿呀呀
好的,谢谢Xingbo大佬的指导,我先去试试。
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

咿咿呀呀
我按照您这个修改了,跟我之前的也是一样的。能运行的通,输出的结果也是正确的,现在最大的问题是——运行一段时间后(3分钟左右)就出现了OSError:
Expected IPC message of type schema but got record batch这个错误



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

回复: flink1.12错误OSError: Expected IPC message of type schema but got record batch

刘海
这个应该和内存有关,我之前试过,存储的状态无限增长,导致运行几分钟后任务结束,并抛出异常,可以尝试一下加大内存和清理状态


| |
刘海
|
|
[hidden email]
|
签名由网易邮箱大师定制
在2021年1月4日 11:35,咿咿呀呀<[hidden email]> 写道:
我按照您这个修改了,跟我之前的也是一样的。能运行的通,输出的结果也是正确的,现在最大的问题是——运行一段时间后(3分钟左右)就出现了OSError:
Expected IPC message of type schema but got record batch这个错误



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

咿咿呀呀
谢谢大佬的指点,我去增大试试