并发

Intro

Thread: 并行最小单位

和进程的区别

image-20241221230956042

线程和进程最大的区别是,线程之间可以共享地址空间,因此线程的上下文切换不需要切换页表,但是每个线程都有独立的执行栈,它们分散在整个地址空间中,任何有关线程执行相关的信息都存在线程的执行栈——线程本地存储(TLS, Thread Local Storage)中。

  1. 进程是容器:一个进程可以包含一个或多个线程。线程依赖于进程,不能独立存在。
  2. 线程属于进程:线程是进程的一部分,所有线程共享该进程的资源,如代码段、数据段和打开的文件等。
  3. 进程管理资源,线程执行任务:进程管理资源和环境,而线程负责实际计算和操作。

Why Threads?

事实证明,您应该使用线程至少有两个主要原因。

第一个很简单:==并行性==。想象一下,您正在编写一个对非常大的数组执行操作的程序,例如,将两个大数组相加,或者将数组中每个元素的值增加一定量。如果仅在单个处理器上运行,则任务很简单:只需执行每个操作即可完成。 如果在具有多个处理器的系统上执行程序,则可以通过使用每个处理器执行一部分工作来显著加快此过程。将标准单线程程序转换为在多个 CPU 上执行此类工作的程序的任务称为并行化,并且使用每个 CPU 的线程来执行此工作是使程序运行的自然而典型的方法在现代硬件上速度更快。

第二个原因有点微妙:避免由于 I/O 缓慢而**==阻塞程序进度==**。想象一下,您正在编写一个执行不同类型 I/O 的程序:等待发送或接收消息、等待显式磁盘 I/O 完成,甚至(隐式)等待页面错误完成。您的程序可能不想等待,而是希望做其他事情,包括利用 CPU 执行计算,甚至发出进一步的 I/O 请求。使用线程是避免阻塞的自然方法;当程序中的一个线程等待时(即被阻塞等待 I/O),CPU 调度程序可以切换到其他线程,这些线程已准备好运行并执行一些有用的操作。线程允许 I/O 与单个程序中的其他活动重叠,就像多道程序设计对跨程序的进程所做的那样;因此,许多现代基于服务器的应用程序(Web 服务器、数据库管理系统等)在其实现中都使用了线程。

当然,在上述任何一种情况下,您都可以使用多个进程而不是线程。然而,**==线程共享地址空间==,因此可以轻松共享数据,因此是构建这些类型的程序时的自然选择,==线程更加轻量==**,切换成本没有那么高。对于逻辑上独立的任务来说,进程是一个更合理的选择,因为这些任务几乎不需要共享内存中的数据结构。

问题

一、不可控调度引发的问题:

  1. 临界区(critical section): 多个任务共享的一片区域
  2. 竞态条件(race condition): 多个任务几乎同时读取一片区域,并做了修改,结果不符合预期
  3. 不确定性(indeterminate):多个竞态条件组成程序,导致结果不确定
  4. 解决方案:注重原子性
    • 线程使用互斥(mutex exclusion)原语,保证同时只有一个任务进入临界区修改,避免竞态
    • 原语:若干条指令组成的程序段,用来实现某个特定功能,在执行过程中不可被中断
    • 操作系统中,某些被进程调用的操作,如队列操作、对信号量的操作、检查启动外设操作等,一旦开始执行,就不能被中断,否则就会出现操作错误,造成系统混乱。

二、任务之间应该如何唤醒对方?

POSIX Thread API

Structured (“fork-join”) Parallelism

Compile and Run: -pthread

在链接行上,您还必须通过添加 -pthread 标志来显式动态链接 pthreads 库。

prompt> gcc -o thread thread.c -Wall -pthread

prompt> gcc thread.c -o thread -lpthread

并且要在源码中加入 pthread.h

Creation: pthread_create()

create:

1
2
3
4
5
#include <pthread.h>
int pthread_create(pthread_t *thread,
const pthread_attr_t *attr,
void *(*start_routine)(void*),
void *arg);
  • 参数说明
    • thread 是一个 pthread_t 类型的指针,也就是待初始化的线程指针
    • attr 用来配置这个线程的属性,比如栈大小,线程调度优先级,默认可以为 NULL
    • start_routine 是函数指针,,前面是返回值类型,后面是参数类型及个数
    • void * 可以代表任何类型的参数/返回值, argstart_routine 参数类型一致
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <stdio.h>
#include <pthread.h>

typedef struct {
int a;
int b;
} myarg_t;

void *mythread(void *arg) {
myarg_t *args = (myarg_t *) arg;
printf("%d %d\n", args->a, args->b);
return NULL;
}

int main(int argc, char *argv[]) {
pthread_t p;
myarg_t args = { 10, 20 };

int rc = pthread_create(&p, NULL, mythread, &args);
...
}

我们只是创建一个传递两个参数的线程,并将其打包成我们自己定义的单个类型(myarg t

线程一旦创建,就可以简单地将其参数转换为它期望的类型,从而根据需要解压参数。

创建线程后,您实际上拥有另一个实时的执行实体,具有自己的调用堆栈,与程序中所有当前现有线程在同一地址空间中运行。

Completion: pthread_join()

join:

1
int pthread_join(pthread_t thread, void **value_ptr);
  • 参数说明
    • thread 用于指定要等待哪个线程。该变量由线程创建例程初始化(当您将指向它的指针作为参数传递给 pthread_create() 时);如果保留,则可以使用它等待线程终止。
    • value_ptr 是指向 指向期望返回值的指针 的二级指针,因为还没返回,只能传进去一个指针变量,join修改指针就需要传二级指针

Usage:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <stdio.h>
#include <pthread.h>
typedef struct { int a; int b; } myarg_t;
typedef struct { int x; int y; } myret_t;

void *mythread(void *arg) {
// 这里用的是包装函数,首字母大写,用来应对可能发生的异常
myret_t *rvals = Malloc(sizeof(myret_t));
rvals->x = 1;
rvals->y = 2;
return (void *) rvals;
}

int main(int argc, char *argv[]) {
pthread_t p;
myret_t *rvals;
myarg_t args = { 10, 20 };
// 这里用的是包装函数,首字母大写,用来应对可能发生的异常
Pthread_create(&p, NULL, mythread, &args);
Pthread_join(p, (void **) &rvals);
printf("returned %d %d\n", rvals->x, rvals->y);
free(rvals);
return 0;
}

在代码中,再次创建单个线程,并通过 myarg_t 结构传递几个参数。要返回值,请使用 myret_t 类型。一旦线程完成运行,一直在 pthread_join() 例程1内等待的主线程就会返回,我们可以访问从线程返回的值,即 myret_t 中的任何内容。

关于这个例子有几点需要注意:

  1. 首先,很多时候我们不必进行所有这些痛苦的参数打包和拆包。例如,如果我们只是创建一个不带参数的线程,则可以在创建线程时将 NULL 作为参数传入。类似地,如果我们不关心返回值,我们可以将 NULL 传递给 pthread_join()

  2. ```c
    void *mythread(void *arg) {

    long long int value = (long long int) arg;
    printf("%lld\n", value);
    return (void *) (value + 1);
    

    }
    int main(int argc, char *argv[]) {

    pthread_t p;
    long long int rvalue;
    Pthread_create(&p, NULL, mythread, (void *) 100);
    Pthread_join(p, (void **) &rvalue);
    printf("returned %lld\n", rvalue);
    return 0;
    

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    如上图,如果我们只是传递单个值(例如,`long long int`),则不必将其打包为参数。在这种情况下,我们不必将参数和返回值打包在结构体内部。

    3. ```c
    void *mythread(void *arg) {
    myarg_t *args = (myarg_t *) arg;
    printf("%d %d\n", args->a, args->b);
    myret_t oops; // ALLOCATED ON STACK: BAD!
    oops.x = 1;
    oops.y = 2;
    return (void *) &oops;
    }

    如上图,必须非常小心线程的返回值,永远不要**返回指向线程栈上分配的内容的指针**,因为栈随着函数返回,空间也会自动释放,oops返回的指针指向的是一片不确定区域。

  3. 这种 fork-join 式的多线程编程方式是较为普遍的结构化编程方法。但我们应该注意,并非所有多线程代码都使用 join 例程。例如,多线程 Web 服务器可能会创建多个工作线程,然后使用主线程无限期地接受请求并将它们传递给工作线程。因此,此类长期计划可能不需要加入。然而,创建线程来执行特定任务(并行)的并行程序可能会使用 join 来确保所有此类工作在退出或进入下一个计算阶段之前完成。

Mutex: lock() unlock()

1
2
3
4
pthread_mutex_t lock;  // lock is here
pthread_mutex_lock(&lock); // LOCK
... // critical section
pthread_mutex_unlock(&lock);// UNLOCK

整体结构如上,但是缺乏正确的初始化和各种细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;// 宏,设置为默认的值

int rc = pthread_mutex_init(&lock, NULL);
assert(rc == 0); // always check success!
Pthread_mutex_lock(lock);
...
Pthread_mutex_unlock(lock);

// Keeps code clean; only use if exit() OK upon failure 包装
void Pthread_mutex_lock(pthread_mutex_t *mutex) {
int rc = pthread_mutex_lock(mutex);
assert(rc == 0);
}
  • int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t):
    • pthread_mutexattr_t: 具体的参数设置,可以使用 NULL 作为缺省选项
  • int pthread_mutex_destroy(pthread_mutex_t *mutex):
    • 销毁一个已经初始化但未上锁的互斥锁是安全的。
    • 使用完锁资源需要释放 (RAII 的思想) ,只能是被释放,不再被争抢,不再被需要时才可以

