浅谈coroutine和gevent

一般将coroutine称之为协程(或微线程,也有称纤程的)。 我在python并发编程那篇文章已经详细讲解了进程Process和线程Thread的用法, 很早就想再写一篇专门讲解coroutine以及相关的优秀库gevent。

目前常见的coroutine应用都是网络程序中,所以我们先来看看各种不同的网络框架模型, 然后再介绍coroutine就会比较理解了。

不同网络模型

阻塞式单进程

这样的网络程序最简单,一个进程一个循环处理网络请求,当然性能也是最差的。 现在服务器采用这种模型的基本看不到了。

阻塞式多进程

每个请求开一个进程去处理,这样就能同时处理多个请求了, 不过缺点就是当请求数变大时CPU花在进程切换开销巨大,效率低下。

非阻塞式事件驱动

也是多进程,不过使用一个主进程循环来检查是否有网络I/O事件发生,再来决定怎样处理。 省去了上下文切换、进程复制等成本,也不会有死锁、竞争的发生。 不过缺点是没有阻塞式进程直观,Twisted就是这样的网络框架。

非阻塞式Coroutine

有没有可能既有事件驱动的好处,又有阻塞式编程的直观的模型呢?答案也行就是我要讲的coroutine了。 基本上它的本质也是事件驱动,只在单一循环上面检查事件的发生,但是加上了coroutine的概念。 gevent就是这种框架的代表。

Coroutine

简单来讲,coroutine就是允许你暂时中断之后再继续执行的程序。事实上,python最基础的coroutine就是生成器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
# -*- coding: utf8 -*-

def foo():
for i in range(10):
# 暂时返回一个值,并将控制权交出去
yield i
print(u'foo: 控制权又回到我手上了,叫我大笨蛋')


bar = foo()
# 执行coroutine
print(next(bar))
print(u'main: 现在控制权在我手上,啦啦啦')
print('main:hello baby!')
# 回到刚刚foo这个coroutine中断的地方继续执行
print(next(bar))
print(u'main: 老大我又来了')
print(next(bar))

执行结果:

1
2
3
4
5
6
7
8
0
main: 现在控制权在我手上,啦啦啦
main:hello baby!
foo: 控制权又回到我手上了,叫我大笨蛋
1
main: 老大我又来了
foo: 控制权又回到我手上了,叫我大笨蛋
2

看到没有,我们的foo在执行过程中被中断又继续了好几回。 你刚开始可能觉得这样做没什么好处呀,thread的上下文切换不是也可以暂停再继续么。 其实重点在于:

  • thread之间需要context-switch,而且成本很高,但coroutine没有任何上下文切换
  • 可轻松生成大量的coroutine,基本没什么开销
  • 所有的coroutine全部都在一个线程中完成
  • thread的切换基本靠OS来觉得该执行哪个,而coroutine是自己控制。

Gevent

优秀的第三方库gevent就是基于coroutine的Python网络库, 它用到Greenlet提供的,封装了libevent事件循环的高层同步API,它的coroutine是由Greenlet负责生成的。

事实上程序写的跟普通的阻塞式程序一样,但它千真万确是异步的,这是它神奇的地方,我们来看实际的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
"""Spawn multiple workers and wait for them to complete"""

urls = ['http://www.gevent.org/', 'http://www.baidu.com', 'http://www.python.org']

import gevent
from gevent import monkey

# patches stdlib (including socket and ssl modules) to cooperate with other greenlets
monkey.patch_all()

import urllib2


def print_head(url):
print'Starting %s' % url)
data = urllib2.urlopen(url).read()
print'%s: %s bytes: %r' % (url, len(data), data[:50]))

jobs = [gevent.spawn(print_head, url) for url in urls]

gevent.joinall(jobs)

写起来非常简单,不过里面有一句monkey.patch_all()会让人有点疑惑。 也就是猴子补丁,因为python内置的各种函数库和IO库一般都是阻塞式的,比如sleep()就会当前进程, 而monkey就是负责将这些阻塞函数全部取代替换成gevent中相应的异步函数。

gevent打了monkey patch之后会设置python相应的模块设置成非阻塞, 然后在内部实现epoll的机制,一旦调用非阻塞的IO(比如recv)都会立刻返回, 并且设置一个回调函数,这个回调函数用于切换到当前子coroutine, 设置好回掉函数之后就把控制权返回给主coroutine,主coroutine继续调度。 一旦网络I/O准备就绪,epoll会触发之前设置的回调函数,从而引发主coroutine切换到子coroutine,做相应的操作。

joinall()的意思是等待列表中所有coroutine完成后再返回。

上下文切换

gevent里面的上下文切换是非常平滑的。在下面的例子程序中,我们可以看到两个上下文通过调用 gevent.sleep()来互相切换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import gevent


def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')


def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')


gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])

接下来一个例子中可以看到gevent是安排各个任务的执行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import gevent
import random


