视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
Python自定义进程池实例分析【生产者、消费者模型问题】
2020-11-27 14:26:35 责编:小采
文档


本文实例分析了Python自定义进程池。分享给大家供大家参考,具体如下:

代码说明一切:

#encoding=utf-8
#author: walker
#date: 2014-05-21
#function: 自定义进程池遍历目录下文件
from multiprocessing import Process, Queue, Lock
import time, os
#消费者
class Consumer(Process):
 def __init__(self, queue, ioLock):
 super(Consumer, self).__init__()
 self.queue = queue
 self.ioLock = ioLock
 def run(self):
 while True:
 task = self.queue.get() #队列中无任务时,会阻塞进程
 if isinstance(task, str) and task == 'quit':
 break;
 time.sleep(1) #假定任务处理需要1秒钟
 self.ioLock.acquire()
 print( str(os.getpid()) + ' ' + task)
 self.ioLock.release()
 self.ioLock.acquire()
 print 'Bye-bye'
 self.ioLock.release()
#生产者
def Producer():
 queue = Queue() #这个队列是进程/线程安全的
 ioLock = Lock()
 subNum = 4 #子进程数量
 workers = build_worker_pool(queue, ioLock, subNum)
 start_time = time.time()
 for parent, dirnames, filenames in os.walk(r'D:	est'):
 for filename in filenames:
 queue.put(filename)
 ioLock.acquire()
 print('qsize:' + str(queue.qsize()))
 ioLock.release()
 while queue.qsize() > subNum * 10: #控制队列中任务数量
 time.sleep(1)
 for worker in workers:
 queue.put('quit')
 for worker in workers:
 worker.join()
 ioLock.acquire()
 print('Done! Time taken: {}'.format(time.time() - start_time))
 ioLock.release()
#创建进程池
def build_worker_pool(queue, ioLock, size):
 workers = []
 for _ in range(size):
 worker = Consumer(queue, ioLock)
 worker.start()
 workers.append(worker)
 return workers
if __name__ == '__main__':
 Producer()

ps:

self.ioLock.acquire()
...
self.ioLock.release()

可用:

with self.ioLock:
 ...

替代。

再来一个好玩的例子:

#encoding=utf-8
#author: walker
#date: 2016-01-06
#function: 一个多进程的好玩例子
import os, sys, time
from multiprocessing import Pool
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
g_List = ['a']
#修改全局变量g_List
def ModifyDict_1():
 global g_List
 g_List.append('b')
#修改全局变量g_List
def ModifyDict_2():
 global g_List
 g_List.append('c')
#处理一个
def ProcOne(num):
 print('ProcOne ' + str(num) + ', g_List:' + repr(g_List))
#处理所有
def ProcAll():
 pool = Pool(processes = 4)
 for i in range(1, 20):
 #ProcOne(i)
 #pool.apply(ProcOne, (i,))
 pool.apply_async(ProcOne, (i,))
 pool.close()
 pool.join()
ModifyDict_1() #修改全局变量g_List
if __name__ == '__main__':
 ModifyDict_2() #修改全局变量g_List
 print('In main g_List :' + repr(g_List))
 ProcAll()

Windows7 下运行的结果:

λ python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']

Ubuntu 14.04下运行的结果:

In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']

可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。

更多Python自定义进程池实例分析【生产者、消费者模型问题】相关文章请关注PHP中文网!

下载本文
显示全文
专题