大家超越八分之四的时候利用四线程,以至多进度,可是python中出于GIL全局解释器锁的因由,python的多线程并从未当真落到实处

目录

一、开启线程的两种方式
    1.1 直接利用利用threading.Thread()类实例化
    1.2 创建一个类,并继承Thread类
    1.3 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别
        1.3.1 谁的开启速度更快?
        1.3.2 看看PID的不同
        1.3.3 练习
        1.3.4 线程的join与setDaemon
        1.3.5 线程相关的其他方法补充

二、 Python GIL
    2.1 什么是全局解释器锁GIL
    2.2 全局解释器锁GIL设计理念与限制

三、 Python多进程与多线程对比
四、锁
    4.1 同步锁
    GIL vs Lock
    4.2 死锁与递归锁
    4.3 信号量Semaphore
    4.4 事件Event
    4.5 定时器timer
    4.6 线程队列queue

五、协程
    5.1 yield实现协程
    5.2 greenlet实现协程
    5.3 gevent实现协程

六、IO多路复用

七、socketserver实现并发
    7.1 ThreadingTCPServer

八、基于UDP的套接字

生龙活虎、进度和线程的概念

     
实际上,python在施行十六线程的时候,是通过GIL锁,进行上下文切换线程实行,每一回真实唯有三个线程在运营。所以上边才说,未有真的完毕多现程。

生机勃勃、开启线程的二种艺术

在python中开启线程要导入threading,它与开启进程所急需导入的模块multiprocessing在利用上,有相当大的相符性。在接下去的接受中,就足以窥见。

同开启进度的三种艺术相似:

第黄金时代,引出“多职分”的概念:多任务管理是指客商可以在同期内运营三个应用程序,种种应用程序被称作二个职务。Linux、windows正是支撑多义务的操作系统,比起单职责系统它的法力加强了重重。

      那么python的多线程就不曾什么用了啊?

1.1 直接行使利用threading.Thread()类实例化

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()

    print('主线程')

比如,你贰头在用浏览器上网,风华正茂边在听微博云音乐,豆蔻梢头边在用Word赶作业,那正是多职责,最少还要有3个职责正在运维。还会有为数不菲职务悄悄地在后台同不经常候运行着,只是桌面上未有出示而已。

             
不是其同样子的,python八线程日常用来IO密集型的程序,那么哪些叫做IO密集型呢,举个例证,比如说带有阻塞的。当前线程阻塞等待其余线程试行。

1.2 创制三个类,并三回九转Thread类

from threading import Thread
import time
calss Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        time.sleep(2)
        print("%s say hello" %self.name)

if __name__ == "__main__":
    t = Sayhi("egon")
    t.start()
    print("主线程")

只是,那么些职务是还要在运作着的吧?众人周知,运转三个职分就须要cpu去管理,那还要运行七个职责就非得须求八个cpu?那尽管有玖拾柒个职分急需同一时间运行,就得买七个100核的cpu吗?明显不可能!

      即然提起符合python七十四四线程的,那么什么样的不相符用python四线程呢?

1.3 在叁个进程下张开八个线程与在一个进度下展开多少个子过程的差别

现今,多核CPU已经特别遍布了,不过,即便过去的单核CPU,也足以施行多义务。由于CPU施行代码都以逐生龙活虎实施的,那么,单核CPU是怎么推行多任务的呢?

             
答案是CPU密集型的,那么哪些的是CPU密集型的吧?百度时而您就知晓。

1.3.1 什么人的敞开速度更加快?

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    hello
    主线程/主进程
    '''

    #在主进程下开启子进程
    t=Process(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    主线程/主进程
    hello
    '''

结论:由于创立子进度是将主进程完全拷贝豆蔻年华份,而线程无需,所以线程的创始速度更加快。

答案就是操作系统轮流让种种职务交替实践,职分1奉行0.01秒,切换成职责2,职分2施行0.01秒,再切换来职分3,实施0.01秒……那样频仍实行下去。表面上看,每一个职务都以轮番实施的,可是,由于CPU的实施进程其实是太快了,我们感到就如全部职务都在同不经常间推行相近。

      

1.3.2 看看PID的不同

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主线程/主进程pid',os.getpid())

    #part2:开多个进程,每个进程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主线程/主进程pid',os.getpid())


'''
hello 13552
hello 13552
主线程pid: 13552
主线程pid: 13552
hello 1608
hello 6324
'''

总结:可以见到,主进度下开启两个线程,各样线程的PID都跟主进度的PID同样;而开三个经过,各类进度都有例外的PID。

总计:二个cpu同临时刻只可以运转八个“任务”;真正的并行实行多职务只好在多核CPU上达成,可是,由于职务数量远远多于CPU的骨干数据,所以,操作系统也会自行把众多职责轮流动调查节到每此中央上实践。

       以后有如此生龙活虎项任务:要求从200W个url中获取数据?

1.3.3 练习

练习一:动用四线程,实现socket 并发连接
服务端:

from threading import Thread
from socket import *
import os

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
tcpsock.bind(("127.0.0.1",60000))
tcpsock.listen(5)

