吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 4962|回复: 1
收起左侧

[会员申请] 申请会员ID:luyi

[复制链接]
吾爱游客  发表于 2019-10-21 20:15
申 请 I D:luyi
个人邮箱:164196046@qq.cn


# python并发编程之进程

## 进程(process)

```
在面向线程设计的系统(如当代多数操作系统、Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器。进程拥有自己独立的内存空间,所属线程可以访问进程的空间。程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行实例。 例如,我们在PyCharm开发环境中写好一个程序,运行的时候python解释器完成解释并执行该程序。

进程 = 程序段 + 数据段 + PCB
```

## 全局解释器锁GIL

```
GIL是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程。Python的Cpython解释器(普遍使用的解释器)使用GIL,在一个Python解释器进程内可以执行多线程程序,但每次一个线程执行时就会获得全局解释器锁,使得别的线程只能等待,由于GIL几乎释放的同时就会被原线程马上获得,那些等待线程可能刚唤醒,所以经常造成线程不平衡享受CPU资源,此时多线程的效率比单线程还要低下。可以说它的初衷是很好的,为了保证线程间的数据安全性;但是随着时代的发展,GIL却成为了python并行计算的最大障碍,但这个时候GIL已经遍布CPython的各个角落,修改它的工作量太大,特别是对这种开源性的语音来说。但幸好GIL只锁了线程,我们可以再新建解释器进程来实现并行,那这就是multiprocessing的工作了。
```

## 开启子进程的两种方式

### 第一种:直接调用

```
from multiprocessing import Process
import time
def task(name):
    print(f'{name}is running ')
    time.sleep(3)
    print(f'{name} is done')

if __name__ == '__main__':
    # windows操作系统开启多进程,必须得写在main下面。
    p = Process(target=task,args=('路一',))   # args 一定是一个元组的形式
    p.start()
    print('===主进程')
    # start与print几乎是同时发出,但是由于操作系统调用子进程会慢一些。#

# 整个py文件是主进程
# p.start通知操作系统,你给我在内存中开辟一个空间,将p进程放进去,让cpu执行。
```

### 第二种 :继承式调用

```
from multiprocessing import Process
import time
class Myprocess(Process):
    def __init__(self,name):
        # 这里的顺序是有要求,不然反过来写会覆盖。
        super().__init__()  # 必须要执行父类的init
        self.name = name
    def run(self):   # 必须定义这个名字
        print(f'{self.name}is running ')
        time.sleep(3)
        print(f'{self.name} is done')

if __name__ == '__main__':
    p = Myprocess('路一')
    p.start()
    print('主进程')
```

## 获取进程以及父进程的pid

操作系统如何区分进程?每个进程都有一个唯一标识,pid

```
1.在终端查看进程的pid,cmd中输入tasklist
2.在终端查看指定的进程pid,cmd中输入 tasklist | findstr pycharm
```

![查看进程信息](C:\Users\Administrator\Desktop\笔记\并发编程博客笔记\查看进程信息.png)

3.通过代码查看pid

```
import os
import time
print(f'子进程:{os.getpid()}')  # 查看当前进程pid
print(f'父进程:{os.getppid()}') # 查看父进程pid
time.sleep(50000)
```

## 进程之间的数据隔离

测试:

```
from multiprocessing import Process
import time
x = 1000
def task():
    global x
    x = 2

if __name__ == '__main__':
    p1 = Process(target=task,)
    p1.start()  # 不是瞬发的,可能会等一会儿
    time.sleep(1)
    print(f'主进程:{x}')   # 主进程没有运行task这个函数,所以主进程内的x并未改变。

    # 但上面有些瑕疵,是主进程先执行的,我们想让子进程先执行,这样才能验证是否正的改变x,所以用一下time来使主进程慢一步。
```

```
结论:进程之间的数据是相互隔离的。
```

测试是否有小数据池:

