pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 结果print报错: Traceback (most recent call last): File "C:*****/udtf_test.py", line 42, in <module> env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result') File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 543, in execute_sql return TableResult(self._j_tenv.executeSql(stmt)) File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 154, in deco raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'." 【代码如下】: s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) # s_env.set_parallelism(8) env = StreamTableEnvironment.create(s_env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '0m') # 注册源表 env.execute_sql(get_table_ddl('TP_GL_DAY')) env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE')) # 注册输出表 out_ddl = ''' CREATE TABLE print_result ( yldrate1 DOUBLE ) WITH ( 'connector' = 'print' ) ''' env.execute_sql(out_ddl) # 定义及执行SQL log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' " view_table = env.sql_query(log_query) env.register_table('query_result', view_table) # 定义计算逻辑函数 @udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), udf_type="pandas") def multi_production(yldrate): yldrate_1 = yldrate + 1 return np.prod(yldrate_1) - 1 # 注册函数 env.register_function('multi_production', multi_production) env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result') query_result.print_schema() env.execute('my_udf_job') |
抱歉,报错信息理解错误,问题已经解决,感谢大佬。
在 2021-02-03 10:16:38,"肖越" <[hidden email]> 写道: pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积, 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 结果print报错: Traceback (most recent call last): File "C:*****/udtf_test.py", line 42, in <module> env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result') File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 543, in execute_sql return TableResult(self._j_tenv.executeSql(stmt)) File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 154, in deco raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'." 【代码如下】: s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) # s_env.set_parallelism(8) env = StreamTableEnvironment.create(s_env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '0m') # 注册源表 env.execute_sql(get_table_ddl('TP_GL_DAY')) env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE')) # 注册输出表 out_ddl = ''' CREATE TABLE print_result ( yldrate1 DOUBLE ) WITH ( 'connector' = 'print' ) ''' env.execute_sql(out_ddl) # 定义及执行SQL log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' " view_table = env.sql_query(log_query) env.register_table('query_result', view_table) # 定义计算逻辑函数 @udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), udf_type="pandas") def multi_production(yldrate): yldrate_1 = yldrate + 1 return np.prod(yldrate_1) - 1 # 注册函数 env.register_function('multi_production', multi_production) env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM query_result') query_result.print_schema() env.execute('my_udf_job') |
Free forum by Nabble | Edit this page |