视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
python中进程间数据通讯模块multiprocessing.Manager的介绍
2020-11-27 14:20:23 责编:小采
文档


本篇文章给大家带来的内容是关于python中进程间数据通讯模块multiprocessing.Manager的介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Manager, 主要会以dict为例子, 说明下进程间共享(同一个父进程).

dict使用说明

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
# 3. 创建一个测试程序
def test(idx, test_dict):
 test_dict[idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
 pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

too simple.

简单的源码分析

这时我们再看一个例子

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
 test_dict['test'][idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
 pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

可以看到输出结果是奇怪的{'test': {}}
如果我们简单修改一下代码

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
 row = test_dict['test']
 row[idx] = idx
 test_dict['test'] = row
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
 pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

这时输出结果就符合预期了.

为了了解这个现象背后的原因, 我简单去读了一下源码, 主要有以下几段代码很关键.

def Manager():
 '''
 Returns a manager associated with a running server process

 The managers methods such as `Lock()`, `Condition()` and `Queue()`
 can be used to create shared objects.
 '''
 from multiprocessing.managers import SyncManager
 m = SyncManager()
 m.start()
 return m
 
...
 def start(self, initializer=None, initargs=()):
 '''
 Spawn a server process for this manager object
 '''
 assert self._state.value == State.INITIAL

 if initializer is not None and not hasattr(initializer, '__call__'):
 raise TypeError('initializer must be a callable')

 # pipe over which we will retrieve address of server
 reader, writer = connection.Pipe(duplex=False)

 # spawn process which runs a server
 self._process = Process(
 target=type(self)._run_server,
 args=(self._registry, self._address, self._authkey,
 self._serializer, writer, initializer, initargs),
 )
 ident = ':'.join(str(i) for i in self._process._identity)
 self._process.name = type(self).__name__ + '-' + ident
 self._process.start()
...

上面代码可以看出, 当我们声明了一个Manager对象的时候, 程序实际在其他进程启动了一个server服务, 这个server是阻塞的, 以此来实现进程间数据安全.
我的理解就是不同进程之间操作都是互斥的, 一个进程向server请求到这部分数据, 再把这部分数据修改, 返回给server, 之后server再去处理其他进程的请求.

回到上面的奇怪现象上, 这个操作test_dict['test'][idx] = idx实际上在拉取到server上的数据后进行了修改, 但并没有返回给server, 所以temp_dict的数据根本没有变化. 在第二段正常代码, 就相当于先向服务器请求数据, 再向服务器传送修改后的数据. 这样就可以解释这个现象了.

进程间数据安全

这个时候如果出现一种情况, 两个进程同时请求了一份相同的数据, 分别进行修改, 再提交到server上会怎么样呢? 那当然是数据产生异常. 基于此, 我们需要Manager的另一个对象, Lock(). 这个对象也不难理解, Manager本身就是一个server, dict跟lock都来自于这个server, 所以当你lock住的时候, 其他进程是不能取到数据, 自然也不会出现上面那种异常情况.

代码示例:

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
lock = manager.Lock()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict, lock):
 lock.acquire()
 row = test_dict['test']
 row[idx] = idx
 test_dict['test'] = row
 lock.release()
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
 pool.apply_async(test, args=(i, temp_dict, lock))
pool.close()
pool.join()
print(temp_dict)

切忌不要进程里自己新建lock对象, 要使用统一的lock对象.

本篇文章到这里就已经全部结束了,更多其他精彩内容可以关注PHP中文网的python视频教程栏目!

下载本文
显示全文
专题