python队列

队列是线程间最常用的数据交换形式,Queue是提供队列的操作模块。三种队列:

1、FIFO

2、LIFO

3、Priority

In [3]: import QueueIn [4]: queue= Queue.Queue()In [5]: queue.empty()Out[5]: TrueIn [6]: queue.full()Out[6]: FalseIn [7]:In [7]:In [7]:In [7]: queue= Queue.Queue(5)In [9]: queue.maxsizeOut[9]: 5In [10]: help(Queue.Queue)In [11]: queue.empty()Out[11]: TrueIn [12]: queue.full()Out[12]: FalseIn [13]: queue.maxsizeOut[13]: 5In [14]: queue.put('aa')In [15]: queue.empty()Out[15]: FalseIn [16]: queue.full()Out[16]: FalseIn [17]: queue.maxsizeOut[17]: 5In [18]: queue.qsize()Out[18]: 1In [19]: queue.get()Out[19]: 'aa'In [20]: help(queue.put)In [21]: queue.put(1)In [22]: queue.put(2)In [23]: queue.put(3)In [24]: queue.put(4)In [25]: queue.put(5)In [26]: queue.full()Out[26]: TrueIn [27]: queue.put(6,1,3)  #保存数据6到队列中,1表示允许阻塞,3表示阻塞3秒后打印报错raise Full---------------------------------------------------------------------------Full                                      Traceback (most recent call last)
 in 
()----> 1 queue.put(6,1,3)/opt/amos/python2.7/lib/python2.7/Queue.pyc in put(self, item, block, timeout)    132                         remaining = endtime - _time()    133                         if remaining <= 0.0:--> 134                             raise Full    135                         self.not_full.wait(remaining)    136             self._put(item)Full:In [28]:In [28]:In [28]: queue.get()Out[28]: 1In [29]: queue.get()Out[29]: 2In [30]: queue.get()Out[30]: 3In [31]: queue.get()Out[31]: 4In [32]: queue.get()Out[32]: 5In [34]: queue.qsize()Out[34]: 0In [35]: queue.get(1,3) #1表示允许queue的get阻塞,阻塞3秒表示打印错误raise Empty---------------------------------------------------------------------------Empty                                     Traceback (most recent call last)
 in 
()----> 1 queue.get(1,3)/opt/amos/python2.7/lib/python2.7/Queue.pyc in get(self, block, timeout)    174                     remaining = endtime - _time()    175                     if remaining <= 0.0:--> 176                         raise Empty    177                     self.not_empty.wait(remaining)    178             item = self._get()

[root@133 managehosts]# vim queue01.py#!/usr/bin/env pythonimport threadingimport Queueimport timeimport randomclass consumer(threading.Thread):#等号右边的queue是参数,即def __init__(self,queue)这里出现的,把这个参数赋值给self.queue这个属性。    def __init__(self, queue):        threading.Thread.__init__(self)#threading.Thread.__init__(self)这种写法是类的继承,类的继承还有一种写法,回顾一下,看看类继承那个视频。        self.queue = queue  #等号右边的queue是参数,即def __init__(self,queue)这里出现的,把这个参数赋值给self.queue这个属性。    def run(self):        while True:            if self.queue.empty():                break            data = self.queue.get()            print data            time.sleep(1)if __name__ == '__main__':    queue = Queue.Queue(10)    for i in xrange(10):        queue.put(random.randint(1,9))#保存1-9的随机数到queue中,保存10次    for i in range(3):        #创建3个线程,这3个线程就像3条河流,会同时向前走的,所以同时会产生3个数字,多线程就是每个线程都会运行的。        c = consumer(queue)        c.start()#c.start(),是开启一个线程,循环开3个线程与不用循环,开3个线程,是一样的。#第一次循环里,c.start()已经执行完了,再执行第二循环,无所谓把对象覆盖了,因为c.start()已经执行完了#队列是全局队列,每个线程都可以访问这个队列       [root@133 managehosts]# python queue01.py5822665695

例子:开启3个线程去队列取数据,取的结果为空,则推出,由于queue队列的输入是10个随机数(1-9),所以3个线程取到10个数后,就break