改进:增加适当的检测错误机制,健壮的程序需要能够应对调用失败的情况(断言)

还有其他与锁交互的例程

  • int pthread_mutex_trylock(pthread_mutex_t *mutex) 只尝试一次,non-blocking
  • int pthread_mutex_timedlock(pthread_mutex_t *mutex, timespec *tsptr) 尝试一段时间,如果等一段时间获取不到锁就直接返回 ETIMEOUT

Condition Variables: wait() signal()

Thread Interaction

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)

  • 使调用的线程休眠,等待唤醒(通常是在程序中的某些内容发生更改,使某个条件发生了变化,而使得 wait 处于的 while 循环条件发生了变化)
  • 您可能会注意到等待调用将 mutex 作为其第二个参数,而 signal 不需要。造成这种差异的原因是 wait 调用除了使调用线程进入睡眠状态之外,还使调用者进入睡眠状态时释放锁。
    • 进入 wait 状态就会自动 release mutex。当其他线程通过pthread_cond_signal()pthread_cond_broadcast,把该线程唤醒,之后需要重新获取 mutex 来进行之后的操作
    • 1.将线程加入等待队列 2.将线程持有的锁先释放 这两个步骤必须是原子的
  • 被唤醒之后,从wait返回之前,还应该重新获取锁,防止竞态条件的发生

int pthread_cond_signal(pthread_cond_t *cond)

  • 一旦条件满足,**pthread_cond_signal** 函数可以被用来唤醒至少一个等待该条件(ready ==0)的线程,如果有多个线程阻塞在条件变量上,它们被唤醒的顺序由调度策略决定。
  • 如果没有线程在条件变量上阻塞,调用 pthread_cond_signal ==将不会有任何效果==。
  • 在实际应用中,**pthread_cond_signal** 通常用于生产者-消费者问题,其中生产者在添加了新项目后会通知消费者线程。此外,它也用于实现读写锁,以及在两阶段提交算法中通知所有客户端即将提交事务。
  • pthread_cond_broadcast() 用于唤醒当前全部等待的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int ready = 0;


// 符合特定条件,进入睡眠
void thread_function(void *arg) {
pthread_mutex_lock(&mutex);

while (!ready) {//循环检查条件
pthread_cond_wait(&cond, &mutex);
}
// 执行当条件满足时的操作
execute_task();
pthread_mutex_unlock(&mutex);
}
/***************************************************/
// 改变条件之后,唤起正在等待的线程
void signal_condition() {
pthread_mutex_lock(&mutex);

ready = 1;
pthread_cond_signal(&cond);

pthread_mutex_unlock(&mutex);
}
Simple Flags

请注意,有时很容易使用简单的标志在两个线程之间发出信号,而不是使用条件变量和关联的锁。 例如,我们可以重写上面的等待代码,使其在等待代码中看起来更像这样:

1
2
3
4
5
6
// wait code:
while (ready == 0)
; // spin

// notify code:
ready = 1;

永远不要这样做,原因如下:

首先,它在很多情况下表现不佳(长时间自旋浪费 CPU 周期)。

其次,容易出错。正如最近的研究表明 ,使用 FLAG 在线程之间进行同步时非常容易出错;在那项研究中,这些临时同步的使用中大约有一半是有问题的!不要偷懒;即使您认为不这样做也可以逃脱,也要使用条件变量。

Summary

  • Keep simple. 任何在线程之间锁定或发出信号的代码都应该尽可能简单,避免复杂的线程交互

  • 最大限度减少线程交互方式,尝试将线程交互方式的数量保持在最低限度

  • 初始化锁和条件变量 (mutex and condition variables) INITIALIZER

  • 始终使用条件变量在线程之间发出信号。虽然使用简单的 FLAG 通常很诱人,但不要这样做

  • 检查函数的返回码,比如断言失败导致的返回码会异常

  • 如何向线程传递参数以及线程的返回值。比如不要返回 指向栈上变量的指针

  • 每个线程都有自己的栈。如果线程正在执行的某个函数内部有一个局部分配的变量,那么它本质上是该线程私有的(Thread-Local);没有其他线程可以(轻松)访问它。要在线程之间共享数据,值必须位于中或其他可全局访问的区域设置中。

Locks

锁——程序员在 OS 调度的基础上实现对调度的最小控制,使调度的混乱状态变得更加可控

锁的基本要点

  1. 最基本的互斥(mutual exclusion):能否在 OS 调度下,阻止多个线程同时进入临界区?
  2. 公平性(fairness):是否会有线程始终无法竞争到锁(starvation)?
  3. 性能(performance):在有竞争与没有竞争的情况下,抢锁、释放锁的开支如何?

锁的实现

  • 完全由软件实现的锁(✕)
  • 硬件支持有更强大的原子指令 + 操作系统调用支持(✓)

控制中断

1
2
3
4
5
6
void lock() {
DisableInterrupts();
}
void unlock() {
EnableInterrupts();
}

加锁:关中断

释放锁:开中断

  • 优点:实现简单,操作系统本身可能会采用这种方式保证访问自己数据结构的原子性

  • 缺点

  1. 性能开销大:开关中断的指令耗时较长
  2. 丢失中断:关中断导致一些中断没有及时被 CPU 接收
  3. 调度失效:恶意程序一直运行,而时钟中断被屏蔽,操作系统的抢占式调度失效

自旋锁(Spin Locks)

先判断再改值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct __lock_t { int flag; } lock_t;
void init(lock_t *mutex) {
// 0 -> lock is available, 1 -> held
mutex->flag = 0;
}
// LOCK GAIN
void lock(lock_t *mutex) {
while (mutex->flag == 1) // TEST the flag
; // spin-wait (do nothing)
mutex->flag = 1; // now SET it!
}
void unlock(lock_t *mutex) {
mutex->flag = 0;
}

上面是简单的 flag 实现的,先检验 FLAG 是否为 1,如果不是就将其设置为 1,否则就自旋等待

image-20241222220609945

线程1第一次检验到锁是空闲的,于是想将flag设为1,但与此同时也已耗尽时间片,切换到线程2以后线程2依然试图获取锁,结果获取成功,耗尽时间片回到线程1,线程1竟然也成功获取到了锁!

Test-and-Set(TAS)

导致失败的主要原因是检验Test与赋值Set这两个操作并不是原子化的,会出现只执行一半的情况

因此应该改进锁的实现,使用一个硬件原语:TAS(Atomic Exchange)

1
2
3
4
5
6
7
8
9
int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // fetch old value at old_ptr
*old_ptr = new; // store ’new’ into old_ptr
return old; // return the old value
}
void lock(lock_t *lock) {
while (TestAndSet(&lock->flag, 1) == 1)
; // spin-wait (do nothing)
}

其中 TestAndSet() 是一个原子命令,功能是:获取旧值,将其设置为新值,然后返回旧值。这三个一定会在一次操作内完成。如果检测到锁被占用,就会自旋等待,一旦锁被释放,检测返回值为0的同时将其设置为1,成功获取锁,原子命令要么全部成功要么全部失败。自旋锁(spin lock)需要抢占式调度,通过时钟进行线程的中断。

CAS, LL-SC and FAA

Compare-And-Swap(CAS)

x86 中也叫 Compare-And-Exchange(cmpxchg)

1
2
3
4
5
6
7
8
9
10
11
int CompareAndSwap(int *ptr, int expected, int new){
int actual = *ptr;
if (actual == expected){
*ptr = new;
}
return actual;
}
void lock(lock_t *lock){
while(CompareAndSwap(&lock->flag, 0, 1) == 1)
; // spin-wait (do nothing)
}

Load-Linked, Store-Conditional(LL-SC)

MIPS, PowerPC, Alpha, ARM 都有类似功能的指令

1
2
3
4
5
6
7
8
9
10
11
12
13
Load_Linked(address): // 读取某地址的值并将该地址标记为“保留地址”
value ← *address // 从地址中加载值
LL_reserved ← address // 设置保留的地址
return value // 返回加载的值

