Epoll

一、总述 #

  • epoll本身使用红黑树处理fd的存储,主要是用于epoll_ctl可以快速插入删除某个fd
  • epoll加入一个fd时会将自身的一个回调函数插入到此socket的就绪队列的回调上
  • 回调函数中可以直接拿到epoll红黑树上的节点,然后将对应事件加入到链表中,通知上面进行收消息

img

二、数据结构 #

1. eventpoll #

  • epoll的主结构体,存放到file的private_data
  • epoll对于fd的储存使用的是红黑树
  • 使用链表保存处于就绪状态的fd
 1// fs/eventpoll.c
 2/*
 3 * This structure is stored inside the "private_data" member of the file
 4 * structure and represents the main data structure for the eventpoll
 5 * interface.
 6 */
 7struct eventpoll {
 8	/*
 9	 * This mutex is used to ensure that files are not removed
10	 * while epoll is using them. This is held during the event
11	 * collection loop, the file cleanup path, the epoll file exit
12	 * code and the ctl operations.
13	 */
14	struct mutex mtx;
15
16	/* Wait queue used by sys_epoll_wait() */
17	wait_queue_head_t wq;
18
19	/* Wait queue used by file->poll() */
20	wait_queue_head_t poll_wait;
21
22	/* List of ready file descriptors */
23	struct list_head rdllist;	// 就绪队列,epoll收事件是从这里收取
24
25	/* Lock which protects rdllist and ovflist */
26	rwlock_t lock;
27
28	/* RB tree root used to store monitored fd structs */
29	struct rb_root_cached rbr;	// 存储要监听的fd结构体,实际是epitem结构体
30
31	/*
32	 * This is a single linked list that chains all the "struct epitem" that
33	 * happened while transferring ready events to userspace w/out
34	 * holding ->lock.
35	 */
36	struct epitem *ovflist;
37
38	/* wakeup_source used when ep_scan_ready_list is running */
39	struct wakeup_source *ws;
40
41	/* The user that created the eventpoll descriptor */
42	struct user_struct *user;
43
44	struct file *file;
45
46	/* used to optimize loop detection check */
47	u64 gen;
48	struct hlist_head refs;
49
50#ifdef CONFIG_NET_RX_BUSY_POLL
51	/* used to track busy poll napi_id */
52	unsigned int napi_id;
53#endif
54
55#ifdef CONFIG_DEBUG_LOCK_ALLOC
56	/* tracks wakeup nests for lockdep validation */
57	u8 nests;
58#endif
59};

2. epitem #

  • 每个fd存到epitem中,epitem在epoll中使用红黑树存储
 1// fs/eventpoll.c
 2/*
 3 * Each file descriptor added to the eventpoll interface will
 4 * have an entry of this type linked to the "rbr" RB tree.
 5 * Avoid increasing the size of this struct, there can be many thousands
 6 * of these on a server and we do not want this to take another cache line.
 7 */
 8struct epitem {
 9	union {
10		/* RB tree node links this structure to the eventpoll RB tree */
11		struct rb_node rbn;
12		/* Used to free the struct epitem */
13		struct rcu_head rcu;
14	};
15
16	/* List header used to link this structure to the eventpoll ready list */
17	struct list_head rdllink;
18
19	/*
20	 * Works together "struct eventpoll"->ovflist in keeping the
21	 * single linked chain of items.
22	 */
23	struct epitem *next;
24
25	/* The file descriptor information this item refers to */
26	struct epoll_filefd ffd;
27
28	/* List containing poll wait queues */
29	struct eppoll_entry *pwqlist;
30
31	/* The "container" of this item */
32	struct eventpoll *ep;
33
34	/* List header used to link this item to the "struct file" items list */
35	struct hlist_node fllink;
36
37	/* wakeup_source used when EPOLLWAKEUP is set */
38	struct wakeup_source __rcu *ws;
39
40	/* The structure that describe the interested events and the source fd */
41	struct epoll_event event;
42};

2.1. epoll_filefd #

1// fs/eventpoll.c
2struct epoll_filefd {
3	struct file *file;
4	int fd;
5} __packed;

三、epoll_create调用 #

1. 接口定义 #

1// fs/eventpoll.c
2SYSCALL_DEFINE1(epoll_create, int, size)
3{
4	if (size <= 0)
5		return -EINVAL;
6
7	return do_epoll_create(0);
8}

2. do_epoll_create #

  • 就是创建一个epoll结构体,然后将file结构体和epoll关联
  • 对应epoll本身的相关文件操作
 1// fs/eventpoll.c
 2/*
 3 * Open an eventpoll file descriptor.
 4 */
 5static int do_epoll_create(int flags)
 6{
 7	int error, fd;
 8	struct eventpoll *ep = NULL;
 9	struct file *file;
10
11	/* Check the EPOLL_* constant for consistency.  */
12	BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
13
14	if (flags & ~EPOLL_CLOEXEC)
15		return -EINVAL;
16	/*
17	 * Create the internal data structure ("struct eventpoll").
18	 */
19	error = ep_alloc(&ep);
20	if (error < 0)
21		return error;
22	/*
23	 * Creates all the items needed to setup an eventpoll file. That is,
24	 * a file structure and a free file descriptor.
25	 */
26	// 获取一个未使用的fd
27	fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
28	if (fd < 0) {
29		error = fd;
30		goto out_free_ep;
31	}
32	// 创建epoll类型的file结构体
33	file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
34				 O_RDWR | (flags & O_CLOEXEC));
35	if (IS_ERR(file)) {
36		error = PTR_ERR(file);
37		goto out_free_fd;
38	}
39	ep->file = file;
40	// fd和file绑定
41	fd_install(fd, file);
42	return fd;
43
44out_free_fd:
45	put_unused_fd(fd);
46out_free_ep:
47	ep_free(ep);
48	return error;
49}

