本文主要介绍了线程同步对象互斥量的相关属性,并通过代码验证了这些属性。

通过这些线程同步对象的属性可以控制不同线程在使用互斥量进行同步时的行为。

属性介绍

数据类型:pthread_mutexattr_t

初始化函数:

#include <pthread.h>

int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
// 返回值:成功返回0,失败返回错误编号

pthread_mutexattr_init函数使用默认互斥量属性值初始化attr参数。

以下是具体的三个互斥量属性:

进程共享属性

是否支持:通过_POSIX_PROCESS_SHARED_SC_THREAD_SHAREDsysconf参数)来检测。

可取值PTHREAD_PROCESS_PRIVATE(默认值)与PTHREAD_PROCESS_SHARED

作用:当多个进程需要同步访问数据(比如共享内存中数据)时,可以将同一互斥量(即共享内存中分配的互斥量)的属性设置为PTHREAD_PROCESS_SHARED,就可以在多个进程之间进行数据同步访问。

函数pthread_mutexattr_getpsharedpthread_mutexattr_setpshared函数用于获取与修改该属性。

#include <pthread.h>

int pthread_mutexattr_getpshared(
                     const pthread_mutexattr_t *restrict attr,
                     int *restrict pshared);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr,
                     int pshared);
// 返回值:成功返回0,失败返回错误编号

示例

在下面的示例中,父子进程使用共享内存中的互斥量mutex来对共享内存中的参数counter进行同步递增操作

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SHARED_MEMORY_KEY 1234
#define SHARED_MEMORY_SIZE 1024

typedef struct
{
    pthread_mutex_t mutex;  // 互斥量保证父子进程之间的同步
    int counter;            // 父子进程之间对该变量进行同步+1
} SharedData;               // 共享内存数据

void* childProcess(void* arg)
{
    SharedData* sharedData = (SharedData*)arg;

    // 加锁
    pthread_mutex_lock(&(sharedData->mutex));

    printf("Child process: Counter before increment: %d\n", sharedData->counter);

    // 增加计数器
    sharedData->counter++;

    printf("Child process: Counter after increment: %d\n", sharedData->counter);

    // 解锁
    pthread_mutex_unlock(&(sharedData->mutex));

    return NULL;
}

int main()
{
    int sharedMemoryId;
    SharedData* sharedData;

    // 创建共享内存
    sharedMemoryId = shmget(SHARED_MEMORY_KEY, sizeof(SharedData), IPC_CREAT | 0666);
    if (sharedMemoryId < 0)
    {
        perror("shmget");
        exit(1);
    }

    // 连接到共享内存
    sharedData = shmat(sharedMemoryId, NULL, 0);
    if (sharedData == (void*)-1)
    {
        perror("shmat");
        exit(1);
    }

    // 初始化互斥量
    pthread_mutexattr_t mutexAttr;
    pthread_mutexattr_init(&mutexAttr);

    // 设置互斥量的进程共享属性,使其能够在父子进程之间同步
    pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
    pthread_mutex_init(&(sharedData->mutex), &mutexAttr);

    // 初始化计数器
    sharedData->counter = 0;

    // 创建子进程
    pid_t pid = fork();
    if (pid < 0)
    {
        perror("fork");
        exit(1);
    }

    if (pid == 0)
    {
        // 子进程
        int i = 0;
        while (i < 10)
        {
            childProcess(sharedData);
            i++;

            sleep(1);
        }

        // 分离共享内存
        shmdt(sharedData);
    }
    else
    {
        int i = 0;
        while (i < 10)
        {
            // 加锁
            pthread_mutex_lock(&(sharedData->mutex));

            printf("Parent process: Counter before increment: %d\n", sharedData->counter);

            sharedData->counter++;

            printf("Parent process: Counter after increment: %d\n", sharedData->counter);

            // 解锁
            pthread_mutex_unlock(&(sharedData->mutex));

            i++;

            sleep(1);
        }

        // 等待子进程结束
        wait(NULL);

        // 分离共享内存
        shmdt(sharedData);

        // 删除共享内存
        shmctl(sharedMemoryId, IPC_RMID, NULL);
    }

    return 0;
}
// 执行结果如下:
// Parent process: Counter before increment: 0
// Parent process: Counter after increment: 1
// Child process: Counter before increment: 1
// Child process: Counter after increment: 2
// Child process: Counter before increment: 2
// Child process: Counter after increment: 3
// Parent process: Counter before increment: 3
// Parent process: Counter after increment: 4
// Parent process: Counter before increment: 4
// Parent process: Counter after increment: 5
// Child process: Counter before increment: 5
// Child process: Counter after increment: 6
// Parent process: Counter before increment: 6
// Parent process: Counter after increment: 7
// Child process: Counter before increment: 7
// Child process: Counter after increment: 8
// Child process: Counter before increment: 8
// Child process: Counter after increment: 9
// Parent process: Counter before increment: 9
// Parent process: Counter after increment: 10
// Child process: Counter before increment: 10
// Child process: Counter after increment: 11
// Parent process: Counter before increment: 11
// Parent process: Counter after increment: 12
// Parent process: Counter before increment: 12
// Parent process: Counter after increment: 13
// Child process: Counter before increment: 13
// Child process: Counter after increment: 14
// Parent process: Counter before increment: 14
// Parent process: Counter after increment: 15
// Child process: Counter before increment: 15
// Child process: Counter after increment: 16
// Parent process: Counter before increment: 16
// Parent process: Counter after increment: 17
// Child process: Counter before increment: 17
// Child process: Counter after increment: 18
// Child process: Counter before increment: 18
// Child process: Counter after increment: 19
// Parent process: Counter before increment: 19
// Parent process: Counter after increment: 20