def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(random.randint(0, 2) * 0.001)
print('Task', pid, 'done')


def synchronous():
for i in range(1, 10):
task(i)


def asynchronous():
threads = [gevent.spawn(task, i) for i in xrange(10)]
gevent.joinall(threads)


print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

在同步的情况下,任务是按顺序执行的,在执行各个任务的时候会阻塞主线程。

gevent.spawn 的重要功能就是封装了greenlet里面的函数。 初始化的greenlet放在了threads这个list里面, 被传递给了 gevent.joinall 这个函数,它会阻塞当前的程序来执行所有的greenlet。

在异步执行的情况下,所有任务的执行顺序是完全随机的。每一个greenlet的都不会阻塞其他greenlet的执行。

生成greenlet

上面我们通过gevent.spawn来包装了对于greenlet的生成, 另外我们还能可以通过创建Greenlet的子类,并且重写 _run 方法来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import gevent
from gevent import Greenlet


class MyGreenlet(Greenlet):

def __init__(self, message, n):
Greenlet.__init__(self)
self.message = message
self.n = n

def _run(self):
print(self.message)
gevent.sleep(self.n)


g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

Greenlet 的状态

greenlet在执行的时候也会出错。Greenlet有可能会无法抛出异常,停止失败,或者消耗了太多的系统资源。

greenlet的内部状态通常是一个依赖时间的参数。greenlet有一些标记来让你能够监控greenlet的状态

  • started – 标志greenlet是否已经启动
  • ready – 标志greenlet是否已经被终止
  • successful() – 标志greenlet是否已经被终止,并且没有抛出异常
  • value – 由greenlet返回的值
  • exception – 在greenlet里面没有被捕获的异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import gevent


def win():
return 'You win!'


def fail():
raise Exception('You fail at failing.')


winner = gevent.spawn(win)
loser = gevent.spawn(fail)

print(winner.started) # True
print(loser.started) # True

# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
gevent.joinall([winner, loser])
except Exception as e:
print('This will never be reached')

print(winner.value) # 'You win!'
print(loser.value) # None

print(winner.ready()) # True
print(loser.ready()) # True

print(winner.successful()) # True
print(loser.successful()) # False

# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.

print(loser.exception)

终止程序

在主程序收到一个SIGQUIT 之后会阻塞程序的执行让Greenlet无法继续执行。 这会导致僵尸进程的产生,需要在操作系统中将这些僵尸进程清除掉。

1
2
3
4
5
6
7
8
9
10
11
12
import gevent
import signal


def run_forever():
gevent.sleep(1000)


if __name__ == '__main__':
gevent.signal(signal.SIGQUIT, gevent.shutdown)
thread = gevent.spawn(run_forever)
thread.join()

超时

在gevent中支持对于coroutine的超时控制,还能使用with上下文,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
import gevent
from gevent import Timeout

time_to_wait = 5 # seconds


class TooLong(Exception):
pass


with Timeout(time_to_wait, TooLong):
gevent.sleep(10)

猴子补丁

