Improper Synchronization
공유 변수는 편리할 수 있지만 심각한 동기화 오류가 발생할 수 있다.
Example
/* WARNING: This code is buggy! */
#include "csapp.h"
void *thread(void *vargp); /* Thread routine prototype */
/* Global shared variable */
volatile long cnt = 0; /* Counter */
int main(int argc, char **argv) {
long niters;
pthread_t tid1, tid2;
/* Check input argument */
if (argc != 2) {
printf("usage: %s <niters>\n", argv[0]);
exit(0);
}
niters = atoi(argv[1]);
/* Create threads and wait for them to finish */
Pthread_create(&tid1, NULL, thread, &niters);
Pthread_create(&tid2, NULL, thread, &niters);
Pthread_join(tid1, NULL);
Pthread_join(tid2, NULL);
/* Check result */
if (cnt != (2 * niters))
printf("BOOM! cnt=%ld\n", cnt);
else
printf("OK cnt=%ld\n", cnt);
exit(0);
}
/* Thread routine */
void *thread(void *vargp) {
long i, niters = *((long *)vargp);
for (i = 0; i < niters; i++)
cnt++;
return NULL;
}
Main Memory와 CPU 사이에 Cache가 있는데 Cache는 Main Memory에 있는 Object를 Caching 하기 때문에 Main Memory와 Cache에 있는 Object의 Version에 차이가 생길 수 있다. CPU와 Main Memory의 Object가 항상 동기화된다.
Cache가 복사본을 가지고 있기 때문에 CPU와 Main Memory가 동기화되기 전에는 서로 다른 version을 가지고 있을 수 있다. 예를 들어, CPU Cache에 있는 Object가 Main Memory에 동기화가 되지 않으면 Object의 version에 차이가 생길 수 있다.
이런 문제는 Sequential 프로그램에서 문제가 되지 않지만 Concurrent 하게 동작하는 경우 문제가 될 수 있기 때문에 volatile으로 선언한다. Cache가 없는 것과 동일하다고 생각하면 된다.
linux> ./badcnt 1000000
BOOM! cnt=1445085
linux> ./badcnt 1000000
BOOM! cnt=1915220
linux> ./badcnt 1000000
BOOM! cnt=1404746
두 Thread가 cnt를 인자만큼 증가시키기 때문에 2000000이 나올 것이라 예상되지만 그렇지 않다.
문제점은 cnt++라는 연산을 각 Thread가 Atomic 하게 실행한다고 가정했기 때문이다.
즉, 더 이상 쪼갤 수 없는 연산이라고 가정했기 때문이다.
그러나 Compile 하여 CPU가 보는 코드는 Assembly 코드이다.
이 Assembly 코드에서 cnt++은 Load, Update, Store 과정을 거쳐야 완료된다. 즉, 3개의 명령어로 쪼갤 수 있다.
명령어는 쪼갤 수 없지만 cnt++은 쪼갤 수 있다.
cnt++은 Atomic Operation이 아닌, 3개의 명령어로 실행된다.
각 Thread가 실행되다가 Context Switch가 발생하여 cnt를 제대로 읽고 쓰지 못하는 문제가 발생한다.
Semaphores
Edsger Dijkstra는 Semaphore라고 불리는 특별한 유형의 변수를 기반으로 다른 Thread와 동기화하는 문제에 해결책을 제안했다.
Semaphore(s)는 음이 아닌 정수 값을 갖는 전역 변수로서, P와 V라고 불리는 특수 연산으로만 조작할 수 있다.
Semapore의 초기 Value는 Critical Section에 들어갈 수 있는 Thread의 개수이다.
P(s)
- s가 0이 아니면 P는 s를 감소시키고 즉시 반환한다.
하드웨어적으로 설계되어 0이 아니면 감소시키는 연산은 Atomic 하다. - s가 0이면 0이 아닐 때까지 Thread를 일시 중단하고 V에 의해 Thread는 다시 시작된다.
- 다시 시작된 후 P는 감소하여 Control을 Caller에게 반환한다.
V(s)
- s를 1 증가시킨다.
이때 이 증가시키는 연산은 atomic 하다. - P에서 s가 0이 아닐 때까지 Thread가 Block 된 경우
V는 Thread 중 하나를 다시 시작하고 s를 감소하여 P를 완료한다.
V에서 대기 중인 Thread가 다시 시작되는 순서를 정의하지 않는다.
V가 대기 중인 Thread 정확히 하나를 다시 시작할 뿐이다.
Semaphore에서 여러 Thread가 대기 중일 때는 V의 결과로 어떤 Thread가 다시 시작될지 예측할 수 없다.
Semaphore functions
#include <semaphore.h>
int sem_init(sem_t *sem, 0, unsigned int value); /* initializes semaphore sem to value. */
int sem_wait(sem_t *s); /* P(s) */
int sem_post(sem_t *s); /* V(s) */
Returns: 0 if OK, −1 on error
#include "csapp.h"
void P(sem_t *s); /* Wrapper function for sem_wait */
void V(sem_t *s); /* Wrapper function for sem_post */
Returns: nothing
Using Semaphores for Mutual Exclusion
Basic idea
- mutex라는 고유한 Semaphore를 각 공유 변수(또는 관련된 집합)와 연결한다.
- 해당하는 Critical Section을 P(mutex)와 V(mutex)로 둘러싼다.
Terminology
- Binary semaphore: semaphore whose value is always 0 or 1
- Mutex: binary semaphore used for mutual exclusion
- P operation: “locking” the mutex
- V operation: “unlocking” or “releasing” the mutex
- “Holding” a mutex: locked and not yet unlocked.
- Counting semaphore: used as a counter for set of available resources.
Example
volatile long cnt = 0; /* Counter */
sem_t mutex; /* Semaphore that protects counter */
...
Sem_init(&mutex, 0, 1); /* mutex = 1 */
...
for (i = 0; i < niters; i++) {
P(&mutex);
cnt++;
V(&mutex);
}
...
Cirical Section인 cnt++을 P와 V로 둘러싸서 Mutual Exclusion을 보장한다.
Summary
- Programmers need a clear model of how variables are shared by threads.
- Variables shared by multiple threads must be protected to ensure mutually exclusive access.
- Semaphores are a fundamental mechanism for enforcing mutual exclusion.
Using Semaphores to Schedule Shared Resources
- Mutual Exclusion뿐 아니라 Semaphore의 또 다른 중요한 용도는
Shared Resourece에 대한 접근을 Schedule 하는 것이다. - Thread는 Semaphore 연산을 사용하여 프로그램 상태의 일부 조건이 True임을 다른 Thread에 알린다.
두 가지 고전적이고 유용한 예가 있다.
- The Producer-Consumer Problem
- The Readers-Writers Problem
Producer-Consumer Problem
The producer generates items and inserts them into a bounded buffer. The consumer removes items from the buffer and then consumes them.
Producer와 Consumer Thread는 n개의 Slot을 가지는 Bounded buffer를 공유한다.
- Producer Thread는 반복적으로 새로운 Item을 만들고 Buffer에 넣는다.
- Consumer Thread는 반복적으로 Buffer의 Item을 제거하고 사용한다.
Item을 넣고 제거하는 것은 공유 변수를 업데이트하는 것을 포함하기 때문에, Buffer에 대한 Mutual Exclusion Access를 보장해야 한다. 그러나 Mutual Exclusion을 보장하는 것으로 부족하다. Buffer에 대한 Access 또한 Schedule 해야 한다.
- Buffer가 Full이면(빈 Slot이 없는 경우), Producer는 Slot이 사용 가능할 때까지 기다려야 한다.
- Buffer가 Empty이면(사용 가능한 Item이 없는 경우), Consumer는 Item이 사용 가능할 때까지 기다려야 한다.
Example
- Multimedia processing:
- Multimedia 시스템에서 Producer는 Consumer가 비디오 프레임을 디코딩하여 화면에 렌더링 하는 동안 비디오 프레임을 인코딩할 수 있다.
- Event-driven graphical user interfaces
- Producer는 마우스와 키보드 Event를 감지하여 Buffer에 Insert 한다.
- Consumer가 Buffer에서 Event를 제거하고 화면에 Display 한다.
Producer-Consumer on an n-element Buffer
n개의 Slot을 가진 Buffer에서 Producer-Consumer를 다루려면 Mutex와 두 개의 Semaphore가 필요하다.
- mutex: Buffer에 Mutual Exclusion Access를 가능하게 한다.
- slots: Buffer의 사용 가능한 Slot의 수를 센다.
- items: Buffer의 사용 가능한 Item의 수를 센다.
typedef struct {
int *buf; /* Buffer array */
int n; /* Maximum number of slots */
int front; /* buf[(front+1)%n] is first item */
int rear; /* buf[rear%n] is last item */
sem_t mutex; /* Protects accesses to buf */
sem_t slots; /* Counts available slots */
sem_t items; /* Counts available items */
} sbuf_t;
void sbuf_init(sbuf_t *sp, int n);
void sbuf_deinit(sbuf_t *sp);
void sbuf_insert(sbuf_t *sp, int item);
int sbuf_remove(sbuf_t *sp);
Producer-Consumer 프로그램을 구축하기 위해 Sbuf라는 패키지를 사용한다.
- Subf는 sbuf_t 유형의 Bounded Buffer를 다룬다.
- Item은 동적으로 할당된 배열 buf에 저장된다.
- front, rear 인덱스는 배열의 첫 번째와 마지막 항목을 추적한다.
- 세 개의 Semaphore가 Buffer에 대한 Access를 동기화한다.
- mutex는 Mutal Exclusive Buffer Access를 제공한다.
- Slot은 빈 Slot의 수를 센다.
- Item은 사용 가능한 Item의 수를 센다.
sbuf_init
/* Create an empty, bounded, shared FIFO buffer with n slots */
void sbuf_init(sbuf_t *sp, int n) {
sp->buf = Calloc(n, sizeof(int));
sp->n = n; /* Buffer holds max of n items */
sp->front = sp->rear = 0; /* Empty buffer iff front == rear */
Sem_init(&sp->mutex, 0, 1); /* Binary semaphore for locking */
Sem_init(&sp->slots, 0, n); /* Initially, buf has n empty slots */
Sem_init(&sp->items, 0, 0); /* Initially, buf has zero data items */
}
- Buffer에 Heap 메모리를 할당.
- 빈 Buffer임을 알 수 있도록 front, rear를 0으로 설정
- 세 개의 Semaphore에 초기 값을 할당
sbuf_deinit
/* Clean up buffer sp */
void sbuf_deinit(sbuf_t *sp) {
Free(sp->buf);
}
- Buffer를 사용할 때 해당 공간을 해제
sbuf_insert
/* Insert item onto the rear of shared buffer sp */
void sbuf_insert(sbuf_t *sp, int item) {
P(&sp->slots); /* Wait for available slot */
P(&sp->mutex); /* Lock the buffer */
sp->buf[(++sp->rear)%(sp->n)] = item; /* Insert the item */
V(&sp->mutex); /* Unlock the buffer */
V(&sp->items); /* Announce available item */
}
- 사용 가능한 Slot을 기다림
- Mutex lock
- Item 추가
- Mutex unlock
- 새 Item의 사용 가능 여부를 알림
sbuf_remove
/* Remove and return the first item from buffer sp */
int sbuf_remove(sbuf_t *sp) {
int item;
P(&sp->items); /* Wait for available item */
P(&sp->mutex); /* Lock the buffer */
item = sp->buf[(++sp->front)%(sp->n)]; /* Remove the item */
V(&sp->mutex); /* Unlock the buffer */
V(&sp->slots); /* Announce available slot */
return item;
}
- 사용 가능한 Buffer Item을 기다림
- Mutex lock
- Buffer Item 제거
- Mutex unlock
- 새 Slot의 사용 가능 여부를 알려줌
Readers-Writers Problem
Readers-Writers Problem은 Mutual Exclusion의 문제를 일반화한 것이다.
Concurrent Threads 컬렉션은 Main Memory의 데이터 구조나 디스크의 데이터베이스와 같은 공유 객체를 Access 한다. 일부 Thread는 객체를 읽기만 하는 반면 다른 Trhead는 객체를 수정한다.
- Reader: 객체를 읽기만 하는 Thread
- Writer: 객체를 수정하는 Thread
Writer는 객체에 대한 Exclusive Access 권한을 가지고 있어야 하지만, Reader는 다른 Writer와 공유할 수 있다.
일반적으로 Conccurent Reader와 Writer의 수는 무한하다.
Readers-Writers 은 실제 시스템에서 많이 찾아볼 수 있다.
- First readers-writers problem (favors readers)
- writer가 객체를 사용할 수 있는 허가를 받지 않은 한, 어떤 Reader도 기다리게 하지 않을 것을 요구한다.
- 기다리고 있는 Writer 뒤에 도착하는 Reader는 Writer보다 우선순위를 가진다.
- 즉, 어떤 Reader도 Writer가 기다리고 있다고 해서 기다려서는 안 된다.
- Second readers-writers problem (favors writers)
- Writer가 write 할 준비를 마치면, 가능한 한 빨리 write를 수행한다.
- Writer가 기다리고 있더라도, Writer 뒤에 도착하는 Reader는 반드시 기다려야 한다.
위 두 가지 모두 Starvation가 발생할 수 있다.
Solution to First Readers-Writers Problem
- w Semaphore는 Critical Section에 대한 Access를 제어한다.
- mutex Semaphore는 현재 Critical Section에 있는 Readers 수를 세는 공유 변수 readcnt에 대한 Access를 보호한다.
- Writer는 w, mutex가 Critical Section에 들어갈 때마다 lock, 나갈 때마다 unlock 한다.
=> 어느 시점에서든 Critical Section에 최대 한 개의 Writer가 있다는 것을 보장한다. - Critical Section에 들어가는 첫 번째 Reader만 w를 lock 하고,
Critical Section을 나가는 마지막 Reader만 unlock 한다.
=> 한 개의 Reader가 w, mutex를 잡고 있는 한, Reader가 무한으로 Critical Section에 들어갈 수 있다. - w mutex는 다른 Reader가 존재하는 동안 들어가고 나가는 Reader에 의해 무시된다.
=> 한 개의 Reader가 w mutex를 잡고 있는 한 Reader는 무제한으로 Critical Section에 들어갈 수 있다. - Thread가 무기한 차단되고 진전을 이루지 못하는 Starvation을 초래할 수 있다.
- Writer는 일련의 Reader들이 도착하는 동안 무한정 기다릴 수 있다.
=> Starvation에 대한 해결책이 없다.
Putting It Together: A Concurrent Server Based on Prethreading
#include "csapp.h"
void echo(int connfd);
void * thread(void * vargp);
int main(int argc, char ** argv) {
int listenfd, * connfdp;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
pthread_t tid;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listenfd = Open_listenfd(argv[1]);
while (1) {
clientlen = sizeof(struct sockaddr_storage);
connfdp = Malloc(sizeof(int));
*connfdp = Accept(listenfd, (SA *) & clientaddr, & clientlen);
Pthread_create(& tid, NULL, thread, connfdp);
}
}
/* Thread routine */
void * thread(void * vargp) {
int connfd = * ((int *)vargp);
Pthread_detach(pthread_self());
Free(vargp);
echo(connfd);
Close(connfd);
return NULL;
}
위 Concurrent server에서 각 Client마다 새 Thread를 만들었다.
이 방법의 단점은 새로운 Client에서 새로운 Thread를 만드는데 적지 않은 비용이 발생한다는 것이다.
Prethreaded Concurrent Server
- Pthreading을 기반으로 한 서버는 Producer-Consumer 모델을 사용하여 이러한 Overhead를 줄인다.
- 서버는 Master Thread와 Worker Thread로 구성된다.
- Main Thread는 Client의 Connection Request를 반복적으로 Accept 하고
결과적으로 Connected Descriptor를 Buffer에 넣는다. - 각 Worker Thread는 Buffer에서 Descriptor를 반복적으로 제거하고
Client에 서비스를 제공하고 다음 Descriptor를 기다린다.
A Prethreaded Concurrent Echo Server
#include "csapp.h"
#include "sbuf.h"
#define NTHREADS 4
#define SBUFSIZE 16
void echo_cnt(int connfd);
void * thread(void * vargp);
sbuf_t sbuf;/* Shared buffer of connected descriptors */
int main(int argc, char ** argv) {
int i,
listenfd,
connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
pthread_t tid;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listenfd = Open_listenfd(argv[1]);
sbuf_init(& sbuf, SBUFSIZE);
for (i = 0; i < NTHREADS; i++) /* Create worker threads */
Pthread_create(& tid, NULL, thread, NULL);
while (1) {
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA *) & clientaddr, & clientlen);
sbuf_insert(& sbuf, connfd);/* Insert connfd in buffer */
}
}
void * thread(void * vargp) {
Pthread_detach(pthread_self());
while (1) {
int connfd = sbuf_remove(& sbuf);/* Remove connfd from buffer */
echo_cnt(connfd);/* Service client */
Close(connfd);
}
}
- sbuf를 초기화한 후, Main Thread는 Worker Thread Set을 생성한다.
- 무한 루프에 들어가 Connection Request를 수락하고 sbuf에 Connected Descriptor를 삽입한다.
- 각 Worker Thread는 Buffer에서 Connected Descriptor를 제거할 수 있을 때까지 기다린 다음
echo_cnt 함수를 호출하여 Client 입력에 echo 한다.
#include "csapp.h"
static int byte_cnt; /* Byte counter */
static sem_t mutex; /* and the mutex that protects it */
static void init_echo_cnt(void) {
Sem_init(&mutex, 0, 1);
byte_cnt = 0;
}
void echo_cnt(int connfd) {
int n;
char buf[MAXLINE];
rio_t rio;
static pthread_once_t once = PTHREAD_ONCE_INIT;
Pthread_once(&once, init_echo_cnt);
Rio_readinitb(&rio, connfd);
while((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
P(&mutex);
byte_cnt += n;
printf("server received %d (%d total) bytes on fd %d\n",
n, byte_cnt, connfd);
V(&mutex);
Rio_writen(connfd, buf, n);
}
}
- byte_cnt라는 Global 변수에 모든 Client로부터 받은 누적 Byte 수를 기록한다.
- byte_cnt와 mutex Semaphore를 초기화해야 한다.
- Sbuf 및 Rio 패키지에 사용한 방식은 초기화 함수를 명시적으로 호출하기 위해
Main Thread를 요구하는 것이다. - 다른 접근법은 pthread_once 함수를 사용하여 일부 Thread가
echo_cnt 함수를 호출할 때 초기화 함수를 호출한다.
=> 장점: 패키지를 더 쉽게 사용할 수 있다.
=> 단점: echo_cnt에 대한 모든 호출이 pthread_once로 호출돼서 유용하지 못하다.