pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 结果print报错: Traceback (most recent call last): File "C:*****/udtf_test.py", line 42, in <module> env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result') File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 543, in execute_sql return TableResult(self._j_tenv.executeSql(stmt)) File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 154, in deco raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'." 【代码如下】: s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) # s_env.set_parallelism(8) env = StreamTableEnvironment.create(s_env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '0m') # 注册源表 env.execute_sql(get_table_ddl('TP_GL_DAY')) env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE')) # 注册输出表 out_ddl = ''' CREATE TABLE print_result ( yldrate1 DOUBLE ) WITH ( 'connector' = 'print' ) ''' env.execute_sql(out_ddl) # 定义及执行SQL log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' " view_table = env.sql_query(log_query) env.register_table('query_result', view_table) # 定义计算逻辑函数 @udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), udf_type="pandas") def multi_production(yldrate): yldrate_1 = yldrate + 1 return np.prod(yldrate_1) - 1 # 注册函数 env.register_function('multi_production', multi_production) env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result') query_result.print_schema() env.execute('my_udf_job') |
Hi,
报错信息说了最少需要79m,我看你代码配成0m,当然还是继续报错呀 Best, Xingbo 肖越 <[hidden email]> 于2021年2月3日周三 上午10:24写道: > pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积, > 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 > 结果print报错: > Traceback (most recent call last): > File "C:*****/udtf_test.py", line 42, in <module> > env.execute_sql('INSERT INTO print_result SELECT > multi_production(YLDRATE) FROM query_result') > File > "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", > line 543, in execute_sql > return TableResult(self._j_tenv.executeSql(stmt)) > File > "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", > line 154, in deco > raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) > pyflink.util.exceptions.TableException: "The configured Task Off-Heap > Memory 0 bytes is less than the least required Python worker Memory 79 mb. > The Task Off-Heap Memory can be configured using the configuration key > 'taskmanager.memory.task.off-heap.size'." > > > 【代码如下】: > s_env = StreamExecutionEnvironment.get_execution_environment() > s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > # s_env.set_parallelism(8) > env = StreamTableEnvironment.create(s_env, > > environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) > env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > '0m') > # 注册源表 > env.execute_sql(get_table_ddl('TP_GL_DAY')) > env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE')) > > > # 注册输出表 > out_ddl = ''' > CREATE TABLE print_result ( > yldrate1 DOUBLE > ) WITH ( > 'connector' = 'print' > ) > ''' > env.execute_sql(out_ddl) > # 定义及执行SQL > log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY > JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' " > view_table = env.sql_query(log_query) > env.register_table('query_result', view_table) > > > # 定义计算逻辑函数 > @udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), > udf_type="pandas") > def multi_production(yldrate): > yldrate_1 = yldrate + 1 > return np.prod(yldrate_1) - 1 > > > # 注册函数 > env.register_function('multi_production', multi_production) > env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) > FROM query_result') > query_result.print_schema() > env.execute('my_udf_job') > > |
In reply to this post by 肖越
抱歉,报错信息理解错误,问题已经解决,感谢大佬。
在 2021-02-03 10:23:32,"肖越" <[hidden email]> 写道: >pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积, >结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 >结果print报错: >Traceback (most recent call last): > File "C:*****/udtf_test.py", line 42, in <module> > env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result') > File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 543, in execute_sql > return TableResult(self._j_tenv.executeSql(stmt)) > File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 154, in deco > raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) >pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'." > > >【代码如下】: >s_env = StreamExecutionEnvironment.get_execution_environment() >s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) ># s_env.set_parallelism(8) >env = StreamTableEnvironment.create(s_env, > environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) >env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '0m') ># 注册源表 >env.execute_sql(get_table_ddl('TP_GL_DAY')) >env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE')) > > ># 注册输出表 >out_ddl = ''' > CREATE TABLE print_result ( > yldrate1 DOUBLE > ) WITH ( > 'connector' = 'print' > ) >''' >env.execute_sql(out_ddl) ># 定义及执行SQL >log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' " >view_table = env.sql_query(log_query) >env.register_table('query_result', view_table) > > ># 定义计算逻辑函数 >@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), udf_type="pandas") >def multi_production(yldrate): > yldrate_1 = yldrate + 1 > return np.prod(yldrate_1) - 1 > > ># 注册函数 >env.register_function('multi_production', multi_production) >env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result') >query_result.print_schema() >env.execute('my_udf_job') > |
Free forum by Nabble | Edit this page |