```
from multiprocessing import Process
import time
x = 100  # (-5 - 256)
s = 'h'
l1 = [1,2,3]
def task():
    print(f'子进程:{id(x)}')
    print(f'子进程:{id(s)}')
    print(f'子进程:{id(l1)}')

if __name__ == '__main__':
    print(f'主进程:{id(x)}')
    print(f'主进程:{id(s)}')
    print(f'主进程:{id(l1)}')
    p1 = Process(target=task,)
    p1.start()
```

```
结论:只有数字满足小数据池(-5-256)初始化时子进程与主进程是沿用一个.
```

## join方法

join :阻塞目前父进程,它是通知主进程,等我执行完毕,主进程才能执行。

例1:

```
from multiprocessing import Process
import time

def task(name,sec):
    time.sleep(sec)
    print(f'{name} is running')

if __name__ == '__main__':
    p1 = Process(target=task, args=('1',1))
    p2 = Process(target=task, args=('2',2))
    p3 = Process(target=task, args=('3',3))
    start_time = time.time()

p1.start()
p2.start()
p3.start()
p1.join()  # p1 1s
p2.join()  # p2 2s
p3.join()  # p3 3s
print(f'主进程{time.time()-start_time}秒后执行')  # 主进程3.194712162017822秒后执行
```

```
结论:
        1.当 p1.join 通知主进程等p1结束后,主进程开始执行,这里的主进程是 p2.join以后后面的内容。

        2.当p1执行结束后,p2.join通知主程序等p2结束后,主程序开始执行,这里面的主程序是 p3.join以及后面的内容。

        3.当p2执行结束后, p3.join通知主程序等p3结束后,主程序开始执行,这里面的主程序是print.

由于p1,p2,p3是同时发出的通知,它们三个在同时处理,所以,最下面的print要等待这三个程序结束,也就是等待执行时间最长的进程结束后,才开始执行,
```

例2

```
from multiprocessing import Process
import time

def task(name,sec):
    time.sleep(sec)
    print(f'{name} is running')

if __name__ == '__main__':
    p1 = Process(target=task, args=('1',1))
    p2 = Process(target=task, args=('2',2))
    p3 = Process(target=task, args=('3',3))
    start_time = time.time()

p1.start()
p1.join()
p2.start()
p2.join()
p3.start()
p3.join()
print(f'主进程{time.time()-start_time}秒后执行') # 主进程6.424545526504517秒后执行
```

```
当 p1.join通知主程序等p1 结束后,主程序开始执行,这里的主程序是 p2.start及以后的内容,所以,由于不是同时发出的通知,是一种串行的效果。
```

## 进程对象的其它属性

```
from multiprocessing import Process
import time

def task(name):
    print(f'{name} is running')
    time.sleep(3)
    print(f'{name} is done')

if __name__ == '__main__':
    p = Process(target=task, args=('1',),name = '任务1')  # 给进程对象设置name属性
    p.start()         # start 只是一个通知,它会慢一拍。
    # print(p.pid)    # 获取进程pid号
    # print(p.name)
    # time.sleep(1)
    # p.terminate()     # 终止(结束)子进程, 它也不是立即的
    # terminate 与 start一样的工作原理:通知操作系统终止或开启一个子进程,内存中终止或开启是会耗费时间的。
    # print(p.is_alive())        # 判断子进程是否存活
    # is_alive 只是查看内存中p子进程是否运行,比terminat,start快。 这是否矛盾呢?
    # 问如何主动杀死子进程, terminate
    print('主进程')
```

```
对象.pid()
        获取对象pid号。

对象.name
        在初始化时,给进程对象设置name属性。

对象.terminate()
        终止(结束)子进程
        terminate 与 start一样的工作原理:通知操作系统终止或开启一个子进程,内存中终止或开启是会耗费时间的。

对象.is_alive()
        判断子进程是否存活.
        如果terminate()与对象.is_alive()挨着,会打印True,因为对象.is_alive()的速度比terminate()快一些
```

