自旋锁
概念
自旋锁是一种非阻塞的线程同步机制,它采用忙等待的方式,通过不停地循环检查锁的状态来实现对临界区的访问控制。当锁没有被其他线程占有的情况下,当前线程可以获得锁并进入临界区;否则它将在循环中等待其他线程释放锁。
- 非阻塞:为什么是非阻塞?是因为当一个线程没有获取到自旋锁的时候,操作系统并不会将它变为挂起状态,相反,该线程依然会保持活跃状态,并且不断检查锁是否变为可用状态。所以,
区分阻塞和非阻塞的关键就是看线程是否处于挂起状态。在自旋锁的场景下,尽管线程一直处于某种运行状态,虽然这种运行状态没有进行实质性计算等操作,仅仅是不断的检查锁是否可用,但是这也是一种运行状态,只要处于运行状态,那就是非阻塞。 - 忙等待:理解了非阻塞,那么忙等待也就容易理解了。当一个线程试图获得自旋锁,但锁已经被其他线程占用时,该线程会在一个循环中不断地检查锁是否已经被释放。线程在
等待过程中保持忙碌状态,不断执行循环检查操作,而不是挂起等待操作系统的调度。
实现思路
理解了自旋锁的概念,那么实现思路也就明朗了。思路是:
- 定义一个全局变量,初始化为 1,表示自旋锁存在。
- 加锁(获取锁)思路:在 lock()函数中,定义一个局部变量初始化为 0。然后使用原子交换指令去获取全局变量的值,如果发现全局变量的值是 1,表示自旋锁存在,那么就让局部变量的值和全局变量的值做一个交换。交换完成后,全局变量的值为 0,表示当前系统没有自旋锁。
- 解锁(释放锁)思路:在 unlock 函数当中,定义一个局部变量,初始化为 1。然后直接将全局变量的值和这个局部变量的值进行交换。交换完成后,全局变量的值为 1,表示当前系统有自旋锁。
原子指令
总线锁
在多核 CPU 处理器环境中,CPU 提供了在指令执行期间对总线加锁的手段。CPU 芯片上有一条引线 LOCK,如果汇编语言的程序中在一条指令前面加上前缀"LOCK",经过汇编以后的机器代码就会让 CPU 在执行这条指令的时候把引线 LOCK 的电位拉低,持续到这条指令结束时放开,从而把总线锁住。这样同一总线上别的 CPU 就暂时不能通过总线访问内存了,保证了这条指令在多处理器环境中的原子性。
总线锁这种做法锁定的范围太大了,导致 CPU 利用率急剧下降,因为使用 LOCK 是把 CPU 和内存之间的通信锁住了,这使得锁定期间其他处理器不能操作其内存地址的数据,所以总线锁的开销比较大。
缓存锁
随着处理器技术的发展,尤其是从 P6 系列处理器开始,引入了缓存锁的概念来优化多处理器间的同步操作。当访问的内存区域已经被缓存在某个处理器的缓存行中,并且通过LOCK前缀的指令进行操作时,不会锁定总线。相反,处理器会锁定该缓存行,防止其他处理器缓存同一内存地址的数据。在缓存行被修改后,通过缓存一致性协议保证修改的原子性,这样可以大大减少锁的开销,因为不需要锁定整个总线,仅在缓存级别上实现锁定。
lock xchg 指令
在多核 CPU 处理器环境中,假设某个线程持有自旋锁,那么运行该线程的 CPU 缓存中会缓存局部变量的值。
按照前面的自旋锁实现思路,数据交换的过程的本质上是 CPU 缓存中的值(局部变量)和内存(全局变量)进行数据交换的过程,如果在交换的过程中出现了 CPU 中断(交换指令不是原子的),那么当中断恢复前,可能会出现其它线程的 CPU 缓存(局部变量)和内存(全局变量)进行数据交换,当中断恢复后,很有可能内存中的值(全局变量)被其它线程修改,导致死锁或者出现多个自旋锁,从而导致自旋锁不可用。所以,原子性的数据交换将是自旋锁能否正确运行使用的关键因素。
lock 是一个指令前缀,lock 会使紧跟在其后面的指令变成原子指令,所以lock xchg就是我们需要的原子交换指令。
lock xchg指令使用缓存锁机制,在一个线程执行如果执行lock xchg指令时,缓存锁会禁止其它 CPU 缓存和内存之间进行数据交换,当lock xchg指令执行完成后,通过缓存一致性协议(如 MESI 协议)保证内存数据和其它 CPU 缓存数据的一致性。
实现代码(x86 架构)
#ifndef SPIN_LOCK#define SPIN_LOCKint global = 1;int lock_exchange(int *addr, int num){ int result; __asm__ __volatile__( "lock xchg %0,%1\n" : "+m"(*addr), "=a"(result) : "1"(num) : "memory"); return result;}int lock(){ while (1) { if (lock_exchange(&global, 0)) break; }}int unlock(){ lock_exchange(&global, 1);}#endif直接硬看内联汇编的语法可以有点难以理解,代码理解如下:
-
lock_exchange()的作用是把 num 的值和内存地址 addr 当中的值进行交换。
-
lock xchg 表示原子交换指令,后面的
%0和%1是两个占位符,占了谁的位置?占了输出(%0)和输入(%1)参数的位置。 -
输出操作数:
下面一行表示输出列表,这也意味着*addr 和 result 都是这段内联汇编代码需要输出的变量。这里
+代表这个操作数可读可写,m代表操作数是内存地址;后面的=表示这是一个输出操作数,a指示使用 eax 寄存器。综上所述,是为了告诉汇编器,这个*addr 是一个内存地址,这个内存地址里面的值可读可写;result 是一个输出操作数,这个操作数的值被保存到 eax 寄存器中。 -
输入操作数:
再下面一行表示输入列表,这意味着 num 就是这段汇编的输入。
-
被修改的资源:
"memory"告诉编译器这段汇编代码会影响内存,这样编译器在优化代码时会考虑到内存的状态可能被改变。这是一种保守的方式,用于保证内存的可见性和一致性。
指令执行流程:
- 将 num 的值放入 eax 寄存器。
- 执行 lock xchg 指令,原子地将 eax(即 num 的值)和*addr 指向的值交换。
- 将 eax 寄存器(现在存放着原*addr 的值)保存到 result 变量。
- 函数返回 result,即交换前*addr 的值。
测试
#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include <sys/types.h>#include "spinlock.h"volatile long cnt = 0;void *thread(void *sum){ long i, value = *((long *)sum); for (i = 0; i < value; i++) { lock(); cnt++; unlock(); } return NULL;}int main(int argc, char **argv){ const int threadNum = 5; pthread_t tid[threadNum]; long value; if (argc != 2) { printf("usage: %s <parameter>\n", argv[0]); exit(1); } value = atol(argv[1]); for (int i = 0; i < threadNum; i++) { if (pthread_create(&tid[i], NULL, thread, &value) != 0) { perror("pthread_create failed"); exit(1); } } for (int i = 0; i < threadNum; i++) { if (pthread_join(tid[i], NULL) != 0) { perror("pthread_join failed"); exit(1); } } printf("%d\n", cnt); exit(0);}这段代码创建了 5 个线程,这 5 个线程都需要将全局变量 cnt 累加到某一个值,为了保护这 5 个线程共享的全局变量 cnt,这里使用了自旋锁。测试结果如下:
[root@centos OS_Homework]# while true; do time ./spin_sum 1000000; done5000000real 0m0.826suser 0m3.367ssys 0m0.006s5000000real 0m0.835suser 0m3.359ssys 0m0.001s5000000real 0m0.869suser 0m3.526ssys 0m0.000s5000000real 0m0.807suser 0m3.284ssys 0m0.001s5000000real 0m0.866suser 0m3.541ssys 0m0.002s互斥锁
前面提到的自旋锁一定是非阻塞机制,那么互斥锁是不是就是使用的阻塞机制呢?或者说如果一个线程获取不到互斥锁是否就一定会从运行状态变为挂起状态呢?其实线程在得不到互斥锁的时候还真不一定会变成阻塞状态,这主要取决于互斥锁的实现方式。在大多数互斥锁实现中,有两种基本方式来处理锁的获取:阻塞方式和非阻塞方式。
阻塞方式
当线程以阻塞方式请求互斥锁时,如果锁已经被另一个线程持有,请求锁的线程会被挂起(阻塞),直到锁变得可用。这意味着线程会停止执行,等待直到它能获取到锁。这是最常见的方式,因为它简化了线程间的同步。例如,在 POSIX 线程库中,pthread_mutex_lock()函数就会阻塞调用线程,直到它能够获取锁为止。当一个线程释放互斥锁的时候,会唤醒所有等待该互斥锁的线程,让这些线程来争夺互斥锁。
非阻塞方式
非阻塞方式通常是指尝试锁(try-lock)机制。在这种模式下,线程尝试获取互斥锁;如果锁已被另一个线程持有,它不会阻塞,而是立即返回一个错误码或状态,表明锁当前不可用。这允许线程决定是否执行其他任务或再次尝试获取锁。在 pthread 库中,pthread_mutex_trylock()函数提供了这样的非阻塞尝试锁定互斥锁的能力。
如何选择
选择使用阻塞还是非阻塞方式获取互斥锁取决于具体的应用场景:
- 阻塞方式适用于线程必须等待并获取锁才能继续执行的情况,这简化了线程间的协调,但可能导致线程等待时间较长。
- 非阻塞方式(尝试锁)适用于线程在不能立即获取锁时可以执行其他任务的情况,这增加了编程的复杂度,但提高了程序的响应性和效率,尤其是在高并发环境下。
代码实现(x86 架构)
下面使用 try-lock 机制来完成互斥锁的实现。(PS:主要是个人感觉阻塞式的互斥锁要考虑线程唤醒等机制,太过复杂)
#ifndef MUTEX_LOCK#define MUTEX_LOCKint global = 1;int lock_exchange(int *addr, int num){ int result; __asm__ __volatile__( "lock xchg %0,%1\n" : "+m"(*addr), "=a"(result) : "1"(num) : "memory"); return result;}int mutex_trylock() { return lock_exchange(&global, 0) == 1 ? 1 : 0;}void mutex_unlock() { lock_exchange(&global, 1);}#endif测试
#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include <pthread.h>#include <sys/types.h>#include "mutex.h"volatile long cnt = 0;void* thread(void* sum) { long i, value = *((long *)sum); for (i = 0; i < value; i++) { while (mutex_trylock() != 1) { usleep(100); // 短暂休眠100微秒 } cnt++; mutex_unlock(); } return NULL;}int main(int argc, char **argv){ const int threadNum = 5; pthread_t tid[threadNum]; long value; if (argc != 2) { printf("usage: %s <parameter>\n", argv[0]); exit(1); } value = atol(argv[1]); for (int i = 0; i < threadNum; i++) { if (pthread_create(&tid[i], NULL, thread, &value) != 0) { perror("pthread_create failed"); exit(1); } } for (int i = 0; i < threadNum; i++) { if (pthread_join(tid[i], NULL) != 0) { perror("pthread_join failed"); exit(1); } } printf("%d\n", cnt); exit(0);}这段代码创建了 5 个线程,这 5 个线程都需要将全局变量 cnt 累加到某一个值,为了保护这 5 个线程共享的全局变量 cnt,这里使用了互斥锁。测试结果如下:
[root@centos OS_Homework]# while true; do time ./mutex_sum 1000000; done5000000real 0m0.070suser 0m0.079ssys 0m0.001s5000000real 0m0.070suser 0m0.080ssys 0m0.001s5000000real 0m0.070suser 0m0.081ssys 0m0.001s5000000real 0m0.071suser 0m0.081ssys 0m0.001s5000000real 0m0.070suser 0m0.080ssys 0m0.000s总结
测试结果汇总
下面是自旋锁和互斥锁测试结果的对比:
| 自旋锁 | 互斥锁 | |
|---|---|---|
| real | 0.87 秒 | 0.07 秒 |
| user | 3.37 秒 | 0.08 秒 |
| sys | 0.001 秒 | 0.002 秒 |
- real:是从开始运行程序到程序结束的总时间。它包括程序执行的所有时间,即使 CPU 部分时间是空闲的或者在执行其他程序。
- user:是程序执行过程中在用户模式下消耗的 CPU 时间总和。用户模式下执行的是程序代码,而不是内核代码。如果程序是多线程的,并且在多个 CPU 上运行,用户时间可以超过实际时间(real)。
- sys:是程序执行过程中在内核模式下消耗的 CPU 时间。当程序进行如读写文件、请求网络资源或执行其他系统调用需要内核介入时,会记录为系统时间。
比较
-
自旋锁:- real:0.87 秒,说明程序总的执行时间较长。
- user:3.37 秒,远大于实际时间,这意味着有多核 CPU 参与计算。并且在自旋锁的情况下,这也表明线程在尝试获取锁时大量占用 CPU 时间进行忙等待,即使它们实际上并没有做任何有用的工作。
- sys:0.001 秒,非常短的系统时间表明几乎没有系统调用的参与,这可能是因为自旋锁的获取和释放通常是通过 CPU 指令实现的,几乎不涉及系统调用。
-
互斥锁(try-lock 机制):- real:0.07 秒,实际执行时间非常短,远低于自旋锁的实际时间。这表明互斥锁(尤其是采用 try-lock 机制)在这个测试中效率非常高,可能是因为线程在不能立即获得锁时没有进行忙等待,而是做了其他有用的工作或者被挂起。
- user:0.08 秒,与实际时间几乎一致,表明 CPU 资源被有效利用,几乎没有浪费在等待锁的获取上。这个时间接近于实际时间也说明了在这段时间内几乎所有的时间 CPU 都在执行用户代码。
- sys:0.002 秒,仍然很短,但略高于自旋锁的系统时间,可能是因为使用了 try-lock 机制,在获取不到锁的时候使用了 usleep 系统调用来等待锁的状态。