訊息:【Python多任務--進程,協程】

2022-12-15 10:24:08 來源:51CTO博客

一、進程

進程是計算機中的程序關于某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。進程是線程的容器,一個進程可以有多個線程


(資料圖)

進程特征

動態性:進程的實質是程序在多道程序系統中的一次執行過程,進程是動態產生,動態消亡的。并發性:任何進程都可以同其他進程一起并發執行。獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位。異步性:由于進程間的相互制約,使進程具有執行的間斷性,即進程按各自獨立的、不可預知的速度向前推進。

二、python多進程

創建多進程的兩種方式:

調用multiprocessing.Process模塊重寫multiprocessing.Process類的run方法

1、調用multiprocessing.Process模塊創建進程

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)

強調:

需要使用關鍵字的方式來指定參數args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹

1 group參數未使用,值始終為None2 target表示調用對象,即子進程要執行的任務3 args表示調用對象的位置參數元組,args=("liang",)4 kwargs表示調用對象的字典,kwargs={"name":"anne","age":18}5 name為子進程的名稱

from multiprocessing import Processdef work1(name):    for i in range(4):        time.sleep(1)        print(f"{name}澆花的第{i + 1}秒")def work2(name):    for i in range(3):        time.sleep(1)        print(f"{name}打墻的第{i + 1}秒")if __name__ == "__main__":    p1 = Process(target=work1, args=("liang",))    p2 = Process(target=work2, args=("小狼",))    p1.start()    p2.start()    p1.join()    p2.join()    print("主線程執行完畢")#輸出liang澆花的第1秒小狼打墻的第1秒小狼打墻的第2秒liang澆花的第2秒liang澆花的第3秒小狼打墻的第3秒liang澆花的第4秒主線程執行完畢

2、重寫multiprocessing.Process類的run方法

將要執行的任務寫入run方法,同樣的任務可以多線程并行執行

import timefrom multiprocessing import Processclass MyProcess(Process):    """自定義的進程類"""    def __init__(self, name):        super().__init__()        self.name = name    def run(self):        for i in range(3):            time.sleep(1)            print(f"{self.name}澆花的第{i + 1}秒")if __name__ == "__main__":    q_list = []    for i in range(2):  #創建4個線程并啟動        p = MyProcess(f"liang{i}")        q_list.append(p)        p.start()    for q in q_list:   #等待啟動的線程執行結束        q.join()    print("主進程執行完畢")#輸出liang0澆花的第1秒liang1澆花的第1秒liang0澆花的第2秒liang1澆花的第2秒liang0澆花的第3秒liang1澆花的第3秒主進程執行完畢

注意:多進程 執行,必須在main函數下, if __name__ == "__main__":

三、多進程之間通訊

多進程之間所有資源都是獨立的,不能共享全局變量

queue.Queue模塊只能在一個進行中使用,可以實現一個進程中的多個線程相互通訊

多個進程之間的相互通訊,需要用到--multiprocessing.Queue:可以多個進程之間共用(通訊)

"""進程之間通信:使用隊列multiprocessing.Queue:可以多個進程之間共用(通訊)queue.Queue模塊只能在一個進行中使用,一個進程中多個線程使用"""from multiprocessing import Process, Queuedef work1(q):    for i in range(5000):        n = q.get()        n += 1        q.put(n)    print("work1結束時候n的值:", n)def work2(q):    for i in range(5000):        n = q.get()        n += 1        q.put(n)    print("work2結束時候n的值:", n)if __name__ == "__main__":    q = Queue()    q.put(100)    p1 = Process(target=work1, args=(q,))    p2 = Process(target=work2, args=(q,))    p1.start()    p2.start()    p1.join()    p2.join()    print("兩個子進程執行結束之后,主進程打印的n:", q.get())#輸出work2結束時候n的值: 10090work1結束時候n的值: 10100兩個子進程執行結束之后,主進程打印的n: 10100

===================================================================================================================================================================================================================================================

接上文,我們下面來講一下:

python多任務--協程

一、前言

協程

協程 ,又稱為微線程,它是實現多任務的另一種方式,只不過是比線程更小的執行單元。因為它自帶CPU的上下文,這樣只要在合適的時機,我們可以把一個協程切換到另一個協程。

協程的優勢

執行效率高,因為子程序切換函數,而不是線程,沒有線程切換的開銷,由程序自身控制切換。于多線程相比,線程數量越多,切換開銷越大,協程的優勢越明顯不需要鎖的機制,只有一個線程,也不存在同時寫變量沖突,在控制共享資源時也不需要加鎖。

二、實現協程的幾種方式

1、yield(生成器)可以很容易的實現從一個函數切換到另外一個函數

def work1():    for i in range(5):        print(f"work1--befor----{i}")        yield        print(f"work1--after----{i}")        time.sleep(0.5)def work2():    for i in range(5):        print(f"work2---befor---{i}")        yield        print(f"work2--after----{i}")        time.sleep(0.5)def main():    g1 = work1()    g2 = work2()    while True:        try:            next(g1)            print("主程序")            next(g2)        except StopIteration:            breakmain()

運行結果如下:

2、原生的協程

import asyncio# 定義一個協程函數async def work1():    for i in range(10):        print(f"work1--澆花----{i}")# 調用協程函數,返回的是一個協程對象cor1 = work1()# 執行協程asyncio.run(cor1)
2.1、使用原生的協程實現多任務(不同任務)