## 僵尸进程与孤儿进程

### **僵尸进程**

```
在类UNIX系统中,僵尸进程是指子进程完成执行,父进程没有通过wait系统调用来读取这个子进程的退出状态的话(在操作系统的进程表中仍有一个表项(进程控制块PCB)(在Linux中具体是task_struct结构)),这个子进程就会一直维持僵尸进程状态,称为“僵尸进程”。
```

```
僵尸进程是无法通过 kill命令来清除
僵尸进程的状态为EXIT_ZOMBIE,缩写Z,ps命令也会打印僵尸进程,但无法使用kill杀死。
可以使用命令: ps aux | grep Z 查看
```

**子进程的回收**

```
子进程需要保留表项以允许其父进程读取子进程的 `exit status`:一旦退出态通过 `wait系统调用`读取,僵尸进程条目就从进程表中删除,称之为回收
```

**在回收僵尸进程之前,如果父进程退出了,则僵尸进程变为“孤儿进程”**,进而被`init`进程接管、回收。

**孤儿进程**

**父进程**执行完成或被**终止**后仍继续运行的一类进程。

手动创建僵尸进程

```
from multiprocessing import Process
import time
import os

def task(name):
    print(f'{name} is running')
    print(f'子进程开始了{os.getpid()}')
    time.sleep(10)

if __name__ == '__main__':
    p = Process(target=task, args=('1',))  # 给进程对象设置name属性
    for i in range(10000):
        p =Process(target=task, args=(str(i),))
        p.start()
    print(f'主进程开始了{os.getpid()}')
```

### 为什么需要僵尸进程?

```
之所以保留 task_struct,是因为它里面保存了进程的pid,退出码、以及一些统计信息,父进程很可能会关心这些信息。
```

### 如何清除僵尸进程?

```
第一种方法:结束父进程,使之成为孤儿进程。当然者个是暴力的手段,因为我们一般肯定是希望父进程继续运行的。

第二种方法:通过wait调用来读取子进程退出状态。比如通过 multiprocessing.Process产出的进程可以通过 子进程.join()的方法来wait,也可以在父进程中处理 SIGCHLD信号,在处理程序中调用wait系统调用或者直接设置为 SIG_IGN来清除僵尸进程。
```

## 守护进程

通俗:守护:我守护着你,你要是死了,我就与你一起

守护进程是不能开启子进程的,不然当父进程结束的时候,守护进程结束,那么由守护进程开启的进程会成为孤儿进程。(守护进程设置必须在 start前面!)

```
from multiprocessing import Process
import time
import os

def task(name):
    print(f'{name} is running')
    print(f'子进程开始了{os.getpid()}')
    time.sleep(50)
    print(f'{name} is done')


if __name__ == '__main__':
    p = Process(target=task, args=('1',))  # 给进程对象设置name属性
    p.daemon = True  # 将p子进程设置成守护进程,守护主进程,只要主进程结束,子进程无论执行与否,都马上结束
    # 守护进程设置必须在 start前面!
    p.start()
    time.sleep(2)
    print(f'主进程开始了{os.getpid()}')
```

## 进程同步

在系统中有一些需要相互合作、协同工作的进程,它们之间的相互联系称为**进程的同步**。

使并发执行的诸进程之间能有效地**共享资源**和**相互合作**,从而使程序的并发执行具有可再现性。

### 进程的两种制约关系

```
间接制约:竞争同一资源而产生的相互排斥的关系 。

    解释: 当某一进程访问某一资源时,不允许别的进程同时访问,这种限制称为互斥, 即多个进程在访问某些资源(如临界资源)时,也要有一种执行次序上的协调 ,当一个进程访问完毕,另一个进程才能访问。所以就其本质来讲,互斥仍是一种同步。

```

```
直接制约:进程间共同完成一项任务时直接发生相互作用的关系。
```

### 临界资源