3. anon_inode_getfile #

  • ep结构体存到file的private_data中
 1// fs/anon_inodes.c
 2/**
 3 * anon_inode_getfile - creates a new file instance by hooking it up to an
 4 *                      anonymous inode, and a dentry that describe the "class"
 5 *                      of the file
 6 *
 7 * @name:    [in]    name of the "class" of the new file
 8 * @fops:    [in]    file operations for the new file
 9 * @priv:    [in]    private data for the new file (will be file's private_data)
10 * @flags:   [in]    flags
11 *
12 * Creates a new file by hooking it on a single inode. This is useful for files
13 * that do not need to have a full-fledged inode in order to operate correctly.
14 * All the files created with anon_inode_getfile() will share a single inode,
15 * hence saving memory and avoiding code duplication for the file/inode/dentry
16 * setup.  Returns the newly created file* or an error pointer.
17 */
18struct file *anon_inode_getfile(const char *name,
19				const struct file_operations *fops,
20				void *priv, int flags)
21{
22	return __anon_inode_getfile(name, fops, priv, flags, NULL, false);
23}
24EXPORT_SYMBOL_GPL(anon_inode_getfile);
25
26// fs/anon_inodes.c
27static struct file *__anon_inode_getfile(const char *name,
28					 const struct file_operations *fops,
29					 void *priv, int flags,
30					 const struct inode *context_inode,
31					 bool secure)
32{
33	struct inode *inode;
34	struct file *file;
35
36	if (fops->owner && !try_module_get(fops->owner))
37		return ERR_PTR(-ENOENT);
38
39	if (secure) {
40		inode =	anon_inode_make_secure_inode(name, context_inode);
41		if (IS_ERR(inode)) {
42			file = ERR_CAST(inode);
43			goto err;
44		}
45	} else {
46		inode =	anon_inode_inode;
47		if (IS_ERR(inode)) {
48			file = ERR_PTR(-ENODEV);
49			goto err;
50		}
51		/*
52		 * We know the anon_inode inode count is always
53		 * greater than zero, so ihold() is safe.
54		 */
55		ihold(inode);
56	}
57
58	file = alloc_file_pseudo(inode, anon_inode_mnt, name,
59				 flags & (O_ACCMODE | O_NONBLOCK), fops);
60	if (IS_ERR(file))
61		goto err_iput;
62
63	file->f_mapping = inode->i_mapping;
64	// 将priv存到file的private_data
65	file->private_data = priv;
66
67	return file;
68
69err_iput:
70	iput(inode);
71err:
72	module_put(fops->owner);
73	return file;
74}

四、epoll_ctl #

1. 系统调用定义 #

 1// fs/eventpoll.c
 2/*
 3 * The following function implements the controller interface for
 4 * the eventpoll file that enables the insertion/removal/change of
 5 * file descriptors inside the interest set.
 6 */
 7SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
 8		struct epoll_event __user *, event)
 9{
10	struct epoll_event epds;
11
12	if (ep_op_has_event(op) &&
13	    copy_from_user(&epds, event, sizeof(struct epoll_event)))
14		return -EFAULT;
15
16	return do_epoll_ctl(epfd, op, fd, &epds, false);
17}

2. do_epoll_ctl #

  1// fs/eventpoll.c
  2int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
  3		 bool nonblock)
  4{
  5	int error;
  6	int full_check = 0;
  7	struct fd f, tf;
  8	struct eventpoll *ep;
  9	struct epitem *epi;
 10	struct eventpoll *tep = NULL;
 11
 12	error = -EBADF;
 13	f = fdget(epfd);
 14	if (!f.file)
 15		goto error_return;
 16
 17	/* Get the "struct file *" for the target file */
 18	tf = fdget(fd);
 19	if (!tf.file)
 20		goto error_fput;
 21
 22	/* The target file descriptor must support poll */
 23	error = -EPERM;
 24	if (!file_can_poll(tf.file))
 25		goto error_tgt_fput;
 26
 27	/* Check if EPOLLWAKEUP is allowed */
 28	if (ep_op_has_event(op))
 29		ep_take_care_of_epollwakeup(epds);
 30
 31	/*
 32	 * We have to check that the file structure underneath the file descriptor
 33	 * the user passed to us _is_ an eventpoll file. And also we do not permit
 34	 * adding an epoll file descriptor inside itself.
 35	 */
 36	error = -EINVAL;
 37	if (f.file == tf.file || !is_file_epoll(f.file))
 38		goto error_tgt_fput;
 39
 40	/*
 41	 * epoll adds to the wakeup queue at EPOLL_CTL_ADD time only,
 42	 * so EPOLLEXCLUSIVE is not allowed for a EPOLL_CTL_MOD operation.
 43	 * Also, we do not currently supported nested exclusive wakeups.
 44	 */
 45	if (ep_op_has_event(op) && (epds->events & EPOLLEXCLUSIVE)) {
 46		if (op == EPOLL_CTL_MOD)
 47			goto error_tgt_fput;
 48		if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) ||
 49				(epds->events & ~EPOLLEXCLUSIVE_OK_BITS)))
 50			goto error_tgt_fput;
 51	}
 52
 53	/*
 54	 * At this point it is safe to assume that the "private_data" contains
 55	 * our own data structure.
 56	 */
 57	// fd找到file结构体后,从private_data取出epoll的主结构体
 58	ep = f.file->private_data;
 59
 60	/*
 61	 * When we insert an epoll file descriptor inside another epoll file
 62	 * descriptor, there is the chance of creating closed loops, which are
 63	 * better be handled here, than in more critical paths. While we are
 64	 * checking for loops we also determine the list of files reachable
 65	 * and hang them on the tfile_check_list, so we can check that we
 66	 * haven't created too many possible wakeup paths.
 67	 *
 68	 * We do not need to take the global 'epumutex' on EPOLL_CTL_ADD when
 69	 * the epoll file descriptor is attaching directly to a wakeup source,
 70	 * unless the epoll file descriptor is nested. The purpose of taking the
 71	 * 'epmutex' on add is to prevent complex toplogies such as loops and
 72	 * deep wakeup paths from forming in parallel through multiple
 73	 * EPOLL_CTL_ADD operations.
 74	 */
 75	error = epoll_mutex_lock(&ep->mtx, 0, nonblock);
 76	if (error)
 77		goto error_tgt_fput;
 78	if (op == EPOLL_CTL_ADD) {
 79		if (READ_ONCE(f.file->f_ep) || ep->gen == loop_check_gen ||
 80		    is_file_epoll(tf.file)) {
 81			mutex_unlock(&ep->mtx);
 82			error = epoll_mutex_lock(&epmutex, 0, nonblock);
 83			if (error)
 84				goto error_tgt_fput;
 85			loop_check_gen++;
 86			full_check = 1;
 87			if (is_file_epoll(tf.file)) {
 88				tep = tf.file->private_data;
 89				error = -ELOOP;
 90				if (ep_loop_check(ep, tep) != 0)
 91					goto error_tgt_fput;
 92			}
 93			error = epoll_mutex_lock(&ep->mtx, 0, nonblock);
 94			if (error)
 95				goto error_tgt_fput;
 96		}
 97	}
 98
 99	/*
100	 * Try to lookup the file inside our RB tree. Since we grabbed "mtx"
101	 * above, we can be sure to be able to use the item looked up by
102	 * ep_find() till we release the mutex.
103	 */
104	epi = ep_find(ep, tf.file, fd);
105
106	error = -EINVAL;
107	switch (op) {
108	case EPOLL_CTL_ADD:
109		if (!epi) {
110			epds->events |= EPOLLERR | EPOLLHUP;
111			error = ep_insert(ep, epds, tf.file, fd, full_check);
112		} else
113			error = -EEXIST;
114		break;
115	case EPOLL_CTL_DEL:
116		if (epi)
117			error = ep_remove(ep, epi);
118		else
119			error = -ENOENT;
120		break;
121	case EPOLL_CTL_MOD:
122		if (epi) {
123			if (!(epi->event.events & EPOLLEXCLUSIVE)) {
124				epds->events |= EPOLLERR | EPOLLHUP;
125				error = ep_modify(ep, epi, epds);
126			}
127		} else
128			error = -ENOENT;
129		break;
130	}
131	mutex_unlock(&ep->mtx);
132
133error_tgt_fput:
134	if (full_check) {
135		clear_tfile_check_list();
136		loop_check_gen++;
137		mutex_unlock(&epmutex);
138	}
139
140	fdput(tf);
141error_fput:
142	fdput(f);
143error_return:
144
145	return error;
146}