健壮属性

该属性与启用了进程共享属性的互斥量有关。

作用是当持有互斥量的进程意外终止时,解决互斥量状态恢复的问题,否则互斥量处于加锁状态,其他阻塞在这个锁的进程会一直继续阻塞下去。

取值含义
PTHREAD_MUTEX_STALLED默认值,持有互斥量的进程终止时不采取任何动作,其他阻塞在该互斥量解锁的进程会一致阻塞。
PTHREAD_MUTEX_ROBUST当锁被另一个进程持有并且在终止时未释放锁的情况下,互斥量就会处于不一致的状态,当前进程或线程调用pthread_mutex_lock获取锁时返回EOWNERDEAD而非0,从而得知这个情况。

如果使用PTHREAD_MUTEX_ROBUST属性的互斥量,那么pthread_mutex_lock()函数会有三种返回值分别是:EOWNERDEAD(加锁成功但需要恢复互斥量状态)、0(加锁成功不需要恢复)以及加锁失败

下面两个函数用于获取或者设置互斥量的健壮属性:

#include <pthread.h>
int pthread_mutexattr_getrobust(const pthread_mutexattr_t *attr,
                               int *robustness);
int pthread_mutexattr_setrobust(pthread_mutexattr_t *attr,
                              int robustness);
// 返回值:成功返回0,失败返回错误编号

如果已经使用了健壮的互斥量,那么当出现上面所述的问题时,又该如何恢复互斥量的状态呢?

线程可以调用pthread_mutex_consistent()函数,来恢复互斥量的状态,使其可以继续被使用。

#include <pthread.h>

int pthread_mutex_consistent(pthread_mutex_t *mutex);

当互斥量处于不一致的状态之后,先获取到互斥量的线程(返回值为EOWNERDEAD)没有恢复互斥量状态,而是直接解锁,那么就会导致后续获取互斥量的线程返回ENOTCOVERABLE错误,不能再恢复互斥量。换句话说,即恢复互斥量状态一致性的机会仅有一次

示例

下面的例子提供了3个线程,其中线程1在持有互斥量的情况终止,线程2与父线程分别演示恢复互斥量与不恢复互斥量状态的结果:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SHARED_MEMORY_KEY 1234
#define SHARED_MEMORY_SIZE 1024

typedef struct
{
    pthread_mutex_t mutex;  // 互斥量保证父子进程之间的同步
    int counter;            // 父子进程之间对该变量进行同步+1
} SharedData;               // 共享内存数据

