进程和队列(Processes and queues)

进程和队列

僵尸进程与孤儿进程

守护进程

互斥锁(重点)

消息队列

实现进程间数据交互(IPC机制)

生产者消费者模型

线程理论(重要)

  • 僵尸进程与孤儿进程
  • 守护进程
  • 互斥锁(重点)
  • 消息队列
  • 实现进程间数据交互(IPC机制)
  • 生产者消费者模型
  • 线程理论(重要)

僵尸进程与孤儿进程

僵尸进程

进程代码运行结束之后并没有直接结束而是需要等待回收子进程资源才能结束

孤儿进程

即主进程已经死亡(非正常)但是子进程还在运行

守护进程

守护进程:即守护者某个进程 一旦这个进程结束那么也随之结束

from multiprocessing import Process
import time


def test(name):
    print('总管:%s is running' % name)
    time.sleep(3)
    print('总管:%s is over' % name)


if __name__ == '__main__':
    p = Process(target=test, args=('jason',))
    p.daemon = True  # 设置为守护进程(一定要放在start语句上方)
    p.start()
    print("皇帝jason寿终正寝")
    time.sleep(0.1)

互斥锁

简介:并发情况下操作同一份数据 极其容易造成数据错乱 解决措施:将并发变成串行 虽然降低了效率但是提升了数据的安全

锁就可以实现将并发变成串行的效果

行锁、表锁

使用锁的注意事项

在主进程中产生 交由子进程使用

1.一定要在需要的地方加锁 千万不要 随意加

2.不要轻易的使用锁(死锁现象)

#以后的编程生涯中 几乎不会解除到自己操作锁的情况

import json
from multiprocessing import Process, Lock
import time
import random


# 查票
def search(name):
    with open(r'data.txt', 'r', encoding='utf8') as f:
        data_dict = json.load(f)
        ticket_num = data_dict.get('ticket_num')
        print('%s查询余票:%s' % (name, ticket_num))


# 买票
def buy(name):
    # 先查票
    with open(r'data.txt', 'r', encoding='utf8') as f:
        data_dict = json.load(f)
        ticket_num = data_dict.get('ticket_num')
    # 模拟一个延迟
    time.sleep(random.random())
    # 判断是否有票
    if ticket_num > 0:
        # 将余票减一
        data_dict['ticket_num'] -= 1
        # 重新写入数据库
        with open(r'data.txt', 'w', encoding='utf8') as f:
            json.dump(data_dict, f)
        print('%s: 购买成功' % name)
    else:
        print('不好意思 没有票了!!!')


def run(name,mutex):
    search(name)
    mutex.acquire()  # 抢锁
    buy(name)
    mutex.release()  # 释放锁



if __name__ == '__main__':
    mutex = Lock()
    for i in range(1, 11):
        p = Process(target=run, args=('用户%s' % i,mutex))
        p.start()

消息队列

队列:先进先出

from multiprocessing import Queue


q = Queue(5)  # 括号内可以填写最大等待数

# 存放数据
q.put(111)
q.put(222)
# print(q.full())  # False       判断队列中数据是否满了
q.put(333)
q.put(444)
q.put(555)
# print(q.full())
# q.put(666)  # 超出范围原地等待 直到有空缺位置
# 提取数据
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# print(q.get())  # 没有数据之后原地等待直到有数据为止
print(q.get_nowait())  # 没有数据立刻报错

full和get_nowait能否用于多进程情况下的精确使用

队列的使用就可以打破进程间默认无法通信的情况

IPC机制

from multiprocessing import Queue, Process


def producer(q):
    q.put("子进程p放的数据")


def consumer(q):
    print('子进程c取的数据',q.get())


if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q,))
    c = Process(target=consumer, args=(q,))
    p.start()
    c.start()

    # q.put('主进程放的数据')
    # p = Process(target=consumer, args=(q,))
    # p.start()
    # p.join()
    # print(q.get())
    # print('主')