Store_Conditional(address, value):
// 尝试将值存入地址,但前提是自 LL 设置保留后,该地址未被其他线程修改。
if (LL_reserved == address) then // 检查是否仍然保留该地址
*address ← value // 将值存储到地址中
LL_reserved ← NULL // 清除保留状态
return 1 // 存储成功
else
return 0 // 存储失败
1
2
3
4
5
6
7
8
9
10
void lock(lock_t *lock){
do {
while(LL(&lock->flag) == 1); // 首先加载当前值并标记保留
; //spin-wait
} while (SC(&lock->flag, 1) == 0); // 如果存储失败,则重试
}
void lock_boolean_short_circuiting(lock_t *lock){
while(LL(&lock->flag) || !SC(&lock->flag, 1))
; //spin-wait
}
  1. A 和 B 都执行 LL,地址相同,但状态保存在各自寄存器中。
  2. 假设 A 先执行 SC 并成功,硬件会==清除== B 的保留状态。
  3. B 执行 SC 时发现状态无效,返回失败并进入重试。

Fetch-and-Add(FAA)

1
2
3
4
FAA(address):
old_value = *address // 读取当前值
*address = old_value + 1 // 增加指定值
return old_value // 返回旧值
1
2
3
4
5
6
7
8
9
10
11
12
typedef struct lock_t{
int ticket;//初始化为0
int turn;//初始化为0
} lock_t
void lock(lock_t *lock){
int myturn = FAA(&lock->ticket);
while(lock->turn != myturn)
; // spin-wait
}
void unlock(lock_t *lock){
FAA(&lock->turn);
}
  1. 每个线程通过 FAA 获取一个唯一的排队号 (myturn)。
  2. 当前服务号 (ticket)表示哪个线程正在被服务。
  3. 线程不断检查自己的排队号是否等于当前服务号,只有匹配时才能获得锁。
  4. 解锁时,将服务号递增,以便下一个线程继续执行。
  5. 特性:实现了公平性,每个线程最终都有机会被服务,类似排队机制,按照来的先后顺序排队

自旋锁的评估标准

  1. 正确性: 能够实现最基本的互斥功能,不会被操作系统的调度影响
  2. 公平性: 实际上并不能保证一个竞争的线程一定能够拿到锁,可能会有饥饿的现象发生
  3. 性能: 单核性能差,只有一个执行的单位,如果一个获取锁的线程刚进入临界区就被抢占,那么直到此线程再次被调度之前,其他的等待者必须轮流自旋一整个时间片;而多核环境下,由于是各个线程物理上并行执行(parallel),因此获取到锁的线程很快就会执行完并释放锁给别人。
优先级反转

自旋锁适合短时间的临界区操作,但不适合长时间持有锁的场景。在等待锁释放时,线程会忙等待(busy-waiting),一直循环检查锁状态,而不会主动放弃 CPU。

高优先级线程 A:需要自旋锁资源。

低优先级线程 C:当前持有自旋锁资源。

中优先级线程 B:占用 CPU 时间,导致 C 无法执行。

  1. C 获得锁并进入临界区,但是此时被更高优先级的 A 抢占。
  2. A 尝试获取锁,但由于 C 持有锁,A 进入自旋状态忙等待。
  3. B 开始运行,其优先级高于 C,导致 C 仍然无法继续执行,因此也无法释放锁。
  4. A 等待 C 释放锁,但 C 被 B 抢占
  5. 结果:高优先级的 A 无法执行,但是更低优先级的 B 反而能够顺利执行,优先级反转发生。

解决方案:

(1) 优先级继承机制(Priority inheritance)

  • 原理: 当低优先级线程持有锁,而高优先级线程请求锁时,系统会临时提高低优先级线程的优先级到高优先级线程的级别。
  • 效果: 确保低优先级线程尽快运行并释放锁,防止高优先级线程长期等待。
  • 应用: 常用于互斥锁 (mutex) 中,但自旋锁通常不支持该机制。

(2) 使用互斥锁替代自旋锁

  • 互斥锁会主动挂起等待线程,释放 CPU 给其他任务,提高资源调度效率。
  • 适合可能存在较长等待时间的临界区操作,避免忙等待浪费资源。

(3) 控制自旋时间或自旋次数

  • 设置自旋锁的最大等待时间或循环次数,超过后将线程挂起,而不是一直忙等待。
  • 在 Linux 内核中,可通过**spin_trylock()自旋锁超时机制**控制。

(4) 避免中间优先级线程干扰,或取消优先级差异

睡眠(Sleep)

Yield()

改进自旋锁:如果获取不到锁就立即让出 CPU (yield)

1
2
3
4
5
6
7
8
9
10
11
void init() {
flag = 0;
}

void lock() {
while (TestAndSet(&flag, 1) == 1)
yield(); // give up the CPU
}
void unlock() {
flag = 0;
}

缺点: 容易受操作系统的调度影响,可能导致有线程被饿死,并且如果锁的拥有者在进入临界区之后被调度走,其他程序必须反复执行 运行->让出 的循环,上下文切换成本也不容忽视

Queue & Park(Solaris)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;

void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}

void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
queue_add(m->q, gettid());
m->guard = 0;
park(); // sleep here!
}
}
void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock
// (for next thread!)
m->guard = 0;
}

锁外面套了一层 guard 锁,基本思想是,既然不能直接休眠,那就尽量减小自旋等待的范围,原来需要反复自旋获取 flag 锁,并且临界区是整个lock到unlock的区域,现在只需要先自旋获取 guard,临界区只需要获取锁(flag 设置为 1)或者休眠等待锁的释放(唤醒后直接返回,意为锁被上一个线程让了出来)

加锁逻辑

  • 自旋等待获取 guard
  • 获取 guard 之后,如果 flag 未被占用,则直接获取 lock;
  • 如果 flag 被占用,此时不要直接放弃,而是将自己加入等待队列中,释放 guard,并将自己休眠。先休眠后释放一定会造成死锁

释放锁逻辑:(可控调度的关键)

  • 自旋等待获取 guard

  • 获取 guard 之后,如果队列为空,直接将 lock 释放,因为没有人正在等待

  • 如果队列不为空,唤醒队头线程,不能将 lock 释放,因为要为下一个要执行的线程保管好锁

  • 被唤醒的线程之前一直阻塞在 park() ,被唤醒之后依然符合 flag == 1 的条件,直接返回,进入临界区。

    • wakeup race: 如果在 park() 之前切换到了另一个线程(例如,持有锁的线程)可能会导致麻烦,例如,如果该线程随后释放了锁,就会试图唤醒队头线程并FIFO,但是此时线程并没有处于休眠状态,因此唤醒信号丢失,这个线程将永远挂起

    • 解决方案:Solaris 通过添加第三个系统调用来解决此问题:setpark() 通过调用此例程,线程A可以指示它即将停止(about to park)。如果A随后恰好被中断,并且另一个线程B在A实际调用 park 之前调用了 unpark,则后续 park 会立即返回而不是 sleep

1
2
3
4
5
6
7
8
9
10
11
12
13
void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
queue_add(m->q, gettid());
setpark(); // be about to sleep, ready to receive SIGWAKEUP
m->guard = 0; // release guard
park(); // return immediately if received SIGWAKEUP
}
}
  • 最后返回之前将 guard 释放

也可以将 guard 放入内核中,这样就能保持原子的释放

Futex(Linux)

