当多个线程共享相同的内存时,需要确保每个线程看到的都是一致的数据视图。

当一个线程可以修改的变量,其他线程也可以读取或修改的时候,就需要对这些线程进行同步,确保访问变量时不会得到无效的值。

问题

变量修改时间多于一个存储器访问周期的处理器架构中,当存储器读与存储器写这两个周期交叉时,这种数据不一致就会出现。

当一个线程修改变量,其他线程在读取这个变量时可能会看到不一致的值,如下图所示: 两个吸纳成的交叉存储器周期 在线程A成功写入之前,线程B读取变量都会产生这种不一致性。

为了解决这种种问题,线程可以使用锁,将读和写固定为原子操作,同一时间只允许一个线程访问该变量,如下所示: 两个线程同步访问内存

两个或多个线程试图在同一时间修改同一变量时,也需要同步

因为变量的修改不是原子操作,在一个线程修改结束之前,其他线程可能也读取了修改前的值,如下图所示: 两个非同步线程对变量做增量操作

互斥量

互斥量mutex本质是一把锁,在访问共享资源前对其进行加锁,访问完成之后则释放锁

对互斥量进行加锁以后,其他任何试图再次对互斥量进行加锁的线程都会被阻塞,直到当前线程释放该互斥锁,这样可以确保同一时间仅有一个线程可以访问共享资源。

如果在该互斥量上有多个阻塞的线程,那么在释放互斥量时所有的阻塞线程都会变为可运行状态,然后进行竞争,最终只有一个线程对互斥量加锁,其他线程则继续阻塞。

只有所有线程都遵循相同的数据访问规则,互斥机制才会生效。如果有某个线程在没有得到锁的情况下可以访问共享资源,那么互斥机制是没有意义的。

互斥量的数据类型是pthread_mutex_t。可以使用pthread_mutex_init接口来进行初始化,也可以设置为常量PTHREAD_MUTEX_INITIALIZER(仅适用于静态分配的互斥量)。

#include <pthread.h>
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
    const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
// Return Value
// If successful, the pthread_mutex_destroy() and pthread_mutex_init()
// functions shall return zero;
// otherwise, an error number shall be returned to indicate the error.

如果使用动态分配互斥量,在使用结束后需要调用pthread_mutex_destroy释放互斥量。

可以使用以下接口对互斥量进行加解锁,但如果互斥量已经上锁,调用线程则会阻塞直到互斥量解锁。

#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// RETURN VALUE
// If successful, the pthread_mutex_lock(), pthread_mutex_trylock(),
// and pthread_mutex_unlock() functions shall return zero;
// otherwise, an error number shall be returned to indicate the error.

如果线程不能被阻塞,可以使用pthread_mutex_trylock接口尝试对互斥量进行加锁。如果此时互斥量处于未锁住状态则会直接加锁并返回0,否则pthread_mutex_trylock调用失败,不会锁住互斥量并返回EBUSY

实例

该示例中用互斥量来保护某个动态分配的数据对象,数据对象中包含引用计数,该引用计数用于确保在所有使用该数据对象的工作线程完成数据访问之前,该对象的内存空间不会被释放

主线程用于初始化对象,完成数据访问之后释放该对象的内存空间。

工作线程在使用该对像时其引用计数加1,使用完毕之后引用计数减1。

在对引用计数进行加1、减1、判0时需要锁住互斥量,防止其被其他线程修改。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

struct foo
{
    int f_count;  // 引用计数
    pthread_mutex_t f_lock;
    int f_id;
};

static struct foo* pFoo = NULL;

// 用于主线程动态分配数据对象
static struct foo* foo_alloc(int id)
{
    struct foo* fp;
    if ((fp = (struct foo*)malloc(sizeof(struct foo))) != NULL)
    {
        memset(fp, 0x00, sizeof(struct foo));
        fp->f_count = 1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0)
        {
            free(fp);
            return NULL;
        }
    }
    return fp;
}

// 工作线程添加引用计数
static void foo_hold(struct foo* fp)
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}
// 工作线程减去引用计数
static void foo_release(struct foo* fp)
{
    pthread_mutex_lock(&fp->f_lock);
    printf("%d f_count=%d\n", __LINE__, fp->f_count);
    if (--fp->f_count == 0)  // fp->f_count==1时条件满足
    {
        printf("%d f_count=%d\n", __LINE__, fp->f_count);
        // 当没有线程引用数据对象时,释放锁,以及数据对象的内存空间
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    }
    else
    {
        printf("%d f_count=%d\n", __LINE__, fp->f_count);
        pthread_mutex_unlock(&fp->f_lock);
    }
}

