Linux - posix 进程间通信

490 阅读20分钟

接下来则编写程序来创建一个posix消息队列:

这是为什么呢?其实在man帮助中有说明:

所以,修改一下Makefile文件,加上这个链接选项:

posix消息队列创建成功了,下面来查看一下:

这是为什么呢?原因是由于ipcs只能查看System v创建的消息队列,而poix创建的消息队列不能通过这个方式来查看,那到底存放在哪里呢?还是回到强大的man帮助手册来找寻答案:

所以接下来创建一个挂载点:

接下来将消息队列虚拟文件系统挂载到该目录下:

所以来看一下目前我们所创建的消息队列的状态:

如果要想再次看到,则需重新挂载一次既可。

在创建posix消息队列时,我们指定了一个消息队列名字,如下:

其中该名字是有一定的规则的,所以接下来说明一下

接下来要学习的一个函数比较简单,关闭消息队列:

【说明】:它的更准备的含义是删除一个连接数,直接连接数减为0的时候才真正将文件删除。

下面来编写程序来使用一下:

其中stuct mq_attr结构体为:

下面来编译程序来获取一下消息队列的属性:

接下来利用这个函数来向消息队列中发送消息:

另外也可以用我们自己编写的查看属性的程序来查看此时的属性:

现在创建的消息队列中已经发送了一个消息了,接下来将从消息队列中来获取已发送的消息:

这是为什么呢?

而最大长度大小可以通过mq_attr函数来获取,所以修改一个代码:

下面重新再来发送多个消息,再接收:

没有消息了,接收则会被阻塞,从运行效果中可以看出。

从函数字面意思来看是一个通知,当消息队列从没消息到有消息会得到通知,这个是System V跟Posix消息队列的一个非常明显的区别,就是SystemV消息队列是没法得到这个通知的,而Posix消息队列是可以的,也就是目前消息队列是空的,当某个进程或线程往消息队列发送一条消息时,这时消息队列则会给出通知,只要进程注册了该通知事件,而注册通知就可以通过mq_notify函数完成,具体如下:

其中用到的结构体:

可以查看一下man帮助:

这里主要关注信号的方式,因为线程目前还没有学到:

下面编写程序来演示一下通知的效果:

接下来注册一个消息队列的通知事件:

当收到通知时,这时来处理一下信号处理程序:

另外,为了看到通知效果,这里需要不让进程退出,所以死循环一下:

从中发现,当空的消息队列里面新发送了一条消息,则立马就收到通知了,那如果继续再发送呢?

这是为什么呢?其实这个通知是有一定规则的,如下:

要想让每次发送都收到通知,则上面的第三点则是解决方案,下面来修改一下程序:

从中可以发现,这次确实是每次发送消息都能被通知到了,那规则中提到,重新注册必须放到接收消息之前,不能放在之后:

这是为什么呢?这时因为当发送一个消息时,被接收了之后,消息队列里面就为空了,这时再次注册就无法接收到通知了,所以得放在接收消息之前再次注册。

最后来说明一下在上面提到过需要解释的:

正好可以利用这个通知程序来说明,如下:

下面编写程序来创建一个共享内存:

那posix的共享内存存放在哪里呢?上节中学的posix的消息队列是在虚拟文件当中创建一个消息队列,需要我们手工将它挂载到某个目录下才能看到,同样的,posix共享内存也是需要将其挂载,只不过这个挂载操作是由系统完成的,而不用我们人工去操作了,已经挂载到了/dev/shm下了,如下:

接下来要介绍的函数为修改共享内存的大小:

【说明】:实际上ftruncate函数也能修改文件的大小。

下面修改程序来使用一下它:

其中struct stat的结构体为:

修改程序如下:

现在我们已经创建了一个共享内存对象,那如何用它呢?则需要用到下面这个函数才行:

下面来使用一下,映射成功之后,先往内存中写入数据,然后再从内存中来读取:

接下来做一个容错处理:

实际上映射失败有专门的宏定义,从man帮助中可以得知:

所以,用它来代替-1程序会更加可读:

这是为什么呢?还是从man帮助中来寻找答案:

所以问题原因找到了,则修改一下打开方式既可:

有没有成功写入,则需要编写一个读取程序来验证一下:

实际上可以用过shell命令直接查看共享内存的内容:

【注意】:创建失败这时会返回错误码,而通常函数创建失败都会返回-1,然后错误码会保存在errno当中。

在处理线程创建失败检查时,下面来看一下检查错误的一些说明:

所以下面来处理一下线程创建失败的错误:

而且每个线程都有自己的一个errono,避免多线程时有冲突。

接下来做这样的一个操作,就是主线程打印A字符,然后新创建的线程打印B字符,

【注意】:新创建的线程不叫做子线程,因为并没有父子关系,但是可以把初始的线程叫主线程,如下:

这是由于主线程已经结束了,而新创建的线程还没有被调度到,所以就没有打印出B,所以解决此问题的办法可能让主线程小睡一会:

可见新创建的线程被调度到了,实际上主线程跟新创建的线程是交替运行的,下面修改下程序来说明下:

从中可以发现每次运行的结果都不一样,这个取决于系统是如何调度线程的。

另外有这样的一个问题,就是可能新创建的线程还没有执行完毕,主线程就已经执行完毕了,也就是主线程需要睡眠去等待新线程执行完,下面多次运行一下,看能否看到这种现象:

其中在主线程中睡眠是一种解决方案,但是比较笨,有没有一个函数能够等待新创建的线程结束呢?实际上是有的,就好像进程一样,有waitpid来等待子进程的退出:

确实是达到了等待新创建线程退出的目的,下面再来学习一个函数:

当然线程的退出也可以是执行完了再退出,如下:

其中线程结束包含两种情况:

①、自杀:调用pthread_exit();在线程入口函数中调用return。

②、他杀:调用pthread_cancel()。

如果在新创建的线程中调用此方法,如果主线程没有调用pthread_join的情况下,也能避免僵线程。

下面用线程的方式来改造一下之前用进程的方式实现的回射客户/服务器程序,来进一步熟悉线程的使用:

客户端echocli.c:

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)


void echo_cli(int sock)
{
    char sendbuf[1024] = {0};
        char recvbuf[1024] ={0};
        while (fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
        {
                write(sock, sendbuf, strlen(sendbuf));
                read(sock, recvbuf, sizeof(recvbuf));

                fputs(recvbuf, stdout);
                memset(sendbuf, 0, sizeof(sendbuf));
                memset(recvbuf, 0, sizeof(recvbuf));
        }

        close(sock);
}

int main(void)
{
    int sock;
    if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
        ERR_EXIT("socket");

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(5188);
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    if (connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
        ERR_EXIT("connect");

    echo_cli(sock);

    return 0;
}

服务端echosrv.c:

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)

void echo_srv(int conn)
{
    char recvbuf[1024];
        while (1)
        {
                memset(recvbuf, 0, sizeof(recvbuf));
                int ret = read(conn, recvbuf, sizeof(recvbuf));
        if (ret == 0)
        {
            printf("client close\n");
            break;
        }
        else if (ret == -1)
            ERR_EXIT("read");
                fputs(recvbuf, stdout);
                write(conn, recvbuf, ret);
        }
}int main(void)
{
    int listenfd;
    if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
        ERR_EXIT("socket");

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(5188);
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    int on = 1;
    if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
        ERR_EXIT("setsockopt");

    if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
        ERR_EXIT("bind");
    if (listen(listenfd, SOMAXCONN) < 0)
        ERR_EXIT("listen");

    struct sockaddr_in peeraddr;
    socklen_t peerlen = sizeof(peeraddr);
    int conn;

    while (1)
    {
        if ((conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen)) < 0)
            ERR_EXIT("accept");

        printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));

        pid = fork();
        if (pid == -1)
            ERR_EXIT("fork");

        if (pid == 0)
        {//子进程
            close(listenfd);//不需要处理监听
            echo_srv(conn);
            exit(EXIT_SUCCESS);
        } else {
            close(conn);//父进程不需要处理连接    
        }
    }
    
    return 0;
}

接下来要来进行服务端改造:

【注意】:由于是单进程,所以就没必要像创建进程的方式要关闭conn了,编程也简单了许多。

从中可以发现程序正常运转,而且当客户端退出时,相应的线程也退出了,这是为什么呢?

另外当线程退出了之后,其实该线程是属于一个僵线程的状态,因为在主线程中并没有调用pthread_join()来等待新创建线程的退出,所以得避免僵线程的出现,修改代码如下:

关于这个程序还有一个细节需要探讨一下,如下:

