Concurrent Programming with I/O Multiplexing
사용자가 표준 입력에 입력하는 대화형 명령에 응답할 수 있는 echo server를 작성하도록 요청받았다고 가정한다.
이 경우 server는 connection request를 하는 network client와 키보드에서 command line을 입력하는 사용자 등 두 가지 독립적인 I/O 이벤트에 응답해야 한다. 어떤 이벤트를 먼저 기다려야 할까? 어느 쪽도 이상적이지 않다.
connection request이 accept되기를 기다리는 경우 입력 명령에 응답할 수 없다.
마찬가지로, read에서 명령 입력을 기다리는 경우 connection request를 응답할 수 없다.
이 딜레마에 대한 한 가지 해결책은 I/O multiplexing이다.
기본 아이디어는 select 함수를 사용하여 kernel에 프로세스를 일시 중지하도록 요청하며, 하나 이상의 I/O 이벤트가 발생한 후에만 응용 프로그램에 제어권을 반환하는 것이다.
- {0, 4} 집합에 있는 descriptor를 읽을 준비가 되면 반환한다.
- {1, 2, 7} 집합에 있는 descriptor를 쓸 준비가 되면 반환한다.
- I/O 이벤트가 발생하기를 기다리는 동안 152.13초가 경과한 경우 시간 초과.
select는 다양하게 사용할 수 있는 복잡한 함수다.
#include <sys/select.h>
int select(int n, fd_set *fdset, NULL, NULL, NULL);
Returns: nonzero count of ready descriptors, −1 on error
FD_ZERO(fd_set *fdset); /* Clear all bits in fdset */
FD_CLR(int fd, fd_set *fdset); /* Clear bit fd in fdset */
FD_SET(int fd, fd_set *fdset); /* Turn on bit fd in fdset */
FD_ISSET(int fd, fd_set *fdset); /* Is bit fd in fdset on? */
Macros for manipulating descriptor sets
select 함수는 descriptor 집합이라고 하는 fd_set type의 set을 조작한다. 논리적으로, 우리는 descriptor set을 크기 n의 bit vector로 생각한다.
각 비트 bk는 descriptor k에 해당한다. descriptor k는 bk = 1인 경우에만 descriptor set의 member이다. descriptor set은 (1) 할당, (2) 이 type의 변수 중 하나를 다른 변수에 할당, (3) FD_ZERO, FD_SET, FD_CLR 및 FD_ISSET을 사용하여 수정 및 검사 등 세 가지 작업만 수행할 수 있다.
select 함수는 두 가지 입력, 즉 read set이라 불리는 descript set(fdset)과 read set의 cadinality(n)을 취한다.
select 함수는 read set에서 하나 이상의 descriptor가 읽을 준비가 될 때까지 block한다.
descriptor의 1 byte 읽기 요청이 block되지 않는 경우에만 descriptor를 읽을 준비가 된다.
Example
#include "csapp.h"
void echo(int connfd);
void command(void);
int main(int argc, char **argv) {
int listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
fd_set read_set, ready_set;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listenfd = Open_listenfd(argv[1]);
FD_ZERO(&read_set); /* Clear read set */
FD_SET(STDIN_FILENO, &read_set); /* Add stdin to read set */
FD_SET(listenfd, &read_set); /* Add listenfd to read set */
while (1) {
ready_set = read_set;
Select(listenfd + 1, &ready_set, NULL, NULL, NULL);
if (FD_ISSET(STDIN_FILENO, &ready_set))
command(); /* Read command line from stdin */
if (FD_ISSET(listenfd, &ready_set)) {
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
echo(connfd); /* Echo client input until EOF */
Close(connfd);
}
}
}
void command(void) {
char buf[MAXLINE];
if (!Fgets(buf, MAXLINE, stdin))
exit(0); /* EOF */
printf("%s", buf); /* Process the input command */
}
open_listenfd 함수를 사용하여 listening descriptor를 open 하고 FD_ZERO를 사용하여 empty read set을 만든다.
listenfd = Open_listenfd(argv[1]);
FD_ZERO(&read_set); /* Clear read set */
다음으로 descriptor 0(표준 입력)과 descriptor 3(listenfd)으로 구성된 read set을 정의한다.
FD_SET(STDIN_FILENO, &read_set); /* Add stdin to read set */
FD_SET(listenfd, &read_set); /* Add listenfd to read set */
이제 server loop를 시작한다.
accept 함수를 호출하여 connection request를 기다리는 대신, listening descriptor 또는 표준 입력이 읽을 준비가 될 때까지 block하는 select 함수를 호출한다.
ready_set = read_set;
Select(listenfd + 1, &ready_set, NULL, NULL, NULL);
예를 들어, 사용자가 enter 키를 누르면 반환되는 read_set 값은 다음과 같다.
따라서 표준 입력 descriptor가 읽을 준비가 된다.
select가 반환되면 FD_ISSET 매크로를 사용하여 읽을 준비가 된 descriptor를 결정한다.
표준 입력이 준비되면 command 함수를 호출하여 명령을 읽고 구문 분석하고 응답한 후 main으로 돌아간다.
if (FD_ISSET(STDIN_FILENO, &ready_set))
command(); /* Read command line from stdin */
listening descriptor가 준비되면 accept를 호출하여 connected descriptor를 얻은 다음 echo 함수를 호출한다.
이 함수는 client가 연결의 끝을 닫을 때까지 client의 입력을 echo한다.
if (FD_ISSET(listenfd, &ready_set)) {
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
echo(connfd); /* Echo client input until EOF */
Close(connfd);
}
Issue
이 예시는 select 함수를 사용하기 좋은 예이지만, 문제점이 있다.
일단 client에 연결되면 client가 연결 끝을 닫을 때까지 input line을 계속 echo한다는 것이다. 따라서 표준 입력에 명령을 입력할 경우, server가 client로 완료될 때까지 응답을 받을 수 없다. 더 나은 접근법은 sever lopp를 통해 매번 한 개의 텍스트 행을 finer granualarity 하게 다중화하는 것이다.
I/O Multiplexed Event Processing
I/O multiplexing은 특정 이벤트의 결과로 진행되는 concurrent event-driven program의 기반으로 사용될 수 있다.
일반적인 아이디어는 logical flow를 state machine으로 모델링하는 것이다.
state machine은 state, input event, state 및 input event를 state에 매핑하도록 변환하는 집합이다.
각 변환은 쌍을 출력 상태로 매핑한다. self-loop는 동일한 입력 및 출력 state간의 변환이다.
server는 select 함수를 제공하는 I/O multiplexing을 사용하여 input event의 발생을 감지한다. connected descriptor가 읽을 준비가 되면 server는 state machine에 해당하는 변환을 실행한다.
Example
#include "csapp.h"
typedef struct
{ /* Represents a pool of connected descriptors */
int maxfd; /* Largest descriptor in read_set */
fd_set read_set; /* Set of all active descriptors */
fd_set ready_set; /* Subset of descriptors ready for reading */
int nready; /* Number of ready descriptors from select */
int maxi; /* High water index into client array */
int clientfd[FD_SETSIZE]; /* Set of active descriptors */
rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
} pool;
int byte_cnt = 0; /* Counts total bytes received by server */
int main(int argc, char **argv) {
int listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
static pool pool;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listenfd = Open_listenfd(argv[1]);
init_pool(listenfd, &pool);
while (1) {
/* Wait for listening/connected descriptor(s) to become ready */
pool.ready_set = pool.read_set;
pool.nready = Select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);
/* If listening descriptor ready, add new client to pool */
if (FD_ISSET(listenfd, &pool.ready_set)) {
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
add_client(connfd, &pool);
}
/* Echo a text line from each ready connected descriptor */
check_clients(&pool);
}
}
void init_pool(int listenfd, pool *p) {
/* Initially, there are no connected descriptors */
int i;
p->maxi = -1;
for (i = 0; i < FD_SETSIZE; i++)
p->clientfd[i] = -1;
/* Initially, listenfd is only member of select read set */
p->maxfd = listenfd;
FD_ZERO(&p->read_set);
FD_SET(listenfd, &p->read_set);
}
void add_client(int connfd, pool *p) {
int i;
p->nready--;
for (i = 0; i < FD_SETSIZE; i++) /* Find an available slot */
if (p->clientfd[i] < 0)
{
/* Add connected descriptor to the pool */
p->clientfd[i] = connfd;
Rio_readinitb(&p->clientrio[i], connfd);
/* Add the descriptor to descriptor set */
FD_SET(connfd, &p->read_set);
/* Update max descriptor and pool high water mark */
if (connfd > p->maxfd)
p->maxfd = connfd;
if (i > p->maxi)
p->maxi = i;
break;
}
if (i == FD_SETSIZE) /* Couldn’t find an empty slot */
app_error("add_client error: Too many clients");
}
void check_clients(pool *p) {
int i, connfd, n;
char buf[MAXLINE];
rio_t rio;
for (i = 0; (i <= p->maxi) && (p->nready > 0); i++) {
connfd = p->clientfd[i];
rio = p->clientrio[i];
/* If the descriptor is ready, echo a text line from it */
if ((connfd > 0) && (FD_ISSET(connfd, &p->ready_set))) {
p->nready--;
if ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
byte_cnt += n;
printf("Server received %d (%d total) bytes on fd %d\n",
n, byte_cnt, connfd);
Rio_writen(connfd, buf, n);
}
/* EOF detected, remove descriptor from pool */
else {
Close(connfd);
FD_CLR(connfd, &p->read_set);
p->clientfd[i] = -1;
}
}
}
}
active client의 집합이 pool 구조체로 관리된다.
typedef struct { /* Represents a pool of connected descriptors */
int maxfd; /* Largest descriptor in read_set */
fd_set read_set; /* Set of all active descriptors */
fd_set ready_set; /* Subset of descriptors ready for reading */
int nready; /* Number of ready descriptors from select */
int maxi; /* High water index into client array */
int clientfd[FD_SETSIZE]; /* Set of active descriptors */
rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
} pool;
main
init_pool을 호출하여 pool을 초기화하고 server는 무한 loop에 진입한다.
init_pool(listenfd, &pool);
이 loop의 각 반복 동안 두 가지 다른 종류의 입력 이벤트를 detect한다.
- server는 select 함수를 호출하여 새로운 client에서 도착하는 connection request
- 읽을 준비가 된 기존 client에 대한 connected descriptor
connection request가 도착하면 server는 connection을 open하고 add_client 함수를 호출하여 pool에 client를 추가한다.
if (FD_ISSET(listenfd, &pool.ready_set)) {
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen); // opens the connection
add_client(connfd, &pool); // add client to pool
}
마지막으로 server는 check_clients 함수를 호출하여 각 connected descriptor에서 하나의 텍스트 행을 echo한다.
/* Echo a text line from each ready connected descriptor */
check_clients(&pool);
init_pool
init_pool함수는 client pool을 초기화한다.
clientfd 배열은 connected descriptor set을 나타내며 정수 -1은 사용 가능한 슬롯을 나타낸다.
처음에는 connected descriptor set이 비어있고
p->maxi = -1;
for (i = 0; i < FD_SETSIZE; i++)
p->clientfd[i] = -1;
listening descriptor는 select read set에서 유일한 descriptor이다.
/* Initially, listenfd is only member of select read set */
p->maxfd = listenfd;
FD_ZERO(&p->read_set);
FD_SET(listenfd, &p->read_set);
add_client
add_client 함수는 active client pool에 new client를 추가한다.
clientfd 배열에서 빈 슬롯을 찾은 후 server를 connected descriptor를 배열에 추가하고 Rio 읽기 버퍼를 초기화하기 위해 descriptor로 rio_readlineb를 호출한다.
/* Add connected descriptor to the pool */
p->clientfd[i] = connfd;
Rio_readinitb(&p->clientrio[i], connfd);
그다음 connected descriptor를 select read set에 추가한다.
/* Add the descriptor to descriptor set */
FD_SET(connfd, &p->read_set);
pool의 global property들을 업데이트한다.
maxfd 변수는 select를 위해 가장 큰 file descriptor를 추적한다.
/* Update max descriptor and pool high water mark */
if (connfd > p->maxfd)
p->maxfd = connfd;
maxi 변수는 clientfd 배열에서 가장 큰 인덱스를 추적하여 check_clients 함수가 전체 배열을 검색할 필요가 없게 한다.
if (i > p->maxi)
p->maxi = i;
check_clients
descriptor에서 텍스트 행을 읽는 데 성공하면 해당 행을 client에 다시 echo한다.
if ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
byte_cnt += n;
printf("Server received %d (%d total) bytes on fd %d\n",
n, byte_cnt, connfd);
Rio_writen(connfd, buf, n);
client가 연결 끝을 닫았기 때문에 EOF를 감지하면 연결 끝을 닫고 pool에서 descriptor를 제거한다.
/* EOF detected, remove descriptor from pool */
else {
Close(connfd);
FD_CLR(connfd, &p->read_set);
p->clientfd[i] = -1;
}
Pros and Cons of Event-based Servers
장점
- 하나의 logical flow와 address space
- single-step debugger
- 프로세스 또는 스레드 오버헤드 없음.
단점
- process, thread 보다 복잡함.
- find-grained concurreny를 개발자가 제공해야 함.
- multi-core를 활용할 수 없다.