qspinlock

一、介绍 #

qspinlock是基于MCS lock进行改进的,将整个锁放入32位的空间中

二、结构体介绍 #

 1// include/asm-generic/qspinlock_types.h
 2typedef struct qspinlock {
 3	union {
 4		atomic_t val;
 5
 6		/*
 7		 * By using the whole 2nd least significant byte for the
 8		 * pending bit, we can allow better optimization of the lock
 9		 * acquisition for the pending bit holder.
10		 */
11#ifdef __LITTLE_ENDIAN
12		struct {
13			u8	locked;
14			u8	pending;
15		};
16		struct {
17			u16	locked_pending;
18			u16	tail;
19		};
20#else
21		struct {
22			u16	tail;
23			u16	locked_pending;
24		};
25		struct {
26			u8	reserved[2];
27			u8	pending;
28			u8	locked;
29		};
30#endif
31	};
32} arch_spinlock_t;
33
34/*
35 * Initializier
36 */
37#define	__ARCH_SPIN_LOCK_UNLOCKED	{ { .val = ATOMIC_INIT(0) } }
38
39/*
40 * Bitfields in the atomic value:
41 *
42 * When NR_CPUS < 16K
43 *  0- 7: locked byte
44 *     8: pending
45 *  9-15: not used
46 * 16-17: tail index
47 * 18-31: tail cpu (+1)
48 *
49 * When NR_CPUS >= 16K
50 *  0- 7: locked byte
51 *     8: pending
52 *  9-10: tail index
53 * 11-31: tail cpu (+1)
54 */
55#define	_Q_SET_MASK(type)	(((1U << _Q_ ## type ## _BITS) - 1)\
56				      << _Q_ ## type ## _OFFSET)
57#define _Q_LOCKED_OFFSET	0
58#define _Q_LOCKED_BITS		8
59#define _Q_LOCKED_MASK		_Q_SET_MASK(LOCKED)
60
61#define _Q_PENDING_OFFSET	(_Q_LOCKED_OFFSET + _Q_LOCKED_BITS)
62#if CONFIG_NR_CPUS < (1U << 14)
63#define _Q_PENDING_BITS		8
64#else
65#define _Q_PENDING_BITS		1
66#endif
67#define _Q_PENDING_MASK		_Q_SET_MASK(PENDING)
68
69#define _Q_TAIL_IDX_OFFSET	(_Q_PENDING_OFFSET + _Q_PENDING_BITS)
70#define _Q_TAIL_IDX_BITS	2
71#define _Q_TAIL_IDX_MASK	_Q_SET_MASK(TAIL_IDX)
72
73#define _Q_TAIL_CPU_OFFSET	(_Q_TAIL_IDX_OFFSET + _Q_TAIL_IDX_BITS)
74#define _Q_TAIL_CPU_BITS	(32 - _Q_TAIL_CPU_OFFSET)
75#define _Q_TAIL_CPU_MASK	_Q_SET_MASK(TAIL_CPU)
76
77#define _Q_TAIL_OFFSET		_Q_TAIL_IDX_OFFSET
78#define _Q_TAIL_MASK		(_Q_TAIL_IDX_MASK | _Q_TAIL_CPU_MASK)
79
80#define _Q_LOCKED_VAL		(1U << _Q_LOCKED_OFFSET)
81#define _Q_PENDING_VAL		(1U << _Q_PENDING_OFFSET)

内存排布如下,小端序

大端序

  • 其中,locked只有1个字节,代表是否上锁,只有0和1两个值
  • tail中还有
    • 16-17代表tail index,代表一个cpu最多同时获取4个spin lock
    • 18-31代表tail cpu,针对cpu数目小于 $2^{14} = 16384$ 个的情况

三、相关操作及函数 #

