[讀書心得] The Linux Programming Interface 第 31 章:執行緒安全與個別執行緒儲存空間

前言

第 31 章節主要是延伸第 30 章,介紹執行緒安全函式,以及單次初始化,從中探討如何使用執行緒特有的資料 (specific data) 以及執行緒區域儲存空間 (thread-local storage),以讓現有的函式具有執行緒安全,而無需更動函式介面

31.1 執行緒安全

上週有提到這個程式

1
2
3
4
5
6
7
8
9
10
11
static int glob = 0;

static void incr(int loops)
{
int loc, j;
for (j = 0; j < loops; j++) {
loc = glob;
loc++;
glob = loc;
}
}

我們也討論出,glob 在多個執行緒執行 incr() 的情況下,會造成 glob 沒有辦法被預測
這不是我們所樂見的,所以針對 global 以及 static 變數,我們必須在執行緒中更加謹慎使用

有許多方式是可以使函式成為 thread-safty

  • 將一個函式、或是會共用之全域變數使用 mutex 進行關聯使用
    • 優點:簡易、單純
    • 缺點:只能以「序列 (serialized)」方式存取資源,耗時,且導致失去同步效果
  • 將 mutex 與一個共用變數關聯
    • 優點:可以讓多個 thread 執行相同函式,並且可並行操作
    • 缺點:要定義 critical section,若多個 thread 都要執行相同的 critical section,就沒辦法,乖乖等待吧

簡易範例與說明

這部分我們想要一個間單的例子作為說明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>

#define MAXSIZE 5 /*共享緩衝區的大小*/

int sharedArray[MAXSIZE]; /*sharedArray是共享緩衝區*/
int curr=-1; /*curr是用來指定sharedArray當前存有資料的最大位置*/
/*注意,sharedArray和curr都屬於共享資料*/

int empty=0;
int full=MAXSIZE;
pthread_mutex_t sharedMutex=PTHREAD_MUTEX_INITIALIZER; /*鎖定臨界區的mutex*/
sem_t waitNonEmpty, waitNonFull; /*等待"非空資源"和等待"非滿資源"的semaphor*/

void * readData(void * whichone)
{
int data, position;
while (1){
sem_wait(&waitNonEmpty); /*是否有"非空資源"*/

pthread_mutex_lock(&sharedMutex); /*進入臨界區*/
data = sharedArray[curr];
position = curr--;
printf ("%s read from the %dth: %d, /n", (char*)whichone, position, data);
sem_post(&waitNonFull); /*生成一個"非滿資源"*/
pthread_mutex_unlock(&sharedMutex); /*離開臨界區*/

sleep(2); /*跟同步無關的費時操作*/
}
}

void * writeData(void * whichone)
{
int data, position;
while (1) {
data=(int)(10.0*random()/RAND_MAX); /*生成一個隨機資料,注意是10.0而不是10*/
sem_wait(&waitNonFull); /*是否有"非滿資源"*/

pthread_mutex_lock(&sharedMutex); /*進入臨界區*/
position = ++curr;
sharedArray[curr]=data;
printf ("%s wrote to the %dth: %d, /n", (char*)whichone, position, data);
sem_post(&waitNonEmpty); /*生成一個"非空資源"*/
pthread_mutex_unlock(&sharedMutex); /*離開臨界區*/

sleep(1); /*跟同步無關的費時操作*/

}
}

int main (int argc, char** argv)
{
pthread_t consumer1, consumer2, producer1, producer2; /*兩個生產者和兩個消費者*/
sem_init(&waitNonEmpty, 0, empty); /*初始化訊號量*/
sem_init(&waitNonFull, 0, full);
/*注意,本問題中的兩種semaphore是有一定關係的,那就是它們的初始值之和應該等於共享緩衝區大小*/
/*即empty+full等於MAXSIZE*/

pthread_create (&consumer1, NULL, &readData, "consumer1");
pthread_create (&consumer2, NULL, &readData, "consumer2");
pthread_create (&producer1, NULL, &writeData, "producer1");
pthread_create (&producer2, NULL, &writeData, "producer2");
pthread_join (consumer1, NULL);
pthread_join (consumer2, NULL);
pthread_join (producer1, NULL);
pthread_join (producer2, NULL);
sem_destroy(&waitNonEmpty);
sem_destroy(&waitNonFull);

}