void* childProcess1(void* arg)
{
    // 子进程1
    SharedData* sharedData = (SharedData*)arg;

    // 加锁
    pthread_mutex_lock(&(sharedData->mutex));

    printf("Child process1: Counter before increment: %d\n", sharedData->counter);
    // 增加计数器
    sharedData->counter++;
    printf("Child process1: Counter after increment: %d\n", sharedData->counter);

    // 当counter==5时,子进程再持有互斥锁的情况退出
    if (sharedData->counter >= 5)
    {
        printf("Child process1: exit\n");
        exit(-1);
    }

    // 解锁
    pthread_mutex_unlock(&(sharedData->mutex));
    return NULL;
}
void* childProcess2(void* arg)
{
    // 子进程2
    SharedData* sharedData = (SharedData*)arg;

    // 加锁
    int iRet = pthread_mutex_lock(&(sharedData->mutex));
    if (iRet == ENOTRECOVERABLE)
    {
        // 进程1退出之后,主进程先获取到锁,但未恢复互斥量
        printf("Child process2: mutex not recoverable\n");
        exit(-1);
    }
    else if (iRet == EOWNERDEAD)
    {
        // 解锁之前不恢复互斥量,会导致接下来其他进程也无法恢复
        pthread_mutex_unlock(&(sharedData->mutex));
        return NULL;

        printf("Child process2: recover mutex\n");
        pthread_mutex_consistent(&sharedData->mutex);
    }

    printf("Child process2: Counter before increment: %d\n", sharedData->counter);
    // 增加计数器
    sharedData->counter++;
    printf("Child process2: Counter after increment: %d\n", sharedData->counter);

    // 解锁
    pthread_mutex_unlock(&(sharedData->mutex));
    return NULL;
}