```
临界资源:一次仅允许一个进程访问的资源。例如打印机。
临界区:访问临界资源的代码段,不允许多个并发进程交叉执行的一段程序。
```

### 临界区必须**互斥**访问

```
进入区:(1) 检查临界资源是否被访问,未被访问,转(2),否则转(1)。
           (2) 进入临界区,并设访问标志。
退出区:恢复访问标志,允许其它进程进入
```

### 同步机制应遵循的准则

```
空闲让进——有效利用
忙则等待——互斥
有限等待——避免“死等”
让权等待——避免“忙等”
```

## 互斥锁

问题1

```
3个进程,同一时刻共抢一个资源:输出平台。

需求分析:多个进程共抢一个资源,你要是做到结果第一位,效率第二位。你应该牺牲效率,保证结果。做到串行。
```

方法1

```
from multiprocessing import Process
import time
import random

def task1():
    print('task1 开始打印')
    time.sleep(random.randint(1,3))
    print('task1 打印完成')
def task2():
    print('task2 开始打印')
    time.sleep(random.randint(1, 3))
    print('task2 打印完成')
def task3():
    print('task3 开始打印')
    time.sleep(random.randint(1, 3))
    print('task3 打印完成')

if __name__ == '__main__':
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()
```

```
虽然上面这个版本完成了串行结果,保证了顺序,但是没有保证公平。顺序是人为写好的。我们要做到公平的去抢占资源,谁先抢到,先执行谁。
```

方法2(lock)

```
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
# lock = Lock()  # 这样的话,子进程各会拷贝一份,也就是创建一把锁,会出问题,好多把锁
def task1(lock):
    print('task1')      # 第一种验证方法:验证cpu遇到I/O切换了
    lock.acquire()
    print('task1 开始打印')
    time.sleep(random.randint(1,3))
    print('task1 打印完成')
    lock.release()
def task2(lock):
    print('task2')
    lock.acquire()
    print('task2 开始打印')
    time.sleep(random.randint(1, 3))
    print('task2 打印完成')
    lock.release()
def task3(lock):
    print('task3')
    lock.acquire()
    # lock.acquire()  # 多加一次会进入死锁
    print('task3 开始打印')
    time.sleep(random.randint(1, 3))
    print('task3 打印完成')
    lock.release()
def task4():
    print('task4 开始打印')
    time.sleep(random.randint(1, 3))
    print('task4 打印完成')


if __name__ == '__main__':
    lock = Lock()       # 在这里实例化是要保证同一把锁! 所以要以参数的形式传入
    p1 = Process(target=task1,args=(lock,))
    p2 = Process(target=task2,args=(lock,))
    p3 = Process(target=task3,args=(lock,))
    p4 = Process(target=task4)
    p1.start()
    p2.start()
    p3.start()
    # p4.start()    # 第二种验证方法,当如果p1拿到锁,并遇到I/O阻塞的时候,cpu会进行切换,找其它可用的进程。
```

```
上面的程序中:当第一个到达的时候,假如p1开始执行,遇到阻塞的时候,cpu要切换,发现其它也需要同一把锁,所以cpu就停下来等待p1的阻塞结束。

上锁:一定要是同一把锁:上锁一次,解锁一次。
```

### 互斥锁与join区别共同点

```
共同点:都完成了进程之间的串行
区别:join是人为控制的进程串行, 互斥锁是随机的抢占资源
```

情景二:模拟抢票

问题2

```
模拟抢票

需求分析:买票之前需要查票,必经流程,有可能你查票的同时,100个人也在查本次车票。买票时,你要先从服务端获取票数,票数 > 0,买票,然后服务端票数减一,中间肯定有网络延迟。
```

