在Linux網絡編程中如何實現信號處理和定時器功能呢?

2023-08-08 16:25:58 來源:周末程序猿

關于epoll驚群問題,什么是驚群呢?

比如我們在寫代碼過程中,使用兩個線程的epoll監聽socket,當socket上有事件發生時,兩個epoll都會被喚醒,導致會操作同一個socket,這就是驚群,那如何解決呢? (1)使用EPOLLEXCLUSIVEEPOLLEXCLUSIVEepoll的擴展選項,它允許一個線程獨占一個epoll實例,從而避免了epoll的驚群問題; (2)使用EPOLLONESHOT:對于注冊了EPOLLONESHOT事件的文件描述符,操作系統最多觸發一個可讀,可寫或者異常事件,且觸發一次,這樣就能確保一個線程獲取事件并處理,但是需要注意的是對于監聽類型(如accept)不能使用EPOLLONESHOT,否則就不能持續監聽連接,對于處理完了的非監聽事件,需要重置EPOLLONESHOT


(資料圖片僅供參考)

第一部分:信號

1、發送信號給進程

#include < sys/types.h >#include < signal.h >int kill(pid_t pid, int sig);

2、信號回調函數

#include < signal.h >typedef void (&__sighandler_t) (int);__sighandler_t signal(int sig, __sighandler_t _handler);int sigaction(int sig, const struct sigaction *act, struct sigaction *oact);

(1)__sighandler_t信號處理的函數指針,其中處理參數為觸發信號當前值,其中有兩個默認宏(SIG_DFL:使用信號默認處理,SIG_IGN:忽略目標信號); (2)signal注冊信號回調處理函數,返回值為一個函數指針,含義是這個信號上一次處理的回調函數或者是系統默認的處理函數,這里目的是讓用戶可以自己恢復信號處理方式,比如系統對于一些信號是殺掉進程的,這里就應該處理完自己的回調邏輯后再調用系統默認行為; (3)sigaction函數的功能是檢查或修改與指定信號相關聯的處理動作,使用樣例如下:

#include < stdio.h >#include < unistd.h >#include < stdlib.h >#include < signal.h > int main(){    struct sigaction newact, oldact;     newact.sa_handler = SIG_IGN; // 設置信號忽略,也可以設置為處理函數    sigemptyset(&newact.sa_mask);    newact.sa_flags = 0;    int count = 0;    pid_t pid = 0;     sigaction(SIGINT, &newact, &oldact); // 原始的備份到oldact,為后續的處理恢復     pid = fork();    if (pid == 0)    {        while(1)        {            printf("child exec ...n");            sleep(1);        }        return 0;    }     while (1)    {        if (count++ > 3)        {            sigaction(SIGINT, &oldact, NULL); // 恢復父進程信號處理方式            kill(pid, SIGKILL); // 父進程發信號給子進程        }         printf("father exec ...n");        sleep(1);    }     return 0;}

第二部分:定時器

Linux網絡編程中,定時器的作用主要是管理定時任務,處理過期連接,檢測超時隊列等,那我們可以通過哪些方式實現定時器呢?

1、利用系統API

...setsockopt(socketfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);setsockopt(socketfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, len);int number = epoll_wait(fd, events, MAX_EVENT_NUMBER, timeout);...

通過使用socket的參數,設置連接句柄的發送和接收數據超時時間,可以實現定時處理: (1)SO_SNDTIMEO發送數據超時時間,根據timeout設置; (2)SO_RCVTIMEO接收數據超時時間,根據timeout設置; IO復用的參數中都帶了一個timeout參數,可以設置來達到定時觸發分支邏輯,比如epoll_wait

2、簡單的定時器

(1)啟動一個線程實現定時器,具體實現如下圖:

主線程啟動,開始執行任務,這里可以是網絡收發或者其他;啟動一個線程,做定時任務處理使用;主線程需要增加定時任務,可以將任務封裝為task,添加到任務隊列中;同時通知定時線程,隊列中有任務了,這里通知機制可以是信號量或者廣播方式;定時線程取出隊列中任務,判斷當前任務是否過期,如果過期就執行,沒有過期就繼續放入任務隊列中,同時這里需要讓線程等待隊列中距離下一個周期最短的時間,繼續取隊列任務;