那我可以这样写么?

此时线程入口函数也得发生改变:

看似一切正常,当然肯定是有问题的,不然也不会换一种写法来进行说明了,有什么潜在风险呢?

这就是典型的Race Condition(也叫做资源竞争)问题。所以说conn只能值传递,而不能是传递指针,还是将代码还原,下面来讨论另外一个细节问题:

那如何解决呢?可以采用动态申请内存的方式:

进程跟线程之间的对比图

pthread_att_t属性变量是需要进行初始化才能够用的,一定初始化了属性变量,它就包含了线程的多种属性的值,那到底有哪些属性了,下面一一来介绍:

其中第二个参数的指定值可以通过man帮助来了解到:

下面用程序来实验一下:

【注意】:在设置栈大小时,一般第二个参数设置为0表示用系统定义的栈的大小,如果指定我们自己设定的栈的大小可能会导致一些移植性的问题,所以一般情况下栈的大小不会去设置。

首先需要了解线程调度竞争范围:

那默认线程是什么竞争范围呢?用程序来查看:

这意味着新创建的线程跟调者用线程是否是一样的调度策略,如果设置成继承的则拥有一样的调度策略:

其中调到了线程模型,这里介绍一下,其实线程模型有三种:

其中需要说明一下,int pthread_setconcurrency(int new_level)设置并发级别,并不意味着线程的并发数是new_level,仅仅只是设置了一个并发级别,并且只是给内核一个提示而已,并非真正的提供new_level个核心线程来映射用户线程:

上面的这些概念还是有些生涩,下面来用一个实例程序来进一步理解,在写程序之前,需要用到特定数据的一些函数:

找一个空位来创建特定数据:

删除特定数据:

给特定数据设定值及获取特定数据里面的值:

然后再创建两个线程出来,来使用特定数据:

接下来编写线程处理函数:

从结果来看:

下面再来介绍一对函数,如下:

它代表init_routine函数只在第一个线程进入的时候被执行一次,下面来修改一下程序:

那如果希望只有第一个线程进来时创建,而其它线程进来不再创建,那这个函数就派上用场了,修改程序如下:

跟posix消息队列,共享内存的打开,关闭,删除操作一样,不过,上面的函数是对有名信号量进行操作,通过man帮助可以得知:

有名信号量相对的那就是无名信号量,对于它相关的函数如下:

同样可以查看man帮助:

【思考】:是不是无名信号量就无法用于不同进程间的多个线程间进行通信呢?实际上不是这样的:

而对于信号量的P、V操作,可以用以下两个函数,既能用于有名,也能用于无名信号量:

初始化互斥锁:

锁定操作:

解锁操作:

锁屏互斥锁:

【说明】:以上四个函数也是应用于无名的,也可以用于不同进程的不同线程间进行通信。

接下来就用信号量与互斥锁来解决生产者消费者的问题:

下面利用posix信号量与互斥锁来模拟生产者消费者问题:

由于生产者与消费者可以有多个,所以这两个的个数可以定义成一个宏,便于随意更改:

接下来要定义一些信号量和互斥锁变量:

以上是一些全局数据的初始化,接下来则开始真正代码的编写,首先得初始化信号量和互斥锁:

接下来创建若干个线程:

接下来来编写生产者与消费者的入口函数的实现:

先来实现生产产品的代码:

在正式生产之前,先打印出仓库当前的状态,也就是缓冲区里:

同样的,在消费之前,也打印一下当前仓库消费的状态:

打印状态之后,则开始生产产品:

同样的消费者也类似:

下面则通过调整生产者与消费者的个数,再配合睡眠来查看一下运行结果:

情况一:生产产品比较快,消费产品比较慢,所以经常有产品满的情况,也就是生产者会出现等待。

从结果中来以发现:

情况二:生产产品比较慢,但是消费得比较快,所以经常出现产品为空的情况,也就是消费者会不断出现等待。

从中可以发现:

也就是说如果对某个临界区施加了共享锁,意味着还可以对其施加共享锁;而如果对临界区施加了共享锁或排它锁,则不允许其它线程对它施加排它锁。

#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)

#define CONSUMERS_COUNT 1
#define PRODUCERS_COUNT 1
#define BUFFSIZE 10

int g_buffer[BUFFSIZE];