Linux 提供了一个类似于 Solaris 接口的 futex(Fast Userspace muTEX),但提供更多 in-kernel 功能。具体来说,每个 futex 与特定的物理内存位置以及每个 futex 内核队列相关联(SLAB Allocator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void mutex_lock (int *mutex) {
int v;
// Bit 31 was clear(0), we got the mutex (fastpath, no SYSCALL)
// Set bit 31 to 1, variable mutex is negative now
if (atomic_bit_test_set (mutex, 31) == 0)
return;
// Not free
atomic_increment (mutex);
while (1) {
// If bit 31 is still 0, there is no contention, acquire the mutex
if (atomic_bit_test_set (mutex, 31) == 0) {
atomic_decrement (mutex);
return;
}
/*
First to make sure futex value
we are monitoring is negative (locked).
*/
v = *mutex;
if (v >= 0)
continue;
futex_wait (mutex, v);// immediately return if v!= *mutex
// otherwise sleep
}
}

void mutex_unlock (int *mutex) {
/* Adding 0x80000000 to counter results in 0 if and
only if there are not other interested threads
returns (new_mutex == 0)*/
if (atomic_add_zero (mutex, 0x80000000))
return;

// There are other threads waiting for this mutex,
// wake one of them up.
futex_wake (mutex);
}

它使用单个整数来跟踪锁是否被持有(整数最高位)以及锁上的等待者数量(所有其他位)。 如果整数为负,则保持该锁定(因为设置了高位,并且该位确定整数的符号)

加锁:

  1. 整数的最高位用来标记锁是否被占用,其余位用来标记等待者数量
  2. 如果获取锁失败,说明锁被占用,则低位自增,等待者 + 1
  3. 随后再次尝试获取锁(spin for one time,Phase 1)
    • 如果成功,则低位自减,等待者 - 1
    • 如果失败,即将进入下个阶段(Phase 2)
  4. 再次检查锁的状态(避免竞态条件,如果在这期间锁被释放就应该重新尝试获取锁)

解锁:

  1. 检测低位等待者的同时,清除最高位,如果结果不为0,则返回false
  2. false,唤醒等待的线程

Function Signature: int futex_wait(int *uaddr, int val)

Purpose: If the futex word equals to val, the thread is put to sleep. The thread remains asleep until another thread calls futex_wake on the same futex word, signaling that the condition has changed.

Parameters:

  • uaddr: Pointer to the futex word in user space.
  • val: The expected value of the futex word.

Return Value: Returns 0 on success, or an error code on failure.

Function Signature: int futex_wake(int *uaddr, int val)

Parameters:

  • uaddr: Pointer to the futex word in user space.
  • val: The number of threads to wake up.

Return Value: Returns the number of threads that were woken up, or an error code on failure.

Function Signature: int atomic_bit_test_set(int *ptr, int bit)

Parameters:

  • ptr: Pointer to the integer variable.
  • bit: The bit position to be tested and set.

Return Value: Returns the previous value of the bit (0 or 1).

Function Signature: bool atomic_add_zero(int *ptr, int value)

Parameters:

  • ptr: Pointer to the integer variable.
  • value: Value to be added.

Return Value: Returns true if the result is zero, otherwise false.

一般来说,内核态同步机制需要调用系统调用,来确保只有一个线程能进入互斥区,但如果根本没有竞争对象,则系统调用浪费了性能。

Futex 是一种用户态和内核态混合的同步机制。首先,同步的进程间通过 mmap 共享一段内存,futex 变量就位于这段共享的内存中,且操作是原子的,当进程尝试进入互斥区lock()或者退出互斥区unlock()的时候,先去查看共享内存中的 futex 变量,如果没有竞争发生,则只修改 futex,而不用再执行系统调用了。当通过访问 futex 变量告诉进程有竞争发生,则还是得执行系统调用去完成相应的处理(wait 或者 wake

简单的说,futex 就是通过在用户态的检查,如果了解到当前没有竞争就不用陷入内核了,大大提高了低竞争情况下的效率。

假设地址处的值等于预期,对 futex_wait(address,expected) 的调用将使调用线程进入睡眠状态。如果不相等,则调用立即返回。对例程 futex_wake(address) 的调用会唤醒正在队列中等待的一个线程。

  1. Futex 变量的特征:

    1)位于共享的用户空间中;

    2)是一个32位的整型;

    3)对它的操作是原子的。

  2. Futex 在程序 low-contention 的时候能获得比传统同步机制更好的性能。

  3. 不要直接使用 Futex 系统调用。

  4. Futex 同步机制可以用于进程间同步,也可以用于线程间同步。

两阶段锁

两阶段锁中,自旋被看作可能很有用,特别是在锁即将被释放的情况下。在 Phase 1,会自旋一段时间,希望能够获取到锁。 如果在 Phase 1 没有获取锁,则进入 Phase 2,调用者将进入睡眠状态,只有在锁稍后释放时才会被唤醒。

上面的 Linux Futex 实现的 Mutex 就是这种锁的一种形式,但它只自旋一次;更常见的是在循环中自旋固定的(fixed)次数

线程安全的数据结构

计数器

Basic Mutex

image-20241224125018845

简单给访问临界区加锁完全能够保证绝对的线程安全(thread safe),但是锁的开销非常大

Scalable Counting

image-20241224134929698

近似计数器:每个CPU有一个局部计数器(local),所有CPU共享一个全局计数器(global),局部计数器加到全局计数器上的时候才加给全局计数器加锁,这就显著减少了加锁解锁的次数。

另外,局部锁也是需要的,因为我们假设每个核心上可能有多个线程。相反,如果每个核心上仅运行一个线程,则不需要局部所锁。

为了使全局计数器保持最新(如果线程希望读取其值),通过获取全局锁并将其增加局部计数器的值,局部值会定期传输到全局计数器;然后局部计数器归零。 这种局部到全局传输发生的频率由阈值 S 决定,当局部计数器达到 S 就向全局计数器写入。S 越小,计数器的行为就越像上面的不可扩展计数器; S 越大,计数器的可扩展性就越高,但全局值可能与实际计数相差越远。人们可以简单地获取所有局部锁和全局锁(以指定的顺序,以避免死锁)来获得精确的值,但这是不可扩展的:

image-20241224135017601

图 29.6 显示了阈值 S 的重要性,其中有四个线程,每个线程在四个 CPU 上将计数器递增 100 万次。如果 S 较低,则性能较差(但全局计数总是相当准确); 如果 S 较高,则性能出色,但全局计数滞后(最多为 CPU 数量乘以 S)。这种准确性/性能权衡正是近似计数器所实现的。

链表

Basic Mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// basic list structure (one used per list)
typedef struct __list_t {node_t *head; pthread_mutex_t lock;} list_t;
void List_Insert(list_t *L, int key) {
node_t *new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
return; // fail
}

new->key = key;
pthread_mutex_lock(&L->lock); // lock
new->next = L->head;
L->head = new;
pthread_mutex_unlock(&L->lock);// unlock
}

int List_Lookup(list_t *L, int key) {
int rv = -1;
pthread_mutex_lock(&L->lock);
node_t *curr = L->head;
while (curr) {
if (curr->key == key) {
rv = 0;
break;
}
curr = curr->next;
}
pthread_mutex_unlock(&L->lock); // failure
return rv; // rv = -1:
}
  • 尽量缩小锁涵盖的范围(临界区大小)如果没有涉及到访问共享区域的就不要纳入范围
  • 使用单一返回路径,减少代码中需要获取、释放锁的地方,降低了返回前忘记释放锁的可能

Lock and Control flow

在并发编程中,函数通常在开始时获取锁或分配资源、更改状态。如果发生错误,函数必须在退出之前释放锁或释放资源。这个过程很容易出错,因为它需要仔细管理状态。为了避免这些问题,最好以一种尽量减少撤消状态更改的方式构建代码。这可以通过以下方式实现:

  • 集中错误处理:谨慎处理导致函数返回、退出或其他停止执行的错误情况的更改,在函数中使用单个退出点来处理所有清理操作。
  • 最小化模式:构造代码以尽量减少撤消状态更改的需要,从而降低出错风险。
  • 避免过早返回:减少函数中的返回语句数量,以确保执行所有必要的清理。
  • 使用 RAII(资源获取即初始化):在支持它的语言中,使用 RAII 自动管理资源。

Hand-over-hand Locking:

每个节点都有一个锁,替代之前链表的整个链表一个锁,遍历链表时首先抢占下一个节点的锁,然后释放当前节点的锁,一定程度上增加了链表的并发能力,但是开销很大,

队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
typedef struct __queue_t {
node_t *head; node_t *tail;
pthread_mutex_t head_lock, tail_lock;
}queue_t;

void Queue_Init(queue_t *q) {
node_t *tmp = malloc(sizeof(node_t));
tmp->next = NULL;
q->head = q->tail = tmp;
pthread_mutex_init(&q->head_lock, NULL);
pthread_mutex_init(&q->tail_lock, NULL);
}

void Queue_Enqueue(queue_t *q, int value) {
node_t *tmp = malloc(sizeof(node_t));
assert(tmp != NULL);
tmp->value = value;
tmp->next = NULL;

pthread_mutex_lock(&q->tail_lock);//在队尾加锁
q->tail->next = tmp;
q->tail = tmp;
pthread_mutex_unlock(&q->tail_lock);
}

int Queue_Dequeue(queue_t *q, int *value) {
pthread_mutex_lock(&q->head_lock);
node_t *tmp = q->head;
node_t *new_head = tmp->next;
if (new_head == NULL) {
pthread_mutex_unlock(&q->head_lock);
return -1; // queue was empty
}
*value = new_head->value;
q->head = new_head;
pthread_mutex_unlock(&q->head_lock);
free(tmp);
return 0;
}

队列的加锁特点:

  • 入队只访问 tail_lock 出队只访问 head_lock

  • 在初始化阶段添加了 dummy node 假节点,不然空队列的情况需要同时处理 tail 和 head:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    void Queue_Enqueue(queue_t *q, int value) {
    node_t *tmp = malloc(sizeof(node_t));
    assert(tmp != NULL);
    tmp->value = value;
    tmp->next = NULL;

    pthread_mutex_lock(&q->tail_lock);
    if (q->tail == NULL) {// additional if-else!!!
    q->head = q->tail = tmp;
    } else {
    q->tail->next = tmp;
    q->tail = tmp;
    }
    pthread_mutex_unlock(&q->tail_lock);
    }

    int Queue_Dequeue(queue_t *q, int *value) {
    pthread_mutex_lock(&q->head_lock);
    if (q->head == NULL) {// additional if-else!!!
    pthread_mutex_unlock(&q->head_lock);
    return -1; // queue was empty
    }
    node_t *tmp = q->head;
    node_t *new_head = tmp->next;
    if (new_head == NULL) {
    pthread_mutex_unlock(&q->head_lock);
    return -1; // queue was empty
    }
    *value = new_head->value;
    q->head = new_head;
    pthread_mutex_unlock(&q->head_lock);
    free(tmp);
    return 0;
    }

