屏障(barrier)是用户协调多个线程并行工作的同步机制。

屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后从该点继续执行。

pthread_join是一种特殊的屏障,允许一个线程等待,直到另一个线程退出。

初始化

屏障说明
数据类型pthread_barrier_t
动态分配pthread_barrier_init
释放变量pthread_barrier_destroy
#include <pthread.h>
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
   const pthread_barrierattr_t *restrict attr, unsigned count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
// 成功返回0,失败返回错误编号

使用函数pthread_barrier_init初始化屏障时,count参数指定在允许所有线程继续运行之前,必须到达屏障的线程数目

等待

线程已经完成工作后,可以调用pthread_barrier_wait函数进行阻塞,然后等待其他线程到达屏障。

#include <pthread.h>
int pthread_barrier_wait(pthread_barrier_t *barrier);

// 成功返回0或者PTHREAD_BARRIER_SERIAL_THREAD(其中任意一个线程),失败返回错误编号

调用pthread_barrier_wait的线程在屏障计数未满足条件时,会进入休眠状态。

最后一个线程调用pthread_barrier_wait函数时,满足了屏障计数,所有的线程都会被唤醒。

其中只有一个线程的返回值是PTHREAD_BARRIER_SERIAL_THREAD,其他线程返回值为0。

到达屏障计数后,屏障可以重用,但是修改屏障技术必须要先pthread_barrier_destroypthread_barrier_init

实例

该实例中,使用8个线程对800w个数进行排序。每个线程分别使用快速排序对100w个数进行排序,然后主线程对排序结果进行合并。

采用pthread_barrier_wait对8个线程以及主线程之间进行同步,每个线程执行完任务之后调用pthread_barrier_wait阻塞以等待其他线程。

这里已经使用主线程来合并排序结果,因而并不需要使用返回值为PTHREAD_BARRIER_SERIAL_THREAD来决定哪个线程做这个任务。

#include <limits.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#define NTHR 8                // 工作线程数
#define NUMNUM 8000000L       // 排序的数量
#define TNUM (NUMNUM / NTHR)  // 每个线程排序的数
static long nums[NUMNUM];
static long snums[NUMNUM];

static pthread_barrier_t b;

static int complong(const void* arg1, const void* arg2)
{
    long l1 = *(long*)arg1;
    long l2 = *(long*)arg2;
    return l1 == l2 ? 0 : (l1 > l2 ? 1 : -1);
}
static void* thr_fn(void* arg)
{
    long idx = (long)arg;

    qsort(&nums[idx], TNUM, sizeof(long), complong);
    pthread_barrier_wait(&b);  // 屏障允许每个线程等待,直到到达屏障计数
    return ((void*)0);
}

// 多线程排序完成之后,产生NTHR个有序数组,该函数合并数组
static void merge()
{
    long idx[NTHR];  // 每个有序数组的起始索引
    long i, minidx, sidx, num;
    for (i = 0; i < NTHR; i++)
        // 初始化有序数组的索引 0~TNUM-1, TNUM~2TNUM-1...
        idx[i] = i * TNUM;

    // 从每个有序数组的起始位置开始,不断获取最小值,存放到snums中。
    // 值最小的有序数组向后移动一位
    for (sidx = 0; sidx < NUMNUM; sidx++)
    {
        num = LONG_MAX;
        for (i = 0; i < NTHR; i++)
        {
            // 当前数组的索引不能超过下一个数组的开始
            if ((idx[i] < (i + 1) * TNUM) && (nums[idx[i]] < num))
            {
                // num变量保存最小值
                num = nums[idx[i]];
                // minidx指定的有序数组的值最小
                minidx = i;
            }
        }
        // 将最小值存放到结果数组snums中
        // snums[sidx] = nums[idx[minidx]];
        snums[sidx] = num;
        // minidx指定的有序数组向后移动一位
        idx[minidx]++;
    }
}

int main()
{
    unsigned long i = 0;
    pthread_t tid;
    int err = 0;

    struct timeval start, end;
    long long startusec, endusec;
    double elapsed;

    srandom(1);
    for (i = 0; i < NUMNUM; i++)
    {
        nums[i] = random();
    }

    gettimeofday(&start, NULL);
    pthread_barrier_init(&b, NULL, NTHR + 1);  // 包括主线程
    for (i = 0; i < NTHR; i++)
    {
        err = pthread_create(&tid, NULL, thr_fn, (void*)(i * TNUM));
        if (err != 0)
            exit(-1);
    }
    pthread_barrier_wait(&b);
    merge();
    gettimeofday(&end, NULL);

    startusec = start.tv_sec * 1000000 + start.tv_usec;
    printf("%lld\n", startusec);
    endusec = end.tv_sec * 1000000 + end.tv_usec;
    printf("%lld\n", endusec);
    elapsed = (double)(endusec - startusec) / 1000000.0;
    printf("sort took %.4f seconds\n", elapsed);
    FILE* fp = fopen("/tmp/output", "w");
    if (fp != NULL)
    {
        for (long i = 0; i < NUMNUM; i++)
        {
            fprintf(fp, "%ld\n", snums[i]);
        }
        fclose(fp);
    }
}
// $ ./pthread_barrier_demo
// 1703518722972872
// 1703518723851445
// sort took 0.8786 seconds