HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

陈康
在pyflink自定义UDF加载Keras模型并注册UDF时、报错:TypeError: can't pickle _thread.lock
objects、有大佬遇到过吗?谢谢!(插入的图不知看不看的到)

class myKerasMLP(ScalarFunction):
    def __init__(self):
        ...

    def open(self, function_context):
        ...

    def eval(self, x, y):
        ...

    def load_model(self):
        """
        加载模型,如果 redis 里存在模型,则优先从 redis 加载,否则初始化一个新模型
        :return:
        """
        import redis
        import pickle
        import logging

        logging.info('载入模型!')
        r = redis.StrictRedis(**self.redis_params)
        model = None

        try:
            # redis加载model json
            model = model_from_json(r.get(self.model_name))
            # redis加载model权重
            weights = pickle.loads(r.get(self.weights))
            # # 设置权重
            model.set_weights(weights)
            model.summary()
        except TypeError:
            logging.info('Redis 内没有指定名称的模型,因此初始化一个新模型')
        except (redis.exceptions.RedisError, TypeError, Exception):
            logging.warning('Redis 出现异常,因此初始化一个新模型')
        finally:
            print("MLP model", model)
        return model

myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.FLOAT(),
DataTypes.FLOAT()],
                 result_type=DataTypes.FLOAT())
print('UDF 模型加载完成!')
t_env.create_temporary_system_function('train_and_predict', myKerasMLP)
print('UDF 注册成功')
-----------
_________________________________________________________________
Layer (type)                 Output Shape              Param #  
=================================================================
dense_1 (Dense)              (None, 8)                 72        
_________________________________________________________________
dense_2 (Dense)              (None, 10)                90        
_________________________________________________________________
dense_3 (Dense)              (None, 1)                 11        
=================================================================
Total params: 173
Trainable params: 173
Non-trainable params: 0
_________________________________________________________________
MLP model <keras.models.Sequential object at 0x0000000031C386A0>
UDF 模型加载完成!


<http://apache-flink.147419.n8.nabble.com/file/t1280/2.jpg>
<http://apache-flink.147419.n8.nabble.com/file/t1280/11.jpg>




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

Dian Fu
可以发一下你的__init__方法吗?应该是在__init__方法里有不能pickle的对象。

> 在 2021年2月3日,下午6:01,陈康 <[hidden email]> 写道:
>
> <http://apache-flink.147419.n8.nabble.com/file/t1280/1213.jpg>
> https://blog.csdn.net/weixin_44904816/article/details/108744530
> 看到一篇博客说:“PyFlink以后还可以支持 Tensorflow、Keras”.....好吧..
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

陈康
感谢回复、之前是在__init__方法中加载Keras模型、经钉钉群大佬指教在eval中使用再加载、问题解决了,谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

Xingbo Huang
Hi,

你其实可以在open方法里面进行加载的,这样只会加载一次,在eval方法中加载将会导致多次加载。

Best,
Xingbo

陈康 <[hidden email]> 于2021年2月4日周四 上午9:25写道:

> 感谢回复、之前是在__init__方法中加载Keras模型、经钉钉群大佬指教在eval中使用再加载、问题解决了,谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

陈康
谢谢、我试试



--
Sent from: http://apache-flink.147419.n8.nabble.com/