3. EPOLL_CTL_ADD 添加fd监听 #

3.1. ep_insert #

  • 先插入fd到epitem
  • 再将epitem插入到epoll的红黑树中
  • 然后将epoll的回调设置到socket中
  1// fs/eventpoll.c
  2/*
  3 * Must be called with "mtx" held.
  4 */
  5static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
  6		     struct file *tfile, int fd, int full_check)
  7{
  8	int error, pwake = 0;
  9	__poll_t revents;
 10	struct epitem *epi;
 11	struct ep_pqueue epq;
 12	struct eventpoll *tep = NULL;
 13
 14	if (is_file_epoll(tfile))
 15		tep = tfile->private_data;
 16
 17	lockdep_assert_irqs_enabled();
 18
 19	if (unlikely(percpu_counter_compare(&ep->user->epoll_watches,
 20					    max_user_watches) >= 0))
 21		return -ENOSPC;
 22	percpu_counter_inc(&ep->user->epoll_watches);
 23
 24	if (!(epi = kmem_cache_zalloc(epi_cache, GFP_KERNEL))) {
 25		percpu_counter_dec(&ep->user->epoll_watches);
 26		return -ENOMEM;
 27	}
 28
 29	/* Item initialization follow here ... */
 30	INIT_LIST_HEAD(&epi->rdllink);
 31	epi->ep = ep;
 32	ep_set_ffd(&epi->ffd, tfile, fd);	// 设置fd到epitem中
 33	epi->event = *event;
 34	epi->next = EP_UNACTIVE_PTR;
 35
 36	if (tep)
 37		mutex_lock_nested(&tep->mtx, 1);
 38	/* Add the current item to the list of active epoll hook for this file */
 39	if (unlikely(attach_epitem(tfile, epi) < 0)) {
 40		if (tep)
 41			mutex_unlock(&tep->mtx);
 42		kmem_cache_free(epi_cache, epi);
 43		percpu_counter_dec(&ep->user->epoll_watches);
 44		return -ENOMEM;
 45	}
 46
 47	if (full_check && !tep)
 48		list_file(tfile);
 49
 50	/*
 51	 * Add the current item to the RB tree. All RB tree operations are
 52	 * protected by "mtx", and ep_insert() is called with "mtx" held.
 53	 */
 54	// epitem插入到红黑树中
 55	ep_rbtree_insert(ep, epi);
 56	if (tep)
 57		mutex_unlock(&tep->mtx);
 58
 59	/* now check if we've created too many backpaths */
 60	if (unlikely(full_check && reverse_path_check())) {
 61		ep_remove(ep, epi);
 62		return -EINVAL;
 63	}
 64
 65	if (epi->event.events & EPOLLWAKEUP) {
 66		error = ep_create_wakeup_source(epi);
 67		if (error) {
 68			ep_remove(ep, epi);
 69			return error;
 70		}
 71	}
 72
 73	/* Initialize the poll table using the queue callback */
 74	epq.epi = epi;
 75	// 将ep_ptable_queue_proc设置为_qproc,加入等待队列的操作函数
 76	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
 77
 78	/*
 79	 * Attach the item to the poll hooks and get current event bits.
 80	 * We can safely use the file* here because its usage count has
 81	 * been increased by the caller of this function. Note that after
 82	 * this operation completes, the poll callback can start hitting
 83	 * the new item.
 84	 */
 85	revents = ep_item_poll(epi, &epq.pt, 1);
 86
 87	/*
 88	 * We have to check if something went wrong during the poll wait queue
 89	 * install process. Namely an allocation for a wait queue failed due
 90	 * high memory pressure.
 91	 */
 92	if (unlikely(!epq.epi)) {
 93		ep_remove(ep, epi);
 94		return -ENOMEM;
 95	}
 96
 97	/* We have to drop the new item inside our item list to keep track of it */
 98	write_lock_irq(&ep->lock);
 99
100	/* record NAPI ID of new item if present */
101	ep_set_busy_poll_napi_id(epi);
102
103	/* If the file is already "ready" we drop it inside the ready list */
104	if (revents && !ep_is_linked(epi)) {
105		list_add_tail(&epi->rdllink, &ep->rdllist);
106		ep_pm_stay_awake(epi);
107
108		/* Notify waiting tasks that events are available */
109		if (waitqueue_active(&ep->wq))
110			wake_up(&ep->wq);
111		if (waitqueue_active(&ep->poll_wait))
112			pwake++;
113	}
114
115	write_unlock_irq(&ep->lock);
116
117	/* We have to call this outside the lock */
118	if (pwake)
119		ep_poll_safewake(ep, NULL);
120
121	return 0;
122}

