先前在使用RPyC进行进程间通讯文章中介绍过RPyC这个利器。在实际项目中运用之后,发现了一个需要解决的问题,这里记录下解决的思路。
RPyC启动方式
按照之前那篇文章里一样,在实际开发中选用了ThreadedServer。ThreadedServer在独立的线程中启动,
class RPyCServer(threading.Thread):
def __init__(self, host, port):
super(RPyCServer, self).__init__()
self.host = host
self.port = port
self.st_started = False
def start(self):
if self.st_started:
return
super(RPyCServer, self).start()
def run(self):
try:
server = ThreadedServer(rpyc.SlaveService, port=self.port, hostname=self.host)
self.st_started = True
except:
self.st_started = False
server = RPyCServer(host, port)
server.dameon = True
server.start()
RPyC服务单独启动了一个线程进行处理,在RPyC服务对外暴露的接口当中上不了要访问一些公共数据,多线程环境下不加保护就进行类似访问操作,很容易就会导致各种奇怪问题。
使用消息队列处理多线程访问
以Service方式对外暴露接口的RPyC相关代码如下,
class RPyCService(SlaveService):
def exposed_get_answer(self):
return 42
以exposed_开头的函数可以被RPyC客户端代码访问到,这部分函数实际实在RPyC线程中运行,实际应用中暴露的接口绝非简单返回数值那么简单。为了保证数据操作的安全,以及简化多线程环境下的数据操作,考虑用阻塞式队列来实现RPyC线程与其余线程间的交互。
import Queue
class QueueHelper(object):
def __init__(self):
self.commands = Queue.Queue()
def execute(self):
if self.commands.empty():
return
command = self.commands.get_nowait()
command()
def put(self, cb):
self.commands.put(cb)
g_queue_helper = QueueHelper()
上述代码定义了一个辅助的队列管理类,如何使用可以有以下几种方式。
显示调用QueueHelper代码将操作加入队列
class RPyCService(SlaveService):
def exposed_get_answer(self):
g_queue_helper.put(lambda: self._get_answer())
def _get_answer(self):
return 42
# main thread
def execute():
...
g_queue_helper.execute()
...
在RPyCService的exposed_函数中,将具体逻辑单独封装,以lambda方式加入到队列中,在主线程中调用g_queue_helper的execute操作,执行加入的操作。对于Service类中定义的每一个接口函数都需要进行类似的改动,于是可以想到通过python decorator来简化代码。
通过decorator标记需要多线程处理的接口
def rpyc_call(func):
def _(*args, **kwargs):
g_queue_helper.put(lambda: func(*args, **kwargs))
return _
class RPyCService(SlaveService):
@rpyc_call
def exposed_get_answer(self):
return 42
@rpyc_call
...
在使用decorator之后,Service类的实现简化了不少。每一个函数的定义基本不需要再考虑多线程的问题了。不过上述代码是否能再进一步简化呢,毕竟需要在每一个exposed_函数上定义@rpyc_call。在这个特殊的场景下,使用metaclass,可能会是个不错的选择。
通过metaclass处理需要多线程处理的接口
def rpyc_call(func):
def _(*args, **kwargs):
g_queue_helper.put(lambda: func(*args, **kwargs))
return _
class ServiceMeta(type):
def __init__(cls, name, bases, dic):
super(ServiceMeta, cls).__init__(name, bases, dic)
for x in dic:
if not x.startswith('exposed_'):
continue
setattr(cls, x, rpyc_call(getattr(cls, x)))
class RPyCService(SlaveService):
__metaclass__ = ServiceMeta
def exposed_get_answer(self):
return 42
def exposed_...:
...
通过上述metaclass,在编写Service类的时候就完全做到了不需要考虑线程问题。不过目前的例子有一种情况没有处理到,对于那些需要返回值的接口来说,通过队列将任务放到主线程上执行,会导致无法获取返回值。为了处理这个问题,我们需要再引入一个队列来处理接口返回值。
使用双队列处理需要返回值的接口函数
class QueueHelper(object):
def __init__(self):
self.commands = Queue.Queue()
self.results = Queue.Queue()
def execute(self):
if self.commands.empty():
return
command = self.commands.get_nowait()
self.results.put(command())
def put(self, cb):
self.commands.put(cb)
def get(self):
return self.results.get()
def copy_to_local(x):
if isinstance(x, dict):
x = {k: copy_to_local(v) for k, v in x.items()}
if isinstance(x, list) or isinstance(x, tuple):
x = [copy_to_local(i) for i in x]
return x
def rpyc_call(func):
def _(*args, **kwargs):
# 处理netref在非rpyc线程访问问题
args = [copy_to_local(x) for x in args]
kwargs = {k: copy_to_local(v) for k, v in kwargs.items()}
g_queue_helper.put(lambda: func(*args, **kwargs))
return g_queue_helper.get()
return _
经过修改之后的rpyc_call中通过结果队列获取操作执行的结果并返回,这样就完整的处理了多线程带来的问题。rpyc_call也加入了对netref的处理,实际运行之后发现netref在非rpyc线程访问也会导致各种问题,因此在将任务放入队列之前,先将需要的各项参数拷贝至本地。
总结
写Python代码偶尔还是需要考虑多线程问题,通过类似生产者消费者方式的消息队列可以有效处理一定程度的多线程问题。decorator、metaclass都是简化python代码的利器,实际需要根据具体应用场景选择对应的方式。RPyC是一个好用的工具,推荐使用。