1. native_queued_spin_lock_slowpath 上锁 #

  1// kernel/locking/qspinlock.c
  2#ifdef CONFIG_PARAVIRT_SPINLOCKS
  3#define queued_spin_lock_slowpath	native_queued_spin_lock_slowpath
  4#endif
  5...
  6/**
  7 * queued_fetch_set_pending_acquire - fetch the whole lock value and set pending
  8 * @lock : Pointer to queued spinlock structure
  9 * Return: The previous lock value
 10 *
 11 * *,*,* -> *,1,*
 12 */
 13#ifndef queued_fetch_set_pending_acquire
 14static __always_inline u32 queued_fetch_set_pending_acquire(struct qspinlock *lock)
 15{
 16	return atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);
 17}
 18#endif
 19...
 20/**
 21 * queued_spin_lock_slowpath - acquire the queued spinlock
 22 * @lock: Pointer to queued spinlock structure
 23 * @val: Current value of the queued spinlock 32-bit word
 24 *
 25 * (queue tail, pending bit, lock value)
 26 *
 27 *              fast     :    slow                                  :    unlock
 28 *                       :                                          :
 29 * uncontended  (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0)
 30 *                       :       | ^--------.------.             /  :
 31 *                       :       v           \      \            |  :
 32 * pending               :    (0,1,1) +--> (0,1,0)   \           |  :
 33 *                       :       | ^--'              |           |  :
 34 *                       :       v                   |           |  :
 35 * uncontended           :    (n,x,y) +--> (n,0,0) --'           |  :
 36 *   queue               :       | ^--'                          |  :
 37 *                       :       v                               |  :
 38 * contended             :    (*,x,y) +--> (*,0,0) ---> (*,0,1) -'  :
 39 *   queue               :         ^--'                             :
 40 */
 41void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 42{
 43	struct mcs_spinlock *prev, *next, *node;
 44	u32 old, tail;
 45	int idx;
 46
 47    // 确保cpu数量不超过自旋锁支持最大数量
 48	BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
 49
 50	if (pv_enabled())
 51		goto pv_queue;
 52
 53	if (virt_spin_lock(lock))
 54		return;
 55
 56	/*
 57	 * Wait for in-progress pending->locked hand-overs with a bounded
 58	 * number of spins so that we guarantee forward progress.
 59	 *
 60	 * 0,1,0 -> 0,0,1
 61	 */
 62	if (val == _Q_PENDING_VAL) {
 63		int cnt = _Q_PENDING_LOOPS;
 64		val = atomic_cond_read_relaxed(&lock->val,
 65					       (VAL != _Q_PENDING_VAL) || !cnt--);
 66	}
 67
 68	/*
 69	 * If we observe any contention; queue.
 70	 */
 71    // 这里发现pending或等待队列有值,说明不止我们在等,且第一顺位有cpu在等
 72    // 我们属于第二顺位之后的,乖乖到后面排队
 73	if (val & ~_Q_LOCKED_MASK)
 74		goto queue;
 75
 76	/*
 77	 * trylock || pending
 78	 *
 79	 * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
 80	 */
 81    // 设置pending为1,返回是设置前的值
 82	val = queued_fetch_set_pending_acquire(lock);
 83
 84	/*
 85	 * If we observe contention, there is a concurrent locker.
 86	 *
 87	 * Undo and queue; our setting of PENDING might have made the
 88	 * n,0,0 -> 0,0,0 transition fail and it will now be waiting
 89	 * on @next to become !NULL.
 90	 */
 91    // 这里发现我们设置pending前,pending或等待队列有值,说明在上面那句话执行过程中,有cpu比我们更优先设置了pending
 92    // 这里算是二次确认,说明我们比别人慢,不属于第一顺位,乖乖到后面排队
 93	if (unlikely(val & ~_Q_LOCKED_MASK)) {
 94
 95		/* Undo PENDING if we set it. */
 96        // 如果设置pending前没有pending,说明pending是我们自己设置的,将pending清空
 97        // 我们设置的pending,还走到这个位置,说明第一顺位没人,但是第二顺位有人,我们放开pending,排后面去
 98		if (!(val & _Q_PENDING_MASK))
 99			clear_pending(lock);
100
101        // 进入到排队状态
102		goto queue;
103	}
104
105	/*
106	 * We're pending, wait for the owner to go away.
107	 *
108	 * 0,1,1 -> 0,1,0
109	 *
110	 * this wait loop must be a load-acquire such that we match the
111	 * store-release that clears the locked bit and create lock
112	 * sequentiality; this is because not all
113	 * clear_pending_set_locked() implementations imply full
114	 * barriers.
115	 */
116    // 我们设置pending,其他cpu没有申请,说明我们是第一顺位
117    // 只需要等待锁被释放就可以了
118	if (val & _Q_LOCKED_MASK)
119		atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));
120
121	/*
122	 * take ownership and clear the pending bit.
123	 *
124	 * 0,1,0 -> 0,0,1
125	 */
126    // 设置上锁,清理我们设置的pending
127	clear_pending_set_locked(lock);
128	lockevent_inc(lock_pending);
129	return;
130
131	/*
132	 * End of pending bit optimistic spinning and beginning of MCS
133	 * queuing.
134	 */
135queue:
136	lockevent_inc(lock_slowpath);
137pv_queue:
138	node = this_cpu_ptr(&qnodes[0].mcs);
139	idx = node->count++;
140	tail = encode_tail(smp_processor_id(), idx);
141
142	trace_contention_begin(lock, LCB_F_SPIN);
143
144	/*
145	 * 4 nodes are allocated based on the assumption that there will
146	 * not be nested NMIs taking spinlocks. That may not be true in
147	 * some architectures even though the chance of needing more than
148	 * 4 nodes will still be extremely unlikely. When that happens,
149	 * we fall back to spinning on the lock directly without using
150	 * any MCS node. This is not the most elegant solution, but is
151	 * simple enough.
152	 */
153	if (unlikely(idx >= MAX_NODES)) {
154		lockevent_inc(lock_no_node);
155		while (!queued_spin_trylock(lock))
156			cpu_relax();
157		goto release;
158	}
159
160	node = grab_mcs_node(node, idx);
161
162	/*
163	 * Keep counts of non-zero index values:
164	 */
165	lockevent_cond_inc(lock_use_node2 + idx - 1, idx);
166
167	/*
168	 * Ensure that we increment the head node->count before initialising
169	 * the actual node. If the compiler is kind enough to reorder these
170	 * stores, then an IRQ could overwrite our assignments.
171	 */
172	barrier();
173
174	node->locked = 0;
175	node->next = NULL;
176	pv_init_node(node);
177
178	/*
179	 * We touched a (possibly) cold cacheline in the per-cpu queue node;
180	 * attempt the trylock once more in the hope someone let go while we
181	 * weren't watching.
182	 */
183	if (queued_spin_trylock(lock))
184		goto release;
185
186	/*
187	 * Ensure that the initialisation of @node is complete before we
188	 * publish the updated tail via xchg_tail() and potentially link
189	 * @node into the waitqueue via WRITE_ONCE(prev->next, node) below.
190	 */
191	smp_wmb();
192
193	/*
194	 * Publish the updated tail.
195	 * We have already touched the queueing cacheline; don't bother with
196	 * pending stuff.
197	 *
198	 * p,*,* -> n,*,*
199	 */
200	old = xchg_tail(lock, tail);
201	next = NULL;
202
203	/*
204	 * if there was a previous node; link it and wait until reaching the
205	 * head of the waitqueue.
206	 */
207	if (old & _Q_TAIL_MASK) {
208		prev = decode_tail(old);
209
210		/* Link @node into the waitqueue. */
211		WRITE_ONCE(prev->next, node);
212
213		pv_wait_node(node, prev);
214		arch_mcs_spin_lock_contended(&node->locked);
215
216		/*
217		 * While waiting for the MCS lock, the next pointer may have
218		 * been set by another lock waiter. We optimistically load
219		 * the next pointer & prefetch the cacheline for writing
220		 * to reduce latency in the upcoming MCS unlock operation.
221		 */
222		next = READ_ONCE(node->next);
223		if (next)
224			prefetchw(next);
225	}
226
227	/*
228	 * we're at the head of the waitqueue, wait for the owner & pending to
229	 * go away.
230	 *
231	 * *,x,y -> *,0,0
232	 *
233	 * this wait loop must use a load-acquire such that we match the
234	 * store-release that clears the locked bit and create lock
235	 * sequentiality; this is because the set_locked() function below
236	 * does not imply a full barrier.
237	 *
238	 * The PV pv_wait_head_or_lock function, if active, will acquire
239	 * the lock and return a non-zero value. So we have to skip the
240	 * atomic_cond_read_acquire() call. As the next PV queue head hasn't
241	 * been designated yet, there is no way for the locked value to become
242	 * _Q_SLOW_VAL. So both the set_locked() and the
243	 * atomic_cmpxchg_relaxed() calls will be safe.
244	 *
245	 * If PV isn't active, 0 will be returned instead.
246	 *
247	 */
248	if ((val = pv_wait_head_or_lock(lock, node)))
249		goto locked;
250
251	val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));
252
253locked:
254	/*
255	 * claim the lock:
256	 *
257	 * n,0,0 -> 0,0,1 : lock, uncontended
258	 * *,*,0 -> *,*,1 : lock, contended
259	 *
260	 * If the queue head is the only one in the queue (lock value == tail)
261	 * and nobody is pending, clear the tail code and grab the lock.
262	 * Otherwise, we only need to grab the lock.
263	 */
264
265	/*
266	 * In the PV case we might already have _Q_LOCKED_VAL set, because
267	 * of lock stealing; therefore we must also allow:
268	 *
269	 * n,0,1 -> 0,0,1
270	 *
271	 * Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the
272	 *       above wait condition, therefore any concurrent setting of
273	 *       PENDING will make the uncontended transition fail.
274	 */
275	if ((val & _Q_TAIL_MASK) == tail) {
276		if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
277			goto release; /* No contention */
278	}
279
280	/*
281	 * Either somebody is queued behind us or _Q_PENDING_VAL got set
282	 * which will then detect the remaining tail and queue behind us
283	 * ensuring we'll see a @next.
284	 */
285	set_locked(lock);
286
287	/*
288	 * contended path; wait for next if not observed yet, release.
289	 */
290	if (!next)
291		next = smp_cond_load_relaxed(&node->next, (VAL));
292
293	arch_mcs_spin_unlock_contended(&next->locked);
294	pv_kick_node(lock, next);
295
296release:
297	trace_contention_end(lock, 0);
298
299	/*
300	 * release the node
301	 */
302	__this_cpu_dec(qnodes[0].mcs.count);
303}
304EXPORT_SYMBOL(queued_spin_lock_slowpath);