int main()
{
    int sharedMemoryId;
    SharedData* sharedData;

    // 创建共享内存
    sharedMemoryId = shmget(SHARED_MEMORY_KEY, sizeof(SharedData), IPC_CREAT | 0666);
    if (sharedMemoryId < 0)
    {
        perror("shmget");
        exit(1);
    }

    // 连接到共享内存
    sharedData = shmat(sharedMemoryId, NULL, 0);
    if (sharedData == (void*)-1)
    {
        perror("shmat");
        exit(1);
    }

    // 初始化互斥量
    pthread_mutexattr_t mutexAttr;
    pthread_mutexattr_init(&mutexAttr);

    // 设置互斥量的进程共享属性,使其能够在父子进程之间同步
    pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
    // 设置互斥量的健壮属性
    pthread_mutexattr_setrobust(&mutexAttr, PTHREAD_MUTEX_ROBUST);
    pthread_mutex_init(&(sharedData->mutex), &mutexAttr);

    // 初始化计数器
    sharedData->counter = 0;

    // 创建子进程1
    pid_t pid1 = fork();
    if (pid1 < 0)
    {
        perror("fork");
        exit(1);
    }

    if (pid1 == 0)
    {
        // 子进程1
        int i = 0;
        while (i < 5)
        {
            childProcess1(sharedData);
            i++;
            sleep(1);
        }
        // 分离共享内存
        shmdt(sharedData);
    }
    else
    {
        // 创建子进程1
        pid_t pid2 = fork();
        if (pid2 < 0)
        {
            perror("fork");
            exit(1);
        }
        if (pid2 == 0)
        {
            // 子进程2
            int i = 0;
            while (i < 5)
            {
                childProcess2(sharedData);
                i++;
                sleep(1);
            }
            // 分离共享内存
            shmdt(sharedData);
        }
        else
        {
            // 父进程
            int i = 0;
            while (i < 5)
            {
                // 父进程检测返回值,判断是否需要恢复互斥量
                int iRet = pthread_mutex_lock(&(sharedData->mutex));
                if (iRet == ENOTRECOVERABLE)
                {
                    // 进程1退出之后,进程2先获取到锁,但未恢复互斥量
                    printf("Parent process: mutex not recoverable\n");
                    exit(-1);
                }
                else if (iRet == EOWNERDEAD)
                {
                    // 解锁之前不恢复互斥量,会导致接下来其他进程也无法恢复
                    pthread_mutex_unlock(&(sharedData->mutex));
                    break;

                    printf("Parent process: recover mutex\n");
                    pthread_mutex_consistent(&sharedData->mutex);
                }

                printf("Parent process: Counter before increment: %d\n", sharedData->counter);
                sharedData->counter++;
                printf("Parent process: Counter after increment: %d\n", sharedData->counter);

                // 解锁
                pthread_mutex_unlock(&(sharedData->mutex));

                i++;
                sleep(1);
            }

            // 回收两个子进程资源
            wait(NULL);
            wait(NULL);
            // 分离共享内存
            shmdt(sharedData);
            // 删除共享内存
            shmctl(sharedMemoryId, IPC_RMID, NULL);
        }
    }

    return 0;
}
// 进程1持有互斥量退出之后,执行结果如下:
// 进程1执行了3次递加操作,因此总数为3+5+5=13
// Parent process: Counter before increment: 0
// Parent process: Counter after increment: 1
// Child process2: Counter before increment: 1
// Child process2: Counter after increment: 2
// Child process1: Counter before increment: 2
// Child process1: Counter after increment: 3
// Child process1: Counter before increment: 3
// Child process1: Counter after increment: 4
// Child process2: Counter before increment: 4
// Child process2: Counter after increment: 5
// Parent process: Counter before increment: 5
// Parent process: Counter after increment: 6
// Parent process: Counter before increment: 6
// Parent process: Counter after increment: 7
// Child process2: Counter before increment: 7
// Child process2: Counter after increment: 8
// Child process1: Counter before increment: 8
// Child process1: Counter after increment: 9
// Child process1: exit
// Parent process: recover mutex
// Parent process: Counter before increment: 9
// Parent process: Counter after increment: 10
// Child process2: Counter before increment: 10
// Child process2: Counter after increment: 11
// Parent process: Counter before increment: 11
// Parent process: Counter after increment: 12
// Child process2: Counter before increment: 12
// Child process2: Counter after increment: 13
//
// 进程1持有互斥量退出之后,进程2与父进程均不恢复互斥量
// Child process1: Counter before increment: 0
// Child process1: Counter after increment: 1
// Parent process: Counter before increment: 1
// Parent process: Counter after increment: 2
// Child process2: Counter before increment: 2
// Child process2: Counter after increment: 3
// Parent process: Counter before increment: 3
// Parent process: Counter after increment: 4
// Child process2: Counter before increment: 4
// Child process2: Counter after increment: 5
// Child process1: Counter before increment: 5
// Child process1: Counter after increment: 6
// Child process1: exit
// Child process2: mutex not recoverable 父进程先持有互斥量,但未恢复

从结果中可以看出,进程2与父进程在恢复互斥量状态时,都可以继续执行。但是第2次的结果中,父进程获得锁之后未恢复互斥量状态,直接导致进程2终止。

类型属性

该属性决定互斥量的锁定特性,POSIX.1指定了以下4取值。

互斥量属性说明
PTHREAD_MUTEX_NORMAL标准互斥量类型,不做错误检查或死锁检测。
PTHREAD_MUTEX_ERRORCHECK提供错误检查。
PTHREAD_MUTEX_RECURSIVE允许同一线程在互斥量解锁之前对该互斥量进行多次加锁。递归互斥量维护锁的计数,在解锁次数与加锁次数不相同的情况下,不会释放锁。
PTHREAD_MUTEX_DEFAULT操作系统实现时会映射到其他三种互斥量。

以下是不同类型互斥量在对三种不同情况(重复加锁、未占用时解锁以及重复解锁)时的处理结果:

互斥量类型重复加锁未占用时解锁重复解锁
PTHREAD_MUTEX_NORMAL死锁未定义未定义
PTHREAD_MUTEX_ERRORCHECK返回错误返回错误返回错误
PTHREAD_MUTEX_RECURSIVE允许返回错误返回错误

可以使用下面的函数获取或设置互斥量的类型属性:

