线程同步之屏障
屏障(barrier)是用户协调多个线程并行工作的同步机制。
屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后从该点继续执行。
pthread_join
是一种特殊的屏障,允许一个线程等待,直到另一个线程退出。
初始化
屏障 | 说明 |
---|---|
数据类型 | pthread_barrier_t |
动态分配 | pthread_barrier_init |
释放变量 | pthread_barrier_destroy |
#include <pthread.h>
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
const pthread_barrierattr_t *restrict attr, unsigned count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
// 成功返回0,失败返回错误编号
使用函数pthread_barrier_init
初始化屏障时,count
参数指定在允许所有线程继续运行之前,必须到达屏障的线程数目。
等待
线程已经完成工作后,可以调用pthread_barrier_wait
函数进行阻塞,然后等待其他线程到达屏障。
#include <pthread.h>
int pthread_barrier_wait(pthread_barrier_t *barrier);
// 成功返回0或者PTHREAD_BARRIER_SERIAL_THREAD(其中任意一个线程),失败返回错误编号
调用pthread_barrier_wait
的线程在屏障计数未满足条件时,会进入休眠状态。
最后一个线程调用pthread_barrier_wait
函数时,满足了屏障计数,所有的线程都会被唤醒。
其中只有一个线程的返回值是PTHREAD_BARRIER_SERIAL_THREAD
,其他线程返回值为0。
到达屏障计数后,屏障可以重用,但是修改屏障技术必须要先pthread_barrier_destroy
再pthread_barrier_init
。
实例
该实例中,使用8个线程对800w个数进行排序。每个线程分别使用快速排序对100w个数进行排序,然后主线程对排序结果进行合并。
采用pthread_barrier_wait
对8个线程以及主线程之间进行同步,每个线程执行完任务之后调用pthread_barrier_wait
阻塞以等待其他线程。
这里已经使用主线程来合并排序结果,因而并不需要使用返回值为PTHREAD_BARRIER_SERIAL_THREAD
来决定哪个线程做这个任务。
#include <limits.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#define NTHR 8 // 工作线程数
#define NUMNUM 8000000L // 排序的数量
#define TNUM (NUMNUM / NTHR) // 每个线程排序的数
static long nums[NUMNUM];
static long snums[NUMNUM];
static pthread_barrier_t b;
static int complong(const void* arg1, const void* arg2)
{
long l1 = *(long*)arg1;
long l2 = *(long*)arg2;
return l1 == l2 ? 0 : (l1 > l2 ? 1 : -1);
}
static void* thr_fn(void* arg)
{
long idx = (long)arg;
qsort(&nums[idx], TNUM, sizeof(long), complong);
pthread_barrier_wait(&b); // 屏障允许每个线程等待,直到到达屏障计数
return ((void*)0);
}
// 多线程排序完成之后,产生NTHR个有序数组,该函数合并数组
static void merge()
{
long idx[NTHR]; // 每个有序数组的起始索引
long i, minidx, sidx, num;
for (i = 0; i < NTHR; i++)
// 初始化有序数组的索引 0~TNUM-1, TNUM~2TNUM-1...
idx[i] = i * TNUM;
// 从每个有序数组的起始位置开始,不断获取最小值,存放到snums中。
// 值最小的有序数组向后移动一位
for (sidx = 0; sidx < NUMNUM; sidx++)
{
num = LONG_MAX;
for (i = 0; i < NTHR; i++)
{
// 当前数组的索引不能超过下一个数组的开始
if ((idx[i] < (i + 1) * TNUM) && (nums[idx[i]] < num))
{
// num变量保存最小值
num = nums[idx[i]];
// minidx指定的有序数组的值最小
minidx = i;
}
}
// 将最小值存放到结果数组snums中
// snums[sidx] = nums[idx[minidx]];
snums[sidx] = num;
// minidx指定的有序数组向后移动一位
idx[minidx]++;
}
}
int main()
{
unsigned long i = 0;
pthread_t tid;
int err = 0;
struct timeval start, end;
long long startusec, endusec;
double elapsed;
srandom(1);
for (i = 0; i < NUMNUM; i++)
{
nums[i] = random();
}
gettimeofday(&start, NULL);
pthread_barrier_init(&b, NULL, NTHR + 1); // 包括主线程
for (i = 0; i < NTHR; i++)
{
err = pthread_create(&tid, NULL, thr_fn, (void*)(i * TNUM));
if (err != 0)
exit(-1);
}
pthread_barrier_wait(&b);
merge();
gettimeofday(&end, NULL);
startusec = start.tv_sec * 1000000 + start.tv_usec;
printf("%lld\n", startusec);
endusec = end.tv_sec * 1000000 + end.tv_usec;
printf("%lld\n", endusec);
elapsed = (double)(endusec - startusec) / 1000000.0;
printf("sort took %.4f seconds\n", elapsed);
FILE* fp = fopen("/tmp/output", "w");
if (fp != NULL)
{
for (long i = 0; i < NUMNUM; i++)
{
fprintf(fp, "%ld\n", snums[i]);
}
fclose(fp);
}
}
// $ ./pthread_barrier_demo
// 1703518722972872
// 1703518723851445
// sort took 0.8786 seconds
- 原文作者:生如夏花
- 原文链接:https://blduan.top/post/%E8%AF%BB%E4%B9%A6%E7%AC%94%E8%AE%B0/apue/%E7%BA%BF%E7%A8%8B%E5%90%8C%E6%AD%A5%E4%B9%8B%E5%B1%8F%E9%9A%9C/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。