1) init_poll_funcptr 初始化插入等待队列的操作函数指针 #

  • _qproc设置为上面的ep_ptable_queue_proc
  • 记住下面的操作,后面插入等待队列会使用
1// include/linux/poll.h
2static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
3{
4	pt->_qproc = qproc;
5	pt->_key   = ~(__poll_t)0; /* all events enabled */
6}

2) ep_rbtree_insert 插入epitem #

 1// fs/eventpoll.c
 2static void ep_rbtree_insert(struct eventpoll *ep, struct epitem *epi)
 3{
 4	int kcmp;
 5	struct rb_node **p = &ep->rbr.rb_root.rb_node, *parent = NULL;
 6	struct epitem *epic;
 7	bool leftmost = true;
 8
 9	while (*p) {
10		parent = *p;
11		epic = rb_entry(parent, struct epitem, rbn);
12		// 使用fd的比较进行红黑树的查找
13		kcmp = ep_cmp_ffd(&epi->ffd, &epic->ffd);
14		if (kcmp > 0) {
15			p = &parent->rb_right;
16			leftmost = false;
17		} else
18			p = &parent->rb_left;
19	}
20	// 将epi的节点插入到红黑树中
21	rb_link_node(&epi->rbn, parent, p);
22	// 着色
23	rb_insert_color_cached(&epi->rbn, &ep->rbr, leftmost);
24}

3) ep_item_poll 插入epoll的回调到socket上 #

 1// fs/eventpoll.c
 2/*
 3 * Differs from ep_eventpoll_poll() in that internal callers already have
 4 * the ep->mtx so we need to start from depth=1, such that mutex_lock_nested()
 5 * is correctly annotated.
 6 */
 7static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
 8				 int depth)
 9{
10	struct file *file = epi->ffd.file;
11	__poll_t res;
12
13	pt->_key = epi->event.events;
14	if (!is_file_epoll(file))
15		res = vfs_poll(file, pt);
16	else
17		res = __ep_eventpoll_poll(file, pt, depth);
18	return res & epi->event.events;
19}
  • 插入socket的情况调用vfs_poll
  • 这里对应socket的poll调用
1// include/linux/poll.h
2static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt)
3{
4	if (unlikely(!file->f_op->poll))
5		return DEFAULT_POLLMASK;
6	return file->f_op->poll(file, pt);
7}

4) socket的poll处理 #