#include <pthread.h>
int pthread_mutexattr_settype(pthread_mutexattr_t* attr, int kind);
int pthread_mutexattr_gettype(const pthread_mutexattr_t* attr, int* kind);
// 返回值:成功返回0,失败返回错误编号

示例1

在这个示例中,验证两种互斥量类型PTHREAD_MUTEX_ERRORCHECKPTHREAD_MUTEX_RECURSIVE对上面三种情况的处理结果。

#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

int main()
{
    // 设置错误检查互斥量
    pthread_mutex_t mutex;
    pthread_mutexattr_t mutexAttr;
    pthread_mutexattr_init(&mutexAttr);
    pthread_mutexattr_settype(&mutexAttr, PTHREAD_MUTEX_ERRORCHECK);
    pthread_mutex_init(&mutex, &mutexAttr);

    // 1. 未加锁时解锁
    int iRet = pthread_mutex_unlock(&mutex);
    if (iRet != 0)
        printf("PTHREAD_MUTEX_ERRORCHECK: %d %s\n", iRet, strerror(iRet));

    iRet = pthread_mutex_lock(&mutex);
    if (iRet == 0)
    {
        // 2. 检测重复加锁返回错误
        iRet = pthread_mutex_lock(&mutex);
        if (iRet != 0)
            printf("PTHREAD_MUTEX_ERRORCHECK: %d %s\n", iRet, strerror(iRet));

        iRet = pthread_mutex_unlock(&mutex);
        if (iRet == 0)
        {
            // 3. 未占有时解锁
            iRet = pthread_mutex_unlock(&mutex);
            if (iRet != 0)
                printf("PTHREAD_MUTEX_ERRORCHECK: %d %s\n", iRet, strerror(iRet));
        }
    }
    // 递归互斥量
    pthread_mutex_t mutex1;
    pthread_mutexattr_t mutexAttr1;
    pthread_mutexattr_init(&mutexAttr1);
    pthread_mutexattr_settype(&mutexAttr1, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&mutex1, &mutexAttr1);

    // 1. 未加锁时解锁
    iRet = pthread_mutex_unlock(&mutex1);
    if (iRet != 0)
        printf("PTHREAD_MUTEX_RECURSIVE: %d %s\n", iRet, strerror(iRet));

    iRet = pthread_mutex_lock(&mutex1);
    if (iRet == 0)
    {
        // 2. 检测重复加锁
        iRet = pthread_mutex_lock(&mutex1);
        if (iRet != 0)
            printf("PTHREAD_MUTEX_RECURSIVE: %d %s\n", iRet, strerror(iRet));
        else
            printf("PTHREAD_MUTEX_RECURSIVE: %d %s\n", iRet, strerror(iRet));

        iRet = pthread_mutex_unlock(&mutex1);
        if (iRet == 0)
        {
            iRet = pthread_mutex_unlock(&mutex1);
            if (iRet != 0)
                printf("PTHREAD_MUTEX_RECURSIVE: %d %s\n", iRet, strerror(iRet));

            // 3. 未占有时解锁
            iRet = pthread_mutex_unlock(&mutex1);
            if (iRet != 0)
                printf("PTHREAD_MUTEX_RECURSIVE: %d %s\n", iRet, strerror(iRet));
        }
    }
}
// PTHREAD_MUTEX_ERRORCHECK: 1 Operation not permitted 未加锁时解锁
// PTHREAD_MUTEX_ERRORCHECK: 35 Resource deadlock avoided 重复加锁
// PTHREAD_MUTEX_ERRORCHECK: 1 Operation not permitted 未占有时解锁
// PTHREAD_MUTEX_RECURSIVE: 1 Operation not permitted 未加锁时解锁
// PTHREAD_MUTEX_RECURSIVE: 0 Success 重复加锁
// PTHREAD_MUTEX_RECURSIVE: 1 Operation not permitted 未占有时解锁

示例2

递归锁的一种使用场景是当一个库需要调用库之外的函数,而且该函数可能会再次回调该库中的函数时,就需要递归锁。