```
from multiprocessing import Process
import time
import random
from multiprocessing import Lock
import json
import os

def search():
    time.sleep(random.random())
    dic = json.load(open('db.json',encoding='utf-8'))
    print(f"剩余票数:{dic['count']}")

def get():
    dic = json.load(open('db.json', encoding='utf-8'))
    time.sleep(random.randint(1,3))
    if dic['count'] > 0:
        dic['count'] -= 1
        json.dump(dic,open('db.json',mode = 'w' ,encoding='utf-8'))
        print(f'{os.getpid()}用户 购买成功')
    else:
        print('没票了....')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(5):
        p = Process(target=task,args=(lock,))
        p.start()
    # with open('db.json',mode='w',encoding='utf-8') as f1:
    #     json.dump({'count':3},f1)     # 写入票数
```

```
多进程原则上是不能互相通信的,它们在内存级别数据隔离的。不代表硬盘的数据隔离,它们可以共同操作一个文件。

多个进程抢占同一个()资源,要想公平按照顺序,只能串行。
```

## 进程之间的通信:队列

多个进程间的通信:基于文件以及加锁的方式。

**缺点:**

```
操作文件效率低
自己加锁很麻烦,很容易出现死锁,递归锁。
```

**什么是队列?**

队列是存在于内存中的一个容器,最大的一个特点:队列的特性就是FIFO,完全支持先进先出的原则。

```
from multiprocessing import Queue

q = Queue(3)   # 可以设置元素的最大个数
q.put('小黑')
q.put({'count':1})
def func():
    print('in func')
q.put(func)
# q.put(666)    # 当队列中的数据已经达到上限,再插入数据的时候,该进程就会夯住,阻塞,等待别的进程取出数据。
print(q.get())
print(q.get())
print(q.get())
print(q.get()) # 当你将数据取完继续在取值的时候,该进程会夯住,阻塞,等待别的进程插入数据。

队列的maxsize q = Queue(maxsize) 数据量不宜过大。精简的重要的数据。比如:各种请求链接。
```

### 常用方法介绍:

put(self, obj, block=True, timeout=None)

```
当超过最大限度时,默认阻塞 block=True 改成False 如果继续 put 报queue.Full异常
timeout 延时报错,超过设置的时间间隔后还插不进去数据,会报queue.Full异常
```

get(self, block=True, timeout=None)

```
当队列为空的时候,默认阻塞;改为False后如果继续 get 报queue.Empty异常
timeout 延时作业,超过设置的时间间隔还取不出来数据,会报queue.Empty异常。
```

### 进程之间的通信实例

12306车票的抢购,预期发售10个,100个人去抢。

```
import os
from multiprocessing import Queue
from multiprocessing import Process
import queue

def task(q):
    try:
        q.put(f'{os.getpid()}',block=False)
    except queue.Full:
        pass

if __name__ == '__main__':
    q = Queue(10)
    for i in range(100):
        p  = Process(target=task,args=(q,))
        p.start()
    for i in range(1,11):
        print(f"排名{i}的 用户{q.get()}抢到了")

利用队列进行进程之间通信:简单,方便,不用自己手动加锁。队列自带阻塞,可持续化读取数据。
```

## 进程之间的通信:管道

### 管道

在UNIX系统中,它连接一个**读进程**和一个**写进程**,以实现它们之间通信的共享文件,又称pipe文件。它是以文件为基础,实质是以**外存**来进行**数据通信**。

在windows中是一段共享内存。这段共享的内存设计采用**数据流I/0**的方式来访问。由一个**进程读**、另一个**进程写**。

类似于一根管道的两端,所以这种进程间的通信方式被称作“管道”。

```
Python官方文档的描述:
Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.
```

因此, Pipe仅仅适用于只有两个进程**一读一写的单双工**情况,也就是说信息是只向一个方向流动。例如电视、广播,看电视的人只能看,电视台是能播送电视节目。