[root@133 managehosts]# vim queue02.py#!/usr/bin/env pythonimport threadingimport Queueimport timeimport randomclass consumer(threading.Thread):    def __init__(self, queue):        threading.Thread.__init__(self)        self.queue = queue    def run(self):        while True:            data = self.queue.get()            if data == None:                break            print self.name,data,time.ctime()if __name__ == '__main__':    queue = Queue.Queue(10)    for i in range(3):        c = consumer(queue)        c.start()    for i in xrange(10):        queue.put(random.randint(1,9))        time.sleep(1)    queue.put(None)    queue.put(None)    queue.put(None)[root@133 managehosts]# python queue02.pyThread-1 7 Wed Mar 29 17:43:46 2017Thread-2 3 Wed Mar 29 17:43:47 2017Thread-3 7 Wed Mar 29 17:43:48 2017Thread-1 9 Wed Mar 29 17:43:49 2017Thread-2 4 Wed Mar 29 17:43:50 2017Thread-3 1 Wed Mar 29 17:43:51 2017Thread-1 8 Wed Mar 29 17:43:52 2017Thread-2 3 Wed Mar 29 17:43:53 2017Thread-3 1 Wed Mar 29 17:43:54 2017Thread-1 6 Wed Mar 29 17:43:55 2017

Queue的例子

实现一个线程不断生成一个随机数到一个队列中

实现一个线程从上面的队列中不断的取出奇数

实现另一个线程从上面的队列中不断取出偶数

cat queue03.py#!/usr/bin/env pythonimport threadingimport randomimport Queueimport timedef producer(name,queue):    for i in xrange(10):        num = random.randint(1,10)        th_name = name + '-' +threading.currentThread().getName()        print "%s: %s ----> %s" % (time.ctime(), th_name,num)        queue.put(num)        time.sleep(1)def odd(name,queue):    th_name = name + '-' + threading.currentThread().getName()    while True:        try:            val_odd = queue.get(1,5)            if val_odd % 2 !=0:                print "%s: %s  ---> %s" %(time.ctime(), th_name, val_odd)            else:                queue.put(val_odd)                time.sleep(1)        except:            print "%s: %s finished" % (time.ctime(), th_name)            breakdef even(name,queue):    th_name = name + '-' + threading.currentThread().getName()    while True:        try:            val_even = queue.get(1,5)            if val_even % 2 ==0:                print "%s: %s  ---> %s" %(time.ctime(), th_name, val_even)                time.sleep(1)            else:                queue.put(val_even)                time.sleep(1)        except:            print "%s: %s finished" % (time.ctime(), th_name)            breakdef main():    q = Queue.Queue(10)    t_pro = threading.Thread(target=producer,args=('pro',q))    t_odd = threading.Thread(target=odd,args=('odd',q))    t_even = threading.Thread(target=even,args=('even',q))    t_pro.start()    t_odd.start()    t_even.start()if __name__== '__main__':    main()[root@133 managehosts]# python queue03.pyWed Mar 29 19:52:38 2017: pro-Thread-1 ----> 2Wed Mar 29 19:52:38 2017: even-Thread-3  ---> 2Wed Mar 29 19:52:39 2017: pro-Thread-1 ----> 6Wed Mar 29 19:52:39 2017: even-Thread-3  ---> 6Wed Mar 29 19:52:40 2017: pro-Thread-1 ----> 6Wed Mar 29 19:52:40 2017: even-Thread-3  ---> 6Wed Mar 29 19:52:41 2017: pro-Thread-1 ----> 8Wed Mar 29 19:52:41 2017: even-Thread-3  ---> 8Wed Mar 29 19:52:42 2017: pro-Thread-1 ----> 8Wed Mar 29 19:52:42 2017: even-Thread-3  ---> 8Wed Mar 29 19:52:43 2017: pro-Thread-1 ----> 3Wed Mar 29 19:52:43 2017: odd-Thread-2  ---> 3Wed Mar 29 19:52:44 2017: pro-Thread-1 ----> 4Wed Mar 29 19:52:44 2017: even-Thread-3  ---> 4Wed Mar 29 19:52:45 2017: pro-Thread-1 ----> 8Wed Mar 29 19:52:45 2017: even-Thread-3  ---> 8Wed Mar 29 19:52:46 2017: pro-Thread-1 ----> 8Wed Mar 29 19:52:46 2017: even-Thread-3  ---> 8Wed Mar 29 19:52:47 2017: pro-Thread-1 ----> 5Wed Mar 29 19:52:47 2017: odd-Thread-2  ---> 5Wed Mar 29 19:52:52 2017: odd-Thread-2 finishedWed Mar 29 19:52:52 2017: even-Thread-3 finished