static void* start_rtn1(void* arg)
{
    foo_hold(pFoo);
    printf("thread1: f_count=%d\n", pFoo->f_count);
    foo_release(pFoo);
    return ((void*)1);
}

static void* start_rtn2(void* arg)
{
    foo_hold(pFoo);
    printf("thread2: f_count=%d\n", pFoo->f_count);
    foo_release(pFoo);

    pthread_exit((void*)2);
}
int main()
{
    int err;
    pthread_t tid1, tid2;
    void* tret;

    pFoo = foo_alloc(0);
    if (pFoo != NULL)
    {
        err = pthread_create(&tid1, NULL, start_rtn1, NULL);
        if (err != 0)
            printf("can't create thread1\n");

        err = pthread_create(&tid2, NULL, start_rtn2, NULL);
        if (err != 0)
            printf("can't create thread2\n");

        // 使用pthread_join来等待工作线程完成对数据对象的访问
        // 防止在工作线程完成数据访问之前是否数据对象内存空间
        err = pthread_join(tid1, &tret);
        if (err != 0)
            printf("can't join with thread1\n");
        printf("thread1 exit code %ld\n", (long)tret);

        err = pthread_join(tid2, &tret);
        if (err != 0)
            printf("can't join with thread2\n");
        printf("thread2 exit code %ld\n", (long)tret);
        foo_release(pFoo);
    }
}
// thread1: f_count=2
// 下面这个3出现的原因在于,线程2的foo_hold要比线程1的foo_release运行早,
// 而线程2的printf比线程1的foo_release运行迟
// 45 f_count=3
// 56 f_count=2
// thread2: f_count=3
// 45 f_count=2
// 56 f_count=1
// 45 f_count=1
// 48 f_count=0

死锁

线程对同一个互斥量加锁两次,就会陷入死锁状态。

两个线程分别以相反的顺序对两个互斥量进行加锁,可能会导致两个线程同时死锁。这种情况可以通过控制互斥量的加锁顺序来避免死锁的产生。

假设需要对两个互斥量A和B同时加锁,如果所有线程总是在对互斥量B锁住之前锁住互斥量A,那么使用这两个互斥量就不会产生死锁。

如果无法对互斥量进行排序,则可以使用pthread_mutex_trylock接口避免死锁。如果已经占有某些锁并且pthread_mutex_trylock接口返回成功,则继续前进,否则先释放已经占有的锁,然后过一段时间再尝试。

多互斥量实例

在该实例中,具有两个互斥量,分别是hashlockf_lockhashlock用于保护foo数据结构的散列列表和f_next字段,foo结构中的互斥量f_lock用于保护引用计数字段f_count

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define NHASH 29
#define HASH(id) (((unsigned long)id) % NHASH)

struct foo* fh[NHASH];                                 // 结构体数组,散列列表
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;  // 互斥量,保护fh
                                                       //

struct foo
{
    int f_count;  // 引用计数
    pthread_mutex_t f_lock;
    int f_id;
    struct foo* f_next;  // 散列值冲突时,用于追加数据。由hashlock保护
};

// 用于主线程动态分配数据对象
static struct foo* foo_alloc(int id)
{
    struct foo* fp;
    if ((fp = (struct foo*)malloc(sizeof(struct foo))) != NULL)
    {
        memset(fp, 0x00, sizeof(struct foo));
        fp->f_count = 1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0)
        {
            free(fp);
            return NULL;
        }