unsigned short in = 0;
unsigned short out = 0;
unsigned short produce_id = 0;
unsigned short consume_id = 0;

sem_t g_sem_full;
sem_t g_sem_empty;
pthread_mutex_t g_mutex;

pthread_t g_thread[CONSUMERS_COUNT+PRODUCERS_COUNT];

void* consume(void *arg)
{
    int num = (int)arg;
    int i;
    while (1)
    {
        printf("%d wait buffer not empty\n", num);
        sem_wait(&g_sem_empty);
        pthread_mutex_lock(&g_mutex);
        //消费产品
        for (i=0; i<BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

            if (i == out)
                printf("\t<--consume");

            printf("\n");
        }

        consume_id = g_buffer[out];
        printf("%d begin consume product %d\n", num, consume_id);
        g_buffer[out] = -1;
        out = (out + 1) % BUFFSIZE;
        printf("%d end consume product %d\n", num, consume_id);

        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_full);

        sleep(1);
    }
    return NULL;
}

void* produce(void *arg)
{
    int num = (int)arg;
    int i;
    while (1)
    {
        printf("%d wait buffer not full\n", num);
        sem_wait(&g_sem_full);
        pthread_mutex_lock(&g_mutex);
        //生产产品的代码
        for (i=0; i<BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

            if (i == in)
                printf("\t<--produce");

            printf("\n");
        }
        
        printf("%d begin produce product %d\n", num, produce_id);
        g_buffer[in] = produce_id;
        in = (in + 1) % BUFFSIZE;
        printf("%d end produce product %d\n", num, produce_id++);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_empty);

        sleep(5);
    }
    return NULL;
}

int main(void)
{
    int i;
    for (i=0; i<BUFFSIZE; i++)
        g_buffer[i] = -1;

    sem_init(&g_sem_full, 0, BUFFSIZE);
    sem_init(&g_sem_empty, 0, 0);

    pthread_mutex_init(&g_mutex, NULL);

    for (i=0; i<CONSUMERS_COUNT; i++)
        pthread_create(&g_thread[i], NULL, consume, (void*)i);

    for (i=0; i<PRODUCERS_COUNT; i++)
        pthread_create(&g_thread[CONSUMERS_COUNT+i], NULL, produce, (void*)i);
    
    for (i=0; i<CONSUMERS_COUNT+PRODUCERS_COUNT; i++)
        pthread_join(g_thread[i], NULL);

    sem_destroy(&g_sem_full);
    sem_destroy(&g_sem_empty);
    pthread_mutex_destroy(&g_mutex);

    return 0;
}

下面用一个图来进一步描述条件变量的作用:

为什么呢?

这实际上可以解决生产者与消费者问题,而且对于缓冲区是无界的是一种比较理解的解决方案,只有有产品时才通知消费者开始消费产品,生产者不关心缓存区是否满,后面会用条件变量与互斥锁来解决生产者与消费者问题。

下面则根据上面的使用规范来解决生产者与消费者问题:

【说明】:这里并没有用到缓冲区,而是只要发现条件不满足则等待,直接条件满足才消费,所以实现了一个无界的缓冲区,另外nready来简单模拟产品。

另外为了首次让消费者进行等待,在创建消费者线程之后小睡一会:

下面也来分几种情况来查看消费者与生产者之间的关系:

①、消费得比较快,生产得比较慢

②、生产速度比较快,消费得比较慢:

为啥之后没有等待线程呢?这是由于消费者的个数不如生产者线程的个数,消费速度不够快,结合代码来解释:

所以,当消费者比生产者少时,等待的机率就会少很多。

以上就是利用条件变量与互斥锁来解决生产者与消费者问题,下面来理解一些细节,也是理解代码很关键的地方:

它主要是做了下面三件事:

1、对g_mutex进行解锁。

为什么要先进行解锁呢?

2、等待条件,直到有线程向它发起通知。

3、重新对g_mutex进行加锁操作。

这三者构成了一个pthread_cond_wait原语,条件变量的使用最难的地方就是这个函数隐藏动作的理解,需细细体会下。

在上面留了一个问题,就是:

可以从man帮助中寻找到答案:

上次中已经用互斥锁与条件变量来改造了生产者与消费者问题,这次利用它来实现一个线程池,加强对条件变量及互斥锁的认识,下面开始:

关于什么是线程池,这里就不多说了,应该基本都在实际中用到过,下面关于线程池实现有几个点需要说明一下:

线程池,顾名思义就是拥有若干个线程,对于线程的个数是有严格的要求的,并非越多越好,太多了会增加系统的开销,太少了又会降低并发量。

执行时间较长的任务是不太适合放在线程池中进行处理,比如说:线程的执行时间跟进程的生命周期是一致的,那么这个任务的执行就没必要放到线程池中进行,直接用普通的线程既可。

那线程池当中的线程个数究竟存放多少个比较合适呢?实际上这跟任务类型有关系:

①、计算密集型任务:一般这个任务是占用CPU的,它很少被外界的事件打断,这时线程个数 = CPU个数,如果线程个数>CPU个数,由于CPU的个数是一定的,那么能够并发的数目也是一定的,所以会用少量的CPU个数来调度多个线程,这肯定会涉及到线程与线程之间的切换开销,因而会降低效率。

②、I/O密集型任务:这种任务在运行时,可能会被I/O中断,也就是说这个线程会挂起,这时线程个数 > CPU个数,

那接下来先了解一下线程池实现中,需要用到的结构体:

下面则开始实现,首先在头文件中定义上面的数据结构:

其中用到了条件变量,这里对条件变量进行了简单的封装,所以先来看下是如何封装的:

condition.c:

#include "condition.h"

//初使化条件变量,可想而知是对互斥锁和条件变量进行初始化
int condition_init(condition_t *cond)
{
    int status;
    if ((status = pthread_mutex_init(&cond->pmutex, NULL)))
        return status;

    if ((status = pthread_cond_init(&cond->pcond, NULL)))
        return status;

    return 0;
}

//对互斥锁进行锁定
int condition_lock(condition_t *cond)
{
    return pthread_mutex_lock(&cond->pmutex);
}

//对互斥锁进行解锁
int condition_unlock(condition_t *cond)
{
    return pthread_mutex_unlock(&cond->pmutex);
}

//在条件变量上等待条件
int condition_wait(condition_t *cond)
{
    return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}

//具有超时功能的等待功能
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
    return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}

//向等待线程发起一个通知
int condition_signal(condition_t *cond)
{
    return pthread_cond_signal(&cond->pcond);
}

//向等待线程发起广播
int condition_broadcast(condition_t* cond)
{
    return pthread_cond_broadcast(&cond->pcond);
}

//销毁条件变量
int condition_destroy(condition_t* cond)
{
    int status;
    if ((status = pthread_mutex_destroy(&cond->pmutex)))
        return status;

    if ((status = pthread_cond_destroy(&cond->pcond)))
        return status;

    return 0;
}

接着来实现一下线程池:

在正式实现这些函数之前,其实可以先从使用者的角度来看,如何使用这些线程池,如下:

其实这是典型的“测试驱动开发”,先编写好测试代码,然后再来从使用的角度去具体实现,下面则开始具体实现线程池相应的方法:

接下来实现往线程池中添加任务:

其添加过程是从尾部进行添加的,其实就是单链表的应用。

这里需要注意一个问题,就是在使用条件变量之前是需要对进行互斥的,因为队列资源是生产者与消费者都可以访问的,所以需要互斥:

接下来来处理线程的执行入口函数,线程应该是等待任务并且处理任务,也就是它是一个消费者线程:

下面来编译运行一下,在运行之后,需要在main函数中做一下sleep:

而且是经过15秒之后,则进程退出了,但是有个问题,就是当任务执行完了,应该线程也能动态减少,目前当任务执行完了之后,所有线程都还在,也就是需要看到这样的输出:

但是目前看不到这样的状态,而是等到进程退出来线程才销毁,所以需要对代码进行改进,这时就需要用到等待超时的一个函数:

也就是如果线程等待超时了,则代表没有任务了,则该线程就可以销毁了,所以将condition_wait需要换成condition_timedwait函数:

查看一相man帮助:

【说明】:获取当前时间可以用函数clock_gettime:

下面再来做超时处理:

接下来就剩最后一个销毁方法没有实现了,而main中的sleep则可以去掉了:

其中看到有等待条件变量,那是谁来通知该条件变量呢,当然是在任务执行时,于时需要修改任务执行线程里面的代码:

下面再来编译运行一下,在运行之前,可以将之前的休眠代码去掉了:

可见当线程任务都执行完了,所有的线程也销毁了