一般将coroutine称之为协程(或微线程,也有称纤程的)。
我在python并发编程那篇文章已经详细讲解了进程Process和线程Thread的用法,
很早就想再写一篇专门讲解coroutine以及相关的优秀库gevent。
目前常见的coroutine应用都是网络程序中,所以我们先来看看各种不同的网络框架模型,
然后再介绍coroutine就会比较理解了。
不同网络模型 阻塞式单进程 这样的网络程序最简单,一个进程一个循环处理网络请求,当然性能也是最差的。
现在服务器采用这种模型的基本看不到了。
阻塞式多进程 每个请求开一个进程去处理,这样就能同时处理多个请求了,
不过缺点就是当请求数变大时CPU花在进程切换开销巨大,效率低下。
非阻塞式事件驱动 也是多进程,不过使用一个主进程循环来检查是否有网络I/O事件发生,再来决定怎样处理。
省去了上下文切换、进程复制等成本,也不会有死锁、竞争的发生。
不过缺点是没有阻塞式进程直观,Twisted 就是这样的网络框架。
非阻塞式Coroutine 有没有可能既有事件驱动的好处,又有阻塞式编程的直观的模型呢?答案也行就是我要讲的coroutine了。
基本上它的本质也是事件驱动,只在单一循环上面检查事件的发生,但是加上了coroutine的概念。
gevent 就是这种框架的代表。
Coroutine 简单来讲,coroutine就是允许你暂时中断之后再继续执行的程序。事实上,python最基础的coroutine就是生成器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def foo (): for i in range (10 ): yield i print (u'foo: 控制权又回到我手上了,叫我大笨蛋' ) bar = foo() print (next (bar))print (u'main: 现在控制权在我手上,啦啦啦' )print ('main:hello baby!' )print (next (bar))print (u'main: 老大我又来了' )print (next (bar))
执行结果:
1 2 3 4 5 6 7 8 0 main: 现在控制权在我手上,啦啦啦 main:hello baby! foo: 控制权又回到我手上了,叫我大笨蛋 1 main: 老大我又来了 foo: 控制权又回到我手上了,叫我大笨蛋 2
看到没有,我们的foo在执行过程中被中断又继续了好几回。
你刚开始可能觉得这样做没什么好处呀,thread的上下文切换不是也可以暂停再继续么。
其实重点在于:
thread之间需要context-switch,而且成本很高,但coroutine没有任何上下文切换
可轻松生成大量的coroutine,基本没什么开销
所有的coroutine全部都在一个线程中完成
thread的切换基本靠OS来觉得该执行哪个,而coroutine是自己控制。
Gevent 优秀的第三方库gevent就是基于coroutine的Python网络库,
它用到Greenlet提供的,封装了libevent事件循环的高层同步API,它的coroutine是由Greenlet负责生成的。
事实上程序写的跟普通的阻塞式程序一样,但它千真万确是异步的,这是它神奇的地方,我们来看实际的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 """Spawn multiple workers and wait for them to complete""" urls = ['http://www.gevent.org/' , 'http://www.baidu.com' , 'http://www.python.org' ] import geventfrom gevent import monkeymonkey.patch_all() import urllib2def print_head (url ): print ('Starting %s' % url) data = urllib2.urlopen(url).read() print ('%s: %s bytes: %r' % (url, len (data), data[:50 ])) jobs = [gevent.spawn(print_head, url) for url in urls] gevent.joinall(jobs)
写起来非常简单,不过里面有一句monkey.patch_all()
会让人有点疑惑。
也就是猴子补丁,因为python内置的各种函数库和IO库一般都是阻塞式的,比如sleep()
就会当前进程,
而monkey就是负责将这些阻塞函数全部取代替换成gevent中相应的异步函数。
gevent打了monkey patch之后会设置python相应的模块设置成非阻塞,
然后在内部实现epoll的机制,一旦调用非阻塞的IO(比如recv)都会立刻返回,
并且设置一个回调函数,这个回调函数用于切换到当前子coroutine,
设置好回掉函数之后就把控制权返回给主coroutine,主coroutine继续调度。
一旦网络I/O准备就绪,epoll会触发之前设置的回调函数,从而引发主coroutine切换到子coroutine,做相应的操作。
joinall()
的意思是等待列表中所有coroutine完成后再返回。
上下文切换 gevent里面的上下文切换是非常平滑的。在下面的例子程序中,我们可以看到两个上下文通过调用 gevent.sleep()来互相切换。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import geventdef foo (): print ('Running in foo' ) gevent.sleep(0 ) print ('Explicit context switch to foo again' ) def bar (): print ('Explicit context to bar' ) gevent.sleep(0 ) print ('Implicit context switch back to bar' ) gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
接下来一个例子中可以看到gevent是安排各个任务的执行的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import geventimport randomdef task (pid ): """ Some non-deterministic task """ gevent.sleep(random.randint(0 , 2 ) * 0.001 ) print ('Task' , pid, 'done' ) def synchronous (): for i in range (1 , 10 ): task(i) def asynchronous (): threads = [gevent.spawn(task, i) for i in xrange(10 )] gevent.joinall(threads) print ('Synchronous:' )synchronous() print ('Asynchronous:' )asynchronous()
在同步的情况下,任务是按顺序执行的,在执行各个任务的时候会阻塞主线程。
而 gevent.spawn
的重要功能就是封装了greenlet里面的函数。
初始化的greenlet放在了threads这个list里面,
被传递给了 gevent.joinall
这个函数,它会阻塞当前的程序来执行所有的greenlet。
在异步执行的情况下,所有任务的执行顺序是完全随机的。每一个greenlet的都不会阻塞其他greenlet的执行。
生成greenlet 上面我们通过gevent.spawn
来包装了对于greenlet的生成,
另外我们还能可以通过创建Greenlet的子类,并且重写 _run 方法来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import geventfrom gevent import Greenletclass MyGreenlet (Greenlet ): def __init__ (self, message, n ): Greenlet.__init__(self) self.message = message self.n = n def _run (self ): print (self.message) gevent.sleep(self.n) g = MyGreenlet("Hi there!" , 3 ) g.start() g.join()
Greenlet 的状态 greenlet在执行的时候也会出错。Greenlet有可能会无法抛出异常,停止失败,或者消耗了太多的系统资源。
greenlet的内部状态通常是一个依赖时间的参数。greenlet有一些标记来让你能够监控greenlet的状态
started – 标志greenlet是否已经启动
ready – 标志greenlet是否已经被终止
successful() – 标志greenlet是否已经被终止,并且没有抛出异常
value – 由greenlet返回的值
exception – 在greenlet里面没有被捕获的异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import geventdef win (): return 'You win!' def fail (): raise Exception('You fail at failing.' ) winner = gevent.spawn(win) loser = gevent.spawn(fail) print (winner.started) print (loser.started) try : gevent.joinall([winner, loser]) except Exception as e: print ('This will never be reached' ) print (winner.value) print (loser.value) print (winner.ready()) print (loser.ready()) print (winner.successful()) print (loser.successful()) print (loser.exception)
终止程序 在主程序收到一个SIGQUIT 之后会阻塞程序的执行让Greenlet无法继续执行。
这会导致僵尸进程的产生,需要在操作系统中将这些僵尸进程清除掉。
1 2 3 4 5 6 7 8 9 10 11 12 import geventimport signaldef run_forever (): gevent.sleep(1000 ) if __name__ == '__main__' : gevent.signal(signal.SIGQUIT, gevent.shutdown) thread = gevent.spawn(run_forever) thread.join()
超时 在gevent中支持对于coroutine的超时控制,还能使用with
上下文,示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 import geventfrom gevent import Timeouttime_to_wait = 5 class TooLong (Exception ): pass with Timeout(time_to_wait, TooLong): gevent.sleep(10 )
猴子补丁 现在这是gevent里面的一个难点。下面一个例子里可能看到 `monkey.patch_socket()`` 能够在运行时里面修改基础库socket:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import socketprint (socket.socket)print "After monkey patch" from gevent import monkeymonkey.patch_socket() print (socket.socket)import selectprint (select.select)monkey.patch_select() print "After monkey patch" print (select.select)
Python的运行时里面允许能够大部分的对象都是可以修改的,包括模块,类和方法。这通常是一个坏主意,
然而在极端的情况下,当有一个库需要加入一些Python基本的功能的时候,monkey patch就能派上用场了。
在上面的例子里,gevent能够改变基础库里的一些使用IO阻塞模型的库比如socket,ssl,threading等等并且把它们改成协程的执行方式。
事件 有时候我们还需要在多个greenlet直接进行通信,比如某些操作的同步。事件(event)是一个在Greenlet之间异步通信的形式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import geventfrom gevent.event import Event''' Illustrates the use of events ''' evt = Event() def setter (): '''After 3 seconds, wake all threads waiting on the value of evt''' print ('A: Hey wait for me, I have to do something' ) gevent.sleep(3 ) print ("Ok, I'm done" ) evt.set () def waiter (): '''After 3 seconds the get call will unblock''' print ("I'll wait for you" ) evt.wait() print ("It's about time" ) def main (): gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), ]) if __name__ == '__main__' : main()
事件对象的一个扩展是AsyncResult,它允许你在唤醒调用上附加一个值。
它有时也被称作是future或defered,因为它持有一个指向将来任意时间可设置为任何值的引用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import geventfrom gevent.event import AsyncResulta = AsyncResult() def setter (): """ After 3 seconds set the result of a. """ gevent.sleep(3 ) a.set ('Hello!' ) def waiter (): """ After 3 seconds the get call will unblock after the setter puts a value into the AsyncResult. """ print (a.get()) gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])
队列 队列是一个排序的数据集合,它有常见的put / get操作,但是它是以在Greenlet之间可以安全操作的方式来实现的。
举例来说,如果一个Greenlet从队列中取出一项,此项就不会被同时执行的其它Greenlet再取到了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import geventfrom gevent.queue import Queuetasks = Queue() def worker (n ): while not tasks.empty(): task = tasks.get() print ('Worker %s got task %s' % (n, task)) gevent.sleep(0 ) print ('Quitting time!' ) def boss (): for i in range (1 , 25 ): tasks.put_nowait(i) gevent.spawn(boss).join() gevent.joinall([ gevent.spawn(worker, 'steve' ), gevent.spawn(worker, 'john' ), gevent.spawn(worker, 'nancy' ), ])
如果需要,队列也可以阻塞在put或get操作上。
put和get操作都有非阻塞的版本,put_nowait和get_nowait不会阻塞,
然而在操作不能完成时抛出gevent.queue.Empty
或gevent.queue.Full
异常。
我们让boss与多个worker同时运行,并限制了queue不能放入多于3个元素:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import geventfrom gevent.queue import Queue, Emptytasks = Queue(maxsize=3 ) def worker (n ): try : while True : task = tasks.get(timeout=1 ) print ('Worker %s got task %s' % (n, task)) gevent.sleep(0 ) except Empty: print ('Quitting time!' ) def boss (): """ Boss will wait to hand out work until a individual worker is free since the maxsize of the task queue is 3. """ for i in xrange(1 , 10 ): tasks.put(i) print ('Assigned all work in iteration 1' ) for i in xrange(10 , 20 ): tasks.put(i) print ('Assigned all work in iteration 2' ) gevent.joinall([ gevent.spawn(boss), gevent.spawn(worker, 'steve' ), gevent.spawn(worker, 'john' ), gevent.spawn(worker, 'bob' ), ])
组和池 组(group)是一个运行中greenlet的集合,集合中的greenlet像一个组一样会被共同管理和调度。
它也兼饰了像Python的multiprocessing库那样的平行调度器的角色:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import geventfrom gevent.pool import Groupdef talk (msg ): for i in xrange(3 ): print (msg) g1 = gevent.spawn(talk, 'bar' ) g2 = gevent.spawn(talk, 'foo' ) g3 = gevent.spawn(talk, 'fizz' ) group = Group() group.add(g1) group.add(g2) group.join() group.add(g3) group.join()
池(pool)是一个为处理数量变化并且需要限制并发的greenlet而设计的结构。
在需要并行地做很多受限于网络和IO的任务时常常需要用到它。
当构造gevent驱动的服务时,经常会将围绕一个池结构的整个服务作为中心。一个例子就是在各个socket上轮询的类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from gevent.pool import Poolclass SocketPool (object ): def __init__ (self ): self.pool = Pool(1000 ) self.pool.start() def listen (self, socket ): while True : socket.recv() def add_handler (self, socket ): if self.pool.full(): raise Exception("At maximum pool size" ) else : self.pool.spawn(self.listen, socket) def shutdown (self ): self.pool.kill()
锁和信号量 信号量是一个允许greenlet相互合作,限制并发访问或运行的低层次的同步原语。信号量有两个方法,acquire和release。
在信号量是否已经被acquire或release,和拥有资源的数量之间不同,被称为此信号量的范围 (the bound of the semaphore)。
如果一个信号量的范围已经降低到0,它会阻塞acquire操作直到另一个已经获得信号量的greenlet作出释放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from gevent import sleepfrom gevent.pool import Poolfrom gevent.coros import BoundedSemaphoresem = BoundedSemaphore(2 ) def worker1 (n ): sem.acquire() print ('Worker %i acquired semaphore' % n) sleep(0 ) sem.release() print ('Worker %i released semaphore' % n) def worker2 (n ): with sem: print ('Worker %i acquired semaphore' % n) sleep(0 ) print ('Worker %i released semaphore' % n) pool = Pool() pool.map (worker1, xrange(0 , 2 )) pool.map (worker2, xrange(3 , 6 ))
范围为1的信号量也称为锁(lock),它向单个greenlet提供了互斥访问。信号量和锁常常用来保证资源只在程序上下文被单次使用。
子进程 自gevent 1.0起,gevent.subprocess
,一个Python subprocess模块的修补版本已经添加,它支持协作式的等待子进程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import geventfrom gevent.subprocess import Popen, PIPEdef cron (): while True : print ("cron" ) gevent.sleep(0.2 ) g = gevent.spawn(cron) sub = Popen(['sleep 1; uname' ], stdout=PIPE, shell=True ) out, err = sub.communicate() g.kill() print (out.rstrip())
actor模型 actor模型是一个由于Erlang变得普及的更高层的并发模型。简单的说它的主要思想就是许多个独立的Actor,
每个Actor有一个可以从其它Actor接收消息的收件箱。Actor内部的主循环遍历它收到的消息,并根据它期望的行为来采取行动。
Gevent没有原生的Actor类型,但在一个子类化的Greenlet内使用队列,我们可以定义一个非常简单的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import geventfrom gevent.queue import Queueclass Actor (gevent.Greenlet): def __init__ (self ): self.inbox = Queue() Greenlet.__init__(self) def receive (self, message ): """ Define in your subclass. """ raise NotImplemented () def _run (self ): self.running = True while self.running: message = self.inbox.get() self.receive(message)
接下来使用这个actor的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import geventfrom gevent.queue import Queuefrom gevent import Greenletclass Pinger (Actor ): def receive (self, message ): print (message) pong.inbox.put('ping' ) gevent.sleep(0 ) class Ponger (Actor ): def receive (self, message ): print (message) ping.inbox.put('pong' ) gevent.sleep(0 ) ping = Pinger() pong = Ponger() ping.start() pong.start() ping.inbox.put('start' ) gevent.joinall([ping, pong])