生产者消费模型

生产者:负责产生数据
消费者:负责处理数据
该模型需要解决恭喜不平衡现象

生产者:负责产生数据

消费者:负责处理数据

该模型需要解决恭喜不平衡现象

from multiprocessing import Queue, Process, JoinableQueue
import time
import random


def producer(name, food, q):
    for i in range(10):
        print('%s 生产了 %s' % (name, food))
        q.put(food)
        time.sleep(random.random())


def consumer(name, q):
    while True:
        data = q.get()
        print('%s 吃了 %s' % (name, data))
        q.task_done()


if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()
    p1 = Process(target=producer, args=('大厨jason', '玛莎拉', q))
    p2 = Process(target=producer, args=('印度阿三', '飞饼', q))
    p3 = Process(target=producer, args=('泰国阿人', '榴莲', q))
    c1 = Process(target=consumer, args=('班长阿飞', q))

    p1.start()
    p2.start()
    p3.start()
    c1.daemon = True
    c1.start()

    p1.join()
    p2.join()
    p3.join()

    q.join()  # 等待队列中所有的数据被取干净

    print('主')

线程理论

什么是线层?进程其实是一个资源单位 真正被CPU执行的其实是进程里面的线程

    进程类似于是工厂 线程类似于是工厂里面的一条条流水线
    所有的进程肯定含有最少一个线程

进程间数据默认是隔离的 但是同一个进程内的多个线程数据是共享的

开设线程的两种方式

开设进程需要做哪些操作

1.重新申请一块内存空间
2.将所需的资源全部导入

1.重新申请一块内存空间

2.将所需的资源全部导入

开设线程需要做哪些操作

上述两个步骤都不需要 所以开设线程消耗的资源远比开设进程的少

上述两个步骤都不需要 所以开设线程消耗的资源远比开设进程的少

from threading import Thread
import time


def test(name):
    print('%s is running' % name)
    time.sleep(3)
    print('%s is over' % name)


t = Thread(target=test, args=('jason',))
t.start()
print('主')



class MyClass(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s is running' % self.name)
        time.sleep(3)
        print('%s is over' % self.name)

obj = MyClass('jason')
obj.start()
print('主线程')

线程对象的其他方法

1.join方法
2.获取进程号(验证同一个进程内可以开设多个线程)
3.active_count统计当前正在活跃的线程数
4.current_thread

守护进程

主线程的结束意味着整个进程的结束
所以主线程需要等待里面所有非守护线程的结束才能结束

主线程的结束意味着整个进程的结束

所以主线程需要等待里面所有非守护线程的结束才能结束

from threading import Thread
from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(3)
    print("end123")
def bar():
    print(456)
    time.sleep(1)
    print("end456")
if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")

线程数据共享

from threading import Thread
money = 100

def test():
    global money
    money = 999

t = Thread(target=test)
t.start()
t.join()
print(money)

线程互斥锁

from threading import Thread, Lock
from multiprocessing import Lock
import time


num = 100


def test(mutex):
    global num
    mutex.acquire()
    # 先获取num的数值
    tmp = num
    # 模拟延迟效果
    time.sleep(0.1)
    # 修改数值
    tmp -= 1
    num = tmp
    mutex.release()

t_list = []
mutex = Lock()
for i in range(100):
    t = Thread(target=test, args=(mutex,))
    t.start()
    t_list.append(t)
# 确保所有的子线程全部结束
for t in t_list:
    t.join()
print(num)

TCP服务端实现并发

import socket
from threading import Thread
from multiprocessing import Process

server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)


def talk(sock):
    while True:
        try:
            data = sock.recv(1024)
            if len(data) == 0: break
            print(data.decode('utf8'))
            sock.send(data + b'gun dan!')
        except ConnectionResetError as e:
            print(e)
            break
    sock.close()


while True:
    sock, addr = server.accept()
    print(addr)
    # 开设多进程或者多线程
    t = Thread(target=talk, args=(sock,))
    t.start()