python的全局解释性锁GIL,无论有多少个cpu,只有一个cpu能轮训处理多个线程。

multiprocessing可以让多个cpu处理多个进程。(方法和threading差不多)

[root@133 managehosts]# vim multiprocessing01.py#!/usr/bin/env pythonimport multiprocessingimport osimport timedef func(i):    print 'hello',i,os.getpid(),os.getppid()    time.sleep(1)for i in xrange(10):    p = multiprocessing.Process(target=func,args=(i,))    p.start()    [root@133 managehosts]# python multiprocessing01.py #主进程ppid都是31382,子进程pid不同hello 0 31383 31382hello 1 31384 31382hello 2 31385 31382hello 3 31386 31382hello 4 31387 31382hello 5 31388 31382hello 6 31389 31382hello 7 31390 31382hello 8 31391 31382hello 9 31392 31382

多线程(一个cpu切换运行多个线程)

[root@133 ~]# vim threading-multiprocessing02.py#!/usr/bin/env pythonimport threadingimport multiprocessingimport osimport timedef func():    while True:        1 + 1for i in range(20):    t = threading.Thread(target=func,args=())    t.start()    print "process id is %s" % os.getpid()[root@133 managehosts]# python threading-multiprocessing02.pyprocess id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994process id is 31994[root@133 ~]# toptop - 20:34:53 up 406 days,  1:45,  2 users,  load average: 0.12, 0.08, 0.02Tasks: 248 total,   2 running, 246 sleeping,   0 stopped,   0 zombieCpu0  : 26.5%us, 17.3%sy,  0.0%ni, 56.1%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu1  : 23.3%us, 14.6%sy,  0.0%ni, 62.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu2  :  8.4%us, 10.8%sy,  0.0%ni, 80.8%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu3  : 18.2%us, 15.0%sy,  0.0%ni, 66.8%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu4  :  6.1%us,  9.6%sy,  0.0%ni, 84.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu5  :  9.6%us,  7.8%sy,  0.0%ni, 82.6%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu6  :  5.0%us,  6.3%sy,  0.0%ni, 88.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu7  :  6.5%us,  4.5%sy,  0.0%ni, 89.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stMem:  20455792k total, 19930312k used,   525480k free,   315820k buffersSwap:  8388604k total,   157072k used,  8231532k free, 16260864k cached  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND10303 amos      20   0 7490m 1.1g 8096 S  0.0  5.5  16:57.94 mysqld20678 qemu      20   0 4862m 644m 4124 R  9.3  3.2  14493:02 qemu-kvm 3164 qemu      20   0 4685m 328m 1240 S 10.3  1.6  49795:22 qemu-kvm18481 root      20   0 2797m  48m 3096 S  0.0  0.2   0:05.15 salt-master18484 root      20   0 2797m  47m 3080 S  0.0  0.2   0:05.03 salt-master18487 root      20   0 2796m  47m 3088 S  0.0  0.2   0:04.96 salt-master18477 root      20   0 2797m  47m 3088 S  0.0  0.2   0:05.10 salt-master18478 root      20   0 2796m  47m 3096 S  0.0  0.2   0:05.30 salt-master31994 root      20   0 3402m  43m 1944 S 171.6  0.2   1:15.35 python

多进程(多个进程使用多个cpu同时运行程序)

