温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

基于python如何实现rpc远程过程调用

发布时间:2022-06-13 09:18:35 来源:亿速云 阅读:92 作者:iii 栏目:开发技术

这篇文章主要介绍“基于python如何实现rpc远程过程调用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“基于python如何实现rpc远程过程调用”文章能帮助大家解决问题。

    一、主要内容

    所谓RPC,是远程过程调用(Remote Procedure Call)的简写,网上解释很多,简单来说,就是在当前进程调用其他进程的函数时,体验就像是调用本地写的函数一般。
    本文实现的是在本地调用远端的类class对象的接口,也就是本地的client不实例化类对象,调用的是server端的类对象接口。
    为了达到让调用层无须关心底层实现,拥有丝滑般的体验,就需要以下几个部分:

    • 客户端需要把类的接口提取出来,并将调用函数事件捕获存储起来;服务端需要把类的公有函数作为可远程调用的接口。

    • 客户端把调用函数的事件(调用的函数,参数)进行序列化并发送给服务端;服务端将客户端的调用事件反序列化,并执行相应的接口,将返回值发送给客户端。

    • 客户端与服务端通过某种方式(一般就是网络socket)进行通信。

    在下面时序图的灰色部分,对于调用方来说是透明的,它的执行结果应该和执行本地的函数时一致的。

    基于python如何实现rpc远程过程调用

    二、实现步骤

    1. 进程间的通信

    本文采用了基于TCP的sokcet连接来进行进程之间的通信,更多实现细节可参考之前博客。
    在此需要注意:

    本文采用了select模块来监听网络事件,如果服务端未收到任何的网络消息会一直阻塞在这儿。如果服务端除了提供rpc调用服务之外还需要执行其他逻辑,那么应当采用非阻塞,轮询socket的方式来判断是否有新的网络事件。

    # ServerBase.py
    def process(self):
        readable, writable, exceptional = select.select(self.inputs, self.outputs, self.conns.values())
        for conn in readable:
            if conn is self.socket:
                self._handle_conn()
            else:
                self._handle_recv(conn)
        for conn in writable:
            pass
        for conn in exceptional:
            self._handle_leave(conn)

    客户端的网络事件本文通过创建新的线程来监听的。并不会影响客户端主线程的执行,因此可以尽情的阻塞。部分代码如下:

    # AsynCallback.py
    class AsyncTaskManager(object):
        _asy_events = dict()
    
        def __init__(self, loop, *args):
            super(AsyncTaskManager, self).__init__()
            self._loop_fun = loop
    
        def __call__(self, *args, **kwargs):
            proc = threading.Thread(target=self._exec_loop, args=args, kwargs=kwargs)
            proc.start()
    
        def _exec_loop(self, *args, **kwargs):
            while True:
                net_resp = self._loop_fun(*args, **kwargs)
                for resp in net_resp:
                    asy_event = self._asy_events.pop(resp.rid)
                    asy_event.set()
    # Client.py
    class Client(TaskHandle, ClientBase):
    
        @AsyncTaskManager
        def process(self):
            super(Client, self).process()
            _events = []
            while self.has_events:
                event = self.get_next_event()
                data = event[1]
                _events.append(self.unpack_respond(data))
            return _events

    序列化方式,本文采用了库pickle进行序列化与反序列化,使用它的原因是可以将自定义类对象也进行序列化,非常之高级。

    2. 异步回调实现思路

    对于需要返回值的函数调用,处理起来比较简单,只需要将主线程阻塞等待,直至超时或者接收到了对应函数的返回值即可。本文采用了threading.Event来阻塞与唤醒调用的函数,同时采用了装饰器来实现这功能。若日后有更好的方法,可以轻易进行替换。相关示例代码如下所示:

    @AsyncTaskManager.respond
    def _handle_response(self, tid):
        """ 处理有返回值的情况
        会阻塞线程直至收到返回值
        """
        task = self.pop_task(tid)
        if task.callback:
            task.callback()
        return self.pop_respond(tid)
    
    @staticmethod
    def respond(func):
        @wraps(func)
        def make_resp(handle, tid):
            """ 需要注意的是,和装饰的函数参数含义需一致 """
            event = threading.Event()
            AsyncTaskManager._asy_events[tid] = event
            event.wait(timeout=TIME_OUT)
            return func(handle, tid)    # 这儿才是真正执行_handle_response的地方
        return make_resp

    在实际的应用过程中,应有这样的情况,服务端与客户端都是独立的应用,通过rpc函数进行通信和交互,而并不是某方为另外一方提供服务,那么此时返回值并不必要,只需要将要做的事通知另一方即可。对于此种情况,可以采用异步回调的方式来告知调用方对应函数执行成功了。

    在文中依旧采用线程来完成该功能,客户端调用函数之后创建一个新线程并阻塞住,等待服务端将执行结果发回后再唤醒,如果有回调函数就执行。示例代码如下:

    @AsyncTaskManager.callback
    def _handle_call_back(self, tid):
        """ 处理有回调函数的调用
        callback会等tid事件调用成功之后 才会回调,且不会有返回值
        """
        task = self.pop_task(tid)
        if task.callback:
            task.callback()
            
    @staticmethod
    def callback(func):
        @wraps(func)
        def make_thread(event, *args, **kwargs):
            event.wait(timeout=TIME_OUT)
            func(*args, **kwargs)
    
        def make_async(handle, tid):
            """ 注意点同上 """
            event = threading.Event()
            AsyncTaskManager._asy_events[tid] = event
            _task = threading.Thread(target=lambda: make_thread(event, handle, tid))
    
        return make_async

    关于“基于python如何实现rpc远程过程调用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注亿速云行业资讯频道,小编每天都会为大家更新不同的知识点。

    向AI问一下细节

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    AI