先说明一些关于Linux进程间通信的基本知识:Linux的进程间通信是基于文件的,也可以称之为设备,因为所有设备其实都是文件。描述符其实指的是文件描述符,与文件系统中的一个文件对应,每当你创建出一个描述符,就创建出一个设备驱动进程,该进程一直监听着该文件的变化。
核心
select 源码可以参考这里。
首先select不会一直轮询,轮询一次发现没有可处理的事件,进程就会挂起。那为何select中为何能够感知到某一个描述符发生了变化,其实是文件IO事件触发回调函数唤醒了挂起的select进程。而一旦被唤醒后,则记录一个triggered参数,使得select进程以后不再挂起。
当调用select函数,会创建一个实体缓存队列(poll_wqueues),尔后会调用 poll_initwait(poll_wqueues)
去初始化该缓存队列,缓存队列将缓存一些实体(poll_table_entry)。(队列大小是有限的,可以通过 poll_table_page 结构体扩容,这个不是重点,这里不展开讲。) 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37// poll 表格
typedef struct poll_table_struct {
poll_queue_proc _qproc;
__poll_t _key;
} poll_table;
// 实体缓存队列
struct poll_wqueues {
poll_table pt;
struct poll_table_page* table;
struct task_struct* polling_task; //保存当前调用select的用户进程struct task_struct结构体
int triggered; // 当前用户进程被唤醒后置成1,以免该进程接着进睡眠
int error; // 错误码
int inline_index; // 数组inline_entries的引用下标
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; // 实体数组,后面会讲
};
// select 的核心函数
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
struct poll_wqueues table;
// ...
poll_initwait(&table);
// ...
}
void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait);
// ...
}
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->qproc = qproc;
pt->key = ~0UL; /* all events enabled */
}
每个设备驱动进程都有一个等待队列,等待队列存放一个等待项(wait_queue_entry_t),首次发生事件时,会调用 pollwait 往队列放入一个等待项,等待项保存了回调函数,由等待项可会获得一个实体(entry),实体存放一个指针函数与监听事件掩码key,监听事件掩码的不同二进制位存储是否监听对应事件,由此可知道用户想监听哪些事件。当驱动程序发送IO事件,就会扫描等待队列中的实体,检测是否注册了对应事件,并回调函数,该回调函数是一个唤醒函数(pollwake)。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50// 等待项
struct wait_queue_entry {
unsigned int flags;
void *private;
wait_queue_func_t func; // 回调函数
struct list_head entry;
};
// 实体
struct poll_table_entry {
struct file *filp; // 指向特定fd对应的file结构体;
unsigned long key; // 等待特定fd对应硬件设备的事件掩码,如POLLIN、 POLLOUT、POLLERR;
wait_queue_entry wait; // 需要放入等待队列的等待项
wait_queue_head_t *wait_address; // 设备驱动程序中特定事件的等待队列头
};
// 新增一个等待项
// pollwait() -> __pollwait()
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) {
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
get_file(filp);
entry->filp = filp; // 保存对应的file结构体
entry->wait_address = wait_address; // 保存来自设备驱动程序的等待队列头
entry->key = p->key; // 保存对该fd关心的事件掩码
init_waitqueue_func_entry(&entry->wait, pollwake);// 初始化等待队列项,pollwake是唤醒该等待队列项时候调用的函数
entry->wait.private = pwq; // 将poll_wqueues作为该等待队列项的私有数据,后面使用
add_wait_queue(wait_address, &entry->wait);// 将该等待队列项添加到从驱动程序中传递过来的等待队列头中去。
}
// 唤醒
static int pollwake(wait_queue_entry *wait, unsigned mode, int sync, void *key)
{
struct poll_table_entry *entry;
entry = container_of(wait, struct poll_table_entry, wait);// 取得poll_table_entry结构体指针
if (key && !((unsigned long)key & entry->key))/*这里的条件判断至关重要,避免应用进程被误唤醒,什么意思?*/
return 0;
return __pollwake(wait, mode, sync, key);
}
static int __pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
struct poll_wqueues *pwq = wait->private;
DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);
smp_wmb();
pwq->triggered = 1; // select()用户进程只要有被唤醒过,就不可能再次进入睡眠,因为这个标志在睡眠的时候有用
return default_wake_function(&dummy_wait, mode, sync, key); // 默认通用的唤醒函数
}
调用结构
1
2
3
4
5
6
7
8
9
10
11
12select() {
sys_select();
}
sys_select(){ // 将用户态参数拷贝到内核
core_sys_select();
}
core_sys_select(){ // 填充 fd_set_bits
do_select();
}
do_select() { // 轮询
}
do select
do_select 是上面我所说的一切工作的外部函数。
接口: 1
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
1
2
3
4typedef struct {
unsigned long *in, *out, *ex;
unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;
源码: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
ktime_t expire, *to = NULL;
// 实体缓存队列
struct poll_wqueues table;
// poll 表格
poll_table* wait;
int retval, i, timed_out = 0;
u64 slack = 0;
__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_start = 0;
rcu_read_lock();
// 等一下按位移动查询,所以先计算需要位移的数量
// retval = n * BITS_PER_LONG
// BITS_PER_LONG = in | out | exp
retval = max_select_fd(n, fds);
rcu_read_unlock();
if (retval < 0)
return retval;
// 注意,这里给了n
n = retval;
// 初始化等待队列
poll_initwait(&table);
// 将初始化后的 poll 表格拿出来,后面传入vfs_poll用于构造等待项
wait = &table.pt;
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait->_qproc = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);
retval = 0;
for (;;) { // 轮询
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
// 在描述符数组上按位移动
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
__poll_t mask;
in = *inp++; out = *outp++; ex = *exp++;
// 当前描述符监听了哪些事件
all_bits = in | out | ex;
if (all_bits == 0) { // 没有监听事件,到下一个描述符
i += BITS_PER_LONG;
continue;
}
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
struct fd f;
if (i >= n)
break;
// bit 每次左移一位看看监听哪些事件
if (!(bit & all_bits))
continue;
// 找到一个监听的事件,则获取描述符的文件对象
f = fdget(i);
if (f.file) {
wait_key_set(wait, in, out, bit,
busy_flag);
// 调用设备驱动程序的poll
// 得到事件掩码
// 除了返回掩码外,内部将调用 pollwait() 构造等待项,并放入等待队列
mask = vfs_poll(f.file, wait);
fdput(f);
// 通过事件掩码查看到底发生了哪些事件
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
/* got something, stop busy polling */
if (retval) {
can_busy_loop = false;
busy_flag = 0;
/*
* only remember a returned
* POLL_BUSY_LOOP if we asked for it
*/
} else if (busy_flag & mask)
can_busy_loop = true;
}
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
wait->_qproc = NULL;
// 发现事件 或 超时 或 当前进程有信号要处理 则打断轮询
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}
/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;
/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
// 挂起当前进程,直到被唤醒或timeout
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1;
}
// 释放实体缓存队列的内存
poll_freewait(&table);
return retval;
}
总结过程
select调用的基本步骤就是,
- 调用 select
- select 轮询一次,找出需要监听事件的描述符,把等待项(监听掩码+回调函数)放到描述符对应的驱动程序的等待队列里边,并返回一个结果掩码,比对所有结果掩码,如果没有发生事件,则挂起。(一般第一次轮询都是挂起的,否则就是第一次轮询之前就发生了事件)
- 发生IO事件
- select进程被唤醒,第二次轮询,找出发生事件的描述符及其具体事件,在fd事件数组上对应位置上做标记。但可能发生的IO事件并非用户要求监听的事件,这时候就不再挂起,而是一直轮询,等待下一次IO事件
- 清除缓存
- 返回一个发生事件描述符的数量
- 用户调用 FD_ISSET() 找出哪个描述符发生事件,处理一些事情
- 处理完后,又循环回到第1步
Comments