(1) 先看poll的定义位置 #
 1// net/socket.c
 2/*
 3 *	Socket files have a set of 'special' operations as well as the generic file ones. These don't appear
 4 *	in the operation structures but are done directly via the socketcall() multiplexor.
 5 */
 6static const struct file_operations socket_file_ops = {
 7	.owner =	THIS_MODULE,
 8	.llseek =	no_llseek,
 9	.read_iter =	sock_read_iter,
10	.write_iter =	sock_write_iter,
11	.poll =		sock_poll,
12	.unlocked_ioctl = sock_ioctl,
13#ifdef CONFIG_COMPAT
14	.compat_ioctl = compat_sock_ioctl,
15#endif
16	.mmap =		sock_mmap,
17	.release =	sock_close,
18	.fasync =	sock_fasync,
19	.sendpage =	sock_sendpage,
20	.splice_write = generic_splice_sendpage,
21	.splice_read =	sock_splice_read,
22	.show_fdinfo =	sock_show_fdinfo,
23};
(2) sock_poll #
  • 这里又调用到对应的协议的poll函数
 1/* No kernel lock held - perfect */
 2static __poll_t sock_poll(struct file *file, poll_table *wait)
 3{
 4	struct socket *sock = file->private_data;
 5	__poll_t events = poll_requested_events(wait), flag = 0;
 6
 7	if (!sock->ops->poll)
 8		return 0;
 9
10	if (sk_can_busy_loop(sock->sk)) {
11		/* poll once if requested by the syscall */
12		if (events & POLL_BUSY_LOOP)
13			sk_busy_loop(sock->sk, 1);
14
15		/* if this socket can poll_ll, tell the system call */
16		flag = POLL_BUSY_LOOP;
17	}
18
19	return sock->ops->poll(file, sock, wait) | flag;
20}
unix套接字的udp操作 #
  • 定义
 1static const struct proto_ops unix_dgram_ops = {
 2	.family =	PF_UNIX,
 3	.owner =	THIS_MODULE,
 4	.release =	unix_release,
 5	.bind =		unix_bind,
 6	.connect =	unix_dgram_connect,
 7	.socketpair =	unix_socketpair,
 8	.accept =	sock_no_accept,
 9	.getname =	unix_getname,
10	.poll =		unix_dgram_poll,
11	.ioctl =	unix_ioctl,
12#ifdef CONFIG_COMPAT
13	.compat_ioctl =	unix_compat_ioctl,
14#endif
15	.listen =	sock_no_listen,
16	.shutdown =	unix_shutdown,
17	.sendmsg =	unix_dgram_sendmsg,
18	.read_sock =	unix_read_sock,
19	.recvmsg =	unix_dgram_recvmsg,
20	.mmap =		sock_no_mmap,
21	.sendpage =	sock_no_sendpage,
22	.set_peek_off =	unix_set_peek_off,
23	.show_fdinfo =	unix_show_fdinfo,
24};
  • unix_dgram_poll
 1static __poll_t unix_dgram_poll(struct file *file, struct socket *sock,
 2				    poll_table *wait)
 3{
 4	struct sock *sk = sock->sk, *other;
 5	unsigned int writable;
 6	__poll_t mask;
 7
 8	// 这里插入到等待队列
 9	sock_poll_wait(file, sock, wait);
10	mask = 0;
11
12	/* exceptional events? */
13	if (sk->sk_err || !skb_queue_empty_lockless(&sk->sk_error_queue))
14		mask |= EPOLLERR |
15			(sock_flag(sk, SOCK_SELECT_ERR_QUEUE) ? EPOLLPRI : 0);
16
17	if (sk->sk_shutdown & RCV_SHUTDOWN)
18		mask |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
19	if (sk->sk_shutdown == SHUTDOWN_MASK)
20		mask |= EPOLLHUP;
21
22	/* readable? */
23	if (!skb_queue_empty_lockless(&sk->sk_receive_queue))
24		mask |= EPOLLIN | EPOLLRDNORM;
25	if (sk_is_readable(sk))
26		mask |= EPOLLIN | EPOLLRDNORM;
27
28	/* Connection-based need to check for termination and startup */
29	if (sk->sk_type == SOCK_SEQPACKET) {
30		if (sk->sk_state == TCP_CLOSE)
31			mask |= EPOLLHUP;
32		/* connection hasn't started yet? */
33		if (sk->sk_state == TCP_SYN_SENT)
34			return mask;
35	}
36
37	/* No write status requested, avoid expensive OUT tests. */
38	if (!(poll_requested_events(wait) & (EPOLLWRBAND|EPOLLWRNORM|EPOLLOUT)))
39		return mask;
40
41	writable = unix_writable(sk);
42	if (writable) {
43		unix_state_lock(sk);
44
45		other = unix_peer(sk);
46		if (other && unix_peer(other) != sk &&
47		    unix_recvq_full_lockless(other) &&
48		    unix_dgram_peer_wake_me(sk, other))
49			writable = 0;
50
51		unix_state_unlock(sk);
52	}
53
54	if (writable)
55		mask |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND;
56	else
57		sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
58
59	return mask;
60}
  • sock_poll_wait中取到等待队列的头部,调用poll_wait
 1// include/net/sock.h
 2/**
 3 * sock_poll_wait - place memory barrier behind the poll_wait call.
 4 * @filp:           file
 5 * @sock:           socket to wait on
 6 * @p:              poll_table
 7 *
 8 * See the comments in the wq_has_sleeper function.
 9 */
10static inline void sock_poll_wait(struct file *filp, struct socket *sock,
11				  poll_table *p)
12{
13	if (!poll_does_not_wait(p)) {
14		poll_wait(filp, &sock->wq.wait, p);
15		/* We need to be sure we are in sync with the
16		 * socket flags modification.
17		 *
18		 * This memory barrier is paired in the wq_has_sleeper.
19		 */
20		smp_mb();
21	}
22}
  • poll_wait拿到等待队列的头部后,调用_qproc将p插入进去
  • 这里的_qproc其实是上面epoll中设置的ep_ptable_queue_proc
