标签搜索

目 录CONTENT

文章目录

Python中的进程间通信.md

小小城
2021-08-22 / 0 评论 / 0 点赞 / 4 阅读 / 12,293 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-02,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Python中的进程间通信

@[toc]

1 进程间通信

1.1 概念

进程是操作系统分配和调度系统资源(CPU、内存)的基本单位。进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能直接共享,这是多进程在使用中与多线程最明显的区别。

1.2进程间通信方法

(1)信号量( semaphore ) : 信号量是一个共享资源访问者的计数器,可以用来控制多个进程对共享资源的并发访问数。它常作为一种锁机制,防止指定数量的进程正在访问共享资源时,其他进程也访问该资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段,用于控制某共享资源的并发访问者数量。

(2)信号 ( signal ) : 信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。

(3)管道( pipe ):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。

(4)有名管道 (named pipe) : 有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。

(5)消息队列( message queue ) : 消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。

(6)共享内存( shared memory ) :共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号量,配合使用,来实现进程间的同步和通信。

(7)套接字( socket ) : socket也是一种进程间通信机制,与其他通信机制不同的是,它主要用于不同机器间的进程通信,同一机器内的进程通信采用此方式是有些浪费的。

(8) 文件:使用文件进行通信是最简单的一种通信方式,一个进程将结果输出到临时文件,另一个进程从文件中读出来。

各种进程间通信

1. 基于信号量(Semaphore)的IPC

from threading import Semaphore
db_semaphore = Semaphore(2) # 创建信号量
database = []
def insert(data):
        '''
        如果insert(data)是一个子进程任务,
        需要在创建子进程时将信号量db_semaphore作为参数传入子进程任务函数;
        '''
        db_semaphore.acquire() # 尝试获取信号量
        database.append(data)  # 如果信号量获取成功就处理
        db_semaphore.release() # 释放信号量

2 基于信号(Signal)的IPC

Python标准库signal模块提供了在 Python 程序中使用信号处理程序的机制。信号处理程序总是在 Python 主线程中执行,即使信号是在另一个线程中接收的。所以信号不能用作线程间通信的手段,如果需要线程间通信可以使用 threading 模块中的同步函数。此外,只允许主线程设置新的信号处理程序

信号通信的应用:
(1)故障定位技术(进程的底层故障,例如进程突然中断和一些可能性较小的故障);
(2)对进程的流程控制 ;

signal常用的几个函数

(1)os.kill(pid,sig)

用于从一个进程中发送一个信号给某个进程。
参数解析:

pid 指定发送信号的进程号
sig 要发送的信号代号(需要通过signal模块获取)
(2)signal.alarm(sec)

设置时钟信号,在一定时间后给自身发送一个SIGALRM信号。非阻塞函数,sec为定时长度。
原理:
时钟的创建是进程交由操作系统内核(kernal)帮助创建的,时钟和进程之间是异步执行的,当时钟到时,内核会发送信号给进程,进程接收信号进行相应的响应操作。这就是所谓的python异步处理方案。后面的时钟会覆盖前面的时钟,一个进程。只有一个挂起的时钟

import signal, os

def handler(signum, frame):
    '''
    信号处理程序
    '''
    print('Signal handler called with signal', signum)
    raise OSError("Couldn't open device!")

# 设置信号处理器
signal.signal(signal.SIGALRM, handler)
# 设置5s的定时,时间到后给自身发送一个SIGALRM信号
signal.alarm(5)

# open()可能无限等待,或者打开资源的时间过长
fd = os.open('/dev/ttyS0', os.O_RDWR)
# 关闭定时
signal.alarm(0)

3.基于管道(Pipe)的IPC

只有父进程与子进程之前可以用管道传递数据。通过os.read()和os.write()来对文件描述符进行读写操作,使用os.close()关闭描述符。

import os
import sys
import math


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = {}
    unit = n / 10
    for i in range(10):  # 分10个子进程
        mink = unit * i
        maxk = mink + unit
        r, w = os.pipe()
        pid = os.fork()
        if pid > 0:
            childs[pid] = r  # 将子进程的pid和读描述符存起来
            os.close(w)  # 父进程关闭写描述符,只读
        else:
            os.close(r)  # 子进程关闭读描述符,只写
            s = slice(mink, maxk)  # 子进程开始计算
            os.write(w, str(s))
            os.close(w)  # 写完了,关闭写描述符
            sys.exit(0)  # 子进程结束
    sums = []
    for pid, r in childs.items():
        sums.append(float(os.read(r, 1024)))
        os.close(r)  # 读完了,关闭读描述符
        os.waitpid(pid, 0)  # 等待子进程结束
    return math.sqrt(sum(sums) * 8)


