
因為并發相關的東西又多又長。。所以這個專題會分成多篇博客來寫啦。。本篇文章包括
攜程機制,攜程和線程的區別使用鎖來控制并發使用通道(channel)來控制并發通道的多路控制和超時(select語句塊)攜程的簡單使用攜程的使用非常簡單,在需要運行的函數前面加上?線程由于涉及到處理器切換人物,會導致吞吐率下降。而使用攜程,可以做到多個攜程只會使用一個系統現場。利用這種方式降低切換線程上下文的成本
(資料圖)
?go?
?關鍵字就可以了個人認為只需要把攜程看成輕量級線程,使用時可以直接當成線程使用上面的程序輸出結果如下(由于處理機調用的時間不一致,所以并不會出現順序的數字)
=== RUN TestGroutine4756189023--- PASS: TestGroutine (0.10s)PASS
結果如下,可以看見我們丟失了正確的鞋操作
counter = 4799
為了獲得正確的結果,我們需要使用鎖,和等待隊列
CSP相當于是通過通道(發送消息)的機制來進行線程通訊。CSP的操作是通過一個Channel來完成的。也就是,這種方式會比使用直接通訊耦合度更低
Channel的交互機制主動式交互:如果消息消費者離線,那么消息生產者就會阻塞隊列(緩存)式交互:在消息隊列未滿之前,消息的生產者可以一直放入消息,如果消息隊列滿了,那么消息生產者就會阻塞CSP實現異步調用,延遲返回值例如這里的Service方法是需要異步調用的//這個方法做一些耗時的操作,需要異步調用,func service() string { time.Sleep(time.Second*1) return "do something"}包裝上面的方法,返回一個channel(注意代碼里的chan類型的定義方式和把函數放入調用的方式)
//使用一個異步調用包裝上面的方法func asyncService() chan string { //創建一個string類型的頻道 returnChannel := make(chan string) //異步調用 go func() { fmt.Println("開始異步調用") returnChannel <- service() fmt.Println("異步調用結束") }() return returnChannel}這樣調用?
?asyncService?
?方法的時候雖然可以很快的返回chan類型的數據,但是本質上任然是通過攜程異步調用的,立即去獲取chan里的值將會因為異步調用沒有返回值而被阻塞我給出一個調用案例如下//在程序入口的線程里也要做一些操作func TestTask(t *testing.T) { //開始異步調用并且等待結果 result := asyncService() //開始做別的事情 time.Sleep(500*time.Millisecond) fmt.Println("do something else") //這里再等待結果 fmt.Println(<-result)}
結果如下
創建一個帶消息容量的channel是需要再make方法的參數后面加上一個int來表示消息隊列的容量就好了,如下=== RUN TestTask開始異步調用do something else異步調用結束do something--- PASS: TestTask (1.01s)PASS
returnChannel := make(chan string,1)
由于我對我的老本行——java后端的異步調用還不是很熟悉,那我就趁熱打鐵,寫一個Demo實現Java的異步調用
這一部分是使用Java的Callable和FutureTask來完成異步調用,在獲取調用結果的時候如果沒有完成,主線程就會阻塞直到任務完成。如果不感興趣可以直接跳到下一個大標題
使用Callable和ExecutorService線程池來完成任務package com.libi.callable;import java.util.concurrent.*;/** * @author :Libi * @version :1.0 * @date :2019-06-17 19:39 * 用于了解Callable的使用方法 */public class CallableDemo implements Callable使用FutureTask的普通的Thread來完成任務{ /** * 這個方法是用于異步調用的 * @return * @throws Exception */ @Override public String call() throws Exception { System.out.println("do something in 5 seconds"); Thread.sleep(5000); return "完成,這個任務做完了"; } /** * 這個線程池用于調用callable任務,并且可以獲得 */ private ExecutorService executorService = Executors.newSingleThreadExecutor(); public static void main(String[] args) throws ExecutionException, InterruptedException { CallableDemo callableDemo = new CallableDemo(); System.out.println("開始異步調用"); //這一步可以封裝成方法 Future submit = callableDemo.executorService.submit(callableDemo); String s = submit.get(); System.out.println(s); callableDemo.executorService.shutdown(); }}
package com.libi.task;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;/** * @author :Libi * @version :1.0 * @date :2019-06-17 19:51 * 這個類是研究FutureTask的使用方法而創建的類 */public class TaskDemo { /** * 這種方法就沒有用到線程池,但是也簡單的實現類異步調用的 * @return */ private FutureTaskasyncService() { FutureTask stringFutureTask = new FutureTask (() -> { Thread.sleep(5000); return "任務完成"; }); new Thread(stringFutureTask).start(); return stringFutureTask; } public static void main(String[] args) throws ExecutionException, InterruptedException { TaskDemo demo = new TaskDemo(); System.out.println("開始異步調用"); FutureTask stringFutureTask = demo.asyncService(); String s = stringFutureTask.get(); System.out.println("異步調用結束"); System.out.println(s); }}
select塊就是用于多個異步調用的多路選擇和超時控制。
select語句塊類似switch,每一個case里都要使用一個頻道(chan類型)來獲得數據。只要有一個channel返回了數據,那么這個channel的語句塊就會執行。如果都沒有返回值,有default語句塊的話就會執行default語句塊
這里的channel應該提前啟動好,當我們要獲取結果時再去做相關處理
func TestSelect1(t *testing.T) { s := asyncService() time.Sleep(time.Millisecond*1000) select { //這一個語句塊是為了做超時處理,10s后如果沒有結果他就會返回結果 //(當然有了default語句塊這個語句塊也就沒有意義了) case <-time.After(10*time.Second): print("10s") case ret := <-s: print("result:",ret) default: t.Error("error") }}