pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

肖越
pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
  File "C:*****/udtf_test.py", line 42, in <module>
    env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result')
  File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 543, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'."


【代码如下】:
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# s_env.set_parallelism(8)
env = StreamTableEnvironment.create(s_env,
    environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '0m')
# 注册源表
env.execute_sql(get_table_ddl('TP_GL_DAY'))
env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))


# 注册输出表
out_ddl = '''
        CREATE TABLE print_result (
         yldrate1 DOUBLE
        ) WITH (
         'connector' = 'print'
        )
'''
env.execute_sql(out_ddl)
# 定义及执行SQL
log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
view_table = env.sql_query(log_query)
env.register_table('query_result', view_table)


# 定义计算逻辑函数
@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), udf_type="pandas")
def multi_production(yldrate):
    yldrate_1 = yldrate + 1
    return np.prod(yldrate_1) - 1


# 注册函数
env.register_function('multi_production', multi_production)
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result')
query_result.print_schema()
env.execute('my_udf_job')

Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

Xingbo Huang
Hi,
报错信息说了最少需要79m,我看你代码配成0m,当然还是继续报错呀
Best,
Xingbo

肖越 <[hidden email]> 于2021年2月3日周三 上午10:24写道:

> pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
> 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
> 结果print报错:
> Traceback (most recent call last):
>   File "C:*****/udtf_test.py", line 42, in <module>
>     env.execute_sql('INSERT INTO print_result SELECT
> multi_production(YLDRATE) FROM query_result')
>   File
> "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
> line 543, in execute_sql
>     return TableResult(self._j_tenv.executeSql(stmt))
>   File
> "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
> line 1286, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File
> "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 154, in deco
>     raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: "The configured Task Off-Heap
> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
> The Task Off-Heap Memory can be configured using the configuration key
> 'taskmanager.memory.task.off-heap.size'."
>
>
> 【代码如下】:
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> # s_env.set_parallelism(8)
> env = StreamTableEnvironment.create(s_env,
>
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
> env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '0m')
> # 注册源表
> env.execute_sql(get_table_ddl('TP_GL_DAY'))
> env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))
>
>
> # 注册输出表
> out_ddl = '''
>         CREATE TABLE print_result (
>          yldrate1 DOUBLE
>         ) WITH (
>          'connector' = 'print'
>         )
> '''
> env.execute_sql(out_ddl)
> # 定义及执行SQL
> log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY
> JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
> view_table = env.sql_query(log_query)
> env.register_table('query_result', view_table)
>
>
> # 定义计算逻辑函数
> @udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(),
> udf_type="pandas")
> def multi_production(yldrate):
>     yldrate_1 = yldrate + 1
>     return np.prod(yldrate_1) - 1
>
>
> # 注册函数
> env.register_function('multi_production', multi_production)
> env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE)
> FROM query_result')
> query_result.print_schema()
> env.execute('my_udf_job')
>
>
Reply | Threaded
Open this post in threaded view
|

Re:pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

肖越
In reply to this post by 肖越
抱歉,报错信息理解错误,问题已经解决,感谢大佬。

在 2021-02-03 10:23:32,"肖越" <[hidden email]> 写道:

>pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
>结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
>结果print报错:
>Traceback (most recent call last):
>  File "C:*****/udtf_test.py", line 42, in <module>
>    env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result')
>  File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 543, in execute_sql
>    return TableResult(self._j_tenv.executeSql(stmt))
>  File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__
>    answer, self.gateway_client, self.target_id, self.name)
>  File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 154, in deco
>    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'."
>
>
>【代码如下】:
>s_env = StreamExecutionEnvironment.get_execution_environment()
>s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
># s_env.set_parallelism(8)
>env = StreamTableEnvironment.create(s_env,
>    environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
>env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '0m')
># 注册源表
>env.execute_sql(get_table_ddl('TP_GL_DAY'))
>env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))
>
>
># 注册输出表
>out_ddl = '''
>        CREATE TABLE print_result (
>         yldrate1 DOUBLE
>        ) WITH (
>         'connector' = 'print'
>        )
>'''
>env.execute_sql(out_ddl)
># 定义及执行SQL
>log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
>view_table = env.sql_query(log_query)
>env.register_table('query_result', view_table)
>
>
># 定义计算逻辑函数
>@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), udf_type="pandas")
>def multi_production(yldrate):
>    yldrate_1 = yldrate + 1
>    return np.prod(yldrate_1) - 1
>
>
># 注册函数
>env.register_function('multi_production', multi_production)
>env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result')
>query_result.print_schema()
>env.execute('my_udf_job')
>