————————

Processes and queues

Zombie process and orphan process
Daemon
Mutex (key)
Message queue
Implement inter process data interaction (IPC mechanism)
Producer consumer model
Thread theory (important)

  • Zombie process and orphan process
  • Daemon
  • Mutex (key)
  • Message queue
  • Implement inter process data interaction (IPC mechanism)
  • Producer consumer model
  • Thread theory (important)

Zombie process and orphan process

Zombie process

After the process code runs, it does not end directly, but needs to wait for the recovery of sub process resources to end

Orphan process

That is, the main process has died (abnormal) but the child process is still running

Daemon

Daemon: that is, the guardian of a process. Once the process ends, it also ends

from multiprocessing import Process
import time


def test(name):
    print('总管:%s is running' % name)
    time.sleep(3)
    print('总管:%s is over' % name)


if __name__ == '__main__':
    p = Process(target=test, args=('jason',))
    p.daemon = True  # 设置为守护进程(一定要放在start语句上方)
    p.start()
    print("皇帝jason寿终正寝")
    time.sleep(0.1)

mutex

Introduction: operating the same data in the case of concurrency is extremely easy to cause data disorder. Solution: Turning concurrency into serial reduces efficiency but improves data security

Lock can achieve the effect of changing concurrency into serial

Row lock, table lock

Precautions for using lock

Generated in the main process and used by child processes

1. Be sure to lock where necessary, and never add it at will

2. Do not use locks easily (deadlock phenomenon)

#In the future programming career, you will hardly release your own operation lock

import json
from multiprocessing import Process, Lock
import time
import random


# 查票
def search(name):
    with open(r'data.txt', 'r', encoding='utf8') as f:
        data_dict = json.load(f)
        ticket_num = data_dict.get('ticket_num')
        print('%s查询余票:%s' % (name, ticket_num))


# 买票
def buy(name):
    # 先查票
    with open(r'data.txt', 'r', encoding='utf8') as f:
        data_dict = json.load(f)
        ticket_num = data_dict.get('ticket_num')
    # 模拟一个延迟
    time.sleep(random.random())
    # 判断是否有票
    if ticket_num > 0:
        # 将余票减一
        data_dict['ticket_num'] -= 1
        # 重新写入数据库
        with open(r'data.txt', 'w', encoding='utf8') as f:
            json.dump(data_dict, f)
        print('%s: 购买成功' % name)
    else:
        print('不好意思 没有票了!!!')


def run(name,mutex):
    search(name)
    mutex.acquire()  # 抢锁
    buy(name)
    mutex.release()  # 释放锁



if __name__ == '__main__':
    mutex = Lock()
    for i in range(1, 11):
        p = Process(target=run, args=('用户%s' % i,mutex))
        p.start()

Message queue

Queue: first in first out

from multiprocessing import Queue


q = Queue(5)  # 括号内可以填写最大等待数

# 存放数据
q.put(111)
q.put(222)
# print(q.full())  # False       判断队列中数据是否满了
q.put(333)
q.put(444)
q.put(555)
# print(q.full())
# q.put(666)  # 超出范围原地等待 直到有空缺位置
# 提取数据
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# print(q.get())  # 没有数据之后原地等待直到有数据为止
print(q.get_nowait())  # 没有数据立刻报错

Full and get_ Can nowait be used precisely in the case of multiple processes

The use of queues can break the default failure of communication between processes

IPC机制

from multiprocessing import Queue, Process


def producer(q):
    q.put("子进程p放的数据")


def consumer(q):
    print('子进程c取的数据',q.get())


if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q,))
    c = Process(target=consumer, args=(q,))
    p.start()
    c.start()

    # q.put('主进程放的数据')
    # p = Process(target=consumer, args=(q,))
    # p.start()
    # p.join()
    # print(q.get())
    # print('主')

Producer consumption model