哈希表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#define BUCKETS (101)
typedef struct __hash_t {list_t lists[BUCKETS];} hash_t;

void Hash_Init(hash_t *H) {
int i;
for (i = 0; i < BUCKETS; i++)
List_Init(&H->lists[i]);
}

int Hash_Insert(hash_t *H, int key) {
return List_Insert(&H->lists[key % BUCKETS], key);
}

int Hash_Lookup(hash_t *H, int key) {
return List_Lookup(&H->lists[key % BUCKETS], key);
}

image-20241224205440775

如图所示,哈希表中,每个桶都是一个单独的链表,因此,比单独的大锁并发链表性能好很多。

Condition Variables

任何线程库的另一个主要组件是条件变量,主要用于线程间交互。

概念上,一个条件变量就是一个线程队列(thread queue), 其中的线程正等待某个条件变为真,比如 ready == 0,每个条件变量$c$关联着一个断言,当一个线程等待时,该线程不算作占用了该管程,因而其它线程可以进入该管程执行,改变管程的状态,通知条件变量$c$其关联的断言$P_c$在当前状态下为真。

条件变量同锁一起使用使得线程可以以一种**无竞争**的方式等待任意条件的发生。所谓无竞争就是,条件改变之后这个信号会发送到所有等待这个信号的线程。而不是说一个线程接受到这个消息而其它线程就接收不到了。

Precautions

“Wakeup”

线程状态:Ready, Run, Sleep

wait: Run->Sleep

signal: Sleep->Ready

必须与特定的条件挂钩

条件变量必须跟布尔条件挂钩,如果只是单纯地地像下面这样使用条件变量:

1
2
3
4
5
6
7
8
9
10
11
12
thr_exit(){
// done = 1;
mutex_lock(&m);
cond_signal(&c);
mutex_unlock(&m);
}
thr_join(){
mutex_lock(&m);
//while(done == 0)
cond_wait(&c);
mutex_unlock(&m);
}

在父线程调用join之前,子线程创建并运行了exit,就会导致空唤醒,父线程将持续睡下去。

Recheck: While Loop

Mesa 语义

发信号只是一个状态改变的暗示,并不能保证他运行之前的状态一直是期望的情况,线程的 Ready 和 Run 之间的状态转换是由调度程序决定的,signal 以后,Run 之前可能状态会发生变化。

另一个是 Hoare 语义 能直接唤醒线程立即执行,几乎所有系统都采用了 Mesa 语义。

丢失唤醒信号

条件变量代表的是一种条件,需要将 pthread_cond_wait 放在一个 while 循环,而不是 if 语句中,因为很可能会出现在wait之前正好切换走了,这时候signal信号就会丢失。所以线程被唤醒后必须重新检查当时的条件是否仍然满足,如若仍然满足 while 循环的条件,就不能继续执行。

假设线程 A 和线程 B 都在等待同一个条件变量,并且导致线程休眠的条件布尔值最初为 false

  1. 线程 A 进入等待状态:
    • pthread_cond_wait(&cond, &mutex)
  2. 线程 B 也进入等待状态;
  3. 某个线程 C 修改了条件布尔值为 true,并通过条件变量发送信号唤醒线程:
    • pthread_cond_signal(&cond)
  4. 线程 A 被唤醒,并退出等待。此时它执行任务后将条件变量重新置为 false
  5. 问题: 线程 B 也被唤醒,但条件变量已被线程 A 改回 false
    • 如果使用 if 检查条件,线程 B 会直接跳过检查并继续执行任务,从而导致程序逻辑错误。

解决方案:用 while 再次检查条件

  • 当线程 B 被唤醒时,while 循环会再次检查条件变量,发现条件未满足,然后重新进入等待状态,确保安全。

虚假唤醒

有一些 pthread 实现可能会虚假地唤醒多个正在等待的线程;在这种情况下,在不重新检查的情况下,等待线程将继续认为条件已更改,即使它没有更改。因此,应该树立起一个恒等式:

被唤醒⇔条件确实已经改变

先持有锁 再signal() or wait()

==使用条件变量的前提是必须要持有这把锁==

想象一下:一个线程是某个队列的消费者,它必须要等到队列中有数据时才能执行,如果队列为空,则会一直等待挂起,直到另外一个线程在队列中存入数据,并通知先前挂起的线程,该线程才会唤醒重新开始执行。在这个例子中,队列是否 空/满 是线程执行所依赖的状态,而这个状态是多个线程都可以访问的,所以需要加锁互斥访问,这种加锁模式与其他同步加锁略有不同:

锁在 wait 调用中,休眠前需要释放锁,唤醒之后,返回之前需要重新获取锁

唤醒所有线程: broadcast()

考虑分配内存的场景:

  1. 有多个想要申请不同空间的线程,但是此时没有足够空间,因此他们陷入了睡眠;
  2. 此时第三个线程释放了一定的空间,想要唤醒,但唤醒哪一个是不确定的,可能释放的空间不足以支持被唤醒者申请的空间;
  3. 因此需要唤醒所有在此CV上等待的线程:broadcast()

生产者/消费者(有界缓冲区)

生产者/消费者问题 或 有界缓冲区问题:

  • 生产者:从缓冲区中拿东西,如果没东西可拿就应该阻塞
  • 消费者:向缓冲区中放东西,如果缓冲区满了就应当阻塞

因此要注意如下事项:

  1. 不能唤醒同类:生产者和消费者应该使用两个不同的条件变量
  2. while 循环:重新检查条件,以防在 Run 之前,条件发生改变
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#define MAXSIZE 8
int buffer[MAXSIZE];
int fill_ptr = 0;
int use_ptr = 0;
int count = 0;
void put(int value){
buffer[fill_ptr] = value;
fill_ptr = (fill_ptr + 1) % MAXSIZE;
count++;
}
int get(){
int tmp = buffer[use_ptr];
fill_ptr = (fill_ptr + 1) % MAXSIZE;
count--;
return tmp;
}
cond_t isEmpty,isFull;
mutex_t mutex;
void producer(){
for(int i = 0;i < loops;i++){
mutex_lock(&mutex);
while(count == MAXSIZE)
cond_wait(&isFull, &mutex);
put(i);
printf("producer:%d puts value:%d", gettid(), i);
cond_signal((&isEmpty);
mutex_unlock(&mutex);
}
}
void consumer(){
for(int i = 0;i < loops;i++){
mutex_lock(&mutex);
while(count == 0)
cond_wait(&isEmpty, &mutex);
int value = get();
printf("pid:%d gets value:%d", gettid(), value);
cond_signal(&isFull);
}
}

Semaphores(信号量)

互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。

同步:指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源

Semaphore 支持跨进程的同步,是线程同步所有工作的单一原语,能够将其作为锁或条件变量

Condition Variable 只支持同一进程内部线程的同步

特性 信号量(Semaphore) 锁(Mutex) 条件变量(Condition Variable)
功能 控制资源数量(同步和互斥) 提供互斥访问 等待特定条件满足后继续执行
适用场景 资源控制、多线程队列 保护临界区,单资源互斥 条件等待(生产者/消费者问题)
能否跨进程 支持跨进程 仅限线程同步(同一进程内) 仅限线程同步(同一进程内)
是否需要互斥锁 不需要互斥锁 自带互斥功能,不需要额外锁 必须依赖互斥锁来保护共享变量
复杂条件判断 支持简单条件(通过计数控制) 不支持条件判断 支持复杂条件判断和线程等待唤醒机制

POSIX API

POSIX API 给信号量添加了两个调用,这两个调用都是原子操作:

sem_wait(sem_t *s)(P())

信号量值减1,若变为负数,则阻塞在信号量上(信号量负数绝对值为阻塞的线程数量)

sem_post(sem_t *s)(V())

将信号量的值加1,如果信号量值为负数,则肯定有线程正在此信号量上等待,唤醒其中一个线程

二值信号量: Locks

Workflow:

  1. 信号量的初始值为 1,线程 A 调用 sem_wait(*s) 此时信号量为 0,直接返回,进入临界区
  2. 此时另一个线程 B 过来调用 sem_wait(*s) 此时信号量为 -1,休眠……
  3. 线程 A 完成临界区操作,调用 sem_post(*s) 此时信号量变成 0,唤醒线程 B
  4. 线程 B 从 wait() 返回,进入临界区

因此二值信号量能够实现锁的功能。

条件变量(Condition Variables)

给信号量设置初始值:在初始化之后愿意立即放弃的资源数量有多少?

  • 如果是锁,只有 1 把锁,那么就必须初始化为 1;

  • 如果是用作任务排序,父进程等待子进程,没有能给出去的东西,那就只能初始化为 0

  • 如果是消费者,一开始没有可以消费的东西,那就初始化为0;

  • 如果是生产者,一开始可供生产的空间有MAXSIZE个,那就初始化为MAXSIZE。

需要互斥锁

相当于将之前的 count 整合进条件变量中:

  • sem_init(&empty,0,MAXSIZE) 空闲区域的大小为 MAXSIZE
  • sem_init(&full,0,0) 可消费区域大小为 0

Usage:

  • 生产者调用 sem_wait(&empty) empty 自减,若为负数则生产者只能阻塞等待
  • 消费者调用 sem_wait(&full) full 自减,为负数则消费者需要阻塞等待
  • 生产者操作完临界区, sem_post(&full) 使 full 自增,某个消费者被唤醒并进入临界区

  • 消费者操作完临界区, sem_post(&empty) 使 empty 自增,某个生产者被唤醒并进入临界区

这里没有锁,因此会出现并发问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&mutex); // Line P0 (NEW LINE) 加锁
sem_wait(&empty); // Line P1
put(i); // Line P2
sem_post(&full); // Line P3
sem_post(&mutex); // Line P4 (NEW LINE) 解锁
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&mutex); // Line C0 (NEW LINE) 加锁
sem_wait(&full); // Line C1
int tmp = get(); // Line C2
sem_post(&empty); // Line C3
sem_post(&mutex); // Line C4 (NEW LINE) 解锁
printf("%d\n", tmp);
}
}