print(pi(10000000))

4. 基于有名管道(fifo)的IPC

相对于管道只能用于父子进程之间通信,Unix还提供了有名管道可以让任意进程进行通信。有名管道又称fifo,它会将自己注册到文件系统里一个文件,参数通信的进程通过读写这个文件进行通信。

fifo要求读写双方必须同时打开才可以继续进行读写操作,否则打开操作会堵塞直到对方也打开

import os
import sys
import math


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = []
    unit = n / 10
    fifo_path = "/tmp/fifo_pi"
    os.mkfifo(fifo_path)  # 创建named pipe
    for i in range(10):  # 分10个子进程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            childs.append(pid)
        else:
            s = slice(mink, maxk)  # 子进程开始计算
            with open(fifo_path, "w") as ff:
                ff.write(str(s) + "\n")
            sys.exit(0)  # 子进程结束
    sums = []
    while True:
        with open(fifo_path, "r") as ff:
            # 子进程关闭写端,读进程会收到eof
            # 所以必须循环打开,多次读取
            # 读够数量了就可以结束循环了
            sums.extend([float(x) for x in ff.read(1024).strip().split("\n")])
            if len(sums) == len(childs):
                break
    for pid in childs:
        os.waitpid(pid, 0)  # 等待子进程结束
    os.unlink(fifo_path)  # 移除named pipe
    return math.sqrt(sum(sums) * 8)


print(pi(10000000))

5.基于消息队列(Queue)的IPC

操作系统提供了跨进程的消息队列对象可以让我们直接使用,但是python没有默认提供包装好的api来直接使用。我们必须使用第三方扩展来完成OS消息队列通信。第三方扩展是通过使用Python包装的C实现来完成的。

操作系统提供的消息队列有两种形式,一种是POSIX消息队列,另一种是System V 消息队列,有些操作系统两者都支持,有些只支持其中的一个。

System V 与 POSIX的区别:
(1)System V 存在时间比较老,包括linux等许多系统都支持,但是接口复杂,并且可能各平台上实现略有区别(如ftok的实现及限制)。
(2)POSIX是新标准,现在多数类UNIX系统也已实现,如果只是开发的话,那么还是POSIX好,因为语法简单,并且各平台上实现都一样。

文档

注意: 使用posix_ipc与sysv_ipc需要谨慎,如果你的代码需要跨平台,比如在Windows和linux系统上使用,需要:Windows + Cygwin 1.7,Linux with kernel ≥ 2.6

6.基于共享内存的IPC

共享内存也是非常高效的多进程通信方式,操作系统负责将同一份物理地址的内存映射到多个进程的不同的虚拟地址空间中。进而每个进程都可以操作这份内存。考虑到物理内存的唯一性,它属于临界区资源,需要在进程访问时搞好并发控制,比如使用信号量。我们通过一个信号量来控制所有子进程的顺序读写共享内存。

python标准库中实现共享内存通信的工具有mmap,但是该库只能用于基本类型,且需要预先分配存储空间,对于自定义类型的对象使用起来有诸多不便。

比较好用的第三方工具有apache开源的pyarrow,可通过pip install pyarrow直接安装,不需要预先定义存储空间且任意可序列化的对象均可存入共享内存。但使用时需要注意:pyarrow反序列化的对象为只读对象不可修改其值,想要修改对象可先通过对象copy。

7.基于套接字(Socket)的IPC