        int idx = HASH(id);
        pthread_mutex_lock(&hashlock);
        // 在散列表的idx索引位置插入新建的fp数据结构
        fp->f_next = fh[idx];  // 第一次插入时fh[idx]为NULL,之后fh[idx]为上次插入foo数据的地址
        fh[idx] = fp;          // 将idx索引位置保存为当前结点
        pthread_mutex_lock(&fp->f_lock);
        pthread_mutex_unlock(&hashlock);
        printf("%d f_id=%d\n", __LINE__, fp->f_id);
        pthread_mutex_unlock(&fp->f_lock);
    }
    return fp;
}
// 工作线程添加引用计数
static void foo_hold(struct foo* fp)
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}
// 查找已存在的数据对象
static struct foo* foo_find(int id)
{
    struct foo* fp;
    pthread_mutex_lock(&hashlock);
    // 先计算散列表的索引,再在索引链表中查找
    for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next)
    {
        if (fp->f_id == id)
        {
            // 加锁顺序是先hashlock锁定散列表,后foo_hold中的f_lock锁定该结构
            foo_hold(fp);
            break;
        }
    }

    pthread_mutex_unlock(&hashlock);
    return fp;
}

static void foo_release(struct foo* fp)
{
    struct foo* tfp;
    int idx;

    pthread_mutex_lock(&fp->f_lock);
    if (fp->f_count == 1)  // 最后一个引用
    {
        // 从散列表中删除该结构
        // 需要先解锁,然后按顺序重新加锁,先hashlcok后f_lock
        // 顺序不对可能导致多线程死锁
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_lock(&hashlock);
        pthread_mutex_lock(&fp->f_lock);
        // 重新加锁可能会阻塞,导致条件发生变化
        // 如果阻塞时,该结构的引用计数被加1,则此时仅需减1即可
        if (fp->f_count != 1)
        {
            fp->f_count--;
            pthread_mutex_unlock(&fp->f_lock);
            pthread_mutex_unlock(&hashlock);
            return;
        }
        // 从散列表中删除该结构
        idx = HASH(fp->f_id);
        tfp = fh[idx];
        if (tfp == fp)
        {
            fh[idx] = fp->f_next;
        }
        else
        {
            while (tfp->f_next != fp)
                tfp = tfp->f_next;
            // 指向fp的指针指向fp的下一个结构
            tfp->f_next = fp->f_next;
        }
        pthread_mutex_unlock(&hashlock);  // 散列表操作结束,可以先释放

        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    }
    else
    {
        fp->f_count--;
        pthread_mutex_unlock(&fp->f_lock);
    }
}

在这个例子中,如果两种用途使用相同的锁,那么散列列表和引用计数的锁之间的排序问题就不存在了。

如果锁的粒度太粗,就会出现很多线程阻塞等待相同的锁,并不能改善并发型。如果锁的粒度太细,那么过多的锁开销会使系统性能受到影响,而且代码复杂。

正常情况下应该在复杂性和性能之间找到平衡。

超时控制

当线程试图获取一个已加锁的互斥量时,pthread_mutex_timedlock接口可以指定线程阻塞时间功能与pthread_mutex_lock等价,但如果达到超时时间值,pthread_mutex_timedlock不会对互斥量进行加锁,而是直接返回错误码ETIMEDOUT

#include <pthread.h>
#include <time.h>

int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,
   const struct timespec *restrict tsptr);

tsptr指定了超时愿意等待的绝对时间。

下面实例用于验证pthread_mutex_timedlock功能:

#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <time.h>

int main()
{
    int err;
    struct timespec tout;
    struct tm* tmp;
    char buf[64];
    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

    pthread_mutex_lock(&lock);
    printf("mutex is locked\n");

    // 获取timespec格式的时间
    clock_gettime(CLOCK_REALTIME, &tout);
    tmp = localtime(&tout.tv_sec);
    strftime(buf, sizeof(buf), "%r", tmp);
    printf("current time is %s\n", buf);

    // pthread_mutex_timedlock需要指定绝对时间,即在当前时间的基础上加上延迟的时间
    tout.tv_sec += 10;
    err = pthread_mutex_timedlock(&lock, &tout);

    // 重新获取当前时间并输出,判断延迟时间
    clock_gettime(CLOCK_REALTIME, &tout);
    tmp = localtime(&tout.tv_sec);
    strftime(buf, sizeof(buf), "%r", tmp);
    printf("the time is now %s\n", buf);

    if (err == 0)
        printf("mutex locked again!\n");
    else
        printf("can't lock mutex again: %s\n", strerror(err));
    return 0;
}
// mutex is locked
// current time is 12:13:03 PM
// the time is now 12:13:13 PM
// can't lock mutex again: Connection timed out

阻塞时间不准确的原因可能有开始时间在某秒的中间位置,系统时钟精度不足或者程序继续运行时的调度时间