死锁情况

生产者加锁,进入临界区之前,调用empty发现缓冲区已满,遂休眠,此时生产者依然持有锁

切换到就绪的消费者,因为获取不到锁,只能休眠,这样就导致了死锁,因此需要缩小锁的范围:

1
2
3
4
5
6
7
8
9
10
11
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&full); // Line C1
sem_wait(&mutex); // Line C1.5 (NEW LINE) 加锁
int tmp = get(); // Line C2
sem_post(&mutex); // Line C2.5 (NEW LINE) 解锁
sem_post(&empty); // Line C3
printf("%d\n", tmp);
}
}

最根本的区别在于,睡眠的线程不会释放锁,因此条件变量应该加到锁的外面

互斥锁+条件变量 VS 信号量

特性 Mutex + Condition Variable Semaphore
设计理念 提供更高层次的条件等待机制,依赖互斥锁管理共享数据状态。 基于简单的计数器模型,直接控制资源可用数量。
同步功能 适合复杂条件等待或事件驱动的同步场景(例如生产者-消费者模型)。 控制固定数量的资源访问或线程数量(例如资源池管理)。
互斥功能 需要显式的 Mutex 实现互斥保护。 内部实现互斥,无需额外的互斥锁。
跨进程支持 仅支持线程级同步(同一进程内线程同步)。 支持跨进程和线程同步(POSIX 信号量支持跨进程)。
复杂性 支持复杂条件判断,但需要手动管理条件和唤醒逻辑。 简单直观,直接基于计数器操作,不需要条件管理。
效率 条件变量需要多步操作(加锁、解锁、条件检查、等待),效率略低。 基于计数器原子操作,性能较高,适合高并发场景。
复杂条件处理 支持复杂条件和多条件组合判断,适合生产者-消费者问题。 只能处理简单的资源计数条件,不适合复杂条件判断。

Mutex + CV

  1. 互斥锁(Mutex):
    • 提供临界区保护,确保线程在访问共享资源时互斥执行。
    • 底层依赖于操作系统内核的互斥量数据结构(如 Linux 的 Futex 或信号量实现)。
  2. 条件变量(Condition Variable):
    • 条件变量不会保存条件状态,而是通过线程阻塞和唤醒机制等待条件变化。
    • 必须与互斥锁配合使用,防止条件检查过程中出现竞争条件。
  • 条件变量使用等待队列(Wait Queue)机制管理线程。
  • 当线程调用pthread_cond_wait,它会:
    1. 释放锁(解锁 mutex)。
    2. 将线程放入条件变量的等待队列中,并进入阻塞状态(睡眠)。
    3. 等待其他线程通过 pthread_cond_signalpthread_cond_broadcast 唤醒它。
    4. 被唤醒后,重新尝试获取互斥锁并继续执行。
1
2
3
4
5
6
7
8
9
Thread 1:              Condition Variable:
- Acquire Lock [ Wait Queue ]
- Check Condition ----> Add to Queue
- Wait (Unlock) [ Blocked ]
<----- Signal()
Thread 2: Wake Up Thread 1
- Modify Condition
- Signal
- Release Lock
  • 条件变量没有条件状态: 共享条件需要程序员手动管理(例如标志位)。
  • 支持复杂条件判断: 等待某些条件的组合,例如缓冲区为空或满。
  • 虚假唤醒机制: 被唤醒后必须重新检查条件,避免不满足条件的线程继续执行。

Semaphore

  • 信号量直接依赖原子操作(如 CPU 指令 Test-And-SetCompare-And-Swap)更新计数器,确保多线程安全。
  • 阻塞线程会进入等待队列,操作系统负责调度。
  1. 信号量内部维护一个计数器变量(Counter),表示可用资源的数量。
  2. 当调用 sem_wait 时:
    • 如果计数器 > 0,直接减 1,线程继续执行。
    • 如果计数器 == 0,线程阻塞,进入等待队列。
  3. 当调用 sem_post 时:
    • 增加计数器值。
    • 如果等待队列中有线程,则唤醒其中一个线程。
1
2
3
4
5
Semaphore Counter = 2
Thread 1: P() ----> Counter-- (1)
Thread 2: P() ----> Counter-- (0)
Thread 3: P() ----> Blocked (Counter == 0)
Thread 1: V() ----> Counter++ (1) -> Wake Up Thread 3
  • 信号量管理的是资源数量,而不是条件状态。
  • 自带互斥特性,适合多个线程访问有限资源。
  • 适合计数型条件: 一次允许多个线程执行,而不是简单的互斥。

总结

  1. Semaphore 更像一个通用工具:
    • 适合管理固定资源数量,如线程池、连接池、令牌桶等。
    • 更简单、更高效,适合需要资源计数的场景。
    • 支持跨进程同步需求。
  2. Mutex + Condition Variable 提供更高级的同步机制:
    • 适合复杂条件判断或事件驱动模型,如生产者/消费者问题,依赖互斥锁保证数据一致性。
    • 支持灵活的条件管理,适合多条件组合。
    • 更适合线程间等待和唤醒机制,不适合跨进程同步。
  • 在实际开发中,如果场景简单且需求是资源访问控制,选择信号量
  • 如果需要更复杂的条件管理和线程间事件通知,则选择条件变量 + 锁

应用

读写锁

读者只读不写,写者才需要修改。类似 Shared/eXclusion 共享锁和独占锁

RW 锁支持一个写者或者多个读者:

  • 第一个读者首先获取lock(保护reader)增加reader,获取writelock,释放lock进入临界区
  • 之后其他的读者只要获取lock后增加reader个数,直接访问临界区即可
  • 写者需要等待最后一个读者释放writelock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typedef struct _rwlock_t{
sem_t lock; // basic lock INIT: 1
sem_t writelock;//allow 1 writer / many readers INIT: 1
int readers;//number of readers INIT:0
}rwlock_t;
void rwlock_acquire_readlock(rwlock_t *rw){
sem_wait(rw->lock);
rw->readers++;
if(readers == 1)
sem_wait(rw->writelock); //第一个读者获取写锁
sem_post(rw->lock);
}
void rwlock_release_readlock(rwlock_t *rw){
sem_wait(rw->lock);
rw->readers;
if(readers == 0)
sem_post(rw->writelock);//最后一个读者释放写锁
sem_post(rw->lock);
}
void rwlock_acquire_readlock(rwlock_t *rw){
sem_wait(rw->lock);
rw->readers;
if(readers == 0)
sem_post(rw->writelock);//最后一个读者释放写锁
sem_post(rw->lock);
}

这种读写锁并不一定比自旋锁更快,并且公平性无法保证,过多读者通常会饿死写者,需要进一步进行优化。

哲学家吃饭问题

问题描述

  • 有 5 位哲学家围坐在一张圆桌旁,他们的生活方式是 思考进餐
  • 桌子上摆放着 5 根筷子,每位哲学家左右各放一根。
  • 哲学家要进餐时,需要同时拿起左右两根筷子。
  • 哲学家只能在拿到两根筷子后才能吃饭,否则必须等待。