[root@133 managehosts]# vim multiprocessing03.py#!/usr/bin/env pythonimport threadingimport multiprocessingimport osimport timefrom multiprocessing import Processimport osdef func():    while True:        1 + 1for i in range(20):    t = Process(target=func,args=())    t.start()[root@133 managehosts]# python multiprocessing03.py#同时有多个进程在运行,[root@133 managehosts]# ps aux | grep python root     32591  0.0  0.0 133228  5440 pts/6    S+   20:50   0:00 python multiprocessing03.pyroot     32592 34.0  0.0 133228  3936 pts/6    R+   20:50   0:14 python multiprocessing03.pyroot     32593 43.4  0.0 133228  3940 pts/6    R+   20:50   0:18 python multiprocessing03.pyroot     32594 37.7  0.0 133228  3940 pts/6    R+   20:50   0:16 python multiprocessing03.pyroot     32595 40.1  0.0 133228  3940 pts/6    R+   20:50   0:17 python multiprocessing03.pyroot     32596 33.8  0.0 133228  3940 pts/6    R+   20:50   0:14 python multiprocessing03.pyroot     32597 38.5  0.0 133228  3944 pts/6    R+   20:50   0:16 python multiprocessing03.pyroot     32598 40.0  0.0 133228  3948 pts/6    R+   20:50   0:17 python multiprocessing03.pyroot     32599 46.0  0.0 133228  3948 pts/6    R+   20:50   0:19 python multiprocessing03.pyroot     32600 46.0  0.0 133228  3952 pts/6    R+   20:50   0:19 python multiprocessing03.pyroot     32601 37.5  0.0 133228  3952 pts/6    R+   20:50   0:16 python multiprocessing03.pyroot     32602 39.9  0.0 133228  3956 pts/6    R+   20:50   0:17 python multiprocessing03.pyroot     32603 38.3  0.0 133228  3960 pts/6    R+   20:50   0:16 python multiprocessing03.pyroot     32604 37.0  0.0 133228  3964 pts/6    R+   20:50   0:15 python multiprocessing03.pyroot     32605 38.3  0.0 133228  3964 pts/6    R+   20:50   0:16 python multiprocessing03.pyroot     32606 32.9  0.0 133228  3964 pts/6    R+   20:50   0:14 python multiprocessing03.pyroot     32607 37.7  0.0 133228  3968 pts/6    R+   20:50   0:16 python multiprocessing03.pyroot     32608 39.2  0.0 133228  3968 pts/6    R+   20:50   0:16 python multiprocessing03.pyroot     32609 39.4  0.0 133228  3972 pts/6    R+   20:50   0:16 python multiprocessing03.pyroot     32610 37.3  0.0 133228  3972 pts/6    R+   20:50   0:16 python multiprocessing03.pyroot     32611 34.3  0.0 133228  3972 pts/6    R+   20:50   0:14 python multiprocessing03.py#负载增加,load不断升高,0.0%id空闲cpu为0,全部cpu被使用,[root@133 managehosts]# toptop - 20:52:18 up 406 days,  2:03,  2 users,  load average: 16.32, 5.79, 2.07Tasks: 266 total,  21 running, 245 sleeping,   0 stopped,   0 zombieCpu0  : 96.4%us,  3.6%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu1  :100.0%us,  0.0%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu2  : 97.7%us,  2.3%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu3  :100.0%us,  0.0%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu4  :100.0%us,  0.0%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu5  :100.0%us,  0.0%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu6  :100.0%us,  0.0%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stCpu7  :100.0%us,  0.0%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%stMem:  20455792k total, 19900920k used,   554872k free,   315820k buffersSwap:  8388604k total,   157072k used,  8231532k free, 16260928k cached  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND32599 root      20   0  130m 3948  536 R 47.1  0.0   0:46.09 python32600 root      20   0  130m 3952  536 R 46.7  0.0   0:46.08 python32604 root      20   0  130m 3964  536 R 43.8  0.0   0:35.82 python32605 root      20   0  130m 3964  536 R 42.8  0.0   0:38.77 python32598 root      20   0  130m 3948  536 R 41.8  0.0   0:39.72 python32601 root      20   0  130m 3952  536 R 40.4  0.0   0:38.84 python32610 root      20   0  130m 3972  536 R 40.4  0.0   0:38.91 python32593 root      20   0  130m 3940  536 R 39.8  0.0   0:40.42 python32595 root      20   0  130m 3940  536 R 39.8  0.0   0:39.61 python32607 root      20   0  130m 3968  536 R 39.4  0.0   0:37.95 python32597 root      20   0  130m 3944  536 R 39.1  0.0   0:38.28 python32609 root      20   0  130m 3972  536 R 39.1  0.0   0:38.30 python32611 root      20   0  130m 3972  536 R 37.5  0.0   0:34.07 python32602 root      20   0  130m 3956  536 R 37.1  0.0   0:41.16 python32608 root      20   0  130m 3968  536 R 37.1  0.0   0:38.57 python32603 root      20   0  130m 3960  536 R 36.8  0.0   0:39.07 python32596 root      20   0  130m 3940  536 R 36.5  0.0   0:36.10 python32592 root      20   0  130m 3936  532 R 35.8  0.0   0:33.52 python32606 root      20   0  130m 3964  536 R 33.8  0.0   0:33.52 python32594 root      20   0  130m 3940  536 R 32.5  0.0   0:35.69 python20678 qemu      20   0 4862m 644m 4124 S  5.6  3.2  14494:46 qemu-kvm 3164 qemu      20   0 4685m 328m 1240 S  5.0  1.6  49797:00 qemu-kvm   35 root      20   0     0    0    0 S  0.3  0.0   2877:33 events/0