sem_wait() 的說明如下

1
2
3
4
5
6
sem_wait() decrements (locks) the semaphore pointed to by sem.  If
the semaphore's value is greater than zero, then the decrement
proceeds, and the function returns, immediately. If the semaphore
currently has the value zero, then the call blocks until either it
becomes possible to perform the decrement (i.e., the semaphore value
rises above zero), or a signal handler interrupts the call.

以上面的例子我們做了一個簡單的 Q&A:

Q:需要幾個 mutex ?以及有幾個 semaphore?

以上述的理論,應該是需要一個 mutex 以及會產生兩個 semaphore

  • 其中 mutex 是負責鎖定 critial section 的,以解決在 mutli-threading 中的 race condition
  • semaphore 是因為從例子中,消費者會因為緩衝區為空時被阻塞,而生產者則是緩衝區為滿的時候被阻塞,因此會需要兩個 semaphore

Q:有沒有需要修改什麼?

有,像是 sleep() 這種會影響執行緒的 non reentrancy function 應該拔掉

非執行緒安全的函式

SUSv3 的規範內,每個函式都要以「執行緒安全」的方式實作,除了下圖這些函式

SUSv4 修改了上圖的 functions

  • 移除 ecvt()fcvt()gcvt()gethostbyname()gethostbyaddr()等,因為這是 SUSv4 已移除了標準 function
  • 新增 strsignal() 以及 system()system() 會使得操作影響整個行程,所以是 non-reentrant

NOTE: thread-safety 最初始的初衷就是 function 必須要有 reentrancy 的特性

reentrant function v.s non-reentrant function

對於最理想的狀況,我們會希望 thread 都是 reentrant functions,除了可以不需使用 mutex 即可到達 thread-safety,而且也會因為沒有 mutex 少了 mutex lock/unlock 的成本

實作方法:儘量避免使用 global 以及 static variables

但並非如此理想,有些函式還是無法被設計成 reentrant function:例如 malloc lib

31.2 One-Time Initialization

多執行緒程式有時無論建立了多少個執行緒,會需要確保某些初始化動作只需要執行一次,例如:pthread_mutex_init() 會搭載特殊屬性進行初始化,並且只執行一次。

函式庫函式,可以透過 pthread_once() 執行一次性初始化

1
2
3
4
#include <pthrad.h>

/* Returns 0 on success, or a positive error number on error */
int pthread_once(pthread_once_t *once_control, void (*init)(void));

once_control 參數是一個 pointer,必須指向 PTHREAD_ONCE_INIT 的靜態初始值

1
pthread_once_init once_var = PTHREAD_ONCE_INIT;

init() function 不應該帶任何參數,也不應該 return 任何參數

31.3 執行緒特有資料 (Thread Specific Data, TSD)

要讓函式是 thread-safety,最有效的方式就是實作成 reentrant function,但若有函式會執行上述我們討論到的 non-thread-safety function 時,就必須修改自身的 API interface,簡單說,你想要在 myfunc() 裡面執行 rand() 函式,因為 rand() 可能是你的 thread 所需要執行的,結果你發現這個 API 會變成 non-thread-safety function,那怎麼辦呢?

  • 實作 rand() 或是找另一個方案,使 myfunc() 成為 thread-safety function
  • 使用 TSD buffer 管理,直接實作於 myfunc() 中,如圖所示

如何實現

實現步驟如下

  1. 函式會建立一個 key,藉由 pthread_key_create() 建立,而 key 的建立是在執行緒呼叫此函式的時候建立一次,因此需要使用 pthread_once() 函式協助。此函式不會產生 TSD buffer
  2. 透過 pthread_key_create() 的產生 key 的過程中需要告知 destructor function,當 key 被釋放的時候會自動執行此 callback function
  3. 函式被任何執行緒呼叫的時候,會透過 malloc 配置一個 TSD buffer,而且由於 pthread_once() 的緣故,該執行緒的初始化只會執行一次
  4. 若要存取 TSD buffer 的位址,函式需要透過 pthread_setspecific() 以及 pthread_getspecific()
  • pthread_setspecific():儲存此指標,並且記錄此指標相關的 key
  • pthread_getspecific():傳回之前儲存的指標以及之前呼叫此 function 的 thread key