Producer: responsible for generating data
Consumer: responsible for processing data
The model needs to solve the congratulations imbalance

Producer: responsible for generating data

Consumer: responsible for processing data

The model needs to solve the congratulations imbalance

from multiprocessing import Queue, Process, JoinableQueue
import time
import random


def producer(name, food, q):
    for i in range(10):
        print('%s 生产了 %s' % (name, food))
        q.put(food)
        time.sleep(random.random())


def consumer(name, q):
    while True:
        data = q.get()
        print('%s 吃了 %s' % (name, data))
        q.task_done()


if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()
    p1 = Process(target=producer, args=('大厨jason', '玛莎拉', q))
    p2 = Process(target=producer, args=('印度阿三', '飞饼', q))
    p3 = Process(target=producer, args=('泰国阿人', '榴莲', q))
    c1 = Process(target=consumer, args=('班长阿飞', q))

    p1.start()
    p2.start()
    p3.start()
    c1.daemon = True
    c1.start()

    p1.join()
    p2.join()
    p3.join()

    q.join()  # 等待队列中所有的数据被取干净

    print('主')

Thread theory

What is a line layer? A process is actually a resource unit. What is really executed by the CPU is actually the threads in the process

    进程类似于是工厂 线程类似于是工厂里面的一条条流水线
    所有的进程肯定含有最少一个线程

Inter process data is isolated by default, but multiple threads in the same process share data

Two ways to set up threads

What should I do to set up the process

1. Re apply for a piece of memory space
2. Import all required resources

1. Re apply for a piece of memory space

2. Import all required resources

What do I need to do to set up a thread

Neither of the above two steps is required, so setting up a thread consumes far less resources than setting up a process

Neither of the above two steps is required, so setting up a thread consumes far less resources than setting up a process

from threading import Thread
import time


def test(name):
    print('%s is running' % name)
    time.sleep(3)
    print('%s is over' % name)


t = Thread(target=test, args=('jason',))
t.start()
print('主')



class MyClass(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s is running' % self.name)
        time.sleep(3)
        print('%s is over' % self.name)

obj = MyClass('jason')
obj.start()
print('主线程')

Other methods for Thread objects

1.join方法
2.获取进程号(验证同一个进程内可以开设多个线程)
3.active_count统计当前正在活跃的线程数
4.current_thread

Daemon

The end of the main thread means the end of the whole process
Therefore, the main thread needs to wait for the end of all non daemon threads

The end of the main thread means the end of the whole process

Therefore, the main thread needs to wait for the end of all non daemon threads

from threading import Thread
from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(3)
    print("end123")
def bar():
    print(456)
    time.sleep(1)
    print("end456")
if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")

Thread data sharing

from threading import Thread
money = 100

def test():
    global money
    money = 999

t = Thread(target=test)
t.start()
t.join()
print(money)

Thread mutex

from threading import Thread, Lock
from multiprocessing import Lock
import time


num = 100


def test(mutex):
    global num
    mutex.acquire()
    # 先获取num的数值
    tmp = num
    # 模拟延迟效果
    time.sleep(0.1)
    # 修改数值
    tmp -= 1
    num = tmp
    mutex.release()

t_list = []
mutex = Lock()
for i in range(100):
    t = Thread(target=test, args=(mutex,))
    t.start()
    t_list.append(t)
# 确保所有的子线程全部结束
for t in t_list:
    t.join()
print(num)

TCP server implements concurrency

import socket
from threading import Thread
from multiprocessing import Process

server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)


def talk(sock):
    while True:
        try:
            data = sock.recv(1024)
            if len(data) == 0: break
            print(data.decode('utf8'))
            sock.send(data + b'gun dan!')
        except ConnectionResetError as e:
            print(e)
            break
    sock.close()


while True:
    sock, addr = server.accept()
    print(addr)
    # 开设多进程或者多线程
    t = Thread(target=talk, args=(sock,))
    t.start()