import os
import sys
import math
import socket


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = []
    unit = n / 10
    servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 注意这里的AF_INET表示普通套接字
    servsock.bind(("localhost", 0))  # 0表示随机端口
    server_address = servsock.getsockname()  # 拿到随机出来的地址,给后面的子进程使用
    servsock.listen(10)  # 监听子进程连接请求
    for i in range(10):  # 分10个子进程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            childs.append(pid)
        else:
            servsock.close()  # 子进程要关闭servsock引用
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect(server_address)  # 连接父进程套接字
            s = slice(mink, maxk)  # 子进程开始计算
            sock.sendall(str(s))
            sock.close()  # 关闭连接
            sys.exit(0)  # 子进程结束
    sums = []
    for pid in childs:
        conn, _ = servsock.accept()  # 接收子进程连接
        sums.append(float(conn.recv(1024)))
        conn.close()  # 关闭连接
    for pid in childs:
        os.waitpid(pid, 0)  # 等待子进程结束
    servsock.close()  # 关闭套接字
    return math.sqrt(sum(sums) * 8)


print(pi(10000000))

8.基于临时文件(File)的IPC

文件名可以使用子进程的进程id来命名予以区分,进程随时都可以通过os.getpid()来获取自己的进程id。

import os
import sys
import math


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    pids = []
    unit = n / 10
    for i in range(10):  # 分10个子进程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子进程开始计算
            with open("%d" % os.getpid(), "w") as f:
                f.write(str(s))
            sys.exit(0)  # 子进程结束
    sums = []
    for pid in pids:
        os.waitpid(pid, 0)  # 等待子进程结束
        with open("%d" % pid, "r") as f:
            sums.append(float(f.read()))
        os.remove("%d" % pid)  # 删除通信的文件
    return math.sqrt(sum(sums) * 8)


print(pi(10000000))

9.互斥锁

进程之间的数据是不共享的,但是共享同一套文件系统,所以访问同一个文件,或者同一个打印终端,是没有问题的,而共享带来的就是竞争,竞争带来的就是错乱,如下:

# 并发执行,效率高,但竞争着同一个打印终端,所以会带来错乱
from multiprocessing import Process
import time,os

def task():
    print('%s is running'%os.getpid())
    time.sleep(2)
    print('%s is done.'%os.getpid())

if __name__ == '__main__':
    for i in range(5):
        p = Process(target=task)
        p.start()

# 运行结果为:
13580 is running
7912 is running
6176 is running
13668 is running
12172 is running
7912 is done.
13580 is done.
6176 is done.
13668 is done.
12172 is done.

如何控制,就是加锁处理。而互斥锁的意思就是相互排斥,如果把多个进程比喻成多个人,那么互斥锁的工作原理就是多个人都要去争抢同一个资源:洗手间,一个人抢到了洗手间的锁,其余的人就要都等着,等到这个任务执行完成后释放锁,其他人中的一个人才有可能抢到这把锁......所以互斥锁的原理就是:同一时刻只能有一个进程可以成功获取到锁,相当于把并行改为串行,降低了效率,但是保证了数据的安全不错乱

from multiprocessing import Process,Lock
import time,os

def task(Lock):
    Lock.acquire()  # 加锁
    print('%s is running'%os.getpid())
    time.sleep(2)
    print('%s is done.'%os.getpid())
    Lock.release()  # 释放锁

if __name__ == '__main__':
    lock = Lock()  # 先实例化一个对象
    for i in range(5):
        p = Process(target=task,args=(lock,))
        p.start()
# 运行结果为
13868 is running  # 一个子进程开始了
13868 is done.  # 这个子进程死掉了
9576 is running
9576 is done.
13956 is running
13956 is done.
11696 is running
11696 is done.
11632 is running
11632 is done.

2.模拟抢票

我们在12306上抢票过程中,明明看到了仅剩下1张票,但是现在呢有10个人在开始抢票,让我们来模拟一下:

from multiprocessing import Process
import json
import time

# 查询
def search(name):
    time.sleep(1)
    dic=json.load(open('db.txt','r',encoding='utf-8'))  # 在当前目录下db.txt内容:
    print('<%s> 查看到剩余票数【%s】' %(name,dic['count']))

# 购票
def get(name):
    time.sleep(1)
    dic=json.load(open('db.txt','r',encoding='utf-8'))
    if dic['count'] > 0:
        dic['count']-= 1
        time.sleep(3)
        json.dump(dic,open('db.txt','w',encoding='utf-8'))
        print('<%s> 购票成功' %name)


def task(name):
    search(name)
    get(name)

if __name__ == '__main__':
    for i in range(10):
        p=Process(target=task,args=('路人%s' %i,))
        p.start()