1// include/linux/poll.h
2static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
3{
4	if (p && p->_qproc && wait_address)
5		p->_qproc(filp, wait_address, p);
6}
  • ep_ptable_queue_proc,插入ep_poll_callback到socket的等待队列中
 1// fs/eventpoll.c
 2/*
 3 * This is the callback that is used to add our wait queue to the
 4 * target file wakeup lists.
 5 */
 6static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
 7				 poll_table *pt)
 8{
 9	struct ep_pqueue *epq = container_of(pt, struct ep_pqueue, pt);
10	struct epitem *epi = epq->epi;
11	struct eppoll_entry *pwq;
12
13	if (unlikely(!epi))	// an earlier allocation has failed
14		return;
15
16	pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL);
17	if (unlikely(!pwq)) {
18		epq->epi = NULL;
19		return;
20	}
21
22	init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
23	pwq->whead = whead;
24	pwq->base = epi;
25	// 这里插入到等待队列头部,插入的是ep_poll_callback函数
26	if (epi->event.events & EPOLLEXCLUSIVE)
27		add_wait_queue_exclusive(whead, &pwq->wait);
28	else
29		add_wait_queue(whead, &pwq->wait);
30	pwq->next = epi->pwqlist;
31	epi->pwqlist = pwq;
32}
  • ep_poll_callback函数,当回调调用时,将对应的epitem插入到epoll主结构体的链表尾部,通知唤醒
  1// fs/eventpoll.c
  2/*
  3 * This is the callback that is passed to the wait queue wakeup
  4 * mechanism. It is called by the stored file descriptors when they
  5 * have events to report.
  6 *
  7 * This callback takes a read lock in order not to contend with concurrent
  8 * events from another file descriptor, thus all modifications to ->rdllist
  9 * or ->ovflist are lockless.  Read lock is paired with the write lock from
 10 * ep_scan_ready_list(), which stops all list modifications and guarantees
 11 * that lists state is seen correctly.
 12 *
 13 * Another thing worth to mention is that ep_poll_callback() can be called
 14 * concurrently for the same @epi from different CPUs if poll table was inited
 15 * with several wait queues entries.  Plural wakeup from different CPUs of a
 16 * single wait queue is serialized by wq.lock, but the case when multiple wait
 17 * queues are used should be detected accordingly.  This is detected using
 18 * cmpxchg() operation.
 19 */
 20static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
 21{
 22	int pwake = 0;
 23	struct epitem *epi = ep_item_from_wait(wait);
 24	struct eventpoll *ep = epi->ep;
 25	__poll_t pollflags = key_to_poll(key);
 26	unsigned long flags;
 27	int ewake = 0;
 28
 29	read_lock_irqsave(&ep->lock, flags);
 30
 31	ep_set_busy_poll_napi_id(epi);
 32
 33	/*
 34	 * If the event mask does not contain any poll(2) event, we consider the
 35	 * descriptor to be disabled. This condition is likely the effect of the
 36	 * EPOLLONESHOT bit that disables the descriptor when an event is received,
 37	 * until the next EPOLL_CTL_MOD will be issued.
 38	 */
 39	if (!(epi->event.events & ~EP_PRIVATE_BITS))
 40		goto out_unlock;
 41
 42	/*
 43	 * Check the events coming with the callback. At this stage, not
 44	 * every device reports the events in the "key" parameter of the
 45	 * callback. We need to be able to handle both cases here, hence the
 46	 * test for "key" != NULL before the event match test.
 47	 */
 48	if (pollflags && !(pollflags & epi->event.events))
 49		goto out_unlock;
 50
 51	/*
 52	 * If we are transferring events to userspace, we can hold no locks
 53	 * (because we're accessing user memory, and because of linux f_op->poll()
 54	 * semantics). All the events that happen during that period of time are
 55	 * chained in ep->ovflist and requeued later on.
 56	 */
 57	// 这里插入epi到eventpoll的就绪队列
 58	if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
 59		if (chain_epi_lockless(epi))
 60			ep_pm_stay_awake_rcu(epi);
 61	} else if (!ep_is_linked(epi)) {
 62		/* In the usual case, add event to ready list. */
 63		if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist))
 64			ep_pm_stay_awake_rcu(epi);
 65	}
 66
 67	/*
 68	 * Wake up ( if active ) both the eventpoll wait list and the ->poll()
 69	 * wait list.
 70	 */
 71	if (waitqueue_active(&ep->wq)) {
 72		if ((epi->event.events & EPOLLEXCLUSIVE) &&
 73					!(pollflags & POLLFREE)) {
 74			switch (pollflags & EPOLLINOUT_BITS) {
 75			case EPOLLIN:
 76				if (epi->event.events & EPOLLIN)
 77					ewake = 1;
 78				break;
 79			case EPOLLOUT:
 80				if (epi->event.events & EPOLLOUT)
 81					ewake = 1;
 82				break;
 83			case 0:
 84				ewake = 1;
 85				break;
 86			}
 87		}
 88		wake_up(&ep->wq);
 89	}
 90	if (waitqueue_active(&ep->poll_wait))
 91		pwake++;
 92
 93out_unlock:
 94	read_unlock_irqrestore(&ep->lock, flags);
 95
 96	/* We have to call this outside the lock */
 97	if (pwake)
 98		ep_poll_safewake(ep, epi);
 99
100	if (!(epi->event.events & EPOLLEXCLUSIVE))
101		ewake = 1;
102
103	if (pollflags & POLLFREE) {
104		/*
105		 * If we race with ep_remove_wait_queue() it can miss
106		 * ->whead = NULL and do another remove_wait_queue() after
107		 * us, so we can't use __remove_wait_queue().
108		 */
109		list_del_init(&wait->entry);
110		/*
111		 * ->whead != NULL protects us from the race with ep_free()
112		 * or ep_remove(), ep_remove_wait_queue() takes whead->lock
113		 * held by the caller. Once we nullify it, nothing protects
114		 * ep/epi or even wait.
115		 */
116		smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);
117	}
118
119	return ewake;
120}

总结 #

  • 将socket插入到epoll中最中会将epoll的处理函数插入到socket的等待队列中,socket来消息后会调用

五、epoll_wait #

1. 主体逻辑 #

  • 从系统定义开始
 1// fs/eventpoll.c
 2SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
 3		int, maxevents, int, timeout)
 4{
 5	struct timespec64 to;
 6
 7	return do_epoll_wait(epfd, events, maxevents,
 8			     ep_timeout_to_timespec(&to, timeout));
 9}