进程池Pool

multiprocessing.Pool很方便饿同时处理几百个和上千个并行的操作,脚本的复杂性大大降低。

pool = multiprocessing.Pool(process=3) //设置最大进程数3

pool.close()  //会等待池中的worker进程执行结束后再关闭pool

pool.terminate()//直接关闭pool

pool.join()  //等待进程池中的worker进程执行完毕,阻塞主进程,但必须使用在pool.close()或则pool.terminate()之后

[root@133 managehosts]# vim multiprocessing-pool.py#!/usr/bin/env pythonimport multiprocessingfrom subprocess import Popen,PIPEdef run(id):    p = Popen('vim', stdout=PIPE,stderr=PIPE)    p.communicate()    print "Task: %s " %idpool = multiprocessing.Pool()for i in xrange(10):    pool.apply_async(func=run, args=(i,))print "Wait....."pool.close()pool.join()print "Done"[root@133 managehosts]# python multiprocessing-pool.pyWait.....8核心运行8个进程[root@133 managehosts]# ps aux | grep vimroot      1068  0.1  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1069  0.1  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1070  0.1  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1071  0.1  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1072  0.1  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1073  0.1  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1074  0.1  0.0 138996  4968 pts/6    S+   21:19   0:00 vimroot      1076  0.1  0.0 138996  4968 pts/6    S+   21:19   0:00 vimroot      1101  0.0  0.0 103316   904 pts/5    S+   21:20   0:00 grep vim[root@133 managehosts]# ps -ef | grep pythonroot      1056 32539  0 21:19 pts/6    00:00:00 python multiprocessing-pool.pyroot      1057  1056  0 21:19 pts/6    00:00:00 python multiprocessing-pool.pyroot      1058  1056  0 21:19 pts/6    00:00:00 python multiprocessing-pool.pyroot      1059  1056  0 21:19 pts/6    00:00:00 python multiprocessing-pool.pyroot      1060  1056  0 21:19 pts/6    00:00:00 python multiprocessing-pool.pyroot      1061  1056  0 21:19 pts/6    00:00:00 python multiprocessing-pool.pyroot      1062  1056  0 21:19 pts/6    00:00:00 python multiprocessing-pool.pyroot      1063  1056  0 21:19 pts/6    00:00:00 python multiprocessing-pool.pyroot      1064  1056  0 21:19 pts/6    00:00:00 python multiprocessing-pool.py杀掉一个子进程后,又产生一个进程[root@133 managehosts]# kill 1068 [root@133 managehosts]# ps aux | grep vimroot      1069  0.0  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1070  0.0  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1071  0.0  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1072  0.0  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1073  0.0  0.0 138996  4964 pts/6    S+   21:19   0:00 vimroot      1074  0.0  0.0 138996  4968 pts/6    S+   21:19   0:00 vimroot      1076  0.0  0.0 138996  4968 pts/6    S+   21:19   0:00 vimroot      1231  0.5  0.0 138996  4968 pts/6    S+   21:24   0:00 vimroot      1234  0.0  0.0 103316   904 pts/5    S+   21:24   0:00 grep vim[root@133 managehosts]#[root@133 managehosts]# python multiprocessing-pool.pyWait.....Task: 1