又翻了翻 poll 的内核源码,记点笔记。

背景

select, poll, epoll 系列系统调用可以让内核监听事件的发生,提高(网络) IO 性能。也是八股文重灾区,记得校招的时候经常会被问,但大多简短的八股文文章也并没有讲清楚,问题和答案也都比较套路化。

一些迷惑问题

看到过一些迷惑问题和迷惑回答…

联想到的一些问题

epoll 用了回调机制所以性能好,那么 select/poll 事件通知用的是什么机制,是轮询吗?如果是轮询,内核是怎么做到同时轮询多个事件的?

epoll 的回调方式具体是怎么做的?如果说是和设备驱动程序或者设备就绪队列产生关联,这就有个问题,一张网卡可以同时收发多个 tcp 连接的数据,从设备的角度看确实产生了数据变化,但某个 tcp 连接上未必发生了事件。

ET 和 LT 的实现方式区别,以及和 poll/select 的实现区别。一个 socket fd 上有网络数据可读,此时先 ep_insert,et 模式。然后 ep_poll,期间这个 tcp 连接上没有数据到达,此时 ep_poll 能不能报告 fd 上有可读事件。

同一个 file 被两个 task poll 是什么表现?

tcp socket 上发出有网络数据可读事件 (POLLIN) 的时机是什么?发出 POLLOUT 的时机又是什么?

前置知识

这只是个瞎写的零碎记录,不是完整的知识。假设对内核,调度(task, sched, wakeup, wait queue),vfs,网络(socket, sock, tcp) 有基本认识。
本来想对着 5.x 写,但新内核这些实现细节更多了,写起来繁琐,所以还是换回了 2.6.24,大差不差,感兴趣可以自己翻翻高版本内核。

select/poll 分析

把 select 和 poll 放一起是因为这俩很像,而且实现都在同一个文件里 (select.c)。

select 实现

从 syscall 入口到关键函数,syscall 都差不太多,sys_select -> kern_select -> core_sys_select -> do_select。注意 sys_select 在高版本内核是用 SYS_DEFINEx 宏产生的,直接搜符号未必搜的到。

先看一下 do_select 函数,有个大概认识。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// fs/select.c
int do_select(int n, fd_set_bits *fds, s64 *timeout)
{

struct poll_wqueues table;
poll_table *wait;
int retval, i;

// (取 max_fd 并检查 fds 合法性,非法则直接返回错误码)
rcu_read_lock();
retval = max_select_fd(n, fds);
rcu_read_unlock();

if (retval < 0)
return retval;
n = retval;

poll_initwait(&table);
wait = &table.pt;
if (!*timeout)
wait = NULL;
retval = 0;

// 最外层的死循环,重复着:遍历 fd 检查是否有事件就绪,睡眠等待事件发生
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
long __timeout;

set_current_state(TASK_INTERRUPTIBLE);

inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

// 根据 fd set 的实现做了些遍历的优化,跳过一些完全不需要关心的 fd
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
const struct file_operations *f_op = NULL;
struct file *file = NULL;

in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += __NFDBITS;
continue;
}

// 和上面一层循环一起,其实是在遍历 fds 上需要观察的 fd
for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
int fput_needed;
if (i >= n)
break;
if (!(bit & all_bits))
continue;
file = fget_light(i, &fput_needed);
if (file) {
f_op = file->f_op;
mask = DEFAULT_POLLMASK;
if (f_op && f_op->poll)
mask = (*f_op->poll)(file, retval ? NULL : wait);
fput_light(file, fput_needed);
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
}
}
cond_resched();
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
}
wait = NULL;
// 已经有事件发生,或者超时,或者产生了 signal,离开死循环
if (retval || !*timeout || signal_pending(current))
break;
if(table.error) {
retval = table.error;
break;
}

if (*timeout < 0) {
/* Wait indefinitely */
__timeout = MAX_SCHEDULE_TIMEOUT;
} else if (unlikely(*timeout >= (s64)MAX_SCHEDULE_TIMEOUT - 1)) {
/* Wait for longer than MAX_SCHEDULE_TIMEOUT. Do it in a loop */
__timeout = MAX_SCHEDULE_TIMEOUT - 1;
*timeout -= __timeout;
} else {
__timeout = *timeout;
*timeout = 0;
}
__timeout = schedule_timeout(__timeout);
if (*timeout >= 0)
*timeout += __timeout;
}
__set_current_state(TASK_RUNNING);