10
11// fs/eventpoll.c
12/*
13 * Implement the event wait interface for the eventpoll file. It is the kernel
14 * part of the user space epoll_wait(2).
15 */
16static int do_epoll_wait(int epfd, struct epoll_event __user *events,
17			 int maxevents, struct timespec64 *to)
18{
19	int error;
20	struct fd f;
21	struct eventpoll *ep;
22
23	/* The maximum number of event must be greater than zero */
24	if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
25		return -EINVAL;
26
27	/* Verify that the area passed by the user is writeable */
28	if (!access_ok(events, maxevents * sizeof(struct epoll_event)))
29		return -EFAULT;
30
31	/* Get the "struct file *" for the eventpoll file */
32	f = fdget(epfd);
33	if (!f.file)
34		return -EBADF;
35
36	/*
37	 * We have to check that the file structure underneath the fd
38	 * the user passed to us _is_ an eventpoll file.
39	 */
40	error = -EINVAL;
41	if (!is_file_epoll(f.file))
42		goto error_fput;
43
44	/*
45	 * At this point it is safe to assume that the "private_data" contains
46	 * our own data structure.
47	 */
48	ep = f.file->private_data;
49
50	/* Time to fish for events ... */
51	error = ep_poll(ep, events, maxevents, to);
52
53error_fput:
54	fdput(f);
55	return error;
56}
  • 主要逻辑在ep_poll
  1// fs/eventpoll.c
  2/**
  3 * ep_poll - Retrieves ready events, and delivers them to the caller-supplied
  4 *           event buffer.
  5 *
  6 * @ep: Pointer to the eventpoll context.
  7 * @events: Pointer to the userspace buffer where the ready events should be
  8 *          stored.
  9 * @maxevents: Size (in terms of number of events) of the caller event buffer.
 10 * @timeout: Maximum timeout for the ready events fetch operation, in
 11 *           timespec. If the timeout is zero, the function will not block,
 12 *           while if the @timeout ptr is NULL, the function will block
 13 *           until at least one event has been retrieved (or an error
 14 *           occurred).
 15 *
 16 * Return: the number of ready events which have been fetched, or an
 17 *          error code, in case of error.
 18 */
 19static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
 20		   int maxevents, struct timespec64 *timeout)
 21{
 22	int res, eavail, timed_out = 0;
 23	u64 slack = 0;
 24	wait_queue_entry_t wait;
 25	ktime_t expires, *to = NULL;
 26
 27	lockdep_assert_irqs_enabled();
 28
 29	if (timeout && (timeout->tv_sec | timeout->tv_nsec)) {
 30		slack = select_estimate_accuracy(timeout);
 31		to = &expires;
 32		*to = timespec64_to_ktime(*timeout);
 33	} else if (timeout) {
 34		/*
 35		 * Avoid the unnecessary trip to the wait queue loop, if the
 36		 * caller specified a non blocking operation.
 37		 */
 38		timed_out = 1;
 39	}
 40
 41	/*
 42	 * This call is racy: We may or may not see events that are being added
 43	 * to the ready list under the lock (e.g., in IRQ callbacks). For cases
 44	 * with a non-zero timeout, this thread will check the ready list under
 45	 * lock and will add to the wait queue.  For cases with a zero
 46	 * timeout, the user by definition should not care and will have to
 47	 * recheck again.
 48	 */
 49	// 获取是否有事件
 50	eavail = ep_events_available(ep);
 51
 52	while (1) {
 53		if (eavail) {
 54			// 有事件就把事件发给用户空间
 55			/*
 56			 * Try to transfer events to user space. In case we get
 57			 * 0 events and there's still timeout left over, we go
 58			 * trying again in search of more luck.
 59			 */
 60			res = ep_send_events(ep, events, maxevents);
 61			if (res)
 62				return res;
 63		}
 64
 65		// 超时时间处理
 66		if (timed_out)
 67			return 0;
 68
 69		eavail = ep_busy_loop(ep, timed_out);
 70		if (eavail)
 71			continue;
 72
 73		if (signal_pending(current))
 74			return -EINTR;
 75
 76		/*
 77		 * Internally init_wait() uses autoremove_wake_function(),
 78		 * thus wait entry is removed from the wait queue on each
 79		 * wakeup. Why it is important? In case of several waiters
 80		 * each new wakeup will hit the next waiter, giving it the
 81		 * chance to harvest new event. Otherwise wakeup can be
 82		 * lost. This is also good performance-wise, because on
 83		 * normal wakeup path no need to call __remove_wait_queue()
 84		 * explicitly, thus ep->lock is not taken, which halts the
 85		 * event delivery.
 86		 */
 87		init_wait(&wait);
 88
 89		write_lock_irq(&ep->lock);
 90		/*
 91		 * Barrierless variant, waitqueue_active() is called under
 92		 * the same lock on wakeup ep_poll_callback() side, so it
 93		 * is safe to avoid an explicit barrier.
 94		 */
 95		__set_current_state(TASK_INTERRUPTIBLE);
 96
 97		/*
 98		 * Do the final check under the lock. ep_scan_ready_list()
 99		 * plays with two lists (->rdllist and ->ovflist) and there
100		 * is always a race when both lists are empty for short
101		 * period of time although events are pending, so lock is
102		 * important.
103		 */
104		// 最后检查一次是否有事件,没有事件就把自己加入到wq中
105		eavail = ep_events_available(ep);
106		if (!eavail)
107			__add_wait_queue_exclusive(&ep->wq, &wait);
108
109		write_unlock_irq(&ep->lock);
110
111		// 这里进入睡眠,带超时时间的睡眠
112		if (!eavail)
113			timed_out = !schedule_hrtimeout_range(to, slack,
114							      HRTIMER_MODE_ABS);
115		// 这里唤醒后,设置进程状态为running
116		__set_current_state(TASK_RUNNING);
117
118		/*
119		 * We were woken up, thus go and try to harvest some events.
120		 * If timed out and still on the wait queue, recheck eavail
121		 * carefully under lock, below.
122		 */
123		eavail = 1;
124
125		// 等待队列不为空,就从等待队列移除等待项
126		if (!list_empty_careful(&wait.entry)) {
127			write_lock_irq(&ep->lock);
128			/*
129			 * If the thread timed out and is not on the wait queue,
130			 * it means that the thread was woken up after its
131			 * timeout expired before it could reacquire the lock.
132			 * Thus, when wait.entry is empty, it needs to harvest
133			 * events.
134			 */
135			if (timed_out)
136				eavail = list_empty(&wait.entry);
137			__remove_wait_queue(&ep->wq, &wait);
138			write_unlock_irq(&ep->lock);
139		}
140	}
141}
  • ep_events_available主要检查rdllist是否为空
 1// fs/eventpoll.c
 2/**
 3 * ep_events_available - Checks if ready events might be available.
 4 *
 5 * @ep: Pointer to the eventpoll context.
 6 *
 7 * Return: a value different than %zero if ready events are available,
 8 *          or %zero otherwise.
 9 */