API declarsion

首先是 pthread_key_create()

1
2
3
4
#include <pthread.h>

/* Returns 0 on success, or a positive error number on error */
int pthread_key_create(pthread_key_t *key, void (*destructor)(void *));

  • 因為傳回的 key 必須供每個執行緒使用,所以 key 應該指向一個 全域變數
  • destructor 需要設計者自行實作
  • pthread_key_t *key 可以得知其實 key 就是一個陣列索引,所以可以透過一個全域陣列去存取 thread key,並分別處理 destructor function

接下來是取得與設定的 Pthread API

1
2
3
4
5
6
7
#include <pthread.h>

/* Returns 0 on success, or a positive error number on error */
int pthread_setspecific(pthread_key_t key, const void *value);

/* Returns pointer, or NULL if no thread-specific data isassociated with key */
void *pthread_getspecific(pthread_key_t key);

其中 value 是什麼呢?其實是 destructor function 要執行的時候,需要回傳的 pointer,如果我們傳入的是一個純值且強制 cast 成 void*,這種狀況下 pthread_key_create() 中的 destructor 就會被指定成 NULL

1
2
3
4
5
void
dest(void *value)
{
/* Release storage pointed to by 'value' */
}

實例

實作一個 strerror()

1
2
3
4
5
6
7
8
9
10
11
12
char *
strerror(int err)
{
if (err < 0 || err >= _sys_nerr || _sys_errlist[err] == NULL) {
snprintf(buf, MAX_ERROR_LEN, "Unknown error %d", err);
} else {
strncpy(buf, _sys_errlist[err], MAX_ERROR_LEN - 1);
buf[MAX_ERROR_LEN - 1] = '\0'; /* Ensure null termination */
}

return buf;
}

這是什麼呢?這就是當傳入 err 的時候會將 buf 填入相對應的字串,例如

  • EINVAL: Invalid arguemnts
  • EPERM: Operation not permitted

看完結果後,接下來我們照著 thread-safety 的方式設計

首先建立 create key function 並告知只要執行一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static pthread_key_t strerrorKey;

static void /* One-time key creation function */
createKey(void)
{
int s;

/* Allocate a unique thread-specific data key and save the address
of the destructor for thread-specific data buffers */

s = pthread_key_create(&strerrorKey, destructor);
if (s != 0)
errExitEN(s, "pthread_key_create");
}

char * strerror(int err)
{
int s;
char *buf;

/* Make first caller allocate key for thread-specific data */

s = pthread_once(&once, createKey);
if (s != 0)
errExitEN(s, "pthread_once");
/* ....... */
}

接下來透過 key 取得 TSD buffer (pointer),若沒有就創一個 TSD buffer

1
2
3
4
5
6
buf = pthread_getspecific(strerrorKey);
if (buf == NULL) { /* If first call from this thread, allocate
buffer for thread, and save its location */
buf = malloc(MAX_ERROR_LEN);
if (buf == NULL)
errExit("malloc");

接下來就是通知 thread ,紀錄此 key 以及相對應的 buf address

1
2
3
4
5
6
7
8
9
10
if (buf == NULL) {          /* If first call from this thread, allocate
buffer for thread, and save its location */
buf = malloc(MAX_ERROR_LEN);
if (buf == NULL)
errExit("malloc");

s = pthread_setspecific(strerrorKey, buf);
if (s != 0)
errExitEN(s, "pthread_setspecific");
}

最後就是 buf 要在 key 被釋放的時候執行的 destructor function

1
2
3
4
5
static void                         /* Free thread-specific data buffer */
destructor(void *buf)
{
free(buf);
}

由以上的實作,就可以不需要更動 strerror_test.c 中的實作,將 strerror() function 變成 thread-safety function