poll_freewait(&table);

return retval;
}

简单介绍一下,先取 fds 的最大 fd,并检查一下 fds 是不是合法,不合法先返回。然后初始化了 poll_wqueues 和 poll_table,其中 poll_table 是供 vfs 的 poll 接口使用的参数,poll_wqueues 是 select 对 poll_table 的包装(用来实现自己的 poll 逻辑)。

接着是 do_select 主体部分,可以看到是一个巨大的循环。
最外层的 for(;;) 是死循环,做的事情是

  1. 设置当前进程 为 TASK_INTERRUPTIBLE 状态
  2. 遍历所有选中的 fd,查看是否已经有事件发生(里面的两重 for 循环其实是在 fds 上遍历 fd)
  3. 尝试睡眠并等待事件的发生(会维护睡眠时间不超过 timeout)

剩下的问题:

  1. do_select 是怎么等待事件的发生的,或者说事件发生的时候,是怎么通知到 do_select 的
  2. poll_wqueues 起了什么作用
  3. mask = (*f_op->poll)(file, retval ? NULL : wait); 做了什么 (vfs 的 poll 机制)

poll 机制,以 tcp_poll 和 select 为例

f_op->poll 会找到对应 vfs 对象的 poll 函数,下面以 tcp socket 为例进行讲解。sock_poll 是 socket 在 vfs 的 fops 封装,其实就是调用了 sock 的 poll 函数。

1
2
3
4
5
6
7
8
9
10
static unsigned int sock_poll(struct file *file, poll_table *wait)
{
struct socket *sock;

/*
* We can't return errors to poll, so it's either yes or no.
*/
sock = file->private_data;
return sock->ops->poll(file, sock, wait);
}

对应 tcp 协议的 ops->poll 是 tcp_poll,这是初始化的时候设置到 sock->ops 上的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// net/ipv4/tcp.c
/*
* Wait for a TCP event.
*
* Note that we don't need to lock the socket, as the upper poll layers
* take care of normal races (between the test and the event) and we don't
* go look at any of the socket buffers directly.
*/
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
unsigned int mask;
struct sock *sk = sock->sk;
struct tcp_sock *tp = tcp_sk(sk);

poll_wait(file, sk->sk_sleep, wait);
if (sk->sk_state == TCP_LISTEN)
return inet_csk_listen_poll(sk);

/* Socket is not locked. We are protected from async events
by poll logic and correct handling of state changes
made by another threads is impossible in any case.
*/

mask = 0;
if (sk->sk_err)
mask = POLLERR;

/*
* POLLHUP is certainly not done right. But poll() doesn't
* have a notion of HUP in just one direction, and for a
* socket the read side is more interesting.
*
* Some poll() documentation says that POLLHUP is incompatible
* with the POLLOUT/POLLWR flags, so somebody should check this
* all. But careful, it tends to be safer to return too many
* bits than too few, and you can easily break real applications
* if you don't tell them that something has hung up!
*
* Check-me.
*
* Check number 1. POLLHUP is _UNMASKABLE_ event (see UNIX98 and
* our fs/select.c). It means that after we received EOF,
* poll always returns immediately, making impossible poll() on write()
* in state CLOSE_WAIT. One solution is evident --- to set POLLHUP
* if and only if shutdown has been made in both directions.
* Actually, it is interesting to look how Solaris and DUX
* solve this dilemma. I would prefer, if PULLHUP were maskable,
* then we could set it on SND_SHUTDOWN. BTW examples given
* in Stevens' books assume exactly this behaviour, it explains
* why PULLHUP is incompatible with POLLOUT. --ANK
*
* NOTE. Check for TCP_CLOSE is added. The goal is to prevent
* blocking on fresh not-connected or disconnected socket. --ANK
*/
if (sk->sk_shutdown == SHUTDOWN_MASK || sk->sk_state == TCP_CLOSE)
mask |= POLLHUP;
if (sk->sk_shutdown & RCV_SHUTDOWN)
mask |= POLLIN | POLLRDNORM | POLLRDHUP;