10static inline int ep_events_available(struct eventpoll *ep)
11{
12	return !list_empty_careful(&ep->rdllist) ||
13		READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
14}
  • 水平触发和边沿触发主要逻辑在ep_send_events
  • 水平触发就是有事件还是加回就绪队列,同时返回。下一次epoll_wait会检查就绪队列被加回的这个事件是否读完了,没读完继续返回,读完了就移除
  1static int ep_send_events(struct eventpoll *ep,
  2			  struct epoll_event __user *events, int maxevents)
  3{
  4	struct epitem *epi, *tmp;
  5	LIST_HEAD(txlist);
  6	poll_table pt;
  7	int res = 0;
  8
  9	/*
 10	 * Always short-circuit for fatal signals to allow threads to make a
 11	 * timely exit without the chance of finding more events available and
 12	 * fetching repeatedly.
 13	 */
 14	if (fatal_signal_pending(current))
 15		return -EINTR;
 16
 17	init_poll_funcptr(&pt, NULL);
 18
 19	mutex_lock(&ep->mtx);
 20	// 把ep->rdlist全部转移到txlist中,清空rdlist
 21	ep_start_scan(ep, &txlist);
 22
 23	// 遍历txlist链表,将所有就绪队列事件取出检查,取出的元素为epi
 24	/*
 25	 * We can loop without lock because we are passed a task private list.
 26	 * Items cannot vanish during the loop we are holding ep->mtx.
 27	 */
 28	list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
 29		struct wakeup_source *ws;
 30		__poll_t revents;
 31
 32		if (res >= maxevents)
 33			break;
 34
 35		/*
 36		 * Activate ep->ws before deactivating epi->ws to prevent
 37		 * triggering auto-suspend here (in case we reactive epi->ws
 38		 * below).
 39		 *
 40		 * This could be rearranged to delay the deactivation of epi->ws
 41		 * instead, but then epi->ws would temporarily be out of sync
 42		 * with ep_is_linked().
 43		 */
 44		ws = ep_wakeup_source(epi);
 45		if (ws) {
 46			if (ws->active)
 47				__pm_stay_awake(ep->ws);
 48			__pm_relax(ws);
 49		}
 50
 51		// 将epi从链表中移除
 52		list_del_init(&epi->rdllink);
 53
 54		// 检查一下epi是否有事件,没事件就跳过
 55		/*
 56		 * If the event mask intersect the caller-requested one,
 57		 * deliver the event to userspace. Again, we are holding ep->mtx,
 58		 * so no operations coming from userspace can change the item.
 59		 */
 60		revents = ep_item_poll(epi, &pt, 1);
 61		if (!revents)
 62			continue;
 63
 64		// 有事件将event拷贝到用户空间中,没拷贝成功就加回txlist末尾,退出循环
 65		events = epoll_put_uevent(revents, epi->event.data, events);
 66		if (!events) {
 67			list_add(&epi->rdllink, &txlist);
 68			ep_pm_stay_awake(epi);
 69			if (!res)
 70				res = -EFAULT;
 71			break;
 72		}
 73		res++;
 74		if (epi->event.events & EPOLLONESHOT)
 75			epi->event.events &= EP_PRIVATE_BITS;
 76		else if (!(epi->event.events & EPOLLET)) {
 77			// 关键点,如果不是边沿触发也就是水平触发,会把epi加回ep->rdllist
 78			// 下次调用epoll_wait时,会直接有就绪队列,上面会判断是否有事件
 79			// 如果读完了,就绪队列中会删除此epi;没有读完就会继续返回这个epi
 80			/*
 81			 * If this file has been added with Level
 82			 * Trigger mode, we need to insert back inside
 83			 * the ready list, so that the next call to
 84			 * epoll_wait() will check again the events
 85			 * availability. At this point, no one can insert
 86			 * into ep->rdllist besides us. The epoll_ctl()
 87			 * callers are locked out by
 88			 * ep_scan_ready_list() holding "mtx" and the
 89			 * poll callback will queue them in ep->ovflist.
 90			 */
 91			list_add_tail(&epi->rdllink, &ep->rdllist);
 92			ep_pm_stay_awake(epi);
 93		}
 94	}
 95	// 到这里有三个情况
 96	// 一个是用户空间给进来的范围写满了,一个是中间拷贝用户空间出错了,还有就是正常读完了
 97	// 检查txlist,把txlist剩余的放回ep->rdllist
 98	ep_done_scan(ep, &txlist);
 99	mutex_unlock(&ep->mtx);
100
101	return res;
102}
  • ep_done_scan
 1// fs/eventpoll.c
 2static void ep_done_scan(struct eventpoll *ep,
 3			 struct list_head *txlist)
 4{
 5	struct epitem *epi, *nepi;
 6
 7	write_lock_irq(&ep->lock);
 8	// 这里遍历一下当把事件给到用户空间过程中来的事件,放到ep->rdllist中
 9	/*
10	 * During the time we spent inside the "sproc" callback, some
11	 * other events might have been queued by the poll callback.
12	 * We re-insert them inside the main ready-list here.
13	 */
14	for (nepi = READ_ONCE(ep->ovflist); (epi = nepi) != NULL;
15	     nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
16		/*
17		 * We need to check if the item is already in the list.
18		 * During the "sproc" callback execution time, items are
19		 * queued into ->ovflist but the "txlist" might already
20		 * contain them, and the list_splice() below takes care of them.
21		 */
22		if (!ep_is_linked(epi)) {
23			/*
24			 * ->ovflist is LIFO, so we have to reverse it in order
25			 * to keep in FIFO.
26			 */
27			list_add(&epi->rdllink, &ep->rdllist);
28			ep_pm_stay_awake(epi);
29		}
30	}
31	/*
32	 * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
33	 * releasing the lock, events will be queued in the normal way inside
34	 * ep->rdllist.
35	 */
36	WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);
37
38	/*
39	 * Quickly re-inject items left on "txlist".
40	 */
41	list_splice(txlist, &ep->rdllist);
42	__pm_relax(ep->ws);
43
44	// 如果就绪队列还有事件,继续唤醒等待的进程(多进程同时等待同一个epoll的场景)
45	if (!list_empty(&ep->rdllist)) {
46		if (waitqueue_active(&ep->wq))
47			wake_up(&ep->wq);
48	}
49
50	write_unlock_irq(&ep->lock);
51}