日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Python中的協程(Coroutine)操作模塊(greenlet、gevent)_python

作者:springsnow ? 更新時間: 2022-07-27 編程語言

一、協程介紹

協程:英文名Coroutine,是單線程下的并發,又稱微線程,纖程。

協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。對比操作系統控制線程的切換,用戶在單線程內控制協程的切換。

協程自己本身無法實現并發(甚至性能會降低),協程+IO切換性能提高。

1、介紹

通常程序中子程序調用總是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不同。

協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接著執行。

注意,在一個子程序中中斷,去執行其他子程序,不是函數調用,有點類似CPU的中斷。

看起來A、B的執行有點像多線程,但協程的特點在于是一個線程執行,那和多線程比,協程有何優勢?

最大的優勢就是協程極高的執行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。

第二大優勢就是不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。

因為協程是一個線程執行,那怎么利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。

2、舉例

Python對協程的支持是通過generator實現的。

在generator中,我們不但可以通過for循環來迭代,還可以不斷調用next()函數獲取由yield語句返回的下一個值。

但是Python的yield不但可以返回一個值,它還可以接收調用者發出的參數。

來看例子:

傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。

如果改用協程,生產者生產消息后,直接通過yield跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'


def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()


c = consumer()
produce(c)

執行結果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

注意到consumer函數是一個generator,把一個consumer傳入produce后:

  • 首先調用c.send(None)啟動生成器;
  • 然后,一旦生產了東西,通過c.send(n)切換到consumer執行;
  • consumer通過yield拿到消息,處理,又通過yield把結果傳回;
  • produce拿到consumer處理的結果,繼續生產下一條消息;
  • produce決定不生產了,通過c.close()關閉consumer,整個過程結束。

整個流程無鎖,由一個線程執行,produceconsumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。

最后套用Donald Knuth的一句話總結協程的特點:“子程序就是協程的一種特例。”

3、優點如下:

  • 協程的切換開銷更小,屬于程序級別的切換,操作系統完全感知不到,因而更加輕量級
  • 單線程內就可以實現并發的效果,最大限度地利用cpu

4、缺點如下:

  • 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程
  • 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程

5、總結協程特點:

  • 必須在只有一個單線程里實現并發
  • 修改共享數據不需加鎖
  • 用戶程序里自己保存多個控制流的上下文棧
  • 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))

二、greenlet(綠葉)模塊

如果我們在單個線程內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過于麻煩(需要先得到初始化一次的生成器,然后再調用send。。。非常麻煩),而使用greenlet模塊可以非常簡單地實現這20個任務直接的切換。

1、安裝模塊

pip3 install greenlet

2、greenlet實現狀態切換

單純的切換(在沒有io的情況下或者沒有重復開辟內存空間的操作),反而會降低程序的執行速度。

from greenlet import greenlet


def eat(name):
    print('%s eat 1' % name)
    g2.switch('nick')
    print('%s eat 2' % name)
    g2.switch()


def play(name):
    print('%s play 1' % name)
    g1.switch()
    print('%s play 2' % name)


g1 = greenlet(eat)
g2 = greenlet(play)

g1.switch('nick')  # 可以在第一次switch時傳入參數,以后都不需要

3、效率對比

greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。

單線程里的這20個任務的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2...如此,才能提高效率,這就用到了Gevent模塊。

#順序執行
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337

#切換
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524

三、gevent模塊

Gevent 是一個第三方庫,可以輕松實現并發同步或異步編程,在gevent中用到的主要模式是Greenlet,它是以C擴展模塊形式接入Python的輕量級協程。

1、安裝

pip3 install gevent

2、 用法介紹

g1=gevent.spawn(func,1,,2,3,x=4,y=5):# 創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,后面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的
g2=gevent.spawn(func2)
g1.join():#等待g1結束
g2.join():#等待g2結束

#上述兩步合成一步:
gevent.joinall([g1,g2])
g1.value
:#拿到func1的返回值

1、遇到io主動切換

import gevent

def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
# 或者gevent.joinall([g1,g2])
print('主')

上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了。

from gevent import monkey;monkey.patch_all()

必須放到被打補丁者的前面,如time,socket模塊之前。或者我們干脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭。

from gevent import monkey;monkey.patch_all()

import gevent
import time

def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])
print('主')

2、 查看threading.current_thread().getName()

我們可以用threading.current_thread().getName()來查看每個g1和g2,查看的結果為DummyThread-n,即假線程

from gevent import monkey;monkey.patch_all()
import threading
import gevent
import time

def eat():
    print(threading.current_thread().getName())
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print(threading.current_thread().getName())
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])
print('主')

3、Gevent之同步與異步

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time

def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():  # 同步
    for i in range(10):
        task(i)

def asynchronous(): # 異步
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)
    print('DONE')
    
if __name__ == '__main__':
    print('Synchronous:')
    synchronous()
    print('Asynchronous:')
    asynchronous()

#  上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。
#  初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,
#  后者阻塞當前流程,并執行所有給定的greenlet任務。執行流程只會在 所有greenlet執行完后才會繼續向下走。

4、Gevent之應用

通過gevent實現單線程下的socket并發

注意:from gevent import monkey;monkey.patch_all()一定要放到導入socket模塊之前,否則gevent無法識別socket的阻塞。

1、 服務端

from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#如果不想用money.patch_all()打補丁,可以用gevent自帶的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)

2、多線程并發多個客戶端

from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM) #套接字對象一定要加到函數內,即局部名稱空間內,放在函數外則被所有線程共享,則大家公用一個套接字對象,那么客戶端端口永遠一樣了
    c.connect((server_ip,port))

    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8080))
        t.start()

原文鏈接:https://www.cnblogs.com/springsnow/p/12017756.html

欄目分類
最近更新