/* Connected? */
if ((1 << sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV)) {
/* Potential race condition. If read of tp below will
* escape above sk->sk_state, we can be illegally awaken
* in SYN_* states. */
if ((tp->rcv_nxt != tp->copied_seq) &&
(tp->urg_seq != tp->copied_seq ||
tp->rcv_nxt != tp->copied_seq + 1 ||
sock_flag(sk, SOCK_URGINLINE) || !tp->urg_data))
mask |= POLLIN | POLLRDNORM;

if (!(sk->sk_shutdown & SEND_SHUTDOWN)) {
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
mask |= POLLOUT | POLLWRNORM;
} else { /* send SIGIO later */
set_bit(SOCK_ASYNC_NOSPACE,
&sk->sk_socket->flags);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);

/* Race breaker. If space is freed after
* wspace test but before the flags are set,
* IO signal will be lost.
*/
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
mask |= POLLOUT | POLLWRNORM;
}
}

if (tp->urg_data & TCP_URG_VALID)
mask |= POLLPRI;
}
return mask;
}

注意到 tcp_poll 首先执行了 poll_wait(file, sk->sk_sleep, wait);,这里的 wait 是 do_select 传入的 poll_table* 结构。后面的逻辑是在检查当前 tcp 连接的状态,并查看缓冲区的读写状态来确定是否产生了新的可读 / 可写事件,拼到 mask 里返回,do_select 就是读这个返回值来得知是否发生了事件。

跟着看一下 poll_table 和 poll_wait,poll_wait 只是调用了 p->qproc。

1
2
3
4
5
6
// include/linux/poll.h
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
p->qproc(filp, wait_address, p);
}

又是一个回调,这里的 p 是 do_select 初始化好并传入的,最终又回调了 p->qproc。
poll_table 的结构,里面只有 qproc 这一个回调函数的指针。

1
2
3
4
5
6
7
8
9
10
// include/linux/poll.h

/*
* structures and helpers for f_op->poll implementations
*/
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

typedef struct poll_table_struct {
poll_queue_proc qproc;
} poll_table;

select 对 poll_table 做了包装,对应结构 poll_wqueues:

1
2
3
4
5
6
7
8
9
10
11
// include/linux/poll.h
/*
* Structures and helpers for sys_poll/sys_poll
*/
struct poll_wqueues {
poll_table pt;
struct poll_table_page * table;
int error;
int inline_index;
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};

do_select 在 poll_initwait 里是怎么初始化的:

1
2
3
4
5
6
7
8
9
10
11
12
13

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->qproc = qproc;
}

void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->error = 0;
pwq->table = NULL;
pwq->inline_index = 0;
}

__pollwait 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
struct poll_table_entry *entry = poll_get_entry(p);
if (!entry)
return;
get_file(filp);
entry->filp = filp;
entry->wait_address = wait_address;
init_waitqueue_entry(&entry->wait, current);
add_wait_queue(wait_address, &entry->wait);
}

是不是挺乱的,以 tcp_poll 和 do_select 为例,总结一下和 poll 相关的执行流(只考虑通常情况):

  1. do_select 初始化时,调用 poll_initwait 初始化 poll_wqueues(poll_table),把 qproc 设置为 __pollwait
  2. do_select 遍历 fd 时,调用 (*f_op->poll)(file, retval ? NULL : wait); ,这会调用 tcp_poll,传入的 wait 是 poll_wqueues(poll_table)
  3. tcp_poll 里会调用 poll_wait(file, sk->sk_sleep, wait);
  4. poll_wait 会调用 poll_table 注册的 qproc,即 __pollwait 函数
  5. 执行 __pollwait 函数

__pollwait 函数可以先看最后两行,初始化等待队列,把当前进程挂到 entry->wait 上,再把 entry->wait 挂到 sk->sk_sleep 这个等待队列上,结束。

