在pyflink自定义UDF加载Keras模型并注册UDF时、报错:TypeError: can't pickle _thread.lock
objects、有大佬遇到过吗?谢谢!(插入的图不知看不看的到)
class myKerasMLP(ScalarFunction):
def __init__(self):
...
def open(self, function_context):
...
def eval(self, x, y):
...
def load_model(self):
"""
加载模型,如果 redis 里存在模型,则优先从 redis 加载,否则初始化一个新模型
:return:
"""
import redis
import pickle
import logging
logging.info('载入模型!')
r = redis.StrictRedis(**self.redis_params)
model = None
try:
# redis加载model json
model = model_from_json(r.get(self.model_name))
# redis加载model权重
weights = pickle.loads(r.get(self.weights))
# # 设置权重
model.set_weights(weights)
model.summary()
except TypeError:
logging.info('Redis 内没有指定名称的模型,因此初始化一个新模型')
except (redis.exceptions.RedisError, TypeError, Exception):
logging.warning('Redis 出现异常,因此初始化一个新模型')
finally:
print("MLP model", model)
return model
myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.FLOAT(),
DataTypes.FLOAT()],
result_type=DataTypes.FLOAT())
print('UDF 模型加载完成!')
t_env.create_temporary_system_function('train_and_predict', myKerasMLP)
print('UDF 注册成功')
-----------
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
dense_1 (Dense) (None, 8) 72
_________________________________________________________________
dense_2 (Dense) (None, 10) 90
_________________________________________________________________
dense_3 (Dense) (None, 1) 11
=================================================================
Total params: 173
Trainable params: 173
Non-trainable params: 0
_________________________________________________________________
MLP model <keras.models.Sequential object at 0x0000000031C386A0>
UDF 模型加载完成!
<
http://apache-flink.147419.n8.nabble.com/file/t1280/2.jpg>
<
http://apache-flink.147419.n8.nabble.com/file/t1280/11.jpg>
--
Sent from:
http://apache-flink.147419.n8.nabble.com/