def work(conn,addr):
    while True:
        try:
            data = conn.recv(1024)
            print(os.getpid(),addr,data.decode("utf-8"))
            conn.send(data.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn,addr = tcpsock.accept()
        t = Thread(target=work,args=(conn,addr))
        t.start()

"""
开启了4个客户端
服务器端输出:
13800 ('127.0.0.1', 63164) asdf
13800 ('127.0.0.1', 63149) asdf
13800 ('127.0.0.1', 63154) adsf
13800 ('127.0.0.1', 63159) asdf

可以看出每个线程的PID都是一样的。
""

客户端:

from socket import *

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.connect(("127.0.0.1",60000))

while True:
    msg = input(">>: ").strip()
    if not msg:continue
    tcpsock.send(msg.encode("utf-8"))
    data = tcpsock.recv(1024)
    print(data.decode("utf-8"))

练习二:有四个职分,贰个收受顾客输入,三个将顾客输入的源委格式化成大写,八个将格式化后的结果存入文件。

from threading import Thread

recv_l = []
format_l = []

def Recv():
    while True:
        inp = input(">>: ").strip()
        if not inp:continue
        recv_l.append(inp)

def Format():
    while True:
        if recv_l:
            res = recv_l.pop()
            format_l.append(res.upper())

def Save(filename):
    while True:
        if format_l:
            with open(filename,"a",encoding="utf-8") as f:
                res = format_l.pop()
                f.write("%sn" %res)

if __name__ == '__main__':
    t1 = Thread(target=Recv)
    t2 = Thread(target=Format)
    t3 = Thread(target=Save,args=("db.txt",))
    t1.start()
    t2.start()
    t3.start()

对此操作系统来说,三个职分便是二个进度(Process),比如展开三个浏览器正是开发银行贰个浏览器进度,展开贰个记事本就开发银行了多少个记事本进度,展开三个记事本就运维了四个记事本进度,打开七个Word就开动了三个Word进度。

      
那么大家火急不能够用十六线程,上下文切换是亟需时间的,数据量太大,不能承担。这里大家就要用到多进程+协程

1.3.4 线程的join与setDaemon

与经过的主意都是看似的,其实multiprocessing模块是盲目跟随大众threading模块的接口;

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #设置为守护线程,主线程结束,子线程也跟着线束。
    t.start()
    t.join()  #主线程等待子线程运行结束
    print('主线程')
    print(t.is_alive())

稍许进度还不独有同期干豆蔻梢头件事,比如Word,它能够并且扩充打字、拼写检查、打印等事务。在贰个进度之中,要同期干多件事,就须要同不经常候运转多少个“子职责”,大家把进程内的这个“子任务”称为线程(Thread)。

      那么什么样是协程呢?

1.3.5 线程相关的别的情势补充

Thread实例对象的章程:

  • isAlive():重回纯种是或不是是活跃的;
  • getName():再次回到线程名;
  • setName():设置线程名。

threading模块提供的部分艺术:

  • threading.currentThread():重回当前的线程变量
  • threading.enumerate():重回二个包含正在运营的线程的列表。正在运作指线程运行后、甘休前,不包罗运转前和休息后。
  • threading.activeCount():重返正在运作的线程数量,与len(threading.enumerate())有同样结果。

from threading import Thread
import threading
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName()) #获取当前线程名
    print(threading.current_thread()) #主线程
    print(threading.enumerate()) #连同主线程在内有两个运行的线程,返回的是活跃的线程列表
    print(threading.active_count())  #活跃的线程个数
    print('主线程/主进程')

    '''
    打印结果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    2
    主线程/主进程
    Thread-1
    '''

出于各样进度起码要干生龙活虎件事,所以,二个历程至稀有三个线程。当然,像Word这种复杂的长河能够有多少个线程,多少个线程能够同不平日间奉行,八线程的试行方式和多进程是同意气风发的,也是由操作系统在多个线程之间超级快切换,让各种线程都指日可待地轮流运转,看起来就像是同期施行同风姿罗曼蒂克。当然,真正地同期实行八线程须要多核CPU才只怕实现。

      协程,又称微线程,纤程。土耳其语名Coroutine。

二、 Python GIL

GIL全称Global Interpreter
Lock
,即全局解释器锁。首先要求明显的一些是GIL实际不是Python的风味,它是在促成Python解析器(CPython)时所引进的三个定义。就好比C++是黄金时代套语言(语法)标准,不过能够用分裂的编写翻译器来编写翻译成可举办代码。有名的编写翻译器举例GCC,INTEL
C++,Visual
C++等。Python也大器晚成律,相符风华正茂段代码能够经过CPython,PyPy,Psyco等不等的Python执市场价格况来推行。像在那之中的JPython就未有GIL。但是因为CPython是绝大好多条件下默许的Python执市场价格况。所以在不菲人的定义里CPython正是Python,也就想当然的把GIL归咎为Python语言的缺点。所以这里要先明了一点:GIL实际不是Python的天性,Python完全能够不凭借于GIL

小结:

     
协程的定义很已经提出来了,但直至日前几年才在一些语言(如Lua)中拿走遍布应用。

2.1 什么是全局解释器锁GIL

Python代码的实践由Python
虚构机(也叫解释器主循环,CPython版本)来支配,Python
在准备之初就考虑到要在解释器的主循环中,同时独有一个线程在实践,即在随机时刻,独有三个线程在解释器中运作。对Python
设想机的寻访由全局解释器锁(GIL)来决定,正是以此锁能保险平等时刻独有一个线程在运作。
在四线程情状中,Python 设想机按以下情势试行:

  1. 设置GIL
  2. 切换来八个线程去运营
  3. 运行:
    a. 内定数量的字节码指令,恐怕
    b. 线程主动让出调节(能够调用time.sleep(0))
  4. 把线程设置为睡眠景况
  5. 解锁GIL
  6. 重新重新以上全部手续

在调用外部代码(如C/C++增添函数)的时候,GIL
将会被锁定,直到这些函数甘休截止(由于在这里之间平素不Python
的字节码被周转,所以不会做线程切换)。

  • 经过正是一个程序在一个数码集上的一次动态实践进度。进度平常由程序、数据集、进度序调节制块三局地构成。
  • 线程也叫轻量级进度,它是叁当中坚的CPU实行单元,也是程序实践进程中的最小单元,由线程ID、程序流速計、贮存器集结和储藏室合作构成。线程的引进减小了前后相继现身试行时的付出,提高了操作系统的面世品质。线程未有和睦的系统能源。

     
协程有哪些平价吗,协程只在单线程中举办,无需cpu实行上下文切换,协程自动完毕子程序切换。

2.2 全局解释器锁GIL设计思想与范围

GIL的规划简化了CPython的达成,使得对象模型,富含首要的内建项目如字典,都是带有能够并发访谈的。锁住全局解释器使得相比便于的落到实处对四线程的支撑,但也损失了多管理器主机的并行总括技巧。
但是,无论规范的,照旧第三方的恢弘模块,都被设计成在拓宽密集计算职责是,释放GIL。
再有,正是在做I/O操作时,GIL总是会被保释。对具有面向I/O
的(会调用内建的操作系统C 代码的)程序来讲,GIL 会在此个I/O
调用在此以前被放走,以允许别的的线程在这里个线程等待I/O
的时候运维。就算是纯计算的次序,未有 I/O 操作,解释器会每隔 100
次操作就自由那把锁,让别的线程有机缘实施(这一个次数能够透过
sys.setcheckinterval 来调动)假诺某线程并未有使用过多I/O
操作,它会在融洽的年华片内一贯据有管理器(和GIL)。也正是说,I/O
密集型的Python 程序比揣测密集型的次序更能丰硕利用三十八线程遭遇的实惠。

下边是Python 2.7.9手册中对GIL的简约介绍:
The mechanism used by the CPython interpreter to assure that only one
thread executes Python bytecode at a time. This simplifies the CPython
implementation by making the object model (including critical built-in
types such as dict) implicitly safe against concurrent access. Locking
the entire interpreter makes it easier for the interpreter to be
multi-threaded, at the expense of much of the parallelism afforded by
multi-processor machines.
However, some extension modules, either standard or third-party, are
designed so as to release the GIL when doing computationally-intensive
tasks such as compression or hashing. Also, the GIL is always released
when doing I/O.
Past efforts to create a “free-threaded” interpreter (one which locks
shared data at a much finer granularity) have not been successful
because performance suffered in the common single-processor case. It is
believed that overcoming this performance issue would make the
implementation much more complicated and therefore costlier to maintain.

从上文中能够看见,针对GIL的主题素材做的不知凡几更上风华正茂层楼,如应用越来越细粒度的锁机制,在单管理器意况下反而导致了品质的狂降。遍布以为,击溃那一个性情难题会招致CPython完结越发千头万绪,由此维护资金更是高昂。

二、进度和线程的涉及

     
这里未有运用yield协程,这一个python自带的并不是很康健,至于何以有待于你去研讨了。

三、 Python多进程与四线程相比较

有了GIL的存在,同有时刻同后生可畏进度中唯有二个线程被实施?这里可能人有一个疑问:多进度能够应用多核,不过付出大,而Python四线程耗费小,但却敬敏不谢运用多核的优势?要缓和这么些难题,大家供给在以下几点上达成共鸣:

  • CPU是用来测算的!
  • 多核CPU,意味着能够有四个核并行达成总括,所以多核升级的是计量质量;
  • 各样CPU黄金年代旦遇见I/O阻塞,依旧须求等待,所以多核查I/O操作没什么用处。

自然,对于七个前后相继来讲,不会是纯总计照旧纯I/O,我们只好绝没有错去看贰个程序到底是总结密集型,依然I/O密集型。进而进一步剖析Python的多线程有英雄无发挥专长。

分析:

大家有四个职责须求处理,管理访求断定是要有现身的功力,施工方案得以是:

  • 方案后生可畏:开启八个进程;
  • 方案二:八个进程下,开启多少个过程。

单核情形下,剖析结果:

  • 借使四个职责是计算密集型,未有多核来并行总结,方案意气风发徒增了创设进度的花费,方案二胜;
  • 比方多少个任务是I/O密集型,方案大器晚成创办进程的开销大,且经过的切换速度远不及线程,方案二胜。

多核情况下,深入分析结果:

  • 设若多少个义务是密集型,多核意味着并行
    总计,在python中一个历程中平等时刻独有三个线程执行用不上多核,方案大器晚成胜;
  • 要是八个职分是I/O密集型,再多的核 也杀绝不了I/O难点,方案二胜。

结论:以后的微型Computer基本上都以多核,python对于总计密集型的天职开八线程的频率并不能够带动多大品质上的晋级,甚至不比串行(未有大气切换),可是,对于I/O密集型的天职功效照旧有真相大白进级的。

代码达成相比较

计算密集型:

#计算密集型
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
    res=0
    for i in range(1000000):
        res+=i

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(100):
        # t=Thread(target=work) #我的机器4核cpu,多线程大概15秒
        t=Process(target=work) #我的机器4核cpu,多进程大概10秒
        t_l.append(t)
        t.start()

    for i in t_l:
        i.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))
    print('主线程')

I/O密集型:

#I/O密集型
from threading import Thread
from multiprocessing import Process
import time
import os
def work():
    time.sleep(2) #模拟I/O操作,可以打开一个文件来测试I/O,与sleep是一个效果
    print(os.getpid())

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(500):
        # t=Thread(target=work) #run time is 2.195
        t=Process(target=work) #耗时大概为37秒,创建进程的开销远高于线程,而且对于I/O密集型,多cpu根本不管用
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

总结:
应用场景:
多线程用于I/O密集型,如socket、爬虫、web
多进度用于总结密集型,如金融深入分析

进程是Computer中的程序关于某数码集上的一回运营活动,是系统开展能源分配和调节的基本单位,是操作系统结构的基本功。只怕说进度是持有一定独立成效的程序关于有些数据集上的一遍运营活动,进度是系统开展能源分配和调整的多个独自单位。
线程则是经过的三个实体,是CPU调整和分担的着力单位,它是比进度更小的能独立运作的为主单位。

      这里运用比较完备的第三方协程包gevent

四、锁

www.qy186.com 1

      pip  install    gevent

4.1 同步锁

须要:对叁个全局变量,开启一百个线程,每一个线程都对该全局变量做减1操作;

不加锁,代码如下:

import time
import threading

num = 100  #设定一个共享变量
def addNum():
    global num #在每个线程中都获取这个全局变量
    #num-=1

    temp=num
    time.sleep(0.1)
    num =temp-1  # 对此公共变量进行-1操作

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('Result: ', num)

分析:上述程序开启100线程并不可能把全局变量num减为0,第三个线程实施addNum碰到I/O阻塞后相当慢切换来下二个线程实施addNum,由于CPU实践切换的速度特别快,在0.1秒内就切换完成了,那就招致了第二个线程在获得num变量后,在time.sleep(0.1)时,别的的线程也都拿到了num变量,全体线程得到的num值都以100,所以最终减1操作后,正是99。加锁达成。

加锁,代码如下:

import time
import threading

num = 100   #设定一个共享变量
def addNum():
    with lock:
        global num
        temp = num
        time.sleep(0.1)
        num = temp-1    #对此公共变量进行-1操作

thread_list = []

if __name__ == '__main__':
    lock = threading.Lock()   #由于同一个进程内的线程共享此进程的资源,所以不需要给每个线程传这把锁就可以直接用。
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
        thread_list.append(t)

    for t in thread_list:  #等待所有线程执行完毕
        t.join()

    print("result: ",num)

加锁后,第贰个线程得到锁后最早操作,第3个线程必须等待第二个线程操作实现后将锁释放后,再与别的线程竞争锁,获得锁的线程才有权操作。那样就保持了多少的平安,可是拖慢了实践进度。
注意:with locklock.acquire()(加锁)与lock.release()(释放锁)的简写。

import threading

R=threading.Lock()

R.acquire()
'''
对公共数据的操作
'''
R.release()

小结:

种种进度下N个体协会程,   

GIL vs Lock

机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 

先是我们须要高达共鸣:锁的目标是为了掩护分享的多少,同有的时候候只好有一个线程来改正分享的多寡

接下来,我们能够得出结论:珍爱不一样的数码就活该加不相同的锁。

最终,难题就很明朗了,GIL
与Lock是两把锁,敬性格很顽强在山高水险或巨大压力面前不屈的多少不风姿洒脱致,前面一个是解释器级其余(当然维护的便是解释器品级的多寡,例如垃圾回笼的数据),前面一个是保卫安全客商自己开辟的应用程序的数量,很举世瞩目GIL不承担这事,只好客商自定义加柔鱼理,即Lock

详细的:

因为Python解释器帮你活动准期实行内部存款和储蓄器回笼,你能够精晓为python解释器里有三个独立的线程,每过生机勃勃段时间它起wake
up做二遍全局轮询看看哪些内部存款和储蓄器数据是足以被清空的,那时候你协和的程序
里的线程和
py解释器本人的线程是并发运转的,假如你的线程删除了三个变量,py解释器的污染源回笼线程在清空这么些变量的经过中的clearing时刻,恐怕三个其余线程正巧又再一次给那几个还未来及得清空的内部存款和储蓄器空间赋值了,结果就有异常的大几率新赋值的数额被去除了,为了解决肖似的主题素材,python解释器轻巧暴虐的加了锁,即当一个线程运维时,别的人都无法动,这样就一蹴即至了上述的主题材料,
那能够说是Python前期版本的遗留难题。

  • 二个线程只好属于八个经过,而四个历程能够有多少个线程,但最稀有一个线程。

  • 财富分配给进程,同风流潇洒进程的有所线程共享该进度的有着财富。

  • CPU分给线程,即确实在CPU上运转的是线程。
#coding=utf-8
from multiprocessing import Process
import gevent
#from gevent import monkey; monkey.patch_socket()
#用于协程的了程序
def yield_execFunc(x):
    print('______________%s'%x)


#yield_clist决定协程的数量
#开始协程操作
def yield_start(yield_clist):
    task=[] #用来存储协程
    for i in yield_clist:
        task.append(gevent.spawn(yield_execFunc,i))

    gevent.joinall(task) #执行协程

if  __name__=="__main__":
    list1=[1,2,3,4,5,6,7,8,9,10] #元素个数决定开起的协程数量
    list2=[1,2,3,4,5,6,7,8,9,10]
    list3=[1,2,3,4,5,6,7,8,9,10]
    process_list =[list1,list2,list3] #元素个数决定进程数量
    for plist in process_list:
        p = Process(target=yield_start,args=(plist,))
        p.start()

4.2 死锁与递归锁

所谓死锁:是指三个或七个以上的长河或线程在试行进程中,因争夺财富而招致的风流倜傥种互动等待的面貌,若无外力效用,它们都将不能够推动下去。那时称系统处于死锁状态,或系统爆发了死锁。那此永恒在竞相等待的长河称死锁进度

如下代码,就能够生出死锁:

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('33[41m%s 拿到A锁33[0m' %self.name)

        mutexB.acquire()
        print('33[42m%s 拿到B锁33[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('33[43m%s 拿到B锁33[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('33[44m%s 拿到A锁33[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
然后就卡住,死锁了
'''

缓慢解决死锁的主意

制止发出死锁的议程正是用递归锁,在python中为了协理在同一线程中频频诉求同一财富,python提供了可重入锁RLock

这个RLock其间维护着二个Lock和三个counter变量,counter记录了acquire(得到锁)的次数,进而使得财富得以被再三require。直到贰个线程全数的acquire都被release(释放)后,其余的线程能力获取财富。上边的事比假若使用RLock代替Lock,就不会发出死锁的风貌了。

mutexA=mutexB=threading.RLock()
#一个线程得到锁,counter加1,该线程内又际遇加锁的景色,则counter继续加1,这之间全部其余线程都只能等待,等待该线程释放具有锁,即counter依次减少到0停止。

三、并行(xing)和并发

实施结果:开了八个经过,每一个进程下实行十个体协会程合营义务

4.3 信号量Semaphore

同进度的实信号量一样。
用一个粗鄙的例证来讲,锁相当于独立卫生间,独有贰个坑,同临时刻只可以有壹位拿走锁,进去使用;而数字信号量相当于国有换衣间,举个例子有5个坑,同偶然刻能够有5个人获得锁,并应用。

Semaphore关押二个内置的流速计,每当调用acquire()时,内置计数器-1;调用release()时,内置流量计+1;流速計不能够小于0,当流速計为0时,acquire()将封堵线程,直到其余线程调用release()

实例:
何况独有5个线程能够获得Semaphore,即能够节制最浦这接数为5:

import threading
import time

sem = threading.Semaphore(5)
def func():
    if sem.acquire():   #也可以用with进行上下文管理
        print(threading.current_thread().getName()+"get semaphore")
        time.sleep(2)
        sem.release()

for i in range(20):
    t1 = threading.Thread(target=func)
    t1.start()

利用with进展上下文物管理理:

import threading
import time

sem = threading.Semaphore(5)

def func():
    with sem:   
        print(threading.current_thread().getName()+"get semaphore")
        time.sleep(2)

for i in range(20):
    t1 = threading.Thread(target=func)
    t1.start()

注:时限信号量与进程池是一丝一毫不相同一的定义,进度池Pool(4)最大必须要发出4个进度,並且原原本本都只是这4个经过,不会发出新的,而复信号量是发出一群线程/进程。

并行管理(Parallel
Processing)是Computer体系中能同期试行四个或更三个管理的豆蔻梢头种总结办法。并行管理可同期专门的学业于同生机勃勃程序的两样地点。并行管理的重大指标是省去大型和复杂性难题的消除岁月。

C:Python27python.exe D:/weixin/temp/yield_tmp.py
______________1
______________2
______________3
______________4
______________5
______________6
______________7
______________8
______________9
______________10
______________1
______________1
______________2
______________2
______________3
______________3
______________4
______________4
______________5
______________5
______________6
______________6
______________7
______________7
______________8
______________8
______________9
______________9
______________10
______________10

Process finished with exit code 0

4.4 事件Event

同进度的生机勃勃律

线程的二人命关天脾性是各类线程都以单独运维且状态不行预测。假设程序中的别的线程通过判别有些线程的景况来规定本身下一步的操作,那时候线程同步难题就能够变得可怜吃力,为了缓慢解决那个难题我们采取threading库中的Event对象。

Event目的蕴涵贰个可由线程设置的功率信号标识,它同意线程等待有些事件的发生。在起来景况下,伊芙nt对象中的功率信号标识被安装为假。假设有线程等待四个Event对象,而以此伊芙nt对象的标识为假,那么这么些线程将会被
一向不通直至该
标识为真。四个线程就算将一个Event对象的实信号标识设置为真,它将唤起全数等待那个Event对象的线程。假诺一个线程等待叁个曾经被
设置 为真正Event对象,那么它将忽视这一个事件,继续试行。

Event对象具有局地办法:
event = threading.Event() #发出叁个风浪指标

  • event.isSet():返回event状态值;
  • event.wait():如果event.isSet() == False,将阻塞线程;
  • event.set():设置event的景况值为True,全体阻塞池的线程步入就绪状态,等待操作系统中度;
  • event.clear():苏醒event的图景值False。

行使场景:

比方说,大家有八个线程须求三番五次数据库,大家想要在运行时确定保证Mysql服务正常,才让那些专门的学问线程去老是Mysql服务器,那么我们就足以接收threading.Event()编写制定来协调种种专门的学业线程的连接操作,主线程中会去品味连接Mysql服务,若无难题的话,触发事件,各职业线程会尝试连接Mysql服务。

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    print('33[42m%s 等待连接mysql。。。33[0m' %threading.current_thread().getName())
    event.wait()  #默认event状态为False,等待
    print('33[42mMysql初始化成功,%s开始连接。。。33[0m' %threading.current_thread().getName())


def check_mysql():
    print('33[41m正在检查mysql。。。33[0m')
    time.sleep(random.randint(1,3))
    event.set()   #设置event状态为True
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql) #等待连接mysql
    t2=Thread(target=conn_mysql) #等待连接myqsl
    t3=Thread(target=check_mysql) #检查mysql

    t1.start()
    t2.start()
    t3.start()


'''
输出如下:
Thread-1 等待连接mysql。。。
Thread-2 等待连接mysql。。。
正在检查mysql。。。
Mysql初始化成功,Thread-1开始连接。。。
Mysql初始化成功,Thread-2开始连接。。。
'''

注:threading.Eventwait措施还可以接收贰个超时参数,暗中同意情形下,若是事件直接未有发生,wait方法会向来不通下去,而步入那一个超时参数之后,如若打断时间超越那些参数设定的值之后,wait方法会重临。对应于下边包车型大巴运用场景,假如mysql服务器平昔未曾运维,我们希望子线程能够打字与印刷一些日志来不断提醒大家眼下未曾二个方可接连的mysql服务,我们就能够安装这一个超时参数来实现那样的指标:

上例代码改正后如下:

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count = 1
    while not event.is_set():
        print("33[42m%s 第 <%s> 次尝试连接。。。"%(threading.current_thread().getName(),count))
        event.wait(0.2)
        count+=1
    print("33[45mMysql初始化成功,%s 开始连接。。。33[0m"%(threading.current_thread().getName()))

def check_mysql():
    print('33[41m正在检查mysql。。。33[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql) #等待连接mysql
    t2=Thread(target=conn_mysql) #等待连接mysql
    t3=Thread(target=check_mysql) #检查mysql

    t1.start()
    t2.start()
    t3.start()

这般,大家就足以在守候Mysql服务运行的相同的时候,见到职业线程大将军在等待的场馆。应用:连接池。

现身管理(concurrency
Processing)指一个光阴段中有多少个程序都处在已开发银行运维到运维达成之间,且那多少个程序都以在同一个管理机(CPU)上运营,但任三个时刻点上独有四个顺序在管理机(CPU)上运营。

 

4.5 定时器timer

反应计时器,内定n秒后举办某操作。

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)  #1秒后执行任务hello
t.start()   # after 1 seconds, "hello, world" will be printed

www.qy186.com 2

   

4.6 线程队列queue

queue队列:使用import queue,用法与经过Queue一样。

queue下有两种队列:

  • queue.Queue(maxsize) 先进先出,先放进队列的多寡,先被抽出来;
  • queue.LifoQueue(maxsize) 后进先出,(Lifo 意为last in first
    out),后放进队列的数额,先被收取来
  • queue.PriorityQueue(maxsize) 优先级队列,优先级越高优先收取来。

举例:
先进先出:

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''

后进先出:

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

预先级队列:

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

现身的要紧是你有管理多少个职务的力量,不自然要相同的时候。并行的显假使你有同期处理多个职分的力量。所以说,并行是现身的子集。

五、协程

协程:是单线程下的产出,又称微线程、纤程,捷克语名:Coroutine协程是黄金时代种客商态的轻量级线程,协程是由客户程序本人决定调节的。

亟待重申的是:

1.
python的线程属于基本等级的,即由操作系统调整调节(如单线程生龙活虎旦境遇io就被迫交出cpu施行权限,切换其余线程运行)

  1. 单线程内展开协程,大器晚成旦境遇io,从应用程序品级(而非操作系统)调控切换

相比操作系统调节线程的切换,客户在单线程内决定协程的切换,优点如下:

1.
协程的切换开支越来越小,属于程序等级的切换,操作系统完全感知不到,由此特别轻量级

  1. 单线程内就能够实现产出的成效,最大限度地应用cpu。

要完毕协程,关键在于顾客程序本人支配程序切换,切换从前必需由顾客程序本中国人民保险公司留协程上一遍调用时的场馆,如此,每便重复调用时,能够从上次的职位继续试行

(详细的:协程具有本身的寄放器上下文和栈。协程调节切换时,将寄放器上下文和栈保存到其余市方,在切回到的时候,苏醒原先保存的寄放器上下文和栈)

四、同步与异步

5.1 yield完毕协程

咱俩前边早已学习过生龙活虎种在单线程下能够保存程序运转状态的法子,即yield,我们来回顾复习一下:

  • yiled能够保留处境,yield的事态保存与操作系统的保存线程状态很像,可是yield是代码等第决定的,更轻量级
  • send能够把贰个函数的结果传给此外一个函数,以此完成单线程内程序之间的切换

#不用yield:每次函数调用,都需要重复开辟内存空间,即重复创建名称空间,因而开销很大
import time
def consumer(item):
    # print('拿到包子%s' %item)
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333

    pass
def producer(target,seq):
    for item in seq:
        target(item) #每次调用函数,会临时产生名称空间,调用结束则释放,循环100000000次,则重复这么多次的创建和释放,开销非常大

start_time=time.time()
producer(consumer,range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time)) #30.132838010787964


#使用yield:无需重复开辟内存空间,即重复创建名称空间,因而开销小
import time
def init(func):
    def wrapper(*args,**kwargs):
        g=func(*args,**kwargs)
        next(g)
        return g
    return wrapper

init
def consumer():
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333
    while True:
        item=yield
        # print('拿到包子%s' %item)
        pass
def producer(target,seq):
    for item in seq:
        target.send(item) #无需重新创建名称空间,从上一次暂停的位置继续,相比上例,开销小

start_time=time.time()
producer(consumer(),range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time)) #21.882073879241943

缺点:
协程的庐山面目目是单线程下,不可能使用多核,能够是贰个顺序开启两个经过,每种进程内展开多少个线程,各个线程内展开协程。
协程指的是单个线程,因此黄金年代旦协程现身堵塞,将会卡住整个线程。

协程的定义(知足1,2,3就能够叫做协程):

  1. 总得在独有三个单线程里福寿康宁产出
  2. 改进分享数据不需加锁
  3. 客商程序里本中国人民保险公司留八个调节流的光景文栈
  4. 外加:贰个协程蒙受IO操作自动切换来任何协程(怎么样落到实处检查实验IO,yield、greenlet都无法兑现,就用到了gevent模块(select机制))

注意:yield切换在向来不io的意况下也许未有再度开采内部存储器空间的操作,对功效未有何样进步,以至越来越慢,为此,能够用greenlet来为我们演示这种切换。

在微型计算机领域,同步便是指多个经过在举行有个别诉求的时候,若该乞求供给后生可畏段时间工夫重回音信,那么那个历程将会直接守候下去,直到收到重回消息才继续试行下去。

5.2 greenlet实现协程

greenlet是三个用C完成的协程模块,比较与python自带的yield,它能够使您在大肆函数之间自由切换,而不需把那么些函数先注解为generator。

安装greenlet模块
pip install greenlet

from greenlet import greenlet
import time

def t1():
    print("test1,first")
    gr2.switch()
    time.sleep(5)
    print("test1,second")
    gr2.switch()

def t2():
    print("test2,first")
    gr1.switch()
    print("test2,second")

gr1 = greenlet(t1)
gr2 = greenlet(t2)
gr1.switch()


'''
输出结果:
test1,first
test2,first   #等待5秒
test1,second
test2,second
'''

能够在率先次switch时传入参数

from greenlet import greenlet
import time
def eat(name):
    print("%s eat food 1"%name)
    gr2.switch(name="alex")
    time.sleep(5)
    print("%s eat food 2"%name)
    gr2.switch()

def play_phone(name):
    print("%s play phone 1"%name)
    gr1.switch()
    print("%s play phone 1" % name)

gr1 = greenlet(eat)
gr2 = greenlet(play_phone)
gr1.switch(name="egon")  #可以在第一次switch时传入参数,以后都不需要

注意:greenlet只是提供了生龙活虎种比generator更加的便利的切换情势,仍旧未有缓和境遇I/O自动切换的主题素材,而意气风发味的切换,反而会稳中有降程序的举办进度。那就供给采取gevent模块了。

异步是指进度没有必要平昔等下去,而是继续试行此外操作,不管其余进度的意况。当有消息重临时系统会通报进度张开始拍录卖,那样能够拉长实行的作用。举例,打电话时即使一块通讯,发短息时就是异步通讯。

5.3 gevent达成协程

gevent是三个第三方库,能够轻易通过gevent完成产出同步或异步编制程序,在gevent中用到的重大是Greenlet,它是以C扩张模块形式接入Python的轻量级协程。greenlet任何运作在主程操作系统进度的里边,但它们被合作式地调节和测量检验。遇上I/O阻塞时会自动切换职务。

注意:gevent有温馨的I/O阻塞,如:gevent.sleep()和gevent.socket();但是gevent无法直接识别除本身之外的I/O阻塞,如:time.sleep(2),socket等,要想识别这么些I/O阻塞,必需打叁个补丁:from gevent import monkey;monkey.patch_all()

  • 内需先安装gevent模块
    pip install gevent

  • 开创三个体协会程对象g1
    g1 =gevent.spawn()
    spawn括号内率先个参数是函数名,如eat,前边能够有八个参数,能够是岗位实参或主要字实参,都是传给第贰个参数(函数)eat的。

from gevent import monkey;monkey.patch_all()
import gevent

def eat():
    print("点菜。。。")
    gevent.sleep(3)   #等待上菜
    print("吃菜。。。")

def play():
    print("玩手机。。。")
    gevent.sleep(5)  #网卡了
    print("看NBA...")

# gevent.spawn(eat)
# gevent.spawn(play)
# print('主') # 直接结束

#因而也需要join方法,进程或现场的jion方法只能join一个,而gevent的joinall方法可以join多个
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])  #传一个gevent对象列表。
print("主线程")

"""
输出结果:
点菜。。。
玩手机。。。    
##等待大概3秒       此行没打印
吃菜。。。
##等待大概2秒          此行没打印
看NBA...
主线程
"""

注:上例中的gevent.sleep(3)是效仿的I/O阻塞。跟time.sleep(3)效果与利益形似。

同步/异步

import gevent
def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)

def synchronous():  #同步执行
    for i in range(1, 10):
        task(i)

def asynchronous(): #异步执行
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()   #执行后,会顺序打印结果

print('Asynchronous:')
asynchronous()  #执行后,会异步同时打印结果,无序的。

爬虫应用

#协程的爬虫应用

from gevent import monkey;monkey.patch_all()
import gevent
import time
import requests

def get_page(url):
    print("GET: %s"%url)
    res = requests.get(url)
    if res.status_code == 200:
        print("%d bytes received from %s"%(len(res.text),url))

start_time = time.time()
g1 = gevent.spawn(get_page,"https://www.python.org")
g2 = gevent.spawn(get_page,"https://www.yahoo.com")
g3 = gevent.spawn(get_page,"https://www.github.com")
gevent.joinall([g1,g2,g3])
stop_time = time.time()
print("run time is %s"%(stop_time-start_time))

上以代码输出结果:

GET: https://www.python.org
GET: https://www.yahoo.com
GET: https://www.github.com
47714 bytes received from https://www.python.org
472773 bytes received from https://www.yahoo.com
98677 bytes received from https://www.github.com
run time is 2.501142978668213

应用:
通过gevent完结单线程下的socket并发,注意:from gevent import monkey;monkey.patch_all()无庸置疑要放权导入socket模块早先,不然gevent不能够辨别socket的堵塞。

服务端代码:

from gevent import monkey;monkey.patch_all()
import gevent
from socket import *

class server:
    def __init__(self,ip,port):
        self.ip = ip
        self.port = port


    def conn_cycle(self):   #连接循环
        tcpsock = socket(AF_INET,SOCK_STREAM)
        tcpsock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
        tcpsock.bind((self.ip,self.port))
        tcpsock.listen(5)
        while True:
            conn,addr = tcpsock.accept()
            gevent.spawn(self.comm_cycle,conn,addr)

    def comm_cycle(self,conn,addr):   #通信循环
        try:
            while True:
                data = conn.recv(1024)
                if not data:break
                print(addr)
                print(data.decode("utf-8"))
                conn.send(data.upper())
        except Exception as e:
            print(e)
        finally:
            conn.close()

s1 = server("127.0.0.1",60000)
print(s1)
s1.conn_cycle()

客户端代码 :

from socket import *

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.connect(("127.0.0.1",60000))

while True:
    msg = input(">>: ").strip()
    if not msg:continue
    tcpsock.send(msg.encode("utf-8"))
    data = tcpsock.recv(1024)
    print(data.decode("utf-8"))

透过gevent实现产出四个socket顾客端去老是服务端

from gevent import monkey;monkey.patch_all()
import gevent
from socket import *

def client(server_ip,port):
    try:
        c = socket(AF_INET,SOCK_STREAM)
        c.connect((server_ip,port))
        count = 0
        while True:
            c.send(("say hello %s"%count).encode("utf-8"))
            msg = c.recv(1024)
            print(msg.decode("utf-8"))
            count+=1
    except Exception as e:
        print(e)
    finally:
        c.close()

# g_l = []
# for i in range(500):
#     g = gevent.spawn(client,'127.0.0.1',60000)
#     g_l.append(g)
# gevent.joinall(g_l)

#上面注释代码可简写为下面代码这样。

threads = [gevent.spawn(client,"127.0.0.1",60000) for i in range(500)]
gevent.joinall(threads)

举个例证:

六、IO多路复用

是因为CPU和内部存款和储蓄器的快慢远远出乎外设的快慢,所以,在IO编制程序中,就存在速度严重不合作的主题材料。举个例子要把100M的数据写入磁盘,CPU输出100M的数量只须要0.01秒,不过磁盘要收到那100M数码或者要求10秒,有二种办法解决:

经过IO多路复用达成同一时候监听多少个端口的服务端

示例一:

# 示例一:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author : Cai Guangyin

from socket import socket
import select

sock_1 = socket()
sock_1.bind(("127.0.0.1",60000))
sock_1.listen(5)

sock_2 = socket()
sock_2.bind(("127.0.0.1",60001))
sock_2.listen(5)

inputs = [sock_1,sock_2]

while True:
    # IO多路复用
    # -- select方法,内部进行循环操作,哪个socket对象有变化(连接),就赋值给r;监听socket文件句柄有个数限制(1024个)
    # -- poll方法,也是内部进行循环操作,没有监听个数限制
    # -- epoll方法,通过异步回调,哪个socket文件句柄有变化,就会自动告诉epoll,它有变化,然后将它赋值给r;
    # windows下没有epoll方法,只有Unix下有,windows下只有select方法
    r,w,e=select.select(inputs,[],[],0.2)  #0.2是超时时间
        #当有人连接sock_1时,返回的r,就是[sock_1,];是个列表
        #当有人连接sock_2时,返回的r,就是[sock_2,];是个列表
        #当有多人同时连接sock_1和sock_2时,返回的r,就是[sock_1,sock_2,];是个列表
        #0.2是超时时间,如果这段时间内没有连接进来,那么r就等于一个空列表;
    for obj in r:
        if obj in [sock_1,sock_2]:

            conn, addr = obj.accept()
            inputs.append(conn)
            print("新连接来了:",obj)

        else:
            print("有连接用户发送消息来了:",obj)
            data = obj.recv(1024)
            if not data:break
            obj.sendall(data)

客户端:

# -*- coding:utf-8 -*-
#!/usr/bin/python
# Author : Cai Guangyin

from socket import *

tcpsock = socket(AF_INET,SOCK_STREAM)   #创建一个tcp套接字
tcpsock.connect(("127.0.0.1",60001))     #根据地址连接服务器

while True:   #客户端通信循环
    msg = input(">>: ").strip()   #输入消息
    if not msg:continue           #判断输入是否为空
        #如果客户端发空,会卡住,加此判断,限制用户不能发空
    if msg == 'exit':break       #退出
    tcpsock.send(msg.encode("utf-8"))   #socket只能发送二进制数据
    data = tcpsock.recv(1024)    #接收消息
    print(data.decode("utf-8"))

tcpsock.close()

如上服务端运营时,假若有顾客端断开连接则会抛出如下非凡:

www.qy186.com 3

异常

  1. CPU等着,相当于程序暂停实践后续代码,等100M的数码在10秒后写入磁盘,再跟着往下进行,这种形式称为同步IO
  2. CPU不等待,只是告诉磁盘,逐渐写不急急,写完文告小编,笔者跟着干别的事去了,于是继续代码能够跟着推行,这种格局称为异步IO

改正版如下

搜罗分外并将接收数据和发送数据分开处理
示例二:

# 示例二
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author : Cai Guangyin

from socket import *
import select

sk1 = socket(AF_INET,SOCK_STREAM)
sk1.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk1.bind(("127.0.0.1",60000))
sk1.listen(5)

sk2 = socket(AF_INET,SOCK_STREAM)
sk2.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk2.bind(("127.0.0.1",60001))
sk2.listen(5)


inputs = [sk1,sk2]
w_inputs = []

while True:
    r,w,e = select.select(inputs,w_inputs,inputs,0.1)
    for obj in r:
        if obj in [sk1,sk2]:
            print("新连接:",obj.getsockname())
            conn,addr = obj.accept()
            inputs.append(conn)

        else:
            try:
                # 如果客户端断开连接,将获取异常,并将收取数据data置为空
                data = obj.recv(1024).decode('utf-8')
                print(data)
            except Exception as e:
                data = ""

            if data:
                # 如果obj能正常接收数据,则认为它是一个可写的对象,然后将它加入w_inputs列表
                w_inputs.append(obj)
            else:
                # 如果数据data为空,则从inputs列表中移除此连接对象obj
                print("空消息")
                obj.close()
                inputs.remove(obj)


        print("分割线".center(60,"-"))

    # 遍历可写的对象列表,
    for obj in w:
        obj.send(b'ok')
        # 发送数据后删除w_inputs中的此obj对象,否则客户端断开连接时,会抛出”ConnectionResetError“异常
        w_inputs.remove(obj)

五、threading模块

七、socketserver落成产出

依靠TCP的套接字,关键正是八个巡回,二个三番三回循环,三个通信循环。

SocketServer内部采用 IO多路复用 以至 “四线程” 和 “多进度”
,从而完毕产出管理八个客商端诉求的Socket服务端。即:每一种客商端央浼连接到服务器时,Socket服务端都会在服务器是制造二个“线程”也许“进度”
专门担当管理当下顾客端的具有央求。

socketserver模块中的类分为两大类:server类(排除链接难点)和request类(解决通讯难点)

server类:

www.qy186.com 4

server类

request类:

www.qy186.com 5

request类

线程server类的两次三番关系:

www.qy186.com 6

线程server类的持续关系

进度server类的接轨关系:

www.qy186.com 7

进度server类的继续关系

request类的一连关系:

www.qy186.com 8

request类的承继关系

以下述代码为例,深入分析socketserver源码:

ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()

招来属性的顺序:ThreadingTCPServer –> ThreadingMixIn –>
TCPServer->BaseServer

  1. 实例化获得ftpserver,先找类ThreadingTCPServer__init__,在TCPServer中找到,从而实践server_bind,server_active
  2. ftpserver下的serve_forever,在BaseServer中找到,进而实践self._handle_request_noblock(),该措施相近是在BaseServer
  3. 执行self._handle_request_noblock()随着实践request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),然后履行self.process_request(request, client_address)
  4. ThreadingMixIn中找到process_request,开启十二线程应对现身,进而实行process_request_thread,执行self.finish_request(request, client_address)
  5. 上述四部分变成了链接循环,本有的开端步向拍卖通信部分,在BaseServer中找到finish_request,触发大家友好定义的类的实例化,去找__init__方法,而大家本身定义的类未有该方式,则去它的父类也正是BaseRequestHandler中找….

源码解析计算:
依据tcp的socketserver大家团结定义的类中的

  • self.server 即套接字对象
  • self.request 即贰个链接
  • self.client_address 即客商端地址

基于udp的socketserver大家团结定义的类中的

  • self.request是多个元组(第二个成分是顾客端发来的数目,第二部分是服务端的udp套接字对象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
  • self.client_address即顾客端地址。

线程是操作系统直接帮助的举办单元,由此,高端语言平时都内置三十二线程的帮忙,Python也不例外,而且,Python的线程是当真的Posix
Thread,实际不是盲目跟随大众出来的线程。

6.1 ThreadingTCPServer

ThreadingTCPServer完结的Soket服务器内部会为各类client创造贰个“线程”,该线程用来和顾客端进行相互。

使用ThreadingTCPServer:

  • 创设四个持续自 SocketServer.BaseRequestHandler 的类
  • 类中必需定义叁个名号为 handle 的方法
  • 启动ThreadingTCPServer。
  • 启动serve_forever() 链接循环

服务端:

import socketserver

class MyServer(socketserver.BaseRequestHandler):
    def handle(self):
        conn = self.request
        # print(addr)
        conn.sendall("欢迎致电10086,请输入1XXX,0转人工服务。".encode("utf-8"))
        Flag = True
        while Flag:
            data = conn.recv(1024).decode("utf-8")
            if data == "exit":
                Flag = False
            elif data == '0':
                conn.sendall("您的通话可能会被录音。。。".encode("utf-8"))
            else:
                conn.sendall("请重新输入。".encode('utf-8'))

if __name__ == '__main__':
    server = socketserver.ThreadingTCPServer(("127.0.0.1",60000),MyServer)
    server.serve_forever()  #内部实现while循环监听是否有客户端请求到达。

客户端:

import socket

ip_port = ('127.0.0.1',60000)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024).decode("utf-8")
    print('receive:',data)
    inp = input('please input:')
    sk.sendall(inp.encode('utf-8'))
    if inp == 'exit':
        break
sk.close()

Python的标准库提供了多少个模块:_threadthreading_thread是起码模块,threading是高等模块,对_thread进展了打包。绝大许多情形下,大家只须要采纳threading本条高等模块。

七、基于UDP的套接字

  • recvfrom(buffersize[, flags])收纳音讯,buffersize是一次接到几个字节的数据。
  • sendto(data[, flags], address)
    发送信息,data是要发送的二进制数据,address是要发送之处,元组方式,包涵IP和端口

服务端:

from socket import *
s=socket(AF_INET,SOCK_DGRAM)  #创建一个基于UDP的服务端套接字,注意使用SOCK_DGRAM类型
s.bind(('127.0.0.1',8080))  #绑定地址和端口,元组形式

while True:    #通信循环
    client_msg,client_addr=s.recvfrom(1024) #接收消息
    print(client_msg)
    s.sendto(client_msg.upper(),client_addr) #发送消息

客户端:

from socket import *
c=socket(AF_INET,SOCK_DGRAM)   #创建客户端套接字

while True:
    msg=input('>>: ').strip()
    c.sendto(msg.encode('utf-8'),('127.0.0.1',8080)) #发送消息
    server_msg,server_addr=c.recvfrom(1024) #接收消息
    print('from server:%s msg:%s' %(server_addr,server_msg))

依样葫芦即时聊天
出于UDP无连接,所以能够同一时间八个客商端去跟服务端通讯

服务端:

from socket import *

server_address = ("127.0.0.1",60000)
udp_server_sock = socket(AF_INET,SOCK_DGRAM)
udp_server_sock.bind(server_address)

while True:
    qq_msg,addr = udp_server_sock.recvfrom(1024)
    print("来自[%s:%s]的一条消息:33[32m%s33[0m"%(addr[0],addr[1],qq_msg.decode("utf-8")))
    back_msg = input("回复消息:").strip()
    udp_server_sock.sendto(back_msg.encode("utf-8"),addr)

udp_server_sock.close()

客户端:

from socket import *

BUFSIZE = 1024
udp_client_sock = socket(AF_INET,SOCK_DGRAM)
qq_name_dic = {
    "alex":("127.0.0.1",60000),
    "egon":("127.0.0.1",60000),
    "seven":("127.0.0.1",60000),
    "yuan":("127.0.0.1",60000),
}

while True:
    qq_name = input("请选择聊天对象:").strip()
    while True:
        msg = input("请输入消息,回车发送:").strip()
        if msg == "quit":break
        if not msg or not qq_name or qq_name not in qq_name_dic:continue
        print(msg,qq_name_dic[qq_name])
        udp_client_sock.sendto(msg.encode("utf-8"),qq_name_dic[qq_name])

        back_msg,addr = udp_client_sock.recvfrom(BUFSIZE)
        print("来自[%s:%s]的一条消息:33[32m%s33[0m" %(addr[0],addr[1],back_msg.decode("utf-8")))
udp_client_sock.close()

注意:
1.您独自运转方面的udp的客商端,你开采并不会报错,相反tcp却会报错,因为udp合同只担负把包发出去,对方收不收,作者历来不管,而tcp是依据链接的,必需有贰个服务端先运维着,客商端去跟服务端建构链接然后依托于链接手艺传递音信,任何一方试图把链接摧毁都会产生对方程序的垮台。

2.方面包车型大巴udp程序,你注释任何一条顾客端的sendinto,服务端都会卡住,为何?因为服务端有几个recvfrom将要对应多少个sendinto,哪怕是sendinto(b”)那也要有。

3.recvfrom(buffersize)若是设置每便选用数据的字节数,小于对方发送的数目字节数,借使运转Linux情形下,则只会选拔到recvfrom()所设置的字节数的数据;而大器晚成旦运营windows情形下,则会报错。

基于socketserver完毕八十多线程的UDP服务端:

import socketserver

class MyUDPhandler(socketserver.BaseRequestHandler):
    def handle(self):
        client_msg,s=self.request
        s.sendto(client_msg.upper(),self.client_address)

if __name__ == '__main__':
    s=socketserver.ThreadingUDPServer(('127.0.0.1',60000),MyUDPhandler)
    s.serve_forever()

1. 调用Thread类直接开立

启航四个线程正是把一个函数字传送入并创办Thread实例,然后调用start()初始施行:

www.qy186.com 9www.qy186.com 10

 1 import time, threading
 2 
 3 # 新线程执行的代码:
 4 def loop():
 5     print('thread %s is running...' % threading.current_thread().name)
 6     n = 0
 7     while n < 5:
 8         n = n + 1
 9         print('thread %s >>> %s' % (threading.current_thread().name, n))
10         time.sleep(1)
11     print('thread %s ended.' % threading.current_thread().name)
12 
13 print('thread %s is running...' % threading.current_thread().name)
14 t = threading.Thread(target=loop, name='LoopThread')
15 t.start()
16 t.join()
17 print('thread %s ended.' % threading.current_thread().name)
18 
19 
20 #运行结果:
21 #thread MainThread is running...
22 # thread LoopThread is running...
23 # thread LoopThread >>> 1
24 # thread LoopThread >>> 2
25 # thread LoopThread >>> 3
26 # thread LoopThread >>> 4
27 # thread LoopThread >>> 5
28 # thread LoopThread ended.
29 # thread MainThread ended.

实例1

鉴于其他进度私下认可就能够运行三个线程,大家把该线程称为主线程,主线程又有什么不可运维新的线程,Python的threading模块有个current_thread()函数,它世代再次回到当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在成立时钦赐,大家用LoopThread命名子线程。名字只是在打字与印刷时用来体现,完全未有别的意思,假若不起名字Python就机关给线程命名叫Thread-1Thread-2……

www.qy186.com 11www.qy186.com 12

 1 import threading
 2 import time
 3 
 4 def countNum(n): # 定义某个线程要运行的函数
 5 
 6     print("running on number:%s" %n)
 7 
 8     time.sleep(3)
 9 
10 if __name__ == '__main__':
11 
12     t1 = threading.Thread(target=countNum,args=(23,)) #生成一个线程实例
13     t2 = threading.Thread(target=countNum,args=(34,))
14 
15     t1.start() #启动线程
16     t2.start()
17 
18     print("ending!")
19 
20 
21 #运行结果:程序打印完“ending!”后等待3秒结束
22 #running on number:23
23 #running on number:34
24 #ending!

实例2

该实例中国共产党有3个线程:主线程,t1和t2子线程

www.qy186.com 13

 

2. 自定义Thread类承袭式成立

www.qy186.com 14www.qy186.com 15

 1 #继承Thread式创建
 2 
 3 import threading
 4 import time
 5 
 6 class MyThread(threading.Thread):
 7 
 8     def __init__(self,num):
 9         threading.Thread.__init__(self)    #继承父类__init__
10         self.num=num
11 
12     def run(self):    #必须定义run方法
13         print("running on number:%s" %self.num)
14         time.sleep(3)
15 
16 t1=MyThread(56)
17 t2=MyThread(78)
18 
19 t1.start()
20 t2.start()
21 print("ending")

View Code

3. Thread类的实例方法

join和dameon

www.qy186.com 16www.qy186.com 17

 1 import threading
 2 from time import ctime,sleep
 3 
 4 def Music(name):
 5 
 6         print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
 7         sleep(3)
 8         print("end listening {time}".format(time=ctime()))
 9 
10 def Blog(title):
11 
12         print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
13         sleep(5)
14         print('end recording {time}'.format(time=ctime()))
15 
16 
17 threads = []
18 
19 
20 t1 = threading.Thread(target=Music,args=('FILL ME',))
21 t2 = threading.Thread(target=Blog,args=('',))
22 
23 threads.append(t1)
24 threads.append(t2)
25 
26 if __name__ == '__main__':
27 
28     #t2.setDaemon(True)
29 
30     for t in threads:
31 
32         #t.setDaemon(True) #注意:一定在start之前设置
33         t.start()
34 
35         #t.join()
36 
37     #t1.join()
38     #t2.join()    #  考虑这三种join位置下的结果?
39 
40     print ("all over %s" %ctime())

join和setDaemon

其他方式:

1 Thread实例对象的方法
2   # isAlive(): 返回线程是否活动的。
3   # getName(): 返回线程名。
4   # setName(): 设置线程名。
5 
6 threading模块提供的一些方法:
7   # threading.currentThread(): 返回当前的线程变量。
8   # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
9   # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

六、GIL

'''

定义:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)

'''

Python中的线程是操作系统的原生线程,Python虚构机使用三个大局解释器锁(Global
Interpreter
Lock)来互斥线程对Python虚构机的施用。为了帮忙八十九线程机制,三个为主的渴求便是内需贯彻分歧线程对分享能源访谈的排挤,所以引进了GIL。
GIL:在叁个线程具有领会释器的访谈权之后,别的的享有线程都必得等待它释放解释器的访问权,就算这一个线程的下一条指令并不会相互影响。
在调用任何Python C API早先,要先获得GIL
GIL缺点:多管理器退化为单管理器;优点:防止多量的加锁解锁操作。

1.
GIL的前期规划

Python扶持八线程,而消除二十四线程之间数据完整性和状态同步的最轻松易行方法自然就是加锁。
于是有了GIL那把极大锁,而当更加的多的代码库开采者选取了这种设定后,他们最早大批量注重这种特征(即暗中同意python内部对象是thread-safe的,没有必要在促成时思索外加的内存锁和同步操作)。慢慢的这种实现方式被开采是蛋疼且低效的。但当大家总结去拆分和去除GIL的时候,开采大批量库代码开垦者现已重度依赖GIL而极度难以去除了。有多难?做个类比,像MySQL那样的“小项目”为了把Buffer
Pool
Mutex那把大锁拆分成种种小锁也花了从5.5到5.6再到5.7多个大版为期近5年的年华,而且仍在这里起彼伏。MySQL这么些背后有同盟社扶植且有稳固支出团队的产品走的这么劳顿,这又加以Python那样基本开垦和代码进献者高度社区化的团体吗?

2.
GIL的影响

不论你启多少个线程,你有稍许个cpu,
Python在实行三个进度的时候会淡定的在长久以来时刻只允许贰个线程运维。
故此,python是无法运用多核CPU完成七十六八线程的。
如此,python对于总结密集型的职务开四线程的功用甚至不比串行(未有大气切换),可是,对于IO密集型的任务功效照旧有鲜明进级的。

www.qy186.com 18

计算密集型实例:

www.qy186.com 19www.qy186.com 20

 1 #coding:utf8
 2 from threading import Thread
 3 import time
 4 
 5 def counter():
 6     i = 0
 7     for _ in range(100000000):
 8         i = i + 1
 9     return True
10 
11 
12 def main():
13     l=[]
14     start_time = time.time()
15     for i in range(2):
16 
17         t = Thread(target=counter)
18         t.start()
19         l.append(t)
20         t.join()
21 
22     for t in l:
23         t.join()
24     # counter()
25     # counter()
26     end_time = time.time()
27     print("Total time: {}".format(end_time - start_time))
28 
29 if __name__ == '__main__':
30     main()
31 
32 
33 '''
34 py2.7:
35      串行:9.17599987984s
36      并发:9.26799988747s
37 py3.6:
38      串行:9.540389776229858s
39      并发:9.568442583084106s
40 
41 '''

测算密集型,四线程并发相比较串行,未有明显优势

3. 解决方案

用multiprocessing替代Thread
multiprocessing库的现身十分的大程度上是为了弥补thread库因为GIL而无效的重疾。它全部的复制了风流倜傥套thread所提供的接口方便迁移。唯大器晚成的不等就是它接收了多进度实际不是三十二线程。种种进程有温馨的独自的GIL,因而也不会自但是然进程之间的GIL争抢。

www.qy186.com 21www.qy186.com 22

 1 #coding:utf8
 2 from multiprocessing import Process
 3 import time
 4 
 5 def counter():
 6     i = 0
 7     for _ in range(100000000):
 8         i = i + 1
 9 
10     return True
11 
12 def main():
13 
14     l=[]
15     start_time = time.time()
16 
17     # for _ in range(2):
18     #     t=Process(target=counter)
19     #     t.start()
20     #     l.append(t)
21     #     #t.join()
22     #
23     # for t in l:
24     #    t.join()
25     counter()
26     counter()
27     end_time = time.time()
28     print("Total time: {}".format(end_time - start_time))
29 
30 if __name__ == '__main__':
31     main()
32 
33 
34 '''
35 
36 py2.7:
37      串行:8.92299985886 s
38      并行:8.19099998474 s
39 
40 py3.6:
41      串行:9.963459014892578 s
42      并发:5.1366541385650635 s
43 
44 '''

multiprocess多进度达成并发运算能够晋级效用

当然multiprocessing亦不是万能良药。它的引进会增添程序达成时线程间数据通信和协同的大多不便。就拿流量计来例如子,假如大家要三个线程累计同七个变量,对于thread来说,申Bellamy个global变量,用thread.Lock的context包裹住,三行就消除了。而multiprocessing由于经过之间不能见到对方的多少,只好通过在主线程申惠氏(WYETH)个Queue,put再get或然用share
memory的秘籍。那么些附加的实现资金财产使得本来就老大痛楚的八线程程序编码,变得更其痛心了。

小结:因为GIL的留存,独有IO
Bound场景下的四线程会获得较好的习性提高;纵然对并行总括品质较高的次第能够思索把基本部分改为C模块,或然差非常的少用别样语言完成;GIL在较长一段时间内将会三回九转存在,不过会持续对其开展改良。

七、同步锁(lock)

多线程和多进度最大的例外在于,多进度中,同多个变量,各自有后生可畏份拷贝存在于各种进度中,互不影响,而四线程中,全体变量都由全数线程共享,所以,任何一个变量都足以被别的三个线程订正,由此,线程之间分享数据最大的摇摇欲堕在于三个线程同一时候改一个变量,把内容给改乱了。

www.qy186.com 23www.qy186.com 24

 1 import time
 2 import threading
 3 
 4 def subNum():
 5     global num #在每个线程中都获取这个全局变量
 6     temp = num
 7     time.sleep(0.1)
 8     num =temp-1  # 对此公共变量进行-1操作
 9 
10 num = 100  #设定一个共享变量
11 thread_list = []
12 
13 for i in range(100):
14     t = threading.Thread(target=subNum)
15     t.start()
16     thread_list.append(t)
17 
18 for t in thread_list: #等待所有线程执行完毕
19     t.join()
20 
21 print('Result: ', num)
22 
23 
24 #运行结果:
25 #Result:  99

八线程分享变量,无法确认保证变量安全

如上实例,在三个进度内,设置分享变量num=100,然后创设玖十多个线程,推行num-=1的操作,可是,由于在函数subNum中留存time.sleep(0.1),该语句能够等价于IO操作。于是在此短短的0.1秒的大运内,全部的线程已经创办并运维,得到了num=100的变量,等待0.1秒过后,最后赢得的num其实是99.

锁平常被用来达成对分享资源的同步采访。为每一个分享财富创造二个Lock对象,当你须要会见该财富时,调用acquire方法来获得锁对象(要是此外线程已经取得了该锁,则当前线程需拭目以俟其被假释),待财富访问完后,再调用release方法释放锁:

www.qy186.com 25www.qy186.com 26

 1 import time
 2 import threading
 3 
 4 def subNum():
 5     global num #在每个线程中都获取这个全局变量
 6     lock.acquire()
 7     temp = num
 8     time.sleep(0.1)
 9     num =temp-1  # 对此公共变量进行-1操作
10     lock.release()
11 
12 
13 num = 100  #设定一个共享变量
14 lock = threading.Lock()    #生成一个同步锁对象
15 thread_list = []
16 
17 for i in range(100):
18     t = threading.Thread(target=subNum)
19     t.start()
20     thread_list.append(t)
21 
22 for t in thread_list: #等待所有线程执行完毕
23     t.join()
24 
25 print('Result: ', num)
26 
27 #运行结果:
28 #Result:  0

接受lock方法,保障变量安全

 

lock.acquire()与lock.release()包起来的代码段,保险同一时刻只允许一个线程援引。

1 import threading
2 
3 R=threading.Lock()
4 
5 R.acquire()
6 '''
7 对公共数据的操作
8 '''
9 R.release()

八、死锁与递归锁

所谓死锁:
是指四个或三个以上的经过或线程在施行进程中,因争夺财富而招致的生龙活虎种相互等待的光景,若无外力效能,它们都将不也许推进下去。此时称系统处于死锁状态或系统爆发了死锁,这一个永远在竞相等待的经过称为死锁进程。

www.qy186.com 27www.qy186.com 28

 1 import threading
 2 import time
 3 
 4 mutexA = threading.Lock()
 5 mutexB = threading.Lock()
 6 
 7 class MyThread(threading.Thread):
 8 
 9     def __init__(self):
10         threading.Thread.__init__(self)
11 
12     def run(self):
13         self.fun1()
14         self.fun2()
15 
16     def fun1(self):
17 
18         mutexA.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放
19 
20         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
21 
22         mutexB.acquire()
23         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
24         mutexB.release()
25 
26         mutexA.release()
27 
28 
29     def fun2(self):
30 
31         mutexB.acquire()
32         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
33         time.sleep(0.2)
34 
35         mutexA.acquire()
36         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
37         mutexA.release()
38 
39         mutexB.release()
40 
41 if __name__ == "__main__":
42 
43     print("start---------------------------%s"%time.time())
44 
45     for i in range(0, 10):
46         my_thread = MyThread()
47         my_thread.start()
48 
49 
50 
51 #运行结果:
52 #start---------------------------1494316634.4121563
53 #I am Thread-1 , get res: ResA---1494316634.4121563
54 #I am Thread-1 , get res: ResB---1494316634.4121563
55 #I am Thread-1 , get res: ResB---1494316634.4121563
56 #I am Thread-2 , get res: ResA---1494316634.4121563

死锁实例

 

在Python中为了协理在同一线程中往往央浼同一能源,python提供了可重入锁讴歌RDXLock。那么些TiggoLock内部维护着叁个Lock和八个counter变量,counter记录了acquire的次数,进而使得能源得以被屡屡require。直到叁个线程全体的acquire都被release,其他的线程技艺收获财富。上边的例子假设利用TiguanLock代替Lock,则不会发出死锁:

www.qy186.com 29www.qy186.com 30

 1 import threading
 2 import time
 3 
 4 # mutexA = threading.Lock()
 5 # mutexB = threading.Lock()
 6 rlock = threading.RLock()
 7 
 8 class MyThread(threading.Thread):
 9 
10     def __init__(self):
11         threading.Thread.__init__(self)
12 
13     def run(self):
14         self.fun1()
15         self.fun2()
16 
17     def fun1(self):
18         rlock.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放
19 
20         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
21 
22         rlock.acquire()
23         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
24         rlock.release()
25 
26         rlock.release()
27 
28 
29     def fun2(self):
30         rlock.acquire()
31         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
32         time.sleep(0.2)
33 
34         rlock.acquire()
35         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
36         rlock.release()
37 
38         rlock.release()
39 
40 if __name__ == "__main__":
41 
42     print("start---------------------------%s"%time.time())
43 
44     for i in range(0, 10):
45         my_thread = MyThread()
46         my_thread.start()
47 
48 
49 #运行结果:从以下结果也可以发现,线程之间是竞争关系
50 """
51 start---------------------------1494316940.0863945
52 I am Thread-1 , get res: ResA---1494316940.0873976
53 I am Thread-1 , get res: ResB---1494316940.0873976
54 I am Thread-1 , get res: ResB---1494316940.0873976
55 I am Thread-1 , get res: ResA---1494316940.287911
56 I am Thread-2 , get res: ResA---1494316940.287911
57 I am Thread-2 , get res: ResB---1494316940.287911
58 I am Thread-2 , get res: ResB---1494316940.287911
59 I am Thread-2 , get res: ResA---1494316940.4883447
60 I am Thread-4 , get res: ResA---1494316940.4883447
61 I am Thread-4 , get res: ResB---1494316940.4883447
62 I am Thread-4 , get res: ResB---1494316940.4883447
63 I am Thread-4 , get res: ResA---1494316940.6886203
64 I am Thread-6 , get res: ResA---1494316940.6886203
65 I am Thread-6 , get res: ResB---1494316940.6896234
66 I am Thread-6 , get res: ResB---1494316940.6896234
67 I am Thread-6 , get res: ResA---1494316940.890659
68 I am Thread-8 , get res: ResA---1494316940.890659
69 I am Thread-8 , get res: ResB---1494316940.890659
70 I am Thread-8 , get res: ResB---1494316940.890659
71 I am Thread-8 , get res: ResA---1494316941.0918815
72 I am Thread-10 , get res: ResA---1494316941.0918815
73 I am Thread-10 , get res: ResB---1494316941.0918815
74 I am Thread-10 , get res: ResB---1494316941.0918815
75 I am Thread-10 , get res: ResA---1494316941.2923715
76 I am Thread-5 , get res: ResA---1494316941.2923715
77 I am Thread-5 , get res: ResB---1494316941.2923715
78 I am Thread-5 , get res: ResB---1494316941.2923715
79 I am Thread-5 , get res: ResA---1494316941.493138
80 I am Thread-9 , get res: ResA---1494316941.493138
81 I am Thread-9 , get res: ResB---1494316941.493138
82 I am Thread-9 , get res: ResB---1494316941.493138
83 I am Thread-9 , get res: ResA---1494316941.6937861
84 I am Thread-7 , get res: ResA---1494316941.6937861
85 I am Thread-7 , get res: ResB---1494316941.6937861
86 I am Thread-7 , get res: ResB---1494316941.6937861
87 I am Thread-7 , get res: ResA---1494316941.8946414
88 I am Thread-3 , get res: ResA---1494316941.8946414
89 I am Thread-3 , get res: ResB---1494316941.8946414
90 I am Thread-3 , get res: ResB---1494316941.8946414
91 I am Thread-3 , get res: ResA---1494316942.0956843
92 """

递归锁消除死锁

九、event对象

线程的八个要害个性是各样线程都以单身运作且状态不行预测。假使程序中的其余线程须求通过判别有些线程的意况来分明本人下一步的操作,那时候线程同步难题就能够变得不得了讨厌。为了化解那几个主题材料,我们须要动用threading库中的伊芙nt对象。对象包罗一个可由线程设置的时限信号标记,它同意线程等待有个别事件的发生。在开班意况下,Event对象中的实信号标识被装置为False。假如有线程等待一个Event对象,
而这些伊夫nt对象的标识为False,那么那几个线程将会被平昔不通直至该标识为True。贰个线程假如将三个Event对象的非功率信号标识设置为True,它将唤起全数等待这一个Event对象的线程。如若一个线程等待一个曾经棉被服装置为确实伊夫nt对象,那么它将忽视这几个事件,
继续实践。

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

www.qy186.com 31

 

能够虚构豆蔻梢头种接纳场景(仅仅看做验证),比如,大家有多少个线程从Redis队列中读取数据来拍卖,这一个线程都要尝尝去连接Redis的劳务,平常情况下,如若Redis连接不成功,在逐个线程的代码中,都会去尝尝再一次连接。假若大家想要在运营时确定保障Redis服务平日,才让那一个工作线程去连接Redis服务器,那么咱们就能够利用threading.Event机制来和煦种种职业线程的总是操作:主线程中会去尝尝连接Redis服务,若无难题的话,触发事件,各工作线程会尝试连接Redis服务。

www.qy186.com 32www.qy186.com 33

 1 import threading
 2 import time
 3 import logging
 4 
 5 logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
 6 
 7 def worker(event):
 8     logging.debug('Waiting for redis ready...')
 9     while not event.isSet():
10         logging.debug('connect failed...')
11         event.wait(1)
12     logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
13     time.sleep(1)
14 
15 def main():
16     readis_ready = threading.Event()
17     t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
18     t1.start()
19 
20     t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
21     t2.start()
22 
23     logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
24     time.sleep(3) # simulate the check progress
25     logging.debug('redis server is running')
26     readis_ready.set()
27 
28 if __name__=="__main__":
29     main()
30 
31 
32 #运行结果:
33 (t1        ) Waiting for redis ready...
34 # (t1        ) connect failed...
35 # (t2        ) Waiting for redis ready...
36 # (t2        ) connect failed...
37 # (MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
38 # (t1        ) connect failed...
39 # (t2        ) connect failed...
40 # (t2        ) connect failed...
41 # (t1        ) connect failed...
42 # (MainThread) redis server is running
43 # (t2        ) redis ready, and connect to redis server and do some work [Tue May  9 16:15:18 2017]
44 # (t1        ) redis ready, and connect to redis server and do some work [Tue May  9 16:15:18 2017]

监听Redis服务

十、Semaphore(信号量)

Semaphore管理一个放到的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置流速計+1;
计数器无法小于0;当流量计为0时,acquire()将封堵线程直到其余线程调用release()。

实例:(同偶然间独有5个线程能够获得semaphore,即能够界定最大连接数为5):

www.qy186.com 34www.qy186.com 35

 1 import threading
 2 import time
 3 
 4 semaphore = threading.Semaphore(5)
 5 
 6 def func():
 7     if semaphore.acquire():
 8         print (threading.currentThread().getName() + ' get semaphore')
 9         time.sleep(2)
10         semaphore.release()
11 
12 for i in range(20):
13   t1 = threading.Thread(target=func)
14   t1.start()
15 
16 
17 #运行结果:
18 # Thread-1 get semaphore
19 # Thread-2 get semaphore
20 # Thread-3 get semaphore
21 # Thread-4 get semaphore
22 # Thread-5 get semaphore
23 # Thread-6 get semaphore#隔2秒打印
24 # Thread-7 get semaphore
25 # Thread-8 get semaphore
26 # Thread-9 get semaphore
27 # Thread-10 get semaphore
28 # Thread-11 get semaphore#隔2秒打印
29 # Thread-12 get semaphore
30 # Thread-13 get semaphore
31 # Thread-14 get semaphore
32 # Thread-15 get semaphore
33 # Thread-16 get semaphore#隔2秒打印
34 # Thread-17 get semaphore
35 # Thread-18 get semaphore
36 # Thread-20 get semaphore
37 # Thread-19 get semaphore

semaphore实例

十一、multiprocessing

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. 
The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. 
Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

 

鉴于GIL的留存,python中的多线程其实实际不是的确的四线程,假如想要丰裕地采用多核CPU的财富,在python中超级多情景必要动用多进程。

multiprocessing包是python中的多进程管理包。与threading.Thread肖似,它能够应用multiprocessing.Process对象来创立三个经过。该进程能够运维在Python程序内部编写的函数。该Process对象与Thread对象的用法相似,也许有start(),
run(),
join()的艺术。别的multiprocessing包中也是有Lock/伊芙nt/塞马phore/Condition类
(那么些目的能够像三十二线程那样,通过参数字传送递给各样进程),用以同步进度,其用法与threading包中的同名类大器晚成致。所以,multiprocessing的相当大风流倜傥部份与threading使用相符套API,只但是换成了多进度的境地。

www.qy186.com 36www.qy186.com 37

 1 from multiprocessing import Process
 2 import time
 3 def f(name):
 4 
 5     print('hello', name,time.ctime())
 6     time.sleep(1)
 7 
 8 if __name__ == '__main__':
 9     p_list=[]
10     for i in range(3):
11         p = Process(target=f, args=('alvin:%s'%i,))
12         p_list.append(p)
13         p.start()
14     for i in p_list:
15         p.join()
16     print('end')
17 
18 
19 #运行结果:
20 #hello alvin:0 Tue May  9 16:41:18 2017
21 #hello alvin:1 Tue May  9 16:41:18 2017
22 #hello alvin:2 Tue May  9 16:41:18 2017
23 #end

Process类调用

 

 

www.qy186.com 38www.qy186.com 39

 1 from multiprocessing import Process
 2 import time
 3 
 4 class MyProcess(Process):
 5     def __init__(self):
 6         super(MyProcess, self).__init__()
 7 
 8     def run(self):
 9 
10         print ('hello', self.name,time.ctime())
11         time.sleep(1)
12 
13 
14 if __name__ == '__main__':
15     p_list=[]
16     for i in range(3):
17         p = MyProcess()
18         p.start()
19         p_list.append(p)
20 
21     for p in p_list:
22         p.join()
23 
24     print('end')
25 
26 
27 #运行结果:
28 #hello MyProcess-1 Tue May  9 16:42:46 2017
29 #hello MyProcess-2 Tue May  9 16:42:46 2017
30 #hello MyProcess-3 Tue May  9 16:42:46 2017
31 #end

继承Process类调用

process类:

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,近年来还从未兑现,库引用中晋升必得是None; 
  target: 要实践的点子; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():重回经过是或不是在运营。

  join([timeout]):阻塞当前上下文境况的进度程,直到调用此措施的长河终止或达到钦点的timeout(可选参数)。

  start():进程准备伏贴,等待CPU调整

  run():strat()调用run方法,假使实例进程时未制定传入target,那star实践t暗中认可run()方法。

  terminate():不管任务是还是不是到位,马上甘休专业进度

属性:

  daemon:和线程的setDeamon成效近似

  name:进度名字。

  pid:进程号。

实例:

www.qy186.com 40www.qy186.com 41

 1 from multiprocessing import Process
 2 import os
 3 import time
 4 def info(name):
 5 
 6 
 7     print("name:",name)
 8     print('parent process:', os.getppid())
 9     print('process id:', os.getpid())
10     print("------------------")
11     time.sleep(1)
12 
13 def foo(name):
14 
15     info(name)
16 
17 if __name__ == '__main__':
18 
19     info('main process line')
20 
21 
22     p1 = Process(target=info, args=('alvin',))
23     p2 = Process(target=foo, args=('egon',))
24     p1.start()
25     p2.start()
26 
27     p1.join()
28     p2.join()
29 
30     print("ending")
31 
32 
33 
34 #运行结果:
35 # name: main process line
36 # parent process: 5112
37 # process id: 10808
38 # ------------------
39 # name: alvin
40 # name: egon
41 # parent process: 10808
42 # process id: 9576
43 # ------------------
44 # parent process: 10808
45 # process id: 9604
46 # ------------------
47 # ending

process类创造多进度

透过tasklist(Win)大概ps -elf
|grep(linux)命令检验每三个历程号(PID)对应的进程名.

十二、协程

 1 import time
 2 
 3 """
 4 传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
 5 如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
 6 """
 7 # 注意到consumer函数是一个generator(生成器):
 8 # 任何包含yield关键字的函数都会自动成为生成器(generator)对象
 9 
10 def consumer():
11     r = ''
12     while True:
13         # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
14         #    yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
15         #    当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
16         #    就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
17         n = yield r
18         if not n:
19             return
20         print('[CONSUMER] ←← Consuming %s...' % n)
21         time.sleep(1)
22         r = '200 OK'
23 def produce(c):
24     # 1、首先调用c.next()启动生成器
25     next(c)
26     n = 0
27     while n < 5:
28         n = n + 1
29         print('[PRODUCER] →→ Producing %s...' % n)
30         # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
31         cr = c.send(n)
32         # 4、produce拿到consumer处理的结果,继续生产下一条消息;
33         print('[PRODUCER] Consumer return: %s' % cr)
34     # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
35     c.close()
36 if __name__=='__main__':
37     # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
38     c = consumer()
39     produce(c)
40     
41     
42 '''
43 result:
44 
45 [PRODUCER] →→ Producing 1...
46 [CONSUMER] ←← Consuming 1...
47 [PRODUCER] Consumer return: 200 OK
48 [PRODUCER] →→ Producing 2...
49 [CONSUMER] ←← Consuming 2...
50 [PRODUCER] Consumer return: 200 OK
51 [PRODUCER] →→ Producing 3...
52 [CONSUMER] ←← Consuming 3...
53 [PRODUCER] Consumer return: 200 OK
54 [PRODUCER] →→ Producing 4...
55 [CONSUMER] ←← Consuming 4...
56 [PRODUCER] Consumer return: 200 OK
57 [PRODUCER] →→ Producing 5...
58 [CONSUMER] ←← Consuming 5...
59 [PRODUCER] Consumer return: 200 OK
60 '''

 

greenlet:

greenlet机制的要害考虑是:生成器函数恐怕协程函数中的yield语句挂起函数的施行,直到稍后使用next()或send()操作进行苏醒截止。能够运用贰个调治器循环在大器晚成组生成器函数之间合营四个职务。greentlet是python中落到实处大家所谓的”Coroutine(协程)”的二个基础库. 

 1 from greenlet import greenlet
 2  
 3 def test1():
 4     print (12)
 5     gr2.switch()
 6     print (34)
 7     gr2.switch()
 8  
 9 def test2():
10     print (56)
11     gr1.switch()
12     print (78)
13  
14 gr1 = greenlet(test1)
15 gr2 = greenlet(test2)
16 gr1.switch()
17 
18 
19 #运行结果:
20 #12
21 #56
22 #34
23 #78

基于greenlet的框架——gevent

gevent模块达成协程:

Python通过yield提供了对协程的主导援救,但是不完全。而第三方的gevent为Python提供了比较完备的协程扶植。

gevent是第三方库,通过greenlet完结协程,其核情绪想是:

当二个greenlet蒙受IO操作时,举个例子访谈互连网,就自行切换成别的的greenlet,等到IO操作完毕,再在适度的时候切换回来继续试行。由于IO操作非常耗费时间,平日使程序处于等候情状,有了gevent为大家自行切换协程,就保障总有greenlet在运转,并非等待IO。

由于切换是在IO操作时自动实现,所以gevent须求改进Python自带的有的标准库,那大器晚成历程在运行时通过monkey
patch达成:

www.qy186.com 42www.qy186.com 43

 1 from gevent import monkey
 2 monkey.patch_all()
 3 import gevent
 4 from urllib import request
 5 import time
 6 
 7 def f(url):
 8     print('GET: %s' % url)
 9     resp = request.urlopen(url)
10     data = resp.read()
11     print('%d bytes received from %s.' % (len(data), url))
12 
13 start=time.time()
14 
15 gevent.joinall([
16         gevent.spawn(f, 'https://itk.org/'),
17         gevent.spawn(f, 'https://www.github.com/'),
18         gevent.spawn(f, 'https://zhihu.com/'),
19 ])
20 
21 print(time.time()-start)
22 
23 
24 
25 #运行结果:
26 #GET: https://itk.org/
27 #GET: https://www.github.com/
28 #GET: https://zhihu.com/
29 #9077 bytes received from https://zhihu.com/.
30 #12323 bytes received from https://itk.org/.
31 #92574 bytes received from https://www.github.com/.
32 #3.7679357528686523

gevent实例

 

 

www.qy186.com 44www.qy186.com 45

 1 from gevent import monkey
 2 monkey.patch_all()
 3 import gevent
 4 from urllib import request
 5 import time
 6 
 7 def f(url):
 8     print('GET: %s' % url)
 9     resp = request.urlopen(url)
10     data = resp.read()
11     print('%d bytes received from %s.' % (len(data), url))
12 
13 start=time.time()
14 
15 # gevent.joinall([
16 #         gevent.spawn(f, 'https://itk.org/'),
17 #         gevent.spawn(f, 'https://www.github.com/'),
18 #         gevent.spawn(f, 'https://zhihu.com/'),
19 # ])
20 
21 f('https://itk.org/')
22 f('https://www.github.com/')
23 f('https://zhihu.com/')
24 
25 print(time.time()-start)
26 
27 
28 
29 #运行结果:
30 #GET: https://itk.org/
31 #12323 bytes received from https://itk.org/.
32 #GET: https://www.github.com/
33 #92572 bytes received from https://www.github.com/.
34 #GET: https://zhihu.com/
35 #8885 bytes received from https://zhihu.com/.
36 #5.089903354644775

对照串行情势的运作功效

 

仿效资料:

2.

 

相关文章