協程中切換,通過await語法來掛起自身的協程。await后面跟上耗時操作,耗時操作一般指IO操作: 網絡請求,文件讀取等,使用asyncio.sleep模擬耗時操作。協程的目的也是讓這些IO操作異步化。sleep()需要用asyncio.sleep()await必須要在 async def function(): 中用,否則會報錯

import asyncioasync def work1():    for i in range(3):        print(f"work1--澆花----{i}")        await asyncio.sleep(1)async def work2():    for i in range(5):        print(f"work2--打墻----{i}")        await asyncio.sleep(1)if __name__ == "__main__":    loop = asyncio.get_event_loop()    # 創建兩個協程任務    tasks = [        work1(),        work2(),    ]    # 啟動事件循環并將協程放進去執行    loop.run_until_complete(asyncio.wait(tasks))#輸出work1--澆花----0work2--打墻----0work1--澆花----1work2--打墻----1work1--澆花----2work2--打墻----2work2--打墻----3work2--打墻----4
2.2、使用原生的協程實現多任務(同一方法處理大量數據)
import asynciofrom queue import Queueimport timedef decorator(func):    def wrapper():        # 函數執行之前獲取系統時間        start_time = time.time()        func()        # 函數執行之后獲取系統時間        end_time = time.time()        print("執行時間為:", end_time - start_time)        return end_time - start_time    return wrapperasync def work1(q):    while q.qsize():        print(f"請求url:{q.get()}")        await asyncio.sleep(0.1)@decoratordef main():    #創建一個包含有1000條url的隊列    q = Queue()    for i in range(1000):        q.put(f"www.baidu.com.{i}")    loop = asyncio.get_event_loop()    # 創建100個協程任務    tasks = [work1(q) for i in range(100)]    # 啟動事件循環并將協程放進去執行    loop.run_until_complete(asyncio.wait(tasks))    loop.close()if __name__ == "__main__":    main()#輸出...請求url:www.baidu.com.890請求url:www.baidu.com.891請求url:www.baidu.com.892...執行時間為: 1.060093641281128

100個協程執行1000個耗時0.1秒的請求只需要1秒

2.3、版本區別:

python 3.7 以前的版本調用異步函數的步驟:(如以上代碼)

1、調用asyncio.get_event_loop()函數獲取事件循環loop對象2、通過不同的策略調用loop.run_forever()方法或者loop.run_until_complete()方法執行異步函數

python3.7 以后的版本

1、asyncio.run() 函數用來運行最高層級的入口點,下例的main()函數。此函數總是會創建一個新的事件循環并在結束時關閉之。它應當被用作 asyncio 程序的主入口點,理想情況下應當只被調用一次。2、await 等待一個協程,也可以啟動一個協程。3、asyncio.create_task() 函數用來并發運行作為 asyncio 任務 的多個協程。下例并發運行兩個work協程

改動后代碼如下

#以上省略async def main():    q = Queue()    for i in range(1000):        q.put(f"www.baidu.com.{i}")    #創建了任務    tasks = [asyncio.create_task(work1(q)) for i in range(100)]    #將任務丟到執行隊列里面去    [await t for t in tasks]if __name__ == "__main__":    m=main()    start_time = time.time()    asyncio.run(m)    end_time = time.time()    print("運行時間{}秒".format(end_time - start_time))

3、greenlet模塊

import timeimport greenlet"""greenlet:在協程之間只能手動進行切換"""def work1():    for i in range(6):        time.sleep(1)        cor2.switch()        print(f"澆花的第{i + 1}秒")def work2():    for i in range(5):        time.sleep(1)        cor1.switch()        print(f"打墻的第{i + 1}秒")cor1 = greenlet.greenlet(work1)cor2 = greenlet.greenlet(work2)cor1.switch()

4、gevent模塊實現多任務

gevent模塊對greenlet又做了一層封裝,當程序遇到IO耗時等待的時候會進行自動切換gevent中默認是遇到gevent.sleep()會自動進行切換如果讓gevent遇到io耗時自動切換:需要在程序的導包處加一個補丁monkey.patch_all(),該補丁不支持多線程
from gevent import monkeymonkey.patch_all()import geventdef work1():    for i in range(6):        gevent.sleep(1)        print(f"澆花的第{i + 1}秒")def work2():    for i in range(5):        gevent.sleep(1)        print(f"打墻的第{i + 1}秒")# 創建兩個協程g1 = gevent.spawn(work1)g2 = gevent.spawn(work2)# 等待所有協程任務運行完畢gevent.joinall([g1, g2])

?

示例:模擬50000個協程執行對100000個地址的請求

from gevent import monkeymonkey.patch_all()import geventimport time#創建100000個地址urls = ["http://www.baidu.com" for i in range(100000)]#定義需要執行的任務函數def work():    while urls:        url = urls.pop()        # res = requests.get(url)        time.sleep(0.5)        print(f"正在請求url:{url},請求結果:url")def main():    cos = []        #創建50000個協程    for i in range(50000):        cor = gevent.spawn(work)        cos.append(cor)    # 等待所有協程任務運行完畢    gevent.joinall(cos)main()

標簽: 執行完畢 基本單位 位置參數

上一篇:每日播報!goLang包以及并發編程
下一篇:最新資訊:Redis 官方可視化工具--RedisInsight