(2)使用epoll_wait設置timeout,是在網絡事件觸發的定時器中最方便的方式,具體邏輯如下:

... start_timer = ... // 開始執行時間while (true) {    int number = epoll_wait(epfd, events, MAX_EVENT_NUMBER, timeout);    for (...) {        ...        // 處理連接任務        ...    }    end_timer = ... // epoll_wait返回并處理任務時間    // 處理定時任務,判斷當前時間是否在一個timeout    if (end_timer - start_timer > timeout) { // 這里是偽代碼,具體時間判斷可以參考linux結構體        ...        // 啟動線程執行定時任務邏輯        ...    }}

3、時間輪

時間輪是一種高效定時器,通過類似圓盤的形式定義每個tick,定時轉動圓盤,假設每次tick時間為si,一個時間輪有N個tick,那么執行轉動一圈時間為N*si; 現在插入一個任務,需要to1時間周期后執行,這里就分情況處理: (1)如果to1< N*si,則需要分配到(當前時間輪的位置 + to1/ si)的位置上,等待自然tick到達執行當前to1的定時任務; (2)如果to1> N*si,則需要分配到(當前時間輪的位置 + (to1% N) / si+ N)的位置上,由于to1執行時間超過一輪的周期,所以需要等待多輪轉動后才能執行,那如何處理呢?因此我們將每個輪的tick上掛一個鏈表,這個鏈表的節點表示到達這個tick需要執行的任務to1,這里的節點有可能是大于一個輪轉動的事件周期,也可能就是當前輪時間周期內執行,我們只需要當事件到達tick時,取出鏈表遍歷鏈表節點to1,判斷是否是當前事件周期內執行,如果是摘除鏈表節點然后執行任務,如果不是則重新計算to1需要多久后執行,計算方法就和上面的一樣(當前時間輪位置 + ((to1- 鏈表最小的周期時間) % N) / si+ N),然后將當前鏈表節點重新放回;

事件輪

4、時間堆

堆的數據結構應該大家都比較熟悉了,堆是一種滿足以下條件的樹:

堆中某個節點的值總是不大于或不小于其父節點的值;堆總是一棵完全二叉樹;添加堆節點的時間復雜度O(lgn),刪除節點是O(lgn),獲取節點是O(1);

時間堆

(1)循環線程讀取最小時間堆的堆頂元素; (2)取出最小節點,判斷當前事件是否過期,如果過期則繼續執行,否則不處理; (3)將最小節點對應的事件丟給執行線程執行; 這里最小時間堆節點在代碼實現中可以用一個數組表示,使用完全二叉樹的排列。

#include< iostream >void heapify(int arr[], int n, int i) {    if (i >= n) return;        int min_node = i;    int lson = i * 2 + 1;    int rson = i * 2 + 2;    if (lson < n && arr[min_node] > arr[lson]) { // 和左孩子比較,找到最小節點        min_node = lson;    }    if (rson < n && arr[min_node] > arr[rson]) { // 和右孩子比較,找到最小節點        min_node = rson;    }    if (min_node != i) {        swap(arr[min_node], arr[i]);        heapify(arr, n, min_node); // 遞歸處理    }}void heapSort(int arr[], int n) {    // 反向取出最后一個節點    int lastNode = n - 1;    int parent = (lastNode - 1) / 2;    for (int i = parent; i >= 0; i--) {        heapify(arr, n, i);     }    for (int i = n - 1; i >= 0; i--) {        swap(arr[i], arr[0]);        heapify(arr, i, 0); // 調整堆節點    }}int main() {    int arr[5] = { 70, 41, 10, 90, 18, 26 };    heap_sort(arr, sizeof(arr) / sizeof(arr[0]));    for (int i = 0; i < sizeof(arr) / sizeof(arr[0]); i++) {        cout < < arr[i] < < endl;    }    return 0;}

標簽:

上一篇:Linux內核的編譯主要過程
下一篇:最后一頁