线程条件变量可以是线程以非竞争的方式同步执行

线程条件变量支持两个属性:进程共享时钟属性

进程共享属性可以使条件变量被多进程的线程使用。

时钟属性控制计算pthread_cond_timedwait()函数的超时参数(tsptr)时采用哪个时钟。

属性介绍

数据类型:pthread_condattr_t

初始化与反初始化函数如下:

#include <pthread.h>

int pthread_condattr_init(pthread_condattr_t* attr);
int pthread_condattr_destroy(pthread_condattr_t* attr);
// 返回值:成功返回0,失败返回错误编号

进程共享属性

与其他同步对象的进程共享属性一致。

该属性控制着条件变量是可以被单进程的多个线程使用,还是可以被多进程的线程使用。

下面两个函数用于设置或获取进程共享属性值。

#include <pthread.h>

int pthread_condattr_getpshared(const pthread_condattr_t* restrict attr, int* restrict pshared);
int pthread_condattr_setpshared(pthread_condattr_t* attr, int pshared);
// 返回值:成功返回0,失败返回错误编号

可取值为PTHREAD_PROCESS_PRIVATEPTHREAD_PROCESS_SHARED,默认值为PTHREAD_PROCESS_PRIVATE

示例

在下面的实例中,父子进程之间通过互斥量与条件变量进行无竞争的同步行为,即对全局计数器进行加法操作。

当子进程检测到计数器的值大于2时,发送信号给父进程,父子进程开始竞争处理。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

// 该示例主要用于两个进程以非竞争的方式对计数器counter进行加法操作

#define SHARED_MEMORY_KEY 12345
typedef struct
{
    int counter;            // 计数器
    int condition;          // 条件,主线程在条件满足时延迟执行函数
    pthread_mutex_t mutex;  // 用于保护条件与计数器
    pthread_cond_t cond;
} context_t;