如果 do_select 遍历的 fds 里所有 fd 都没有已发生的事件,那么f_op->poll 的调用其实就是在各个 fd 上均执行了 __pollwait,把当前进程挂到了各个 fd 实现的等待队列(只看 tcp socket 的话,对应的是 sk->sk_sleep。遍历完成后,do_select 检查是否有 signal,或者 timeout 到了,如果都没有,就会陷入睡眠。

睡眠自然是等待唤醒,后面的逻辑也很直观,既然把当前进程挂到了对应 tcp 连接的等待队列上,那么唤醒的部分一定是由 tcp 连接可读 / 可写的时候执行。事实也正是如此,下面以可读为例。

数据包到达网卡,会经过内核网络协议栈,最终到达对应的 tcp 连接进行处理( tcp_v4_rcv -> tcp_v4_do_rcv )。对于处于 ESTABLISHED 状态的连接,如果产生新的可读分组,那么会调用 sk->sk_data_ready。而 sock_init_data 初始化 sock 时会把 sk_data_ready 指向的函数设置为 sock_def_readable。

1
2
3
4
5
6
7
8
static void sock_def_readable(struct sock *sk, int len)
{
read_lock(&sk->sk_callback_lock);
if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
wake_up_interruptible(sk->sk_sleep);
sk_wake_async(sk,1,POLL_IN);
read_unlock(&sk->sk_callback_lock);
}

代码也很好懂了,如果 sk_sleep 这个等待队列上有阻塞的进程,那么唤醒它(也就是前面在 do_select 遍历完 fd 之后陷入睡眠的进程)。这个进程被调度之后会重新进入 for(;;) 循环,开启新一轮的 fds 遍历,此时它能够在这个 tcp 连接对应的 fd 上 poll 到事件,从而离开循环,返回到用户态。

epoll 实现分析

epoll 提供给 tcp_poll 的 poll_table 产生了变化,它不再是直接把 current 挂到 sk_sleep 上并用 default_wakeup 唤醒 task,而是在 sk_sleep 上注册 wakeup_func 为 ep_poll_callback。

epoll_wait 时也不像 poll 一样,遍历 fds 并对每个 fd 进行 poll。epoll_wait 的 for(;;) 循环在等待 rdllist 非空。下面摘一点代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
struct eventpoll {
/* Protect the this structure access */
spinlock_t lock;

/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;

/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;

/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;

/* List of ready file descriptors */
struct list_head rdllist;

/* RB tree root used to store monitored fd structs */
struct rb_root rbr;

/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transfering ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;
};

epoll_create 会创建 eventpoll 数据结构,相当于这个 epoll file 的内部实现数据结构。对于唤醒和等待的机制,需要关注 wq, rdllist。wq 是调用 epoll_wait 的 task 阻塞的 wait queue,rdllist 是一个链表,维护了发生事件的 epitem。epitem 是内核对添加到 epoll 的 fd 的封装,比如插入到红黑树,插入到 wait queue 都会依赖这个结构。

下面看看 epoll_wait 的 for(;;) 主循环( ep_poll 函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
if (list_empty(&ep->rdllist)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(&ep->wq, &wait);

for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}

spin_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);

set_current_state(TASK_RUNNING);
}

可以看到 ep_poll 的主循环是在等待 rdllist 非空。大致的流程就是 ep_insert 的时候 poll 了 fd,在文件的 wait queue 上注册了 callback,当文件产生 POLLIN POLLOUT 之类的事件时,会调用 callback 把 epitem 插入到 rdllist 并唤醒阻塞在 ep->wq 上的 task。

在 ep_insert 的时候,是把 ep_ptable_queue_proc 注册到 epq.pt 这个 poll_table 上,供特定文件的 poll 函数执行回调(执行 poll_wait)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
....
/* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
revents = tfile->f_op->poll(tfile, &epq.pt);
....
}

下面看 ep_ptable_queue_proc。不像 poll, select 提供的 __pollwait,直接把 current 注册到文件关联的等待队列(如 sk_sleep )上,然后等待设备可读写的时候唤醒 task。ep_ptable_queue_proc 是往文件的等待队列上注册了一个回调 ep_poll_callback,可读写的时候是调用 ep_poll_callback,而不是直接唤醒 task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
* This is the callback that is used to add our wait queue to the
* target file wakeup lists.
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;

if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}

下面看 ep_ptable_queue_proc。不像 poll, select 提供的 __pollwait,直接把 current 注册到文件关联的等待队列(如 sk_sleep )上,然后等待设备可读写的时候唤醒 task。ep_ptable_queue_proc 是往文件的等待队列上注册了一个回调 ep_poll_callback,可读写的时候是调用 ep_poll_callback,而不是直接唤醒 task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/*
* This is the callback that is passed to the wait queue wakeup
* machanism. It is called by the stored file descriptors when they
* have events to report.
*/
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
...
/* If this file is already in the ready list we exit soon */
if (ep_is_linked(&epi->rdllink))
goto is_linked;