例如当前库中加锁调用一个库之外的函数,并且该函数会调用传入的回调函数,回调函数中又会获取相同的锁,此时就需要使用递归锁。

下面是针对这种情况使用AI工具生成的一个示例:

#include <pthread.h>
#include <stdio.h>

// 定义递归锁
pthread_mutex_t lock;

// 传入回调函数的结构体
typedef struct
{
    void (*callback)(void);
} CallbackData;

// 外部函数,会调用传入的回调函数
void externalFunction(CallbackData* data)
{
    // 加锁
    pthread_mutex_lock(&lock);

    printf("Executing external function\n");

    // 调用传入的回调函数
    data->callback();

    // 解锁
    pthread_mutex_unlock(&lock);
}

// 回调函数,在该函数中又会获取相同的锁
void callbackFunction()
{
    // 加锁(递归锁)
    pthread_mutex_lock(&lock);

    printf("Executing callback function\n");

    // 解锁
    pthread_mutex_unlock(&lock);
}

int main()
{
    // 初始化递归锁
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&lock, &attr);
    pthread_mutexattr_destroy(&attr);

    // 创建回调函数数据结构
    CallbackData data;
    data.callback = callbackFunction;

    // 调用外部函数,传入回调函数数据结构
    externalFunction(&data);

    // 销毁递归锁
    pthread_mutex_destroy(&lock);

    return 0;
}
// Executing external function
// Executing callback function

示例3

这是一个递归互斥量的应用示例,在这个示例中,共有3个线程,分别是mainth_func1以及timeout_help线程。

其中main线程通过判断条件是否满足,如果满足则通过调用timeout函数来创建timeout_help线程,在该线程中来延时调用retry函数。

th_func1线程用于设置条件,然后通知main线程来处理延时调用动作。

如果timeout函数创建线程失败,那么retry函数就会在main线程中被调用,此时就会导致重复加锁,这里就需要使用递归锁来避免这种情况。

代码如下:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#define SECTIONSEC 1000000000  // 秒到纳秒的转换率

typedef struct
{
    int counter;            // 计数器
    int condition;          // 条件,主线程在条件满足时延迟执行函数
    pthread_mutex_t mutex;  // 用于保护条件与计数器
    pthread_cond_t cond;
} context_t;

static context_t* pContext = NULL;

struct to_info
{
    void (*to_fn)(void* arg);  // 运行的函数
    void* arg;                 // 函数参数
    struct timespec to_wait;   // 等待时间
};

static int makepthread(void* (*fn)(void*), void* arg)
{
    int err;
    pthread_t tid;
    pthread_attr_t attr;
    err = pthread_attr_init(&attr);
    if (err != 0)
        return err;

    err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    if (err == 0)
        err = pthread_create(&tid, &attr, fn, arg);

    pthread_attr_destroy(&attr);
    return err;
}

static void* timeout_helper(void* arg)
{
    struct to_info* tip;
    tip = (struct to_info*)arg;
    // 在线程中休眠指定时长,然后调用函数
    clock_nanosleep(CLOCK_REALTIME, 0, &tip->to_wait, NULL);
    (*tip->to_fn)(tip->arg);
    free(arg);
    return (0);
}

// 该函数的作用时,在when指定的时间之后运行func参数指定的函数,函数的参数为arg
static void timeout(const struct timespec* when, void (*func)(void* arg), void* arg)
{
    struct timespec now;
    struct to_info* tip;
    int err;

    clock_gettime(CLOCK_REALTIME, &now);
    if ((when->tv_sec > now.tv_sec) || (when->tv_sec == now.tv_sec && when->tv_nsec > now.tv_nsec))
    {
        // 时间没到
        // 打包传递给线程的数据结构
        tip = (struct to_info*)malloc(sizeof(struct to_info));
        if (tip != NULL)
        {
            memset(tip, 0x00, sizeof(struct to_info));
            tip->arg = arg;
            tip->to_fn = func;

            // 时间没到,计算剩余时间长度
            tip->to_wait.tv_sec = when->tv_sec - now.tv_sec;
            if (when->tv_nsec >= now.tv_nsec)
                tip->to_wait.tv_nsec = when->tv_nsec - now.tv_nsec;
            else
            {
                // 需要借位
                tip->to_wait.tv_sec--;
                tip->to_wait.tv_nsec = SECTIONSEC + when->tv_nsec - now.tv_nsec;
            }

            // 创建线程,等待并调用函数
            err = makepthread(timeout_helper, (void*)tip);
            if (err == 0)
                return;
            else
            {
                printf("makepthread failed,%s\n", strerror(err));
                free(tip);
            }
        }
    }

    // 时间已过,或者创建线程失败,则立即执行函数
    // 这意味着该函数没有在线程中运行,与main函数在同一个线程中。
    // 此时需要使用递归互斥量,否则会死锁
    (*func)(arg);
}