```
from multiprocessing import Pipe,Process

def task(x,pipe):
    _out_pipe, _in_pipe = pipe
    print('子进程开始啦')
    # 关闭fork过来的输入端(关闭进水口)
    _in_pipe.close()
    while True:
        try:
            # 放水
            msg = _out_pipe.recv()
            print(msg)
        except EOFError:
            # 当out_pipe接受不到输出的时候且输入被关闭的时候,会抛出EORFError,可以捕获并且退出子进程
            return

if __name__ == '__main__':
    out_pipe, in_pipe = Pipe() # duplex为True,创建的管道是双向的,默认为
    p1 = Process(target=task, args=(100, (out_pipe, in_pipe)))
    p1.start()

    # 等pipe被fork 后,关闭主进程的输出端
    # 这样,创建的Pipe一端连接着主进程的输入,一端连接着子进程的输出口
    out_pipe.close()      # 关闭出水口,开始进水
    for x in range(100):
        in_pipe.send(x)
    print('xxxxxxx')
    in_pipe.close()       # 关闭进水口。 当进水口关闭后,子进程才开始使用这个管道。
    p1.join()
    print("主程序运行结束")
```

上面的代码主要用到了pipe的send()、recv()、close()方法。当pipe的输入端被关闭,且无法接收到输入的值,那么就会抛出EOFError。

新建一个Pipe(duplex)的时候,如果duplex为True,那么创建的管道是双向的;如果duplex为False,那么创建的管道是单向的。

**举例**

```
主程序关闭出水口,放水,当水充满后,关闭进水口;
子程序拿到管道。先关闭进水口,开始放水,等水流完后,关闭出水口。
```

## 生产者消费者模型

回顾:以前学到的模型,设计模式(单例模式),归一化设计,理论等待,都是交给你一个编程思路,如果以后遇到类似的情况,直接套用即可。

生产者:生产数据的进程。

消费者:对生产者生产出来的数据做进一步的处理的进程。

吃包子举例:**厨师**生产出包子,不可能直接塞你嘴里,他要放在**盆**中;**消费者**从盆中取出包子食用。

**三个主体**:(生产者)厨师、(容器队列)盆、(消费者)人。

为什么夹杂这个容器?

(容器)盆起到一个缓冲区的作用,也起到了解耦的作用(只有生产者和消费者话它们是强耦合的)。

**平衡了生产力和消费力。**

生产者消费者模型多用于**并发**。

### 方法一:利用队列实现

```
from multiprocessing import Process
from multiprocessing import Queue
import time
import random

def producer(q):
    for i in range(1,6):
        time.sleep(random.randint(1,3))
        res = f'{i}号包子'
        q.put(res)
        print(f'\033[0;32m 生产者生产了{res}\033[0m')

def consumer(q):
    while 1:
        try:
            time.sleep(random.randint(1,3))
            ret = q.get(timeout= 5)
            print(f'消费者吃了{ret}')
        except Exception:
            return

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
```

生产者消费者模型:

合理的去调控多个进行去生产数据以及提取数据,中间有个必不可少的环节容器队列。

本质上:利用队列进行通信。

## 方法二:利用管道实现

```
from multiprocessing import Process,Pipe

def producer(out_pipe, in_pipe):
    out_pipe.close()
    for i in range(1,101):
        s = f"第{i}号包子"
        print(f'生产者生产了{s}')
        in_pipe.send(s)
    in_pipe.close()

def consumer(out_pipe, in_pipe):
    in_pipe.close()
    # while True:
    #     try:
    #         s = out_pipe.recv()
    #         print(f"消费者吃了{s}")
    #     except EOFError:          # 只有在进水口关闭的时候才能引发异常
    #         print('xxx')
    #         return
    for i in range(100):
        s = out_pipe.recv()
        print(f"消费者吃了{s}")
    out_pipe.close()

if __name__ == '__main__':
    out_pipe, in_pipe = Pipe()
    pro = Process(target=producer,args=(out_pipe, in_pipe))
    con = Process(target=consumer,args=(out_pipe, in_pipe))
    pro.start()
    con.start()
    con.join()
    print('主进程结束')
```

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

Hmily 发表于 2019-10-22 11:03
这个是你原创?百度一把了?
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-24 13:39

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表