static context_t* pContext = NULL;
static void* childProcess(void* arg)
{
    context_t* pContext = (context_t*)arg;

    // 加锁
    pthread_mutex_lock(&(pContext->mutex));
    printf("Child process: Counter before increment: %d\n", pContext->counter);
    // 增加计数器
    pContext->counter++;
    printf("Child process: Counter after increment: %d\n", pContext->counter);
    if (pContext->counter >= 2 && pContext->condition == 0)
    {
        pContext->condition = 1;
        pthread_cond_signal(&pContext->cond);
    }
    // 解锁
    pthread_mutex_unlock(&(pContext->mutex));

    return NULL;
}
int main()
{
    int err;
    // 设置共享内存,保证互斥量与条件变量能够被多个进程访问
    int sharedMemoryId;
    // 创建共享内存
    sharedMemoryId = shmget(SHARED_MEMORY_KEY, sizeof(context_t), IPC_CREAT | 0666);
    if (sharedMemoryId < 0)
    {
        perror("shmget");
        exit(-1);
    }
    // 连接到共享内存
    pContext = shmat(sharedMemoryId, NULL, 0);
    if (pContext == (void*)-1)
    {
        perror("shmat");
        exit(-1);
    }

    // 设置互斥量与条件变量都为进程共享
    pthread_mutexattr_t mutexAttr;
    if ((err = pthread_mutexattr_init(&mutexAttr)) != 0)
    {
        printf("pthread_mutexattr_init failed, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED)) != 0)
    {
        printf("can't set recursive type, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_mutex_init(&pContext->mutex, &mutexAttr)) != 0)
    {
        printf("can't create recursive mutex, %s\n", strerror(err));
        exit(-1);
    }
    pthread_condattr_t condAttr;
    if ((err = pthread_condattr_init(&condAttr)) != 0)
    {
        printf("pthread_condattr_init failed, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED)) != 0)
    {
        printf("can't set cond process shared, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_cond_init(&pContext->cond, &condAttr)) != 0)
    {
        printf("can't create cond, %s\n", strerror(err));
        exit(-1);
    }

    // 初始化计数器
    pContext->counter = 0;
    pContext->condition = 0;

    // 创建子进程
    pid_t pid = fork();
    if (pid < 0)
    {
        perror("fork");
        exit(-1);
    }

    if (pid == 0)
    {
        // 子进程
        int i = 0;
        while (i < 5)
        {
            childProcess(pContext);
            i++;
            sleep(1);
        }

        // 分离共享内存
        shmdt(pContext);
    }
    else if (pid > 0)
    {
        int i = 0;
        while (i < 5)
        {
            // 加锁
            pthread_mutex_lock(&(pContext->mutex));
            while (!pContext->condition)
                // 条件不满足阻塞
                pthread_cond_wait(&pContext->cond, &pContext->mutex);
            printf("Parent process: Counter before increment: %d\n", pContext->counter);
            pContext->counter++;
            printf("Parent process: Counter after increment: %d\n", pContext->counter);
            // 解锁
            pthread_mutex_unlock(&(pContext->mutex));

            i++;
            sleep(1);
        }

        // 等待子进程结束
        wait(NULL);
        // 分离共享内存
        shmdt(pContext);
        // 删除共享内存
        shmctl(sharedMemoryId, IPC_RMID, NULL);
    }
    return 0;
}
// 执行结果如下:
// Child process: Counter before increment: 0
// Child process: Counter after increment: 1
// Child process: Counter before increment: 1
// Child process: Counter after increment: 2
// Parent process: Counter before increment: 2
// Parent process: Counter after increment: 3
// Child process: Counter before increment: 3
// Child process: Counter after increment: 4
// Parent process: Counter before increment: 4
// Parent process: Counter after increment: 5
// Parent process: Counter before increment: 5
// Parent process: Counter after increment: 6
// Child process: Counter before increment: 6
// Child process: Counter after increment: 7
// Parent process: Counter before increment: 7
// Parent process: Counter after increment: 8
// Child process: Counter before increment: 8
// Child process: Counter after increment: 9
// Parent process: Counter before increment: 9
// Parent process: Counter after increment: 10

从结果中看,前两次加法操作是子进程来执行的,之后父子进程进行竞争。

时钟属性

时钟属性控制着计算pthread_cond_timedwait()函数的超时参数tsptr时采用哪个时钟。

可以使用下面两个函数获取或设置时钟属性:

#include <pthread.h>

int pthread_condattr_getclock(const pthread_condattr_t* restrict attr, clockid_t* restrict clock_id);
int pthread_condattr_setclock(pthread_condattr_t* attr, clockid_t clock_id);
// 返回值:成功返回0,失败返回错误编号

clock_id用于指定选用的时钟,可取值如下:

时钟名称含义
CLOCK_REALTIME实时系统时钟。系统启动时从CMOS上读取的RTC时间(自1970-01-01起经过的秒数、本秒经过的纳秒数),也指现实中的实际时间,可以修改并写入到RTC中(比如和网络时间同步,不是单调递增的)。
CLOCK_MONOTONIC单调时间。系统启动以后经过的秒数、纳秒数。
CLOCK_PROCESS_CPUTIME_ID本进程中从开始执行到当前位置时CPU花费的时间,不包括阻塞的时间(调用sleep等)。
CLOCK_THREAD_CPUTIME_ID本线程中从开始执行到当前位置时CPU花费的时间,不包括阻塞的时间(调用sleep等)。

示例1

统计线程执行到当前位置,CPU使用的时间:

#include <pthread.h>
#include <stdio.h>
#include <time.h>
#include <unistd.h>

void* thread_func(void* arg)
{
    struct timespec current_time;

    for (int i = 0; i < 1000000; i++)
        for (int j = 0; j < 100000; j++)
        {
        }
    // 获取线程执行当前位置的时间
    clock_gettime(CLOCK_THREAD_CPUTIME_ID, &current_time);
    printf("线程使用CPU时间:%ld秒 %ld 纳秒\n", current_time.tv_sec, current_time.tv_nsec);
    return NULL;
}

int main()
{
    pthread_t tid;
    // 创建线程
    pthread_create(&tid, NULL, thread_func, NULL);
    // 等待线程结束
    pthread_join(tid, NULL);
    return 0;
}
// 线程使用CPU时间:138秒 951065179 纳秒

示例2

在下面的例子中,父子进程通过互斥量与条件变量来保证以非竞争的方式对全局计数器进行加法操作。

父进程中设置等待超时时间为1s,如果超过1s没有收到信号,则自动恢复条件并不再等待,之后父子进程开始在互斥量的保护下竞争操作。

#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>

// 该示例主要用于两个进程以非竞争的方式对计数器counter进行加法操作

#define SHARED_MEMORY_KEY 12345
typedef struct
{
    int counter;            // 计数器
    int condition;          // 条件,主线程在条件满足时延迟执行函数
    pthread_mutex_t mutex;  // 用于保护条件与计数器
    pthread_cond_t cond;
} context_t;

static context_t* pContext = NULL;
static void* childProcess(void* arg)
{
    context_t* pContext = (context_t*)arg;

    // 加锁
    pthread_mutex_lock(&(pContext->mutex));
    printf("Child process: Counter before increment: %d\n", pContext->counter);
    // 增加计数器
    pContext->counter++;
    printf("Child process: Counter after increment: %d\n", pContext->counter);
    if (pContext->counter >= 5 && pContext->condition == 0)
    {
        pContext->condition = 1;
        pthread_cond_signal(&pContext->cond);
    }
    // 解锁
    pthread_mutex_unlock(&(pContext->mutex));

    return NULL;
}
int main()
{
    int err;
    // 设置共享内存,保证互斥量与条件变量能够被多个进程访问
    int sharedMemoryId;
    // 创建共享内存
    sharedMemoryId = shmget(SHARED_MEMORY_KEY, sizeof(context_t), IPC_CREAT | 0666);
    if (sharedMemoryId < 0)
    {
        perror("shmget");
        exit(-1);
    }
    // 连接到共享内存
    pContext = shmat(sharedMemoryId, NULL, 0);
    if (pContext == (void*)-1)
    {
        perror("shmat");
        exit(-1);
    }

    // 设置互斥量与条件变量都为进程共享
    pthread_mutexattr_t mutexAttr;
    if ((err = pthread_mutexattr_init(&mutexAttr)) != 0)
    {
        printf("pthread_mutexattr_init failed, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED)) != 0)
    {
        printf("can't set recursive type, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_mutex_init(&pContext->mutex, &mutexAttr)) != 0)
    {
        printf("can't create recursive mutex, %s\n", strerror(err));
        exit(-1);
    }
    pthread_condattr_t condAttr;
    if ((err = pthread_condattr_init(&condAttr)) != 0)
    {
        printf("pthread_condattr_init failed, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED)) != 0)
    {
        printf("can't set cond process shared, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_condattr_setclock(&condAttr, CLOCK_REALTIME)) != 0)
    {
        printf("can't set cond clock, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_cond_init(&pContext->cond, &condAttr)) != 0)
    {
        printf("can't create cond, %s\n", strerror(err));
        exit(-1);
    }

    // 初始化计数器
    pContext->counter = 0;
    pContext->condition = 0;

    // 创建子进程
    pid_t pid = fork();
    if (pid < 0)
    {
        perror("fork");
        exit(-1);
    }

    if (pid == 0)
    {
        // 子进程
        int i = 0;
        while (i < 5)
        {
            childProcess(pContext);
            i++;
            sleep(1);
        }

        // 分离共享内存
        shmdt(pContext);
    }
    else if (pid > 0)
    {
        int i = 0;
        while (i < 5)
        {
            // 加锁
            pthread_mutex_lock(&(pContext->mutex));
            while (!pContext->condition)
            {
                struct timespec ts;
                clock_gettime(CLOCK_REALTIME, &ts);
                ts.tv_sec += 1;
                // 条件不满足的情况,最多等待1s
                if ((err = pthread_cond_timedwait(&pContext->cond, &pContext->mutex, &ts)) != 0)
                {
                    if (err == ETIMEDOUT)
                    {
                        // 等待1s,如果超时直接设置条件并不在等待
                        printf("pthread_cond_timedwait failed, %s\n", strerror(err));
                        pContext->condition = 1;
                        break;
                    }
                }
            }
            printf("Parent process: Counter before increment: %d\n", pContext->counter);
            pContext->counter++;
            printf("Parent process: Counter after increment: %d\n", pContext->counter);
            // 解锁
            pthread_mutex_unlock(&(pContext->mutex));

            i++;
            sleep(1);
        }

        // 等待子进程结束
        wait(NULL);
        // 分离共享内存
        shmdt(pContext);
        // 删除共享内存
        shmctl(sharedMemoryId, IPC_RMID, NULL);
    }
    return 0;
}
// 执行结果如下:
// Child process: Counter before increment: 0
// Child process: Counter after increment: 1
// Child process: Counter before increment: 1
// Child process: Counter after increment: 2
// pthread_cond_timedwait failed, Connection timed out
// Parent process: Counter before increment: 2
// Parent process: Counter after increment: 3
// Parent process: Counter before increment: 3
// Parent process: Counter after increment: 4
// Child process: Counter before increment: 4
// Child process: Counter after increment: 5
// Child process: Counter before increment: 5
// Child process: Counter after increment: 6
// Parent process: Counter before increment: 6
// Parent process: Counter after increment: 7
// Child process: Counter before increment: 7
// Child process: Counter after increment: 8
// Parent process: Counter before increment: 8
// Parent process: Counter after increment: 9
// Parent process: Counter before increment: 9
// Parent process: Counter after increment: 10

从结果中可以看出,子进程至少需要5s才可以发送信号给父进程,而父进程设置的超时等待时间为1s,因此pthread_cond_timedwait()函数会超时退出不再等待,并返回错误码ETIMEOUT