Python的yield关键字用于实现生成器(generator), 在py2.5之后, yield引入了send, 从语句变成了一个表达式, 可以用做coroutine.

在说生成器之前, 先介绍一下Python里的迭代.
应该学过Python的都知道, Py里的for语句其实是迭代而不是循环, 类似于C#里的foreach.

在C#里, 实现了IEnumerable, IEnumerator, 这个对象就可以用于foreach, 其实这就是一种规约.
在Python里, 对象只要实现了iter和next方法即可实现iteration protocol.
而Py里的for循环其实更像是这样的代码:

1
2
3
4
5
6
7
8
9
10
for x in obj:
# do some thing

_iter = iter(obj)
while True:
try:
x = _iter.next()
except StopIteration:
break
# do some thing

iter方法指定的是iter函数作用于obj之后返回的可迭代对象, 这个对象应该实现了next方法.
next方法返回下一次迭代的值, 如果没有值, 则raise Stopiteration().

所以iter一般返回自身, 然后在自身上做next方法用于迭代 (一般的书都没说是为啥.. 以前我也有点懵逼)

所以你如果

1
X = type('X', (object, ), {'__iter__': lambda self: iter(range(3))})

然后
1
2
for i in X():
print i

输出的结果是0 1 2, 就是你返回了range(3)的一个迭代器.

有时候其实写一个可迭代对象挺麻烦的, Python里有个关键字yield, 还有一个东西叫做生成器. 顾名思义.. 生成对象的东西.

如果一个函数里包含了yield, 则这个函数就是一个生成器函数, 对这个函数的调用将构造一个生成器. 生成器只能用于迭代, 他实现了迭代器协议.

比如:

1
2
3
4
5
6
7
8
9
10
11
def countdown(n):
print ’starting to count from’, n
while n>0:
yield n
n -= 1
print 'Done'

c = countdown(3)
c.next()
c.next()


自行感受, c就是个迭代器

这是最原始的yield用法了, 大部分书上对yield的介绍也就是这个样子。
Python2.5之后, yield引入了send语法,并且yield变成了一个expression.

说到生成器, 顺便说一下Python的generator comprehension.
大家应该更熟悉Python的List comprehension
比如[x**2 for x in range(3)]. 就是产生一个[0, 1, 4]这种平方数的list.
有点像map

而生成器推导是产生一个迭代器.
比如(x**2 for x in range(5)). 这就是产生一个可迭代对象, 每次迭代的效果是得到下一个平方数. 有点像Python3里的map

这两者的区别就像Python2里面的range和xrange的区别.

前者直接构造出一个list, 后者构造出一个Object, 而不是一个占用内存很大的list, 每次需要的时候产生下一个.

利用生成器推导的嵌套(foo for x in bar for y in baz). 可以做一些方便的事情.

用生成器的pipeline组合, 加上作用在数据上的函数, 让数据经过一些pipeline. 最后得到想要的东西, 其实还是挺爽的. 有点fp的感觉.

下面介绍coroutine .

co-routine是协程 , 协作程序的意思. 大概就是一种用户态的COE切换, 不像进程, 线程(系统级线程, 非用户态线程)的切换, 需要陷入到内核态, 造成一定的开销. 而coroutine主动交出控制权.

因此可以协程可以搞一些黑科技的东西. 比如你可以执行一个函数(应该说例程更合适了), 执行到某个地方, 转出控制权, 然后别的例程执行, 执行到某个时机, 把控制权转交回这个例程.

通过适当的控制, 就可以实现数据在各个组件之间传递, 按预期的控制流被消费, 也可以用来实现任务调度(这意味着并发)

首先讲如何用新的yield关键字来实现coroutine.

  1. yield是表达式, 可以这样 x = yield foo
  2. 增加了send方法, 可以对生成器发送消息, foo.send(msg)将对foo的yield表达式传送消息msg.
    x = yield i => put(i); x = wait_and_get()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    >>> def g(): 
    print 'step 1'
    x = yield 'hello'
    print 'step 2', 'x=', x
    y = 5 + (yield x)
    print 'step 3', 'y=', y

    >>> f = g()
    >>> f.next()
    step 1
    'hello'
    >>> f.send(5)
    step 2 x= 5
    5
    >>> f.send(2)
    step 3 y= 7

    Traceback (most recent call last):
    File "<pyshell#13>", line 1, in <module>
    f.send(2)
    StopIteration
    自行感受. send(None)等价于next().

具备了send之后, Python的生成器已经可以当做coroutine来使用了.

下面看几个例子.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
from functools import wraps

def coroutine(func):
@wraps(func)
def wrapped(*args, **kwargs):
c = func(*args, **kwargs)
c.next()
return c
return wrapped

def follow(thefile, target):
thefile.seek(0, 2)
while True:
line = thefile.readline()
if not line:
time.sleep(0.1)
continue
target.send(line)

@coroutine
def printer():
while True:
line = (yield)
print line,

比如如上代码. follow对thefile进行检测, 类似于tail -f的功能.
这里在follow里传入的target是一个printer. 通过这种方式对数据进行了正确的派发, 处理. 以及处理完以后控制权的转移.

..很柔很顺..
follow获取到数据 , 把数据给printer, 控制权转移给printer, printer消费数据, 把控制权交回follow.

image
如图.

同样的道理, 你可以在follow和printer之间插入一个filter . 这就实现了grep.

其实这里的dataflow可以看做一个DAG, 源点生产数据, 而数据逐步转发到各个节点, 叶子节点消费数据.

比如还可以这样玩:

做一个广播. 代码也好写.

1
2
3
4
5
6
7
8
@coroutine
def broadcast(targets):
while True:
line = (yield)
for target in targets:
target.send(line)

follow(f, broadcast([grep(s, printer()) for s in ['python', 'perl']]))

回顾刚才的这种做法, 其实一定程度上实现了并发?.
这个模型其实比较像, 一个线程在监听文件, 得到数据之后, 把数据派发给worker线程. 当然即使是作为数据的传递, 这种coroutine的使用也挺好的.

下面介绍一下使用coroutine实现简单的任务调度

带上代码.. 主要是就抽象出YieldEvent和Task.
Task对生成器进行包装, 抽象出run表示一次运行.
YieldEvent是对Task执行行为的抽象, 约定一个Task在yield的时候, yield一个YieldEvent, 每次scheduler选择一个就绪队列里的任务, 然后对其run, 即执行生成器代码直到yield, 并把yield的结果作为run的返回值, 调度器得到run的返回值, 首先执行这个YieldEvent的handle_yield, 这表示这个任务被暂停的时候做的操作. (对于阻塞操作, 比如等待IO读/写, 则把他加入到select的监听fd里, 对于非阻塞的操作, 把他加回就绪队列, 等等).
handle_resume则表示一个task在yield之后没有被加入就绪队列, 等他就绪的时机达成, 应该做的事情, 首先应该把task加入就绪队列, 其次应该把对应得到的数据(例如IO读出的data), 传送给task的send_val, 作为他下一次send的值.

其实SystemCall也可以视作一种YieldEvent, 而一般的交出控制权则是NormalYield

放代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python
# encoding: utf-8

from functools import wraps, partial
from collections import deque
from select import select
import socket


class YieldEvent(object):

def handle_yield(self, sched, task):
raise NotImplementedError()

def handle_resume(self, sched):
raise NotImplementedError()


class NormalYield(YieldEvent):

def handle_yield(self, sched, task):
return True


class WriteSocket(YieldEvent):

def __init__(self, sock, data):
self.sock = sock
self.data = data

def handle_yield(self, sched, task):
sched.add_write_waiting(self.sock, self)
self.task = task

def handle_resume(self, sched):
self.task.send_val = self.sock.send(self.data)
sched.add_ready(self.task)


class ReadSocket(YieldEvent):

def __init__(self, sock, nbytes):
self.sock = sock
self.nbytes = nbytes

def handle_yield(self, sched, task):
sched.add_read_waiting(self.sock, self)
self.task = task

def handle_resume(self, sched):
self.task.send_val = self.sock.recv(self.nbytes)
sched.add_ready(self.task)


class AcceptSocket(YieldEvent):

def __init__(self, sock):
self.sock = sock

def handle_yield(self, sched, task):
sched.add_read_waiting(self.sock, self)
self.task = task

def handle_resume(self, sched):
self.task.send_val = self.sock.accept()
sched.add_ready(self.task)


class Socket(object):

def __init__(self, sock):
self.sock = sock

def accept(self):
return AcceptSocket(self.sock)

def recv(self, nbytes):
return ReadSocket(self.sock, nbytes)

def send(self, data):
return WriteSocket(self.sock, data)


class Task(object):
task_id_alloc = 0

def __init__(self, func):
self.task_id = Task.task_id_alloc
Task.task_id_alloc += 1
self.target = func()
self.func = func
self.send_val = None

def run(self):
return self.target.send(self.send_val)

def exit(self):
self.target.close()


class Scheduler(object):

def __init__(self):
self._ready = deque()
self._task_map = dict()
self._read_waiting = dict()
self._write_waiting = dict()

def add_ready(self, task):
self._ready.append(task)

def add_task(self, task):
self._task_map[task.task_id] = task
self.add_ready(task)

def add_target(self, target):
self.add_task(Task(target))

def remove_task(self, task):
task.exit()
del self._task_map[task.task_id]

def add_read_waiting(self, socket, event):
self._read_waiting[socket] = event

def add_write_waiting(self, socket, event):
self._write_waiting[socket] = event

def _iopoll(self):
rset, wset, xset = select(self._read_waiting, self._write_waiting, [])
for r_socket in rset:
event = self._read_waiting.pop(r_socket)
event.handle_resume(self)
for w_socket in wset:
event = self._write_waiting.pop(w_socket)
event.handle_resume(self)

def main_loop(self):
while True:
if not self._ready:
self._iopoll()
task = self._ready.popleft()
try:
res = task.run()
if isinstance(res, YieldEvent):
if res.handle_yield(self, task):
# return True if task is not block
self.add_ready(task)
else:
raise RuntimeError('wtf')
except StopIteration:
self.remove_task(task)


def target1():
for i in range(5):
print i
yield NormalYield()


class EchoServer(object):

def __init__(self, port=8888):
self.port = port

def server_loop(self):
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_sock.bind(('0.0.0.0', self.port))
_sock.listen(128)
while True:
client_sock, addr = (yield AcceptSocket(_sock))
print 'connection from', addr
sched.add_target(partial(self.client_handler, client_sock))

def client_handler(self, sock):
s = Socket(sock)
line = ''
while True:
while True:
c = yield s.recv(1)
if not c or c == '\n':
break
line = line + c
if not line:
break
while line:
nsent = yield s.send(line)
line = line[nsent:]
sock.close()
print 'close client handler'


sched = Scheduler()
sched.add_target(target1)
echo_server = EchoServer()
sched.add_target(echo_server.server_loop)
sched.main_loop()