list_add_tail(&epi->rdllink, &ep->rdllist);

is_linked:
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if (waitqueue_active(&ep->wq))
__wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE |
TASK_INTERRUPTIBLE);
if (waitqueue_active(&ep->poll_wait))
pwake++;

out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);

/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&psw, &ep->poll_wait);

return 1;
}

注意,并不是唤醒了 ep->wq 上阻塞的 task,那个 task 就会从 ep_poll 返回的,返回前还得再 poll fd,以确保 1. 发生的事件确实是感兴趣的事件,2. 当时发生的事件此时仍然有效。(就像 poll/select 的主循环里,会不断对 fd poll 来检查有效性,避免虚假唤醒)
这是在 ep_poll 里进行重试做到的。看 ep_poll 可以发现,for 循环上面有个 retry。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;

/*
* Calculate the timeout by checking for the "infinite" value ( -1 )
* and the overflow condition. The passed timeout is in milliseconds,
* that why (t * HZ) / 1000.
*/
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;

retry:
...

/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;

return res;
}

最后这个注释就讲的很清楚了,ep_send_events 会检查事件并发送到用户态,如果实际上没有发生,且还有 timeout,那就 retry。
ep_send_events 会遍历 rdllist 里的 epitem,对每个 epi 做的事情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*
* Get the ready file event set. We can safely use the file
* because we are holding the "mtx" and this will guarantee
* that both the file and the item will not vanish.
*/
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
revents &= epi->event.events;

/*
* Is the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, we are holding
* "mtx", so no operations coming from userspace can change
* the item.
*/
if (revents) {
if (__put_user(revents,
&events[eventcnt].events) ||
__put_user(epi->event.data,
&events[eventcnt].data))
goto errxit;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
eventcnt++;
}
/*
* At this point, noone can insert into ep->rdllist besides
* us. The epoll_ctl() callers are locked out by us holding
* "mtx" and the poll callback will queue them in ep->ovflist.
*/
if (!(epi->event.events & EPOLLET) &&
(revents & epi->event.events))
list_add_tail(&epi->rdllink, &ep->rdllist);

首先 poll fd,检查一下此时真实发生的且感兴趣的事件(两种情况,一种是发生了事件但并不感兴趣,另一种是当时发生了事件但此时已经没有事件)。如果发生了感兴趣的事件,那么拷贝到用户态,并且 eventcnt++,表示可以从 ep_poll (即 epoll_wait ) 返回了。最后再检查一下 epi 是不是 EPOLLET 的,如果不是 ET,并且发生了感兴趣事件,那么把 epi 重新插入到 rdllist 里。言外之意,如果是 LT 模式,一个事件拷贝到用户态后,还会把 epi 插回 rdllist,下次 epoll_wait 还会残留在 rdllist 里,直接进行 poll fd 的检查(即是在这之间,没有新的数据在这个 tcp 连接上到达)。如果是 ET 模式,一个事件拷贝到用户态后,会从 rdllist 里真的删除掉,下次还想在这上面检测到事件,只有等这个 tcp 连接上真的有数据可读或者可写(注意哦,可读或可写都会导致 ep_poll_callback 被调用,因为在 epoll 的通常 wakeup 机制里,是没法区分事件类型的)。

看清这个实现就可以回答一些无聊的实现细节问题…
如果一个 tcp 连接上此时有 500 字节可读,正在对外写 400 字节数据,此时有一个 epoll 开始监听这个 tcp socket,ET 模式,对读感兴趣。因为在 ep_insert 的时候会 poll fd,所以对应 epi 会直接进入 rdllist,epoll_wait 的时候可以向用户态返回这个 fd 可读。然后用户态读了 200 字节,缓冲区还剩 300 字节,但 tcp 连接上一直没有新的数据到达,所以不会调用到 sock_def_readble,产生 POLLIN 事件。所以用户态再次 epoll_wait 是不会收到新的可读事件的。