现在这是gevent里面的一个难点。下面一个例子里可能看到 `monkey.patch_socket()`` 能够在运行时里面修改基础库socket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import socket

print(socket.socket)

print
"After monkey patch"
from gevent import monkey

monkey.patch_socket()

print(socket.socket)

import select

print(select.select)
monkey.patch_select()
print
"After monkey patch"
print(select.select)

Python的运行时里面允许能够大部分的对象都是可以修改的,包括模块,类和方法。这通常是一个坏主意, 然而在极端的情况下,当有一个库需要加入一些Python基本的功能的时候,monkey patch就能派上用场了。 在上面的例子里,gevent能够改变基础库里的一些使用IO阻塞模型的库比如socket,ssl,threading等等并且把它们改成协程的执行方式。

事件

有时候我们还需要在多个greenlet直接进行通信,比如某些操作的同步。事件(event)是一个在Greenlet之间异步通信的形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import gevent
from gevent.event import Event

'''
Illustrates the use of events
'''

evt = Event()


def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
print('A: Hey wait for me, I have to do something')
gevent.sleep(3)
print("Ok, I'm done")
evt.set()


def waiter():
'''After 3 seconds the get call will unblock'''
print("I'll wait for you")
evt.wait() # blocking
print("It's about time")


def main():
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
])


if __name__ == '__main__':
main()

事件对象的一个扩展是AsyncResult,它允许你在唤醒调用上附加一个值。 它有时也被称作是future或defered,因为它持有一个指向将来任意时间可设置为任何值的引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import gevent
from gevent.event import AsyncResult

a = AsyncResult()


def setter():
"""
After 3 seconds set the result of a.
"""
gevent.sleep(3)
a.set('Hello!')


def waiter():
"""
After 3 seconds the get call will unblock after the setter
puts a value into the AsyncResult.
"""
print(a.get())


gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
])

队列

队列是一个排序的数据集合,它有常见的put / get操作,但是它是以在Greenlet之间可以安全操作的方式来实现的。

举例来说,如果一个Greenlet从队列中取出一项,此项就不会被同时执行的其它Greenlet再取到了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import gevent
from gevent.queue import Queue

tasks = Queue()


def worker(n):
while not tasks.empty():
task = tasks.get()
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)

print('Quitting time!')


def boss():
for i in range(1, 25):
tasks.put_nowait(i)


gevent.spawn(boss).join()

gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])

如果需要,队列也可以阻塞在put或get操作上。

put和get操作都有非阻塞的版本,put_nowait和get_nowait不会阻塞, 然而在操作不能完成时抛出gevent.queue.Emptygevent.queue.Full异常。

我们让boss与多个worker同时运行,并限制了queue不能放入多于3个元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3)


def worker(n):
try:
while True:
task = tasks.get(timeout=1) # decrements queue size by 1
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)
except Empty:
print('Quitting time!')


def boss():
"""
Boss will wait to hand out work until a individual worker is
free since the maxsize of the task queue is 3.
"""

for i in xrange(1, 10):
tasks.put(i)
print('Assigned all work in iteration 1')

for i in xrange(10, 20):
tasks.put(i)
print('Assigned all work in iteration 2')


gevent.joinall([
gevent.spawn(boss),
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'bob'),
])

组和池

组(group)是一个运行中greenlet的集合,集合中的greenlet像一个组一样会被共同管理和调度。 它也兼饰了像Python的multiprocessing库那样的平行调度器的角色:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import gevent
from gevent.pool import Group


def talk(msg):
for i in xrange(3):
print(msg)


g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

group = Group()
group.add(g1)
group.add(g2)
group.join()

group.add(g3)
group.join()

池(pool)是一个为处理数量变化并且需要限制并发的greenlet而设计的结构。 在需要并行地做很多受限于网络和IO的任务时常常需要用到它。

当构造gevent驱动的服务时,经常会将围绕一个池结构的整个服务作为中心。一个例子就是在各个socket上轮询的类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from gevent.pool import Pool


class SocketPool(object):

def __init__(self):
self.pool = Pool(1000)
self.pool.start()

def listen(self, socket):
while True:
socket.recv()

def add_handler(self, socket):
if self.pool.full():
raise Exception("At maximum pool size")
else:
self.pool.spawn(self.listen, socket)

def shutdown(self):
self.pool.kill()

锁和信号量

信号量是一个允许greenlet相互合作,限制并发访问或运行的低层次的同步原语。信号量有两个方法,acquire和release。 在信号量是否已经被acquire或release,和拥有资源的数量之间不同,被称为此信号量的范围 (the bound of the semaphore)。 如果一个信号量的范围已经降低到0,它会阻塞acquire操作直到另一个已经获得信号量的greenlet作出释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)


def worker1(n):
sem.acquire()
print('Worker %i acquired semaphore' % n)
sleep(0)
sem.release()
print('Worker %i released semaphore' % n)


def worker2(n):
with sem:
print('Worker %i acquired semaphore' % n)
sleep(0)
print('Worker %i released semaphore' % n)


pool = Pool()
pool.map(worker1, xrange(0, 2))
pool.map(worker2, xrange(3, 6))

范围为1的信号量也称为锁(lock),它向单个greenlet提供了互斥访问。信号量和锁常常用来保证资源只在程序上下文被单次使用。

子进程

自gevent 1.0起,gevent.subprocess,一个Python subprocess模块的修补版本已经添加,它支持协作式的等待子进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import gevent
from gevent.subprocess import Popen, PIPE


def cron():
while True:
print("cron")
gevent.sleep(0.2)


g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())

actor模型

actor模型是一个由于Erlang变得普及的更高层的并发模型。简单的说它的主要思想就是许多个独立的Actor, 每个Actor有一个可以从其它Actor接收消息的收件箱。Actor内部的主循环遍历它收到的消息,并根据它期望的行为来采取行动。

Gevent没有原生的Actor类型,但在一个子类化的Greenlet内使用队列,我们可以定义一个非常简单的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import gevent
from gevent.queue import Queue


class Actor(gevent.Greenlet):

def __init__(self):
self.inbox = Queue()
Greenlet.__init__(self)

def receive(self, message):
"""
Define in your subclass.
"""
raise NotImplemented()

def _run(self):
self.running = True

while self.running:
message = self.inbox.get()
self.receive(message)

接下来使用这个actor的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import gevent
from gevent.queue import Queue
from gevent import Greenlet


class Pinger(Actor):
def receive(self, message):
print(message)
pong.inbox.put('ping')
gevent.sleep(0)


class Ponger(Actor):
def receive(self, message):
print(message)
ping.inbox.put('pong')
gevent.sleep(0)


ping = Pinger()
pong = Ponger()

ping.start()
pong.start()

ping.inbox.put('start')
gevent.joinall([ping, pong])