线程同步之读写锁
读写锁也称为共享互斥锁,具有3种状态:读模式下的加锁状态、写模式下的加锁状态、不加锁状态。
一次仅有一个线程可以占有写模式下的读写锁,但是多个线程可以同时占有读模式下的读写锁。
读写锁非常适合于对数据结构读的次数远大于写的情况。
与互斥量相比,读写锁(reader-writer lock)运行更高的并行性。
读写锁
- 当读写锁是写加锁状态时,所有试图对这个锁加锁的线程都会被阻塞。
- 当读写锁是读加锁状态时,所有试图以读模式对其进行加锁的线程都可以访问,但是任何以写模式进行加锁的线程都会阻塞,直到所有线程释放它们的读锁为止。
当读写锁处于读模式锁住状态,并且有一个线程试图以写模式获取锁时,读写锁会阻塞其后的读模式锁请求。这是为了避免读模式锁长期占用,而等待的写模式锁请求得不到满足。
读写锁也称为共享互斥锁。
读写锁的创建与销毁
读写锁的数据结构名为pthread_rwlock_t
,可以使用以下两个接口来创建和销毁:
#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
// Return Value
// If successful, the pthread_rwlock_destroy() and pthread_rwlock_init() functions shall return zero;
// otherwise, an error number shall be returned to indicate the error.
可以使用pthread_rwlock_init
接口初始化读写锁,该接口会在堆内存中分配读写锁数据结构。如果仅使用默认属性,attr
参数可为NULL
。
如果仅使用默认属性,也可以静态分配读写锁,并使用PTHREAD_RWLOCK_INITIALIZE
常量来赋值。
如果使用pthread_rwlock_init
接口为读写锁分配了资源,那么最后就需要调用pthread_rwlock_destroy
接口来释放这些资源。
获取与释放读写锁
要在读模式下锁定读写锁,可以调用pthread_rwlock_rdlock
接口。
要在写模式下锁定读写锁,可以调用pthread_rwlock_wrlock
接口。
无论以何种方式锁定读写锁,都可以调用pthread_rwlock_unlock
接口来解锁。
#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
// 返回值
// 所有接口在调用成功之后返回0,失败则返回错误编号
部分实现可能会对共享模式获取读写锁的次数进行了限制,因此要实时判断接口返回值。
以下两个接口可以用来判断是否可以获取锁:
#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
// 返回值
// 所有接口在调用成功之后返回0,失败则返回错误编号
可以获取锁时,返回0,否则返回错误EBUSY
。
实例
该实例中,作业请求队列有单个读写锁保护,多个工作线程获取单个主线程分配给其的作业。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
typedef struct job
{
// 双向队列
struct job* jNext;
struct job* jPrev;
pthread_t jID; // 用于指定哪个线程执行任务
int iRand; // 任务标识
} JOB_T;
// 任务队列
typedef struct queue
{
struct job* qHead;
struct job* qTail;
pthread_rwlock_t qLock; // 读写锁,用于保证多线程同步
} QUEUE_T;
static QUEUE_T* pQueue = NULL;
static int queue_init(QUEUE_T* pQueue)
{
int iRet = -1;
if (pQueue == NULL)
return iRet;
pQueue->qHead = NULL;
pQueue->qTail = NULL;
if (pthread_rwlock_init(&pQueue->qLock, NULL) != 0)
return iRet;
iRet = 0;
return iRet;
}
static void queue_destroy(QUEUE_T* pQueue)
{
if (pQueue != NULL)
{
pthread_rwlock_destroy(&pQueue->qLock);
}
}
static void job_insert(QUEUE_T* pQueue, JOB_T* pJob)
{
pthread_rwlock_wrlock(&pQueue->qLock);
pJob->jNext = pQueue->qHead;
pJob->jPrev = NULL;
if (pQueue->qHead != NULL)
pQueue->qHead->jPrev = pJob; // 双向队列,当前队列头的前指针指向新任务结点
else
pQueue->qTail = pJob; // 队列为空
pQueue->qHead = pJob;
pthread_rwlock_unlock(&pQueue->qLock);
}
static void job_append(QUEUE_T* pQueue, JOB_T* pJob)
{
pthread_rwlock_wrlock(&pQueue->qLock);
pJob->jNext = NULL;
pJob->jPrev = pQueue->qTail;
if (pQueue->qTail != NULL)
pQueue->qTail->jNext = pJob;
else
pQueue->qHead = pJob; // 队列为空
pQueue->qTail = pJob;
pthread_rwlock_unlock(&pQueue->qLock);
}
static void job_remove(QUEUE_T* pQueue, JOB_T* pJob)
{
pthread_rwlock_wrlock(&pQueue->qLock);
if (pJob == pQueue->qHead)
{
pQueue->qHead = pJob->jNext;
if (pJob == pQueue->qTail)
// 当只有一个任务结点时
pQueue->qTail = NULL;
else
pJob->jNext->jPrev = pJob->jPrev; // 下一个任务结点的前指针指向其任务结点
}
else if (pJob == pQueue->qTail)
{
pQueue->qTail = pJob->jPrev;
pJob->jPrev->jNext = pJob->jNext; // 可能多余,又不是循环队列
}
else
{
pJob->jPrev->jNext = pJob->jNext;
pJob->jNext->jPrev = pJob->jPrev;
}
pthread_rwlock_unlock(&pQueue->qLock);
}
static JOB_T* job_find(QUEUE_T* pQueue, pthread_t tid)
{
if (pthread_rwlock_rdlock(&pQueue->qLock) != 0)
return NULL;
JOB_T* pJob = NULL;
for (pJob = pQueue->qHead; pJob != NULL; pJob = pJob->jNext)
if (pthread_equal(pJob->jID, tid))
break;
pthread_rwlock_unlock(&pQueue->qLock);
return pJob;
}
static void print_rand(const char* s, pthread_t tid)
{
int iCount = 0;
while (1)
{
JOB_T* pJob = job_find(pQueue, tid);
if (pJob != NULL)
{
printf("%s: %d\n", s, pJob->iRand);
job_remove(pQueue, pJob);
iCount = 0;
}
else
iCount++;
if (iCount == 3) // 连续3次查询都无该线程的任务,则退出
break;
sleep(1);
}
}
static void* start_rtn1(void* arg)
{
pthread_t tid = pthread_self();
print_rand("thread1", tid);
pthread_exit(NULL);
}
static void* start_rtn2(void* arg)
{
pthread_t tid = pthread_self();
print_rand("thread2", tid);
pthread_exit(NULL);
}
static void* start_rtn3(void* arg)
{
pthread_t tid = pthread_self();
print_rand("thread3", tid);
pthread_exit(NULL);
}
int main()
{
pQueue = (QUEUE_T*)malloc(sizeof(QUEUE_T));
if (pQueue != NULL)
{
memset(pQueue, 0x00, sizeof(QUEUE_T));
if (queue_init(pQueue) != 0)
{
printf("queue init failed\n");
return 0;
}
pthread_t tid1, tid2, tid3;
void* tret;
int err;
err = pthread_create(&tid1, NULL, start_rtn1, NULL);
if (err != 0)
printf("can't create thread1\n");
err = pthread_create(&tid2, NULL, start_rtn2, NULL);
if (err != 0)
printf("can't create thread2\n");
err = pthread_create(&tid3, NULL, start_rtn3, NULL);
if (err != 0)
printf("can't create thread3\n");
// 生成10个随机任务
srand((unsigned)time(NULL)); // 初始化随机数
for (int i = 0; i < 10; i++)
{
JOB_T* pJob = (JOB_T*)malloc(sizeof(JOB_T));
if (pJob != NULL)
{
memset(pJob, 0x00, sizeof(JOB_T));
int iRand = rand() % 100;
// 模3为0,分配给线程3处理
if (iRand % 3 == 0)
pJob->jID = tid3;
else if (iRand % 3 == 1)
pJob->jID = tid1;
else
pJob->jID = tid2;
pJob->iRand = iRand;
pJob->jNext = NULL;
pJob->jPrev = NULL;
printf("job append rand=%d thread%d\n", pJob->iRand, iRand % 3 == 0 ? 3 : iRand % 3);
job_append(pQueue, pJob);
}
}
err = pthread_join(tid1, &tret);
if (err != 0)
printf("can't join with thread1\n");
printf("thread1 exit code %ld\n", (long)tret);
err = pthread_join(tid2, &tret);
if (err != 0)
printf("can't join with thread2\n");
printf("thread2 exit code %ld\n", (long)tret);
err = pthread_join(tid3, &tret);
if (err != 0)
printf("can't join with thread3\n");
printf("thread3 exit code %ld\n", (long)tret);
queue_destroy(pQueue);
free(pQueue);
}
}
// job append rand=22 thread1
// job append rand=3 thread3
// job append rand=95 thread2
// job append rand=70 thread1
// job append rand=78 thread3
// job append rand=68 thread2
// job append rand=74 thread2
// job append rand=50 thread2
// job append rand=7 thread1
// job append rand=49 thread1
// thread3: 3
// thread3: 78
// thread2: 95
// thread1: 22
// thread1: 70
// thread2: 68
// thread1: 7
// thread2: 74
// thread1: 49
// thread2: 50
// thread1 exit code 0
// thread2 exit code 0
// thread3 exit code 0
在这个例子中,凡是需要想队列中增加或删除作业的时候,都采用了写模式来锁住队列的读写锁,不允许多个线程并发地向队列中添加或删除作业。
不管何时搜索队列,都需要获取读模式下的读写锁,允许所有线程并发地搜索队列。
同时,工作线程只能从队列中读取与它们线程ID匹配的作业并进行处理,处理完毕之后将作业从队列中删除。
属性
读写锁支持的唯一属性是进程共享属性。
超时读写锁
带有超时的读写锁的接口目的是为了避免线程在获取读写锁时陷入永久阻塞状态,可以使用以下两个接口在指定的时间范围内获取读写锁:
#include <pthread.h>
#include <time.h>
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr);
tsptr
指的是绝对时间,即当前时间+延时时间。
下面的例子给出了延时10s无法获取到写模式锁即返回的情况。
#include <pthread.h>
#include <stdio.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
static pthread_rwlock_t pLock = PTHREAD_RWLOCK_INITIALIZER;
void* start_rtn1(void* arg)
{
struct timespec tsp;
tsp.tv_sec = time(NULL) + 10;
tsp.tv_nsec = 0;
printf("thread1 time: %ld\n", time(NULL));
if (pthread_rwlock_timedwrlock(&pLock, &tsp) == 0)
{
printf("thread1 got lock\n");
// 获取锁之后休眠30s
sleep(30);
pthread_rwlock_unlock(&pLock);
}
else
printf("thread1 can't get lock\n");
printf("thread1 return\n");
printf("thread1 time: %ld\n", time(NULL));
return ((void*)1);
}
void* start_rtn2(void* arg)
{
struct timespec tsp;
tsp.tv_sec = time(NULL) + 10;
tsp.tv_nsec = 0;
printf("thread2 time: %ld\n", time(NULL));
if (pthread_rwlock_timedwrlock(&pLock, &tsp) == 0)
{
printf("thread2 got lock\n");
// 获取锁之后休眠30s
sleep(30);
pthread_rwlock_unlock(&pLock);
}
else
printf("thread2 can't get lock\n");
printf("thread2 exit\n");
printf("thread2 time: %ld\n", time(NULL));
pthread_exit((void*)2);
}
int main()
{
int err;
pthread_t tid1, tid2;
void* tret;
err = pthread_create(&tid1, NULL, start_rtn1, NULL);
if (err != 0)
printf("can't create thread1\n");
err = pthread_create(&tid2, NULL, start_rtn2, NULL);
if (err != 0)
printf("can't create thread2\n");
err = pthread_join(tid1, &tret); // 传递二维指针,为了接收指针值,类似于char**
if (err != 0)
printf("can't join with thread1\n");
printf("thread1 exit code %ld\n", (long)tret);
err = pthread_join(tid2, &tret);
if (err != 0)
printf("can't join with thread2\n");
printf("thread2 exit code %ld\n", (long)tret);
return 0;
}
// thread1 time: 1697721371
// thread1 got lock
// thread2 time: 1697721371
// thread2 can't get lock
// thread2 exit
// thread2 time: 1697721380
// thread1 return
// thread1 time: 1697721401
// thread1 exit code 1
// thread2 exit code 2
在例子中,thread1
获取到锁之后休眠了30s,导致thread2
一直没有获取到锁,超过等待时间10s则返回退出。
- 原文作者:生如夏花
- 原文链接:https://blduan.top/post/%E8%AF%BB%E4%B9%A6%E7%AC%94%E8%AE%B0/apue/%E7%BA%BF%E7%A8%8B%E5%90%8C%E6%AD%A5%E4%B9%8B%E8%AF%BB%E5%86%99%E9%94%81/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。