分段对此代码进行分析

1.1. 第一个cpu或第一顺位cpu获取锁的过程 #

 1void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 2{
 3	struct mcs_spinlock *prev, *next, *node;
 4	u32 old, tail;
 5	int idx;
 6    ...
 7	/*
 8	 * If we observe any contention; queue.
 9	 */
10    // 这里发现pending或等待队列有值,说明不止我们在等,且第一顺位有cpu在等
11    // 我们属于第二顺位之后的,乖乖到后面排队
12	if (val & ~_Q_LOCKED_MASK)
13		goto queue;
14
15	/*
16	 * trylock || pending
17	 *
18	 * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
19	 */
20    // 设置pending为1,返回是设置前的值
21	val = queued_fetch_set_pending_acquire(lock);
22
23	/*
24	 * If we observe contention, there is a concurrent locker.
25	 *
26	 * Undo and queue; our setting of PENDING might have made the
27	 * n,0,0 -> 0,0,0 transition fail and it will now be waiting
28	 * on @next to become !NULL.
29	 */
30    // 这里发现我们设置pending前,pending或等待队列有值,说明在上面那句话执行过程中,有cpu比我们更优先设置了pending
31    // 这里算是二次确认,说明我们比别人慢,不属于第一顺位,乖乖到后面排队
32	if (unlikely(val & ~_Q_LOCKED_MASK)) {
33
34		/* Undo PENDING if we set it. */
35        // 如果设置pending前没有pending,说明pending是我们自己设置的,将pending清空
36        // 我们设置的pending,还走到这个位置,说明第一顺位没人,但是第二顺位有人,我们放开pending,排后面去
37		if (!(val & _Q_PENDING_MASK))
38			clear_pending(lock);
39
40        // 进入到排队状态
41		goto queue;
42	}
43
44	/*
45	 * We're pending, wait for the owner to go away.
46	 *
47	 * 0,1,1 -> 0,1,0
48	 *
49	 * this wait loop must be a load-acquire such that we match the
50	 * store-release that clears the locked bit and create lock
51	 * sequentiality; this is because not all
52	 * clear_pending_set_locked() implementations imply full
53	 * barriers.
54	 */
55    // 我们设置pending,其他cpu没有申请,说明我们是第一顺位
56    // 只需要等待锁被释放就可以了
57	if (val & _Q_LOCKED_MASK)
58		atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));
59
60	/*
61	 * take ownership and clear the pending bit.
62	 *
63	 * 0,1,0 -> 0,0,1
64	 */
65    // 设置上锁,清理我们设置的pending
66	clear_pending_set_locked(lock);
67	lockevent_inc(lock_pending);
68	return;
69    ...
70}

1.2. 第二顺位及以后的cpu获取锁过程 #