主要难题

  • **死锁 (Deadlock)**:所有哲学家都同时拿起左边的筷子,导致没有哲学家能拿到第二根筷子,进入无限等待状态。
  • **饥饿 (Starvation)**:某位哲学家可能永远无法获得两根筷子,从而无法进餐。
  • 并发控制:需要保证哲学家拿筷子和放筷子的动作是线程安全的。
  • 方案 1:引入顺序编号
    将哲学家编号为 0 到 4,规定编号为偶数的哲学家先拿左筷子,再拿右筷子;编号为奇数的哲学家先拿右筷子,再拿左筷子。或者,最后一个哲学家先拿右筷子,再拿左筷子。这样就不会互相卡住,打破了等待的循环。
  • 方案 2:限制最多 4 个哲学家进入用餐状态
    使用一个计数器,确保最多 4 位哲学家能尝试拿筷子,这样至少会有一根筷子空闲,避免死锁。

内存抖动(Thread Throttling)

信号量比较适合资源数量有限制的情况:比如有一群线程,每个线程都需要申请一块很大的内存空间用于计算,用于计算的这片区域就是 内存密集型 区域,如果所有线程同时申请,就会造成内存抖动(不停地换出又换入页面导致程序以极慢的速度执行)。

一个简单的信号量就可以解决这个问题:通过将信号量的值初始化为您希望一次进入内存密集区域的最大线程数,然后在该区域周围放置 sem_wait() 和 sem_post(),信号量自然地限制那些并发地处于危险区域的线程数量。

使用 mutex cond 实现 sem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
typedef struct _Zem_t{
int value;
pthread_mutex_t lock;
pthread_cond_t cond;
} Zem_t;
void Zem_init(Zem_t *s, int value){
s->value = value;
Cond_init(s->cond);
Mutex_init(s->lock);
}
void Zem_wait(Zem_t *s){
Mutex_lock(&s->lock);
while(s->value <= 0)
Cond_wait(&s->cond, &s->lock);
s->value--;
Mutex_unlock(&s->lock);
}
void Zem_post(Zem_t *s){
Mutex_lock(&s->lock);
s->value++;
Cond_signal(&s->cond);
Mutex_unlock(&s->lock);
}

常见的并发问题

总的来说可以分为死锁(Deadlock)和非死锁(Non-deadlock)两种,其中后者占绝大多数

非死锁问题

违反原子性(加锁解决)

这种错误违反了原子性,下图的 proc_info 在刚进入 if 循环的时候被取消调度,线程2将其置为NULL,切换回去的时候导致空指针异常

image-20241225194108926

解决方案:在访问共享资源的时候加锁

image-20241225194511624

程序执行顺序错误(加条件变量解决)

模块化:不同线程承担不同职责,线程1负责初始化 mThread ,线程2访问 mThread

如果乱序执行,就会出现线程2访问到空指针导致程序崩溃:

image-20241225194032626

因此,用一个状态变量或者mThread本身来代表初始化是否成功,然后用条件变量解决问题:

image-20241225194350912

死锁(Deadlock)

image-20241225195039934

死锁原因:获取锁的顺序相反

  1. 在大型代码库中组件依赖复杂 循环依赖就会导致死锁的发生
  2. 模块化封装会隐藏底层的细节 v1.addAll(v2) v2.addAll(v1) 同时调用可能会发生死锁

产生死锁的四个条件:

  1. 互斥:线程对资源进行互斥的访问
  2. 持有并等待:线程在持有资源的同时也在等待其他资源
  3. 非抢占:线程获得的资源(如锁)不能被抢占
  4. 循环等待:线程之间存在环路,上面的每个线程都会额外持有下个线程想要申请的资源

预防

循环等待(Circular Wait): 强制规定顺序

强制规定获取锁的顺序

偏序锁:如果资源之间的依赖关系较少或依赖是局部的,偏序锁,提供更好的性能和灵活性。

全序锁:如果资源之间的依赖关系复杂且必须确保一致性(如事务或分布式系统),全序锁更可靠

如果一个函数要抢多个锁,可以根据锁的地址作为锁的顺序:

1
2
3
4
5
6
7
8
9
10
do_something(mutex *m1,mutex *m2){
if(m1 < m2){
pthread_mutex_lock(*m1);
pthread_mutex_lock(*m2);

} else{
pthread_mutex_lock(*m2);
pthread_mutex_lock(*m1);
}
}

这样可以保证 do_something(&m1,&m2)do_something(&m2,&m1) 是同样的抢锁顺序

持有并等待(Hold-and-wait):原子抢锁

在抢锁的最外层加一道锁(原子性抢锁)防止抢锁过程中突然被取消调度,切换到其他线程:

image-20241225203212407

缺点:不适合封装,因为需要准确知道要抢哪些锁并提前全部抢到(即使当前并不需要)

非抢占(No Preemption):trylock

可以通过trylock这种非阻塞式抢锁来避免死锁,但是这种方法会导致活锁,对封装的支持也不好,代码抢完锁中途获取的资源(比如申请的内存空间),如果抢锁失败,还应该释放

image-20241225204421920

活锁(livelock)

死锁: 所有线程都进入等待状态,完全停止运行。

活锁: 所有线程仍然在运行,但因为不断调整状态,始终无法完成任务。

解决方案:

  1. 退避算法(Backoff):在循环结束的时候,先随机等待一段时间再重复
  2. 引入有限重试机制: 因为线程一直在运行,因此可以限制最大重试次数
  3. 使用更高层次的同步机制,结合条件变量或阻塞队列

互斥:无锁化 (CAS)

利用硬件指令的原子性,完全避免互斥区的存在:CAS 失败就不断重试,直到成功为止(乐观锁)

COMPARE AND SWAP 加之前看看是不是对应的正确的值,是的话再赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//自增:
int CAS(int *address, int expected, int new){
if(*address == expected){
*address = new;
return 1;
}
return 0;
}
void increment(int *value, int amount){
int old;
do{
old = *value;
} while(CAS(value, old, old + amount) == 0);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//链表插入
int CAS(node_t **address, node_t *expected, node_t *new){
if(*address == expected){
*address = new;
return 1;
}
return 0;
}
void insert(int value) {
node_t *n = malloc(sizeof(node_t));
assert(n != NULL);
n->value = value;
pthread_mutex_lock(listlock); // begin critical section
n->next = head;
head = n;
pthread_mutex_unlock(listlock); // end critical section
}

void insert(int value){
node_t *n = malloc(sizeof(node_t));
assert(n!=NULL);
n->value = value;
do{
n->next = head;//新节点的下一个应该是现在的头
} while(CAS(&head, n->next, n) == 0);//现在的头应该等于新节点的下一个,不等于
}

通过调度任务避免死锁

image-20241225213119760

image-20241225213105676

不要同时并行执行需要获取完全相同锁的线程

探测死锁与恢复

如果根除死锁实在很困难,可以定期检查死锁,并运行专门的程序来恢复

基于事件的并发

Event Loop(事件循环): 等待事件->处理事件->等待事件,重点是如何获取事件?

1
2
3
4
5
while(1){
events = getEvents();
for (e in events)
processEvent(e);
}

select()

API Usage

  • Purpose: 监视 FD 是否准备好接受 I/O 操作
  • Function Signature: int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval *timeout)
  • Parameters:
    • nfds: 检查集合中 [0,nfds-1] 的 FD
    • readfds: 监控 readfds 中的可读事件(新的数据包到达,准备处理)
    • writefds: 监控 writefds 中的可写事件(服务器回复需要写入队列有空)
    • errorfds: 监控 errorfds 中的错误事件
    • timeout: 最大等待时间
  • Return Value: 返回准备好 I/O 的 FDs

image-20241225225619265

Lock-free

有了单CPU和基于事件的应用程序,并发程序中的问题就不再存在。具体来说,因为一次只处理一个事件,所以不需要获取或释放锁;基于事件的服务器不能被另一个线程中断,因为它绝对是单线程的。因此,线程程序中常见的并发错误不会在基于事件的基本方法中体现出来。

系统调用阻塞?

如果某个事件要求发出可能会阻塞的系统调用怎么办? 例如,假设一个请求从客户端发送到服务器,以从磁盘读取文件并将其内容返回到请求客户端(非常类似于简单的 HTTP 请求)。为了服务这样的请求,某些事件处理程序最终必须发出 open() 系统调用来打开文件,然后执行一系列 read() 调用来读取文件。当文件被读入内存时,服务器可能会开始将结果发送到客户端。 open()read() 调用都可能向存储系统发出 I/O 请求(当所需的元数据或数据尚未在内存中时),因此可能需要很长时间才能提供服务。对于基于线程的服务器,这不是问题:当发出 I/O 请求的线程挂起(等待 I/O 完成)时,其他线程可以运行,从而使服务器能够取得进展。事实上,I/O 和其他计算的这种自然重叠使得基于线程的编程变得非常自然和直接。

然而,使用基于事件的方法,没有其他线程可以运行:只有主事件循环。这意味着,如果事件处理程序发出阻塞调用,则整个服务器将执行此操作:阻塞直到调用完成。当事件循环阻塞时,系统处于空闲状态,因此潜在地浪费了巨大的资源。因此,我们在基于事件的系统中必须遵守一条规则:不允许阻塞调用。