# 运行结果
<路人2> 查看到剩余票数【1】
<路人0> 查看到剩余票数【1】
<路人3> 查看到剩余票数【1】
<路人4> 查看到剩余票数【1】
<路人1> 查看到剩余票数【1】
<路人8> 查看到剩余票数【1】
<路人5> 查看到剩余票数【1】
<路人9> 查看到剩余票数【1】
<路人6> 查看到剩余票数【1】
<路人7> 查看到剩余票数【1】
<路人2> 购票成功
<路人0> 购票成功
<路人3> 购票成功
<路人4> 购票成功
<路人1> 购票成功
<路人8> 购票成功
<路人5> 购票成功
<路人9> 购票成功
<路人6> 购票成功
<路人7> 购票成功

这样看运行结果的话,肯定是不合理的,票只有一张,怎么会让10个人都购票成功呢?所以这里需要使用互斥锁,互斥锁就是相互排斥,它工作的原理就是把并发变成串行,虽然程序运行效率低了,但是对数据安全和不错乱得到了明显的提升。

from multiprocessing import Process,Lock
import json
import time

def search(name):
    time.sleep(1)
    dic=json.load(open('db.txt','r',encoding='utf-8'))
    print('<%s> 查看到剩余票数【%s】' %(name,dic['count']))


def get(name):
    time.sleep(1)
    dic=json.load(open('db.txt','r',encoding='utf-8'))
    if dic['count'] > 0:
        dic['count']-= 1
        time.sleep(3)
        json.dump(dic,open('db.txt','w',encoding='utf-8'))
        print('<%s> 购票成功' %name)


def task(name,lock):
    '''
    因为在查询的时候看到的余票是一样的,所以需要在购票环节需要添加互斥锁
    :param name:
    :param lock:
    :return:
    '''
    search(name)
    lock.acquire()  # 加锁
    get(name)
    lock.release()  # 解锁

if __name__ == '__main__':
    # 首先要生成互斥锁对象
    lock = Lock()
    for i in range(10):
        p=Process(target=task,args=('路人%s' %i,lock))
        p.start()

在上面的代码中,10位用户都看到了剩余的票数,但是只有路人3抢到了票,这就相当于10个人去抢洗手间,但是只有3号用户得到了洗手间的钥匙进去了,并且把洗手间给反锁了。

互斥锁和join的区别

1.join()方法
join方法的作用是让主进程等待子进程运行成功之后在再去运行,join()是把并行改成了串行,确实能够保证数据的安全以及不错乱,但是查票的过程中谁先查到票,那么这张票就是谁的。join()就是把所有的任务都变成了串行,如task函数里的search和get。可以比喻成在一段代码中在最上面添加了try,在最下面添加了except。

2.互斥锁
互斥锁就是相互排斥,在购票环节设置互斥锁,也是将并行改成了串行,但是互斥锁是将程序中的一个任务的某一段代码设置成串行,就比如task函数里的get任务,然后互斥锁却十分符合我们的需求。

3.互斥锁优点
加锁可以保证多个进程在修改同一块数据的时,同一个时间只能有一个任务可以进行修改,即串行的修改,虽然效率下来了,但是却可以保证数据的安全性。

4.互斥锁缺点

  •  效率低(共享数据基于文件,而文件是硬盘上的数据)
  •  需要自己加锁处理

3.生产者消费者模型

1.为什么要使用生产者消费者模型

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者生产的很快,而消费者处理的速度却很慢,那么生产者就必须等待消费者处理完后才能继续生产数据。同样的道理,如果消费者的处理速度大于生产者,那么消费者就需要等待生产者。为了解决这个问题于是引入了生产者和消费者模型。

2.什么是生产者和消费者模型

生产者消费者模式是通过一个容器来解决生产者和消费者的耦合问题的。生产者和消费者之间不直接通信,而通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中去取,阻塞队列就相当于是一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。

3.生产者和消费者模型实现

import time
from multiprocessing import Process,Queue

def producer(q):  # 生产者
    for i in range(3):  # 3个人生产包子
        res = '包子%s'%i
        time.sleep(1)
        print('生产者生产了%s'%res)

        q.put(res)  # 生产完把包子丢到消息队列里面去

