博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python -- 进程补充
阅读量:5324 次
发布时间:2019-06-14

本文共 7943 字,大约阅读时间需要 26 分钟。

生产者消费者模型

import timeimport randomfrom multiprocessing import Queuefrom multiprocessing import Processdef producer(q):    for i in range(10):        q.put('%s-%s'%(food,i)        print('生产了%s'%food)        time.sleep(random.randint(1,3)    q.put(None)    def consumer():    while True:        food = q.get()        if food == None:break        print('%s 吃了 %s'%(name,food))    if __name__ == ' __main__':    q = Queue()    p1 = Process(target=producer,args=(q,'泔水'))    p1.start()    p2 = Process(target=producer,args=(q,'骨头'))    p2.start()    c1 = Process(target=consumer,args=(q,'alex'))    c1.start()    c2 = Process(target=consumer,args=(q,'jin'))    c2.start()    c3 = Process(target=consumer,args=(q,'egon'))    c3.start()#队列很安全

生产者消费者模型

  1.消费者要处理多少数据是不确定的
  2.只能用while循环来处理数据,但无法结束
  3.需要生产者发送信号
  4.有多少个消费者 就需要发送多少个信号
  5.但是发送的信号数量需要根据 生产者

JoinableQueue([maxsize]) 方法

import timeimport randomfrom multiprocessing import JoinableQueuefrom multiprocessing import Processdef producer(q):    for i in range(10):        q.put('%s-%s'%(food,i)        print('生产了%s'%food)        time.sleep(random.randint(1,3)    q.join() #等待消费者把所有数据都处理完    def consumer():    while True:        food = q.get()        if food == None:break        print('%s 吃了 %s'%(name,food))        q.task_done()        if __name__ == '__main__':    q = JoinableQueue()    p1 = Process(target=producer,args=(q,'泔水'))    p1.start()    p2 = Process(target=producer,args=(q,'骨头'))    p2.start()    c1 = Process(target=consumer,args=(q,'alex'))    c1.daemon = True    c1.start()    c2 = Process(target=consumer,args=(q,'jin'))    c2.daemon = True    c2.start()    c3 = Process(target=consumer,args=(q,'egon'))    c3.daemon = True    c3.start()        p1.join() #等待p1执行完毕    p2.join()

管道

#创建管道的类:Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2)   ,其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道#参数介绍:dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。#主要方法:    conn1.recv():接收conn2.send(obj)发送的对象。     如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。    conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象 #其他方法:conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法conn1.fileno():返回连接使用的整数文件描述符conn1.poll([timeout]):如果连接上的数据可用,返回True。   timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。 conn1.recv_bytes([maxlength]):   接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。   如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。   如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。conn.send_bytes(buffer [, offset [, size]]):   通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,   而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收     conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,   该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。   offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
from multiprocessing import Processfrom multiprocessing import Pipedef func(p):    foo,son = p    foo.close()    while True:        try:            print(son.recv())        except EOFError:            break        # print(son.recv())if __name__ == '__main__':    foo,son = Pipe()    p = Process(target=func, args=((foo,son),))    p.start()    son.close()    foo.send('hello')    foo.send('hello')    foo.send('hello')    foo.send('hello')    foo.send('hello')    foo.close()
from multiprocessing import Processfrom multiprocessing import Pipefrom multiprocessing import Lockdef func(p,l):    foo, son = p    foo.close()    while True:        try:            l.acquire()            print(son.recv())            l.release()        except EOFError:            l.release()            son.close()            breakdef func2(p):    foo, son = p    son.close()    for i in range(10):        foo.send(i)    foo.close()if __name__ == '__main__':    foo,son = Pipe()    l = Lock()    p = Process(target=func,args=((foo,son),l))    p1 = Process(target=func,args=((foo,son),l))    p2 = Process(target=func,args=((foo,son),l))    p.start()    p1.start()    p2.start()    p3 = Process(target=func2, args=((foo, son),)).start()    p4 = Process(target=func2, args=((foo, son),)).start()    p5 = Process(target=func2, args=((foo, son),)).start()    p6 = Process(target=func2, args=((foo, son),)).start()    p7 = Process(target=func2, args=((foo, son),)).start()    son.close()    foo.close()

应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。 

管道具有不安全性,而队列相当于管道与锁的结合。

进程之间的数据共享

from multiprocessing import Manager,Process,Lockdef work(d,lock):    with lock: #不加锁而操作共享的数据,肯定会出现数据错乱        d['count']-=1if __name__ == '__main__':    lock=Lock()    with Manager() as m:        dic=m.dict({
'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)'''进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.'''

进程池和multiprocess.Pool模块

进程池

进程池的概念。

在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

Pool([numprocess  [,initializer [, initargs]]]):创建进程池numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值initializer:是每个工作进程启动时要执行的可调用对象,默认为Noneinitargs:是要传给initializer的参数组p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。'''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。'''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''   p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。obj.ready():如果调用完成,返回Trueobj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常obj.wait([timeout]):等待结果变为可用。obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
import osimport timefrom multiprocessing import Poolprint(os.cpu_count())def func(i):    time.sleep(1)    print(i,os.getpid())    if __name__ == '__main__':    p = Pool(5)    p.map(func,range(20))  #默认是无序的    p.close()  #不允许再向进程池中添加任务    p.join()    print('====>')import timefrom multiprocessing import Poolfrom multiprocessing import Processdef func(i):    i += 1    if __name__ == '__main__':    p = Pool(5)    start = time.time()    #target=func,args=next(iterable)    p.map(func,range(20))  #默认是无序的    p.close()  #不允许再向进程池中添加任务    p.join()    print('====>')
数据池与进程之间的对比
import timefrom multiprocessing import Pooldef func(i):    time.sleep(1)    i += 1    return i+1    if __name__ == '__main__':    p = Pool(5)    for i in range(20):        p.apply(func,args=(i,))        #apply是同步提交任务的机制    p.close() #close必须加在join前 不许添新任务    p.join()  #等待子进程结束在往下执行
数据池的同步调用
import timefrom multiprocessing import Pooldef func(i):    time.sleep(1)    i += 1    return i+1    if __name__ == '__main__':    p = Pool(5)    for i in range(20):        p.apply_async(func,args=(i,))        #apply是异步提交任务的机制        #异步必须要有close和join    p.close() #close必须加在join前 不许添新任务    p.join()  #等待子进程结束在往下执行
数据池异步调用

 

转载于:https://www.cnblogs.com/soleZ/p/8418549.html

你可能感兴趣的文章
jmeter接口测试之登录测试
查看>>
【CQOI2009】中位数
查看>>
ThinkPHP大写单字母函数
查看>>
ELK Stack (2) —— ELK + Redis收集Nginx日志
查看>>
ElasticSearch 2 (19) - 语言处理系列之故事开始
查看>>
NLTK的使用
查看>>
Java面试题之谈谈reactor模型
查看>>
win7下安装sdks
查看>>
通过maven profile 打包指定环境配置
查看>>
redis 存储时间区间的数据
查看>>
STM32F0库函数初始化系列:进入STOP模式,外部中断唤醒
查看>>
p1525 关押罪犯
查看>>
使用Html5shiv.js让ie支持html5
查看>>
DBA 优化法则
查看>>
用Python连接SQLServer抓取分析数据、监控 (pymssql)
查看>>
升级ruby后再安装cocodPod
查看>>
MySQL数据库8(十三)高级数据操作之select指令
查看>>
随心测试_Python Se_002<不同浏览器驱动>
查看>>
LeetCode 202. Happy Number
查看>>
【Codeforces Round #432 (Div. 2) A】 Arpa and a research in Mexican wave
查看>>