异步 I/O

API Usage

要对文件发出异步读取,应用程序应首先使用相关信息填充此 AIO 控制块(aiocb):

  • 要读取的文件的文件描述符 (aio fildes)
  • 文件内的偏移量 (aio offset)
  • 长度请求的长度 (aio nbytes)
  • 读取结果应复制到的内存位置 (aio buf)

填充该结构后,应用程序必须发出 AIO 来读取文件;在 Mac 上,此 API 是一个异步读取的 API: int aio_read(struct aiocb *aiocbp); 该调用尝试发出 I/O;如果成功,它会立即返回,并且应用程序(即基于事件的服务器)可以继续其工作。

然而,我们必须解决最后一块难题。我们如何判断 I/O 何时完成,从而确定缓冲区(由 aiobuf 指向)现在已在其中包含所请求的数据? 还需要最后一个 API。在 Mac 上,它被称为 aio_error() API 如下所示: int aio_error(const struct aiocb *aiocbp); 该系统调用检查 aiocbp 引用的请求是否已完成。如果是,则返回成功(用 0 表示); 如果不是,则返回 EINPROGRESS

Poll or Interrupt

对于每个未完成的异步 I/O,应用程序可以通过调用 aio_error() 定期轮询系统,以确定所述 I/O 是否尚未完成。但是轮询很浪费CPU。为了解决这个问题,一些系统提供了基于中断的方法。此方法使用 UNIX Signals 来通知应用程序异步 I/O 何时完成,从而无需重复询问系统。

在没有异步I/O的系统中,无法实现纯粹的基于事件的方法。然而,出现了一些相当有效的混合方法。其中事件用于处理网络数据包,线程池用于管理未完成的 I/O。I/O 多路复用

UNIX 信号

所有现代 UNIX 变体中都存在一个巨大且令人着迷的基础设施,称为信号。最简单的是,信号提供了一种与进程通信的方式。具体来说,可以将信号传递给应用程序;这样做会阻止应用程序运行信号处理程序(即应用程序中处理该信号的某些代码)正在执行的任何操作。 完成后,该进程将恢复其之前的行为。 每个信号都有一个名称,如HUP(挂起)、INT(中断)、SEGV(分段违规)等

有趣的是,有时是内核本身发出信号。例如,当程序遇到 Segmentation Violation 时,操作系统会向其发送 SIGSEGV;如果程序调用了signal(SIGSEV, handler) 就可以运行一些代码(signal handler)来响应。当发送到没有配置响应信号的进程时,将执行默认行为;对于SEGV,该进程被终止。

1
2
3
4
5
6
7
8
9
void handle(int arg) {//handler
printf("stop wakin’ me up...\n");
}
int main(int argc, char *argv[]) {
signal(SIGHUP, handle);
while (1)
; // doin’ nothin’ except catchin’ some sigs
return 0;
}

./main & 后台运行进程,随后返回pid;

kill -HUP [pid] 给进程发送 SIGHUP 信号:

1
2
3
4
5
6
7
prompt> ./main &
[3] 36705
prompt> kill -HUP 36705
stop wakin’ me up...
prompt> kill -HUP 36705
stop wakin’ me up...

状态管理?

在 Thread-based 服务器中,从read调用返回,程序可以直接从栈上知道 sd 是多少

1
2
int rc = read(fd, buffer, size);
rc = write(sd, buffer, size);

而在 Event-based 服务器中,从 aio_read 返回后,应当记录下在处理read事件时必要的信息(continuation),比如将套接字描述符(sd)记录在某种数据结构(例如哈希表)中,并由文件描述符(fd)索引。

aio_error显示成功读取后,事件处理器将使用 FD 来查找 continuation,这会将 sd 的值返回给调用者。最终,服务器可以完成最后一点工作,将数据写入套接字。

事件的其他问题

  1. 单核到多核将会有多个事件处理器同步运行的情况,会产生同步问题。不再可能进行无锁的简单事件处理。

  2. 不能与某些类型的系统活动(例如分页)很好地集成。例如,如果事件处理器发生Page Fault,这就会导致阻塞,因此服务器在Page Fault完成处理之前会一直阻塞。

  3. 随着各种程序的确切语义发生变化,基于事件的代码可能很难管理超时。例如,如果例程从非阻塞更改为阻塞,则调用该例程的事件处理程序也必须通过将自身分成两部分来进行更改以适应其新性质。由于阻塞对基于事件的服务器来说是灾难性的,因此程序员必须始终留意每个事件使用的 API 语义中的此类变化。

  4. 异步磁盘 I/O 并未实现与异步网络 I/O 完全集成。例如,虽然人们只想使用 select() 接口来管理所有未完成的 I/O,但通常需要用于网络的 select() 和用于磁盘 I/O 的 AIO 调用的某种组合。

Monitor(监视器锁)

Semaphore & Monitor

  • **信号量(Semaphere)**:操作系统提供的一种协调共享资源访问的方法。和用软件实现的同步比较,软件同步是平等线程间的的一种同步协商机制,不能保证原子性。而信号量则由操作系统进行管理,地位高于进程,操作系统保证信号量的原子性。
  • **管程(Monitor)**:解决信号量在临界区的 PV 操作上的配对的麻烦,把配对的 PV 操作集中在一起,生成的一种并发编程方法。其中使用了条件变量这种同步机制。

所谓管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。翻译为 Java 领域的语言,就是管理类的成员变量和成员方法,让这个类是线程安全的。一般采用 Mesa Semantic

说明: 信号量将共享变量 S 封装起来,对共享变量 S 的所有操作都只能通过 PV 操作进行,这是不是和面向对象的思想是不是很像呢?事实上,封装共享变量是并发编程的常用手段。

在信号量中,当 P 操作无法获取到锁时,将当前线程添加到**同步队列(syncQueue)中。当其余线程 V 释放锁时,从同步队列中唤醒等待线程。但当有多个条件通过信号量 PV 配对时会异常复杂,所以管程中引入了等待队列(waitQueue)**的概念,进一步封装这些复杂的操作。

在用信号量实现的阻塞队列中,为了实现阻塞队列的功能,即等待-通知(wait-notify),除了使用互斥锁 mutex 外,还需要两个判断队满和队空的资源信号量 full 和 empty,使用起来不仅复杂,还容易出错。管程在信号量的基础上,更进一步,增加了条件同步,对多个条件变量使用多个等待队列,将上述复杂的操作封装起来: wait() notifyAll() notify()

API Usage

While Loop wait()

img

MESA 管程里面,Thread2 通知完 T1 后,T2 还是会接着执行,T1 并不立即执行,仅仅是从条件变量的等待队列进到入口等待队列里面。这样做的好处是 notify() 不用放到代码的最后,T2 也没有多余的阻塞唤醒操作。但是也有个副作用,就是当 T1 再次执行的时候,可能曾经满足的条件现在已经不满足了,所以需要以while循环方式检验条件变量。

notify() or notifyAll() ?什么时候可以使用 notify() 呢?需要满足以下三个条件:

  1. 所有等待线程拥有相同的等待条件;
  2. 所有等待线程被唤醒后,执行相同的操作;
  3. 只需要唤醒一个线程。

notify() 一般只适用于只有一个条件变量的情况,生产者和消费者等待在同一个条件上会导致错误唤醒同类,造成死锁。notifyAll() 类似于 pthread_cond_broadcast() 都可以用于唤醒多个等待在同一个条件变量上的线程。重点是 while 里面的等待条件是完全相同的。

BlockingQueue: Producer/Consumer

Condition 能够更细粒度地进行编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class BlockedQueue<T> {
final Lock lock = new ReentrantLock();
// 条件变量:队列不满
final Condition notFull = lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty = lock.newCondition();
// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满) {
// 等待队列不满
notFull.await();
}
// add x to queue
// 入队后,通知可出队
notEmpty.signal();
} finally {
lock.unlock();
}
}

// 出队
void deq() {
lock.lock();
try {
while (队列已空) {
// 等待队列不空
notEmpty.await();
}
// remove the first element from queue
// 出队后,通知可入队
notFull.signal();
} finally {
lock.unlock();
}
}
}

AQS & synchronized

JUC AQS 就是基于管程实现的,内部包含两个队列,一个是同步队列,一个是等待队列:

  1. 同步队列:锁被占用时,会将该线程添加到同步队列中。当锁释放后,会从队列中唤醒一个线程,又分为公平和非公平两种。
  2. 等待队列:当调用 await 时,会将该线程添加到等待队列中。当其它线程调用 notify 时,会将该线程从等待队列移动到同步队列中,重新竞争锁。

synchronized 也是基于管程实现的,核心的数据结构见 ObjectMonitor。AQS 和 synchronized 都是管程 MESA 模型在 Java 中的应用。一切都套路,有章可循。