def consumer(q):  # 消费者
    while True:
        res = q.get()  # 从消息队列中取数据赋值给res
        time.sleep(2)
        print('消费者吃了%s'%res)

if __name__ == '__main__':
    q = Queue()  # 如果不写大小,那么默认是无限制的

    # 生产者们
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    p3 = Process(target=producer,args=(q,))

    # 消费者们
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    print('主进程')

讲解一下这段代码:

1.生产者来生产包子,消费者来吃包子。

2.生产者每1秒生产3个包子,消费者每2秒吃一个包子,备注为:每个生产者/每个消费者

3.创建消息队列Queue,无限制大小

4.p1/p2/p3.join()方法保证了子进程先运行完之后再运行主进程

5.生产者把包子丢到消息队列里面之后就不用管了

接下来,这段程序的运行结果为:

# 运行结果为
生产者生产了包子0
生产者生产了包子0
生产者生产了包子0
生产者生产了包子1
生产者生产了包子1
生产者生产了包子1
生产者生产了包子2
消费者吃了包子0
生产者生产了包子2
消费者吃了包子0
生产者生产了包子2
主进程
消费者吃了包子0
消费者吃了包子1
消费者吃了包子1
消费者吃了包子1
消费者吃了包子2
消费者吃了包子2
消费者吃了包子2
.......

但是程序在运行之后,会一直处于阻塞状态,因为消费者还在不停的取数据,但是生产者已经把数据生产完了

如何去解决上诉的问题呢?能不能思考,如果消费者去取数据的时候取了一个None,那么就停止:

import time
from multiprocessing import Process,Queue

def producer(q):  # 生产者
    for i in range(3):  # 3个人生产包子
        res = '包子%s'%i
        time.sleep(1)
        print('生产者生产了%s'%res)

        q.put(res)  # 生产完把包子丢到消息队列里面去

def consumer(q):  # 消费者
    while True:
        res = q.get()  # 从消息队列中取数据赋值给res
        if res == None:break  # 如果取的数据是空,那么就结束
        time.sleep(2)
        print('消费者吃了%s'%res)

if __name__ == '__main__':
    q = Queue()  # 如果不写大小,那么默认是无限制的

    # 生产者们
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    p3 = Process(target=producer,args=(q,))

    # 消费者们
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)  # 因为有两个消费者,那么就需要再往消息队列里面传递两个None
    q.put(None)
    print('主进程')

其实我们的思路无非就是发送结束信号而已,有另外一种队列提供了这种机制

JoinableQueue([maxsize])

这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享和条件变量实现的。

参数介绍

maxsize是队列中允许最大项数,省略则代表无大小限制

方法介绍

JoinableQueue的实例p除了与Queue对象相同的方法之外,还有:
1.q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。
如果调用此方法的次数大于从队列中删除项目的数量,将引发异常
2.q.join():生产者使用此方法发出信号,直到队列中所有的项目都被处理。
阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
基于JoinableQueue实现生产者和消费者模型
import time
from multiprocessing import Process,JoinableQueue

def producer(q):  # 生产者
    for i in range(3):  # 3个人生产包子
        res = '包子%s'%i
        time.sleep(2)
        print('生产者生产了%s'%res)

        q.put(res)  # 生产完把包子丢到消息队列里面去
    q.join()  # 等待消息队列的数据都被取完

def consumer(q):  # 消费者
    while True:
        res = q.get()  # 从消息队列中取数据赋值给res
        time.sleep(1)
        print('消费者吃了%s'%res)
        q.task_done()  # 消费者给生产者发送结束信号,但是还是在做q.get()


if __name__ == '__main__':
    q = JoinableQueue()  # 如果不写大小,那么默认是无限制的

    # 生产者们
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    p3 = Process(target=producer,args=(q,))

    # 消费者们
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    c1.daemon = True
    c2.daemon = True


    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    print('主进程')

一些经验
开发中的应用总结:
(1)仅进程同步不涉及数据传输,可以使用信号、信号量;
(2)若进程间需要传递少量数据,可以使用管道、有名管道、消息队列;
(3)若进程间需要传递大量数据,最佳方式是使用共享内存,推荐使用pyarrow,这样减少数据拷贝、传输的时间内存代价;
(4)跨主机的进程间通信(RPC)可以使用socket通信。

0

评论区