Redis源码 - 事件处理 part2
接PART1部分
redis文件事件来源是网络I/O,即clients请求
本文将结合网络socket、 EPoll来聊聊Redis是接收数据的过程
part2可以说是part1的前传
万事开头难,先从socket谈起
Socket 是什么?
先看下socket结构
/**
* struct socket - general BSD socket
* @state: socket state (%SS_CONNECTED, etc)
* @type: socket type (%SOCK_STREAM, etc)
* @flags: socket flags (%SOCK_NOSPACE, etc)
* @ops: protocol specific socket operations
* @file: File back pointer for gc
* @sk: internal networking protocol agnostic socket representation
* @wq: wait queue for several uses
*/
struct socket {
socket_state state;
short type;
unsigned long flags;
struct socket_wq *wq;
struct file *file;
struct sock *sk;
const struct proto_ops *ops;
};
socket 结构介绍两部分:
与文件系统关系密切的部分file
struct file {
union {
struct llist_node fu_llist;
struct rcu_head fu_rcuhead;
} f_u;
struct path f_path;
struct inode *f_inode; /* cached value */
const struct file_operations *f_op;
/*
* Protects f_ep_links, f_flags.
* Must not be taken from IRQ context.
*/
spinlock_t f_lock;
......
}
与通信关系密切的部分sock
struct tcp_sock {
/* inet_connection_sock has to be the first member of tcp_sock */
struct inet_connection_sock inet_conn;
}
struct inet_connection_sock {
/* inet_sock has to be the first member! */
struct inet_sock icsk_inet;
struct request_sock_queue icsk_accept_queue;
struct inet_bind_bucket *icsk_bind_hash;
}
struct inet_sock {
/* sk and pinet6 has to be the first two members of inet_sock */
struct sock sk;
}
struct sock {
/*
* Now struct inet_timewait_sock also uses sock_common, so please just
* don't add nothing before this first member (__sk_common) --acme
*/
struct sock_common __sk_common;
struct sk_buff_head sk_error_queue;
struct sk_buff_head sk_receive_queue;
union {
struct socket_wq __rcu *sk_wq;
struct socket_wq *sk_wq_raw;
};
.....
}
struct sock_common {
/* skc_daddr and skc_rcv_saddr must be grouped on a 8 bytes aligned
* address on 64bit arches : cf INET_MATCH()
*/
union {
__addrpair skc_addrpair;
struct {
__be32 skc_daddr;
__be32 skc_rcv_saddr;
};
};
union {
unsigned int skc_hash;
__u16 skc_u16hashes[2];
};
/* skc_dport && skc_num must be grouped as well */
union {
__portpair skc_portpair;
struct {
__be16 skc_dport;
__u16 skc_num;
};
};
.....
}
创建Socket过程
创建一个 socket,要把 socket 关联到一个已打开文件,方便进程进行管理
#------------------- *用户态* ---------------------------
socket
#------------------- *内核态* ---------------------------
__x64_sys_socket # 内核系统调用。
__sys_socket # net/socket.c
|-- sock_create # net/socket.c
|-- __sock_create # net/socket.c
#------------------- 文件部分 ---------------------------
|-- sock_alloc # net/socket.c
|-- new_inode_pseudo # fs/inode.c
|-- alloc_inode # fs/inode.c
|-- sock_alloc_inode # net/socket.c
|-- kmem_cache_alloc
#------------------- 网络部分 ---------------------------
|-- inet_create # pf->create -- af_inet.c
|-- sk_alloc # net/core/sock.c
|-- sk_prot_alloc # net/core/sock.c
|-- kmem_cache_alloc
|-- inet_sk
|-- sock_init_data # net/core/sock.c
|-- sk_init_common # net/core/sock.c
|-- timer_setup
|-- sk->sk_prot->init(sk) # tcp_v4_init_sock -- net/ipv4/tcp_ipv4.c
|-- tcp_init_sock
#------------------- 文件+网络+关联进程 ------------------------
|-- sock_map_fd # net/socket.c
|-- get_unused_fd_flags # fs/file.c -- 进程分配空闲 fd。
|-- sock_alloc_file # net/socket.c
|-- alloc_file_pseudo # fs/file_table.c
|-- fd_install # fs/file.c
|-- __fd_install # fs/file.c
|-- fdt = rcu_dereference_sched(files->fdt);
|-- rcu_assign_pointer(fdt->fd[fd], file); # file 关联到进程
- sock 与 inode 文件节点关联结构
- sock 与进程关联
static int sock_map_fd(struct socket *sock, int flags) {
struct file *newfile;
int fd = get_unused_fd_flags(flags);
if (unlikely(fd < 0)) {
sock_release(sock);
return fd;
}
newfile = sock_alloc_file(sock, flags, NULL);
if (likely(!IS_ERR(newfile))) {
fd_install(fd, newfile);
return fd;
}
...
}
Redis何时创建socket?
处理client连接的socket,在accept中创建
#------------------- *用户态* ---------------------------
accept
#------------------- *内核态* ---------------------------
__sys_accept4 # net/socket.c - 内核系统调用。
|-- sockfd_lookup_light # 根据 fd 查找 listen socket 的 socket 指针。
|-- sock_alloc # 创建一个新的 socket 对象,因为要从 listen socket 的全连接队列里获取一个就绪的连接。
|-- get_unused_fd_flags # 从进程中获取一个空闲的文件 fd。
|-- sock_alloc_file # 从进程中创建一个新的文件,因为文件要与 socket 关联。
|-- inet_accept # 从 listen socket 的全连接队列里获取一个就绪的 sock 连接,与前面新创建的 socket 关联。
|-- inet_csk_accept
|-- reqsk_queue_empty # 如果 listen socket 的全连接队列是空的,那么阻塞或者非阻塞返回 EAGAIN。
|-- sock_rcvtimeo
|-- inet_csk_wait_for_connect # 阻塞场景下的等待。
# 如果 listen socket 的全连接队列非空,那么从全连接队列取一个连接处理。
|-- reqsk_queue_remove # 从 listen socket 全连接队列删除获取一个 request_sock 连接处理。
|-- sock_graft # socket 与 sock 建立联系。
|-- inet_getname
|-- move_addr_to_user # 拷贝 accept 的连接的 ip/port 到用户层。
|-- fd_install # 文件和进程进行关联。__fd_install(current->files, fd, file);
TCP 三次握手结束,server accept完成,建立与client连接的socket抽象
结合redis实现: anetTcpAccept,GDB debug client连接
Thread 1 "redis-server" hit Breakpoint 5, anetTcpAccept (err=0x750ca8 <server+520> "accept: Resource temporarily unavailable", s=4, ip=0x7fffffffe1f0 "127.0.0.1", ip_len=46, port=0x7fffffffe224)
at anet.c:549
(gdb) bt
#0 anetTcpAccept (err=0x750ca8 <server+520> "accept: Resource temporarily unavailable", s=4, ip=0x7fffffffe1f0 "127.0.0.1", ip_len=46, port=0x7fffffffe224) at anet.c:549
#1 0x0000000000438b5f in acceptTcpHandler (el=0x7ffff6a2f0a0, fd=4, privdata=0x0, mask=1) at networking.c:694
#2 0x0000000000421c85 in aeProcessEvents (eventLoop=0x7ffff6a2f0a0, flags=3) at ae.c:431
#3 0x0000000000421e68 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:489
#4 0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
int fd;
while(1) {
fd = accept(s,sa,len);//user space tcp accept
if (fd == -1) {
if (errno == EINTR)
continue;
else {
anetSetError(err, "accept: %s", strerror(errno));
return ANET_ERR;
}
}
break;
}
return fd;
}
这样链路就清晰了
Redis 创建server 对应的socket
过程如下:
- 创建EPoll fd server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 核心代码epoll_create(1024);
- int listenToPort(int port, int fds, int count) 以TCP4为例
- 创建server的socket socket(p->ai_family,p->ai_socktype,p->ai_protocol)
- anetListen bind 略
- anetListen listen 略
- while(!server.el->stop) aeProcessEvents(eventLoop, AE_ALL_EVENTS) 监听FD事件
Redis 如何管理 Socket - FD ?
redis 使用EPoll管理FD
struct eventpoll {
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
struct list_head rdllist;
/* Lock which protects rdllist and ovflist */
rwlock_t lock;
/* RB tree root used to store monitored fd structs */
struct rb_root_cached rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;
/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
/*
* usage count, used together with epitem->dying to
* orchestrate the disposal of this struct
*/
refcount_t refcount;
#ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
unsigned int napi_id;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
/* tracks wakeup nests for lockdep validation */
u8 nests;
#endif
};
- wq 阻塞在epoll_wait的等待队列
- poll_wait 当epoll_ctl 监听的是另外一个 epoll fd 时使用
- rdllist 就绪队列。产生了用户注册的 fd读写事件的 epi 链表
- ovflist 当正在转移就绪队列中的事件到用户空间时,这段时期就绪的事件会被暂时加入该队列,等待转移结束再添加到rdllist
- rbr 红黑树根结点,管理 fd 结点
- file eventpoll 对应的文件结构,Linux 一切皆文件
- lock 锁,保护 rdllist 和 ovflist
EPoll实例使用红黑树rbr来管理注册的需要监听的fd ,这些fd就是👆 accept过来的client 连接
Redis 如何处理FD就绪事件?
当一个accept的fd被添加到epoll实例时,epoll实例会调用对应file的poll方法
(Redis Poll调用链路aeMain->aeProcessEvents->aeApiPoll,详细代码在PART1已介绍过,这里跳过)
poll方法主要有两个作用:
- 设置callback,当文件有新的就绪事件产生时,调用callback处理
- 返回文件当前的就绪事件
结合Redis, fd被添加到epoll流程如下:
- void acceptTcpHandler(aeEventLoop el, int fd, void privdata, int mask)
- acceptCommonHandler(cfd,0,cip)
- client *createClient(int fd) 设置callback:readQueryFromClient函数
- int aeCreateFileEvent(aeEventLoop eventLoop, int fd, int mask, aeFileProc proc, void *clientData)
- aeApiAddEvent(eventLoop, fd, mask) 注册fd同时设置RW事件对于的处理callback函数,核心代码epoll_ctl(state->epfd,op,fd,&ee)
callback就是readQueryFromClient函数
后续就是读取数据,解析命令,这里不再展开,下一篇继续
上面👆说的FD就绪事件是何时产生的?
当数据进入网卡,底层中断执行 ep_poll_callback, ep_poll_callback 将事件添加至rdllist & ovflist 并 wake up epoll
以上便是产生的过程
链路如下:
driver -> ep_poll_callback -> epoll_wait(wake up)
/*
* This is the callback that is passed to the wait queue wakeup
* mechanism. It is called by the stored file descriptors when they
* have events to report.
*
* This callback takes a read lock in order not to contend with concurrent
* events from another file descriptor, thus all modifications to ->rdllist
* or ->ovflist are lockless. Read lock is paired with the write lock from
* ep_scan_ready_list(), which stops all list modifications and guarantees
* that lists state is seen correctly.
*
* Another thing worth to mention is that ep_poll_callback() can be called
* concurrently for the same @epi from different CPUs if poll table was inited
* with several wait queues entries. Plural wakeup from different CPUs of a
* single wait queue is serialized by wq.lock, but the case when multiple wait
* queues are used should be detected accordingly. This is detected using
* cmpxchg() operation.
*/
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key){
......
// 当内核空间向用户空间拷贝数据时 将它添加到 ovflist。
if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
if (epi->next == EP_UNACTIVE_PTR && chain_epi_lockless(epi))
ep_pm_stay_awake_rcu(epi);
goto out_unlock;
}
if (!ep_is_linked(epi) &&
list_add_tail_lockless(&epi->rdllink, &ep->rdllist)) {
ep_pm_stay_awake_rcu(epi);
}
// 当回调事件是用户注册的事件,那么需要唤醒进程处理。
// ep->wq 在 epoll_wait 时添加,当没有就绪事件,epoll_wait 进行睡眠等待唤醒
if (waitqueue_active(&ep->wq)) {
if ((epi->event.events & EPOLLEXCLUSIVE) &&
!(pollflags & POLLFREE)) {
// #define EPOLLINOUT_BITS (EPOLLIN | EPOLLOUT)
switch (pollflags & EPOLLINOUT_BITS) {
case EPOLLIN:
if (epi->event.events & EPOLLIN)
ewake = 1;
break;
case EPOLLOUT:
if (epi->event.events & EPOLLOUT)
ewake = 1;
break;
case 0:
ewake = 1;
break;
}
}
wake_up(&ep->wq);
}
......
}
epoll_wait(wake up)之后发生了什么?
epoll_wait 调用 ep_events_available 检查就绪队列rdllist & ovflist, 就绪事件调用ep_send_events put到用户空间,详细见👇的代码
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
struct timespec64 to;
return do_epoll_wait(epfd, events, maxevents,
ep_timeout_to_timespec(&to, timeout));
}
/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_pwait(2).
*/
static int do_epoll_pwait(int epfd, struct epoll_event __user *events,
int maxevents, struct timespec64 *to,
const sigset_t __user *sigmask, size_t sigsetsize)
{
error = do_epoll_wait(epfd, events, maxevents, to);
restore_saved_sigmask_unless(error == -EINTR);
return error;
}
/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, struct timespec64 *to){
......
/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
error = -EINVAL;
if (!is_file_epoll(f.file))
goto error_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data;
/* Time to fish for events ... */
// timeout 阻塞等待处理并返回就绪事件
error = ep_poll(ep, events, maxevents, to);
}
/**
* ep_poll - Retrieves ready events, and delivers them to the caller-supplied
* event buffer.
*
* @ep: Pointer to the eventpoll context.
* @events: Pointer to the userspace buffer where the ready events should be
* stored.
* @maxevents: Size (in terms of number of events) of the caller event buffer.
* @timeout: Maximum timeout for the ready events fetch operation, in
* timespec. If the timeout is zero, the function will not block,
* while if the @timeout ptr is NULL, the function will block
* until at least one event has been retrieved (or an error
* occurred).
*
* Return: the number of ready events which have been fetched, or an
* error code, in case of error.
*/
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, struct timespec64 *timeout){
......
// 检查就绪队列
eavail = ep_events_available(ep);
while (1) {
if (eavail) {
/*
* Try to transfer events to user space. In case we get
* 0 events and there's still timeout left over, we go
* trying again in search of more luck.
*/
// 就绪,发送事件
res = ep_send_events(ep, events, maxevents);
if (res)
return res;
}
......
}
}
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents){
......
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, we are holding ep->mtx,
* so no operations coming from userspace can change the item.
*/
//获取 epi 对应 fd 的就绪事件--server socket epfd
revents = ep_item_poll(epi, &pt, 1);
if (!revents)
continue;
//内核空间向用户空间传递数据
events = epoll_put_uevent(revents, epi->event.data, events);
if (!events) {
// 如果拷贝失败,继续保存在就绪列表里
list_add(&epi->rdllink, &txlist);
ep_pm_stay_awake(epi);
if (!res)
res = -EFAULT;
break;
}
res++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, no one can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
/* lt 模式下,当前事件被处理完后,不会从就绪列表中删除,留待下一次 epoll_wait
* 调用,再查看是否还有事件没处理,如果没有事件了就从就绪列表中删除。
* 在遍历事件的过程中,不能写 ep->rdllist,因为已经上锁,只能把新的就绪信息
* 添加到 ep->ovflist */
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
......
}
ep_send_events之后的流程
-
ep_send_events->ep_item_poll->vfs_poll->tcp_poll->sock_poll_wait->poll_wait
- ep_send_events 调用ep_item_poll 获取fd就绪事件
static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
int depth)
{
struct file *file = epi->ffd.file;
__poll_t res;
pt->_key = epi->event.events;
if (!is_file_epoll(file))
res = vfs_poll(file, pt);
else
res = __ep_eventpoll_poll(file, pt, depth);
return res & epi->event.events;
}
static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) {
if (unlikely(!file->f_op->poll))
return DEFAULT_POLLMASK;
// tcp socket,poll指向 tcp_poll 函数。
return file->f_op->poll(file, pt);
}
/*
* Wait for a TCP event.
*
* Note that we don't need to lock the socket, as the upper poll layers
* take care of normal races (between the test and the event) and we don't
* go look at any of the socket buffers directly.
*/
__poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
/**
* sock_poll_wait - place memory barrier behind the poll_wait call.
* @filp: file
* @sock: socket to wait on
* @p: poll_table
*
* See the comments in the wq_has_sleeper function.
*/
static inline void sock_poll_wait(struct file *filp, struct socket *sock,
poll_table *p)
{
if (!poll_does_not_wait(p)) {
poll_wait(filp, &sock->wq.wait, p);
/* We need to be sure we are in sync with the
* socket flags modification.
*
* This memory barrier is paired in the wq_has_sleeper.
*/
smp_mb();
}
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
}
上面的p->_qproc是什么呢?
是函数ep_ptable_queue_proc,ep_insert函数里进行初始化
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->_qproc = qproc;
pt->_key = ~(__poll_t)0; /* all events enabled */
}
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check){
......
/* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
......
}
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct ep_pqueue *epq = container_of(pt, struct ep_pqueue, pt);
struct epitem *epi = epq->epi;
struct eppoll_entry *pwq;
if (unlikely(!epi)) // an earlier allocation has failed
return;
pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL);
if (unlikely(!pwq)) {
epq->epi = NULL;
return;
}
// wq设置ep_poll_callback
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
if (epi->event.events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(whead, &pwq->wait);
else
add_wait_queue(whead, &pwq->wait);
pwq->next = epi->pwqlist;
epi->pwqlist = pwq;
}
ep_insert是什么时候调用呢?
epoll_ctl函数
结合redis,accept之后,调用aeApiAddEvent 时会调用epoll_ctl函数,调用链路如下:
aeApiAddEvent-> epoll_ctl -> ep_insert->init_poll_funcptr->ep_ptable_queue_proc
driver接收到数据如何触发ep_poll_callback呢?
网卡是硬件,内核通过网卡驱动driver与网卡交互
- 内核启动网卡,为网卡工作分配资源(ring buffer)和注册硬中断处理
- 网卡(NIC)接收数据,网卡触发硬中断,通知 CPU 已接收数据
- CPU 收到网卡的硬中断,调用对应的处理函数-driver提供
- 然后启用 NET_RX_SOFTIRQ -> net_rx_action 内核软中断
- 内核软中断线程消费网卡 DMA 方式写入主存的数据
- 内核软中断遍历 softnet_data.poll_list,调用对应的 napi_struct.poll 读取网卡 DMA 方式写入主存的数据
- 遍历 ring buffer 通过 dma_sync_single_for_cpu 接口读取 DMA 方式写入主存的数据,并将数据拷贝到 skb 包
- 网卡驱动读取到 skb 包后,需要将该包传到网络层处理
- skb 包需要传到网络层。如果内核开启了 RPS (Receive Package Steering) 功能,为了利用多核资源,(enqueue_to_backlog)需要将数据包负载均衡到各个 CPU,那么这个 skb 包将会通过哈希算法,挂在某个 cpu 的接收队列上(softnet_data.input_pkt_queue),然后等待软中断调用 softnet_data 的 napi 接口 process_backlog(softnet_data.backlog.poll)将接收队列上的数据包通过 __netif_receive_skb 交给网络层处理
- 网卡驱动读取了网卡写入的数据,并将数据包交给协议栈处理- IP/TCP tcp_v4_rcv
tcp_v4_rcv
tcp_v4_do_rcv
tcp_rcv_state_process
sk_state_change
sock_def_wakeup
wake_up_interruptible_all
__wake_up
__wake_up_common
sock_def_wakeup 当socket状态变化时sk_state_change调用
/*
* Default Socket Callbacks
*/
// sock.c
static void sock_def_wakeup(struct sock *sk)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (skwq_has_sleeper(wq))
wake_up_interruptible_all(&wq->wait);
rcu_read_unlock();
}
//wait.h
#define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)
//wait.c
/**
* __wake_up - wake up threads blocked on a waitqueue.
* @wq_head: the waitqueue
* @mode: which threads
* @nr_exclusive: how many wake-one or wake-many threads to wake up
* @key: is directly passed to the wakeup function
*
* If this function wakes up a task, it executes a full memory barrier
* before accessing the task state. Returns the number of exclusive
* tasks that were awaken.
*/
int __wake_up(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, void *key)
{
return __wake_up_common_lock(wq_head, mode, nr_exclusive, 0, key);
}
static int __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
unsigned long flags;
int remaining;
spin_lock_irqsave(&wq_head->lock, flags);
remaining = __wake_up_common(wq_head, mode, nr_exclusive, wake_flags,
key);
spin_unlock_irqrestore(&wq_head->lock, flags);
return nr_exclusive - remaining;
}
/*
* The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just
* wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve
* number) then we wake that number of exclusive tasks, and potentially all
* the non-exclusive tasks. Normally, exclusive tasks will be at the end of
* the list and any non-exclusive tasks will be woken first. A priority task
* may be at the head of the list, and can consume the event without any other
* tasks being woken.
*
* There are circumstances in which we can try to wake a task which has already
* started to run but is not in state TASK_RUNNING. try_to_wake_up() returns
* zero in this (rare) case, and we handle it by continuing to scan the queue.
*/
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_entry_t *curr, *next;
lockdep_assert_held(&wq_head->lock);
curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);
if (&curr->entry == &wq_head->head)
return nr_exclusive;
//循环处理等待队列结点,回调 ep_poll_callback
list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
unsigned flags = curr->flags;
int ret;
//wq->ep_poll_callback
ret = curr->func(curr, mode, wake_flags, key);
if (ret < 0)
break;
if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
return nr_exclusive;
}
最终调用epol l的ep_poll_callback处理事件