(在低版本内核)但是当 400 字节数据写完的时候,会执行 sk_write_space (sk_write_space 和 tcp socket 产生 POLLOUT 会在后文单独讲),从而产生 POLLOUT 事件,这会调用 ep_poll_callback 把 epi 插入到 rdllist。那么在 ep_send_events 的时候 poll fd 就会发现此时这个 fd 有感兴趣的 POLLIN 事件,从而向用户态返回这个事件。
(注意 2.6.30 以上有在唤醒的时候区分了事件,所以并不会这样,见 https://github.com/torvalds/linux/commit/2dfa4eeab0fc7e8633974f2770945311b31eedf6)。

零碎

tcp 发送 POLLIN/POLLOUT 的时机

tcp_poll 是直接 poll fd 的时候执行的函数,里面可以看到 tcp socket 对可读/可写的定义,比如处于 LISTEN 状态的 tcp 连接可读的定义,处于 ESTABLISHED 状态的定义。另一方面则是随着网络数据的收发实时产生 POLLIN/POLLOUT (即从 sk_sleep 执行 wakeup)。

对于已经建立的 tcp 连接,在收到对端数据的时候会产生 POLLIN。复读一遍前文:数据包到达网卡,会经过内核网络协议栈,最终到达对应的 tcp 连接进行处理( tcp_v4_rcv -> tcp_v4_do_rcv )。对于处于 ESTABLISHED 状态的连接,如果产生新的可读分组,那么会调用 sk->sk_data_ready。而 sock_init_data 初始化 sock 时会把 sk_data_ready 指向的函数设置为 sock_def_readable。

POLLOUT 会绕一点,因为 tcp 连接可写这件事情不像可读那么直观地定义。在 sock_def_readable 的附近有一个函数 sock_def_write_space。sock_init_data 的时候会把 sk->sk_write_space 初始化为 sock_def_write_space。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void sock_def_write_space(struct sock *sk)
{
struct socket_wq *wq;

rcu_read_lock();

/* Do not wake up a writer until he can make "significant"
* progress. --DaveM
*/
if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= READ_ONCE(sk->sk_sndbuf)) {
wq = rcu_dereference(sk->sk_wq);
if (skwq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, EPOLLOUT |
EPOLLWRNORM | EPOLLWRBAND);

/* Should agree with poll, otherwise some programs break */
if (sock_writeable(sk))
sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT);
}

rcu_read_unlock();
}

sk_write_space 会在这几个地方调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
size_t size, int flags);

int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size);

/*
* Socket option code for TCP.
*/
static int do_tcp_setsockopt(struct sock *sk, int level, int optname,
sockptr_t optval, unsigned int optlen)
{
//
case TCP_NOTSENT_LOWAT:
tp->notsent_lowat = val;
sk->sk_write_space(sk);
break;
}

前两个函数的用法类似,都是在末尾有这段代码:

1
2
3
4
5
6
out_err:
/* make sure we wake any epoll edge trigger waiter */
if (unlikely(tcp_rtx_and_write_queues_empty(sk) && err == -EAGAIN)) {
sk->sk_write_space(sk);
tcp_chrono_stop(sk, TCP_CHRONO_SNDBUF_LIMITED);
}

即 tcp 写数据结束的时候,检查一下 写队列和重传队列 是不是为空,如果是空的话,发送一次 POLLOUT(通过调用 sk->sk_write_space)。
而 do_tcp_setsockopt 里的用法就不讲了,跟 tcp 协议强相关,看个热闹吧。

select/poll 的一个 patch

https://github.com/torvalds/linux/commit/5f820f648c92a5ecc771a96b3c29aa6e90013bba
用 custom wake up function 代替 default_wake_function,加入一个 trigger 变量来维护事件是否发生过,从而避免主循环在 poll 之后,interruptiable sleep 之前产生的事件通知不到 task。

epoll 的一个 patch

https://github.com/torvalds/linux/commit/2dfa4eeab0fc7e8633974f2770945311b31eedf6
前文提到的 patch,通过 wakeup 的 key 参数告知 ep_poll_callback 发生的事件,减少不必要的唤醒。

感想

内核实现是复杂、逐渐完善的,早期内核也有各种明显的小问题