// 要定时执行的函数
// 对全局计数器+1
static void retry(void* arg)
{
    pthread_mutex_lock(&pContext->mutex);
    pContext->counter++;

    struct timespec now;
    clock_gettime(CLOCK_REALTIME, &now);
    printf("%s %ld counter=%d\n", __FUNCTION__, now.tv_sec, pContext->counter);
    pthread_mutex_unlock(&pContext->mutex);
}

static void* th_func1(void* arg)
{
    if (pContext != NULL)
    {
        pthread_mutex_lock(&pContext->mutex);
        // 设置条件
        pContext->condition = 1;
        pthread_mutex_unlock(&pContext->mutex);
        pthread_cond_signal(&pContext->cond);
    }
    return NULL;
}

// 这段程序的作用是一段时间(10s)之后对全局计数器加1
int main()
{
    int err, arg;
    struct timespec when;
    pContext = (context_t*)malloc(sizeof(context_t));
    if (pContext == NULL)
        exit(-1);
    memset(pContext, 0x00, sizeof(context_t));

    pthread_mutexattr_t mutexAttr;
    if ((err = pthread_mutexattr_init(&mutexAttr)) != 0)
    {
        printf("pthread_mutexattr_init failed, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_mutexattr_settype(&mutexAttr, PTHREAD_MUTEX_RECURSIVE)) != 0)
    {
        printf("can't set recursive type, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_mutex_init(&pContext->mutex, &mutexAttr)) != 0)
    {
        printf("can't create recursive mutex, %s\n", strerror(err));
        exit(-1);
    }
    pthread_condattr_t condAttr;
    if ((err = pthread_condattr_init(&condAttr)) != 0)
    {
        printf("pthread_condattr_init failed, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED)) != 0)
    {
        printf("can't set cond process shared, %s\n", strerror(err));
        exit(-1);
    }
    if ((err = pthread_cond_init(&pContext->cond, &condAttr)) != 0)
    {
        printf("can't create cond, %s\n", strerror(err));
        exit(-1);
    }
    // 创建一个线程,来控制条件,模拟实际场景线程交互
    // th_func1线程设置条件,主线程在满足条件的情况下延迟调用函数
    if ((err = makepthread(th_func1, NULL)) != 0)
    {
        printf("makepthread failed, %s\n", strerror(err));
        exit(-1);
    }

    pthread_mutex_lock(&pContext->mutex);
    // 在加锁的情况下验证条件
    while (!pContext->condition)
        // 条件不满足的时候阻塞等待
        pthread_cond_wait(&pContext->cond, &pContext->mutex);

    // 设置10s后执行retry函数
    clock_gettime(CLOCK_REALTIME, &when);
    printf("%s %ld counter=%d\n", __FUNCTION__, when.tv_sec, pContext->counter);

    when.tv_sec += 10;
    timeout(&when, retry, (void*)((unsigned long)arg));
    pthread_mutex_unlock(&pContext->mutex);

    pthread_mutexattr_destroy(&mutexAttr);
    pthread_cond_destroy(&pContext->cond);
    pthread_mutex_destroy(&pContext->mutex);

    // 执行其他任务,防止主线程退出
    while (1)
        sleep(1);
    exit(0);
}
// main 1708765486 counter=0
// retry 1708765496 counter=1
// ^C

从执行结果中可以看出,retry函数在10s之后完成了调用,符合预期。