티스토리 뷰

=> 이거 완전 그냥 노드 얘기임 😎


👾 ⬇️

Reactor Pattern for Scaling I/O Bound Server

아이유가 당신의 플랫폼으로 팬들과 소통하겠다고 얘기해서 엄청나게 유명해진 채팅 서버를 운영해야 하는 상황이라고 가정해보자. 당신은 10K 정도의 동시 접속이 이루어지고 있다는 것을 깨달았다. 1개의 단순한 서버라면 이것을 어떻게 핸들링 할까?

 

두가지 측면에서 생각해 볼 수 있다.

- 싱글 스레드를 써야 할까 멀티스레드로 해야 할까? 프로세스는 싱글? 멀티?

- 어떤 reading 매커니즘을 서야 할까? Blocking IO? Non-Blocking IO? 아니면 Demultiplexer?

 

(참고)

TCP 소켓 프로그래밍 basic

 

 

 

 

 

 

 

 

* 아래의 5 IO model을 적용하며 읽어보세요~ 🌹🌻🌸

⬇️ blocking IO model

🐨 첫번재 솔루션 : a thread per connection using Blocing I/O

 

아래의 (파이썬) Echo 서버는 다음과 같이 동작한다.

1. main loop에서 새로 들어오는 connection을 기다린다.

2. 각 connection 마다 새로운 스레드 위에서 처리한다. 

    - 각 스레드에서는 무한 루프로 클라이언트에게 받은 메세지를 echo 함.

# BlockingEchoServer.py

import socket
import threading

# 각 스레드에서 실행 될 echo 함수
def echo(conn):
   while True:
       # 새 메세지가 들어올 때 까지 여기서 스레드 block
       data = conn.recv(1024)        
       if not data: break
       conn.sendall(data)

# 새로운 소켓 연결을 담당하는 메인 스레드
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.bind(('localhost', 1234))
   s.listen(1000)
   while True:
       # 새로운 connection 있을 때 까지 여기서 blocking
       conn, addr = s.accept()
       threading.Thread(target=echo, args=(conn,)).start()
# BlockingEchoClient.py

import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.connect(('localhost', 1234))
   while True:
       s.sendall(input().encode('utf-8'))
       print(s.recv(1024))

소켓을 만들 때, 기본 상태는 blocking 모드이다. blocking 모드에서는 완료하거나 시스템이 connection 타임아웃 등으로 에러를 리턴하기 전까지 작업이 blocking 된다.

이것은 소켓이 I/O 작업을(예를 들면 recv, send 같은 함수) 기다리는 동안 그 뒤의 코드는 blocking 될 것임을 의미한다.

따라서 이 구현에서는 echo 함수는 단순히 recv함수에서 데이터를 받을 때 까지 스레드를 blocking 시킨다. 이 방식에서 1K의 동시 연결을 원한다면, 우리는 1K의 스레드를 생성해야 한다.

 

 

이 방법은 엄청나게 비효율적이다.

1. 스레드 컨텍스트 스위칭에 귀중한 cpu 타임이 소모된다. 스레드 개수가 늘어날 수록 오버헤드 증가

2. 대부분의 스레드는 IO 기다리는 동안 아무것도 안하고 block 되어 있는데 이런걸로 cpu 스케줄 되어 blocking된 스레드를 실행한다.

여기서도 cpu 타임 버리는 것

3. 예를 들어 우리가 1000번 스레드에서 메세지를 받았다 쳐보자. (우리의 스케줄링 매커니즘이 스레드 번호 순이라고 치면) 1-999번 스레드를 다 체크한 후에야 1000번 스레드에 요청한 유저에 응답을 보낼 수 있다. 엄청 구리다. (심지어 1-999 스레드가 그냥 blocking 된 채 I/O를 기다리기만 하는 중이었다고 생각하면 더..)

4. 스레드는 제한된 RAM 자원을 소모한다.

 

따라서 이 방법이 멀티스레드로 인한 모든 문제를 피하고 각 스레드로 로직을 감싸기 때문에 개발자 입장에선 코드 작성이 엄청 쉽겠지만

매우 비효율적인 방법이다!

 

 

 

 

 

 

 

 

 

⬇️ non-blocking IO model ( polling )

🐨 두번째 솔루션 : Single Threaded Busy-waiting on Multiple Non-Blocking I/O sockets

위 구현에의 문제점중 하나는 recv 함수가 어떤 데이터를 리턴하도록 기다리는 동안 각 스레드가 blocking 된다는 점이었다.

이번엔 소켓 접근에 다른 매커니즘을 사용해 보자 : Non-blocking sockets

non-blocking 모드에서는 즉시 완료되지 않으면 작업이 실패로 떨어진다. 

 

 

서버는 다음과 같이 동작한다.

1. 소켓 리스트를 만든다.

2. 해당 소켓 리스트에 대한 반복문을 계속 돈다.

     - 각 non-blocking 소켓에서 Read, write를 시도한다. 이 read, write 액션은 (ready 상태면 수행하고, 아니어도 error 리턴하는 식으로) 바로 리턴된다.

# BusyWaitNonBlockingEchoServer.py

import socket

sockets = set()
remove_pending = set()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
   server.bind(('localhost', 2002))
   server.listen(1000)
   server.setblocking(False)      # set Non-Blocking socket
   while True:
       try:
           # 새로운 connection 요청이 없었으면 기다리지 않고 바로 except 문으로
           conn, addr = server.accept()
           conn.setblocking(False)
           sockets.add(conn)
       except BlockingIOError: # [Errno 35] Resource temporarily unavailable - indicates that "accept" returned without results
           pass

       remove_pending.clear()
       # 모든 connection을 돌면서 새로 read 할만한게 없을지 확인한다.
       for conn in sockets:
           try:
               # recv 역시 읽어들일게 없으면 blocking되지 않고 바로 except 문으로
               data = conn.recv(1024)
               if not data: # connection closed
                   remove_pending.add(conn)
               else:
                   print(data)
                   conn.sendall(data)
           except BlockingIOError:  # recv/send return without data.
               pass

       # remove closed connections
       for conn in remove_pending:
           sockets.remove(conn)
# BusyWaitNonBlockingEchoClient.py

import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.connect(('localhost', 2002))
   while True:
       s.sendall(input().encode('utf-8'))
       print(s.recv(1024))

먼저, non-blocking 서버 소켓을 생성한 뒤 무한 루프 안에서 들어오는 connection을 accept한다. 새로운 connection이 있을 때까지 거기서 blocking되는 위 예제의 서버 소켓과 다르게 non-blocking 소켓의 accept함수는 (혹은 send, recv 함수 역시) 새로운 연결 요청 없을 시 바로 BlockingIOError Exception을 raise한다. ( 99% 경우 여기에 해당 )

따라서 여기서의 control flow는 Exception 기반이다. 매우 매우 busy loop 임.

 

커넥션 몇개를 연결 했다면, 이 커넥션 소켓들을 돌면서 메세지를 읽고 echo하는 작업을 시도할 것이다.

recv함수에서 아예 connection이 cosed 된 것과 새로운 데이터 입력이 없음을 어떻게 구분할까?

recv 함수 리턴값

connection이 closed => b"" (empty string)

no data available => raise BlockingIOError

 

닫힌 connection은 기억해 뒀다가 다음 iteration 전에 소켓 리스트에서 삭제해야 한다.

 

그렇다면 왜 이 솔루션 역시도 비효율적인걸까?

1. 소켓들의 상태와 무관하게 매번 모든 소켓을 다 돌면서 검사한다. 소켓당 최소 한번의 시스템 콜 호출을 하는건데, 만약 우리가 10K 소켓 연결을 가지고 있고, 9K번째 소켓의 메세지를 읽으려고 하더라도 다른 소켓 다 도느라 incoming 메세지 기다리게하고, 시간 쓰고..

2. 실질적으로 메세지를 읽는게 아니라 순전히 상태 체크를 위해 poll하는데도 계속 cpu를 사용한다. 이는 99%의 cpu 타임은 다른 서버 로직 실행이 아니라 polling에 쓰일 것임을 의미.

 

 

1번, 2번 솔루션까지 살펴보고 나서 우리는 소원이 하나 생겼다.

만약 주어진 socket 배열에서 IO 작업(read, write)이 ready 상태인 socket 만을 리턴해주는 마법같은 함수가 있으면 어떨까?

이제 select를 소개하겠다.

 

 

 

 

 

 

 

 

 

 

⬇️ I/O multiplexing

🐨 세번째 솔루션 : (Single Threaded) Synchronous Event Demultiplexer

os 마다 이름은 다를 수 있지만, 개념은 똑같다 - blocking 없이 여러개의 파일 (pipes/sockets/등..) 을 모니터링 하고 싶으면 select 함수를 호출 한다.

 

 

select 시스템 콜

man select

select는 I/O와 관련있는 file descriptor의 상태를 모니터링 하다가, read/write/(error)할 준비가 된 descriptor가 있을 때 리턴하는 시스템 콜이다.

 

인자로 3가지 fd set을 받는다. 모니터링 대상.

1. readfds => 이 중 read할 준비가 된 것을 output set에 추가시킴.

2. writefds => 이중 write할 준비가 된 것을 output set에 추가시킴.

3. errorfds => 어느 것이라도 error 상태인게 있으면 output set에 추가시킴.

 

select는 모니터링 중인 fd에 관련된 이벤트가 발생하기 전까지는 blocking된다.

 

*모니터링 할 수 있는 file descriptor 개수가 제한적이라 (1024개)이고 time-complexity가 O(n)이라 성능에 제약이 있는 select 대신

fd set에서 I/O 작업이 가능한게 있는지 체크하는 더 나은 시스템 콜도 존재한다.

- select (모든 플랫폼)

- poll (대부분의 POSIX 플랫폼)

- epoll (리눅스). 모니터링 할 fd를 저장하는데 red-black tree 자료구조를 사용한다.

- kqueue (FreeBSD, macOS)

- IOCP (window)

- /dev/poll (Solaris)

 

 

 

 

python 라이브러리에서의 구현

파이썬 라이브러리는 synchronous IO multiplexing을 지원하는 2가지 모듈을 제공한다 : select와 selectors

select는 더 로우레벨. (특정 select-family 시스템콜을 직접 노출함)

selectors는 더 하이레벨 모듈이다

- 당신 os에 맞게 best 구현체를 고른다. (epoll | kqueue | devpoll > poll > select 순으로)

- 모니터하고 싶은 특정 IO operation에 대한 fd를 register/unregister 할 수 있도록 BaseSelector 클래스를 정의한다.

 

 

 

 

 

 

selectors 모듈을 사용한 파이썬 에코 서버, 클라이언트 구현

 

 

data is an Optional opaque data object associated to this file object: for example, this could be used to store a per-client session ID.

data 부분엔 어떤 object든 넘길 수 있다. 예를 들어 per-client 세션 ID, callback 함수에 대한 포인터 (=> 여기 코드 예제에서 사용하고 있는 대로) , 추가적인 parameter 등등..

 

 

서버 코드

# echo server example

import selectors
import socket

sel = selectors.DefaultSelector()  # 시스템에 따라 epoll, select, poll, kqueue 등..

# 서버소켓 fd에 READ 이벤트 있을때의 콜백 (새로운 소켓 커넥션 연결하기)
def accept(sock, mask):
    # non-blocking accept. 바로 연결 못하면 에러 raise
    conn, addr = sock.accept()  
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    # monitor할 대상 fd set에 새로 만든 conn 등록하기.
    # 아래 loop 에서 key.data == read 
    sel.register(conn, selectors.EVENT_READ, read)  

# conn fd에 READ 이벤트 있을때의 콜백 (데이터 읽어서 echo)
def read(conn, mask):
    # non-blocking recv. 바로 읽지 못하면 에러 raise
    data = conn.recv(1000)  
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
# monitor할 대상 fd set에 서버 소켓 등록하기.
# 아래에서 key.data == accept (콜백)
sel.register(sock, selectors.EVENT_READ, accept)   

while True:
	# 여기서 event 있을 때까지 blocking
    events = sel.select()    
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

 

서버 소켓을 selector에 등록하고 어떤 이벤트를 모니터링 해야할지, 그리고 콜백함수를 data인자로 넘겨서 알려준다. 이 예시에서는 서버 소켓이 읽을 준비가 됐을 때(selectors.EVENT_READ 이벤트 발생시) accept 함수가 호출될 것이다.

while True:
    events = sel.select()    
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

이 코드가 일종의 아주 간단한 Event Loop이다!

이 무한 루프 안에서 select 함수가 호출된다. select함수는 모니터링 하고 있는 fd set중 어느 것이라도 등록된 operation (read/write/both)에 대해 ready 상태가 되면 리턴 될 것이다. 그러면 그 이벤트와 연관되어 있던 콜백을 호출한다.

 

새로운 유저가 서버에 연결하는 상황을 생각해보자. select는 서버 소켓이 reading할 준비가 되었다는 것을 감지한 뒤 accept함수를 호출할 것이다. 이 함수에서 incoming connection을 받아서 새로운 연결 소켓 (conn)을 생성한다. 이 것 역시 non-blocking 모드로 세팅해주고 selector에 등록해준다. conn의 read operation을 모니터링 하고, available 해지면 read 함수를 호출하도록.

 

이제 유저가 연결된 conn 소켓에 "Hello Kim"이라는 메세지를 전송했다고 치자.

Event loop에서 conn이 reading available 함을 detect 해서 select함수가 리턴한다. read함수가 호출되고 거기서 데이터를 recv해서 echo한다. 만약 이때 아무 데이터도 없다면 유저 측에서 connection kill을 했을 가능성이 있으므로 커넥션을 닫고 selecotor에서 unregister하도록 한다.

 

정석으로는 conn 소켓을 selector.EVENT_WRITE 에 대해서도 등록한다음에 send 작업을 해야하지만 간단하게 하려고 생략되어 있다.

 

클라이언트 코드

# Echo client program
import socket

HOST = 'localhost'        # The remote host
PORT = 1234               # The same port as used by the server
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.connect((HOST, PORT))
   while True:
       msg = input("Enter msg: ").encode('utf8')
       s.sendall(msg)

       data = s.recv(1024)
       print('Received: ', data.decode('utf-8'))

 

 

 

 

 

 

 

 

🐨 위에서 배운 것을 적용 : selectors 모듈을 활용해 채팅 프로그램 구현하기

* 여기서 채팅을 구현한 방식은 websocket 같은 application layer 프로토콜 적용한 건 아니고, transport layer의 TCP 소켓 프로그래밍 수준으로 구현.

 

기본 채팅 서버 구현은 다음의 두가지를 포함하고 있어야함.

1. connection들을 리스트로 관리한다.

2. 새로운 메세지가 들어오면, 모든 다른 connection으로 브로드캐스팅 해준다.

 

서버 코드

import argparse
import collections
import json
import selectors
import socket


SERVER_NUM_CONNECTIONS = 1000
BUFFER_SIZE = 1000

Message = collections.namedtuple('Message', ['user', 'text'])


class ChatServer:

   def __init__(self, **kwargs):
       self._selector = selectors.DefaultSelector()
       self._connections_msg_queue = {}
       self._host = kwargs['host']
       self._port = kwargs['port']

   ##### SELECT FUNCTIONS ########################

   def _accept(self, sock, mask):
       conn, addr = sock.accept()
       self._add_connection(conn)

   def _read_write(self, conn, mask):
       if mask & selectors.EVENT_READ:
           self._read(conn, mask)
       if mask & selectors.EVENT_WRITE:
           self._write(conn, mask)

   def _read(self, conn, mask):
       self._read_message(conn)

   def _write(self, conn, mask):
       self._write_pending_messages(conn)

   ##### CHAT FUNCTIONS ########################

   def _add_connection(self, conn):
       # register new client connection for reading (accepting new messages)
       print(f'{conn.getpeername()} hello!')
       self._connections_msg_queue[conn] = collections.deque()
       conn.setblocking(False)
       self._selector.register(conn, selectors.EVENT_READ, self._read)

   def _remove_connection(self, conn):
       print(f'{conn.getpeername()} bye bye!')
       self._selector.unregister(conn)
       conn.close()
       del self._connections_msg_queue[conn]

   def _read_message(self, conn):
       data = conn.recv(BUFFER_SIZE)  # Should be ready
       if data:
           self._add_message(conn, data)
       else:
           self._remove_connection(conn)

   def _add_message(self, sender_conn, raw_msg):
       try:
           msg = json.loads(raw_msg)
           message = Message(msg['user'], msg['text'])
           print(f"{sender_conn.getpeername()}: [{msg['user']}] {msg['text']}")
       except (json.JSONDecodeError, KeyError,) as e:
           print(f"We got unknown type of message: {raw_msg}; error: {e}")
           return

       # register every client connection for writing (broadcast recent messages)
       for conn, messages in self._connections_msg_queue.items():
           conn.setblocking(False)  # not sure if needed
           self._selector.modify(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self._read_write)
           messages.append(message)

   def _write_pending_messages(self, conn):
       messages = self._connections_msg_queue[conn]
       while messages:
           msg = messages.popleft()
           try:
               conn.send(f'[{msg.user}] {msg.text}'.encode('utf-8'))
           except Exception as e:
               print('Error occurred', e)
               self._remove_connection(conn)
               return

       # if no more message to send, don't listen to available for write
       conn.setblocking(False)  # not sure if needed
       self._selector.modify(conn, selectors.EVENT_READ, self._read)

   def run(self):
       # create and register server socket for reading (accepting new connections)
       server_sock = socket.socket()
       server_sock.bind((self._host, self._port))
       server_sock.listen(SERVER_NUM_CONNECTIONS)
       server_sock.setblocking(False)
       self._selector.register(server_sock, selectors.EVENT_READ, self._accept)

       while True:
           events = self._selector.select()
           for key, mask in events:
               callback = key.data
               callback(key.fileobj, mask)


parser = argparse.ArgumentParser(description='Chat server arguments.')
parser.add_argument('-host', nargs='?', default='localhost')
parser.add_argument('-port', nargs='?', default=1234)
args = parser.parse_args()

chat = ChatServer(**vars(args))
chat.run()

 

ChatServer 클래스

connections_msg_queue = {}는 딕셔너리 임.

key = 소켓, value = 해당 connection으로 들어온 메시지 큐 (double-ended queue)

connection_msg_queue

이 deque는 브로드캐스팅을 위해 존재한다.

어떤 한 유저로 부터 새로운 메세지가 도착하면 모든 connection의 deque에 넣는다. 그리고 큐에 메세지가 들어있는 동안은 write ready 이벤트도 모니터링하고 있다가 write 준비 되면 connection으로 메세지 전송한다 (브로드캐스팅)

 

새 연결 accepting 하기

run 함수에 있는 Event loop은 위의 예제와 거의 똑같다. 서버 소켓 생성해서 non-blocking 모드로 세팅하고 selector에 read operation에 대해 accept 함수를 콜백으로 등록해두기.

그러면 select함수는 IO ready 한 소켓 리스트를 리턴할 것이다. 우리는 그것을 순회하면서 연결된 콜백을 실행하면 된다.

 

_accept함수를 실행하면 _add_connection 이 실행된다. 

conn 상태를 non-blocking으로 세팅. 이 커넥션을 위한 메세지 큐 생성. selector에 등록하기 ( selectors.EVENT_READ 이벤트에 대한 콜백 _read )

 

connection으로 부터 새로운 메세지 수신받기

유저1이 메세지를 보냈다고 치자. select 함수가 이벤트 감지해서 리턴하면 _read 콜백을 실행할 것이다. 그러면 _read_message 함수가 실행된다.

메세지를 recv 한다음에 데이터가 없으면 커넥션 삭제하고, 데이터 있으면 다른 모든 active 커넥션의 메세지 큐에 해당 메세지 append하기. (브로드캐스팅을 위해서)

이때 각 커넥션으로 메세지를 전송하고 싶기 때문에 모니터 하고 싶은 이벤트 타입에 selectors.EVENT_WRITE 도 추가하고, 콜백은 발생한 이벤트 타입에 따라 _read 혹은 _write를 호출하는 _read_write 함수로 바꾼다. ( selector가 리턴한 mask에 read인지 write인지 관련된 IO 상태를 포함하고 있다)

 

모든 connection으로 현재 메세지 브로드캐스팅하기

메세지 큐에 메세지가 들어있을 동안만 selectors.EVENT_WRITE (write ready) 이벤트도 핸들링하도록 해서 해당 커넥션으로 메세지 send하고 큐가 비면 modify 함수 이용해서 소켓이 다시 read 이벤트만 핸들링 하게 바꿔준다.

 

 

클라이언트 코드

import argparse
import collections
import json
import selectors
import socket
import sys


BUFFER_SIZE = 1000

Message = collections.namedtuple('Message', ['user', 'text'])


class ChatClient:

   def __init__(self, **kwargs):
       self._selector = selectors.DefaultSelector()
       self._sock = None
       self._host = kwargs['host']
       self._port = kwargs['port']
       self._name = kwargs['username'] or input('Enter username:')
       self._running = True

   def _read_stdin(self, input, mask):
       data = sys.stdin.readline().strip()
       if data:
           msg = json.dumps({'user': self._name, 'text': data}, ensure_ascii=False).encode('utf8')
           self._sock.send(msg) # We should wait for selector here, but it will work.

   def _read_msg(self, conn, mask):
       data = conn.recv(BUFFER_SIZE)  # Should be ready
       if data:
           print(data.decode("utf-8"))
       else:
           print('Connection to server has failed')
           self._running = False

   def run(self):
       self._selector.register(sys.stdin, selectors.EVENT_READ, self._read_stdin)

       self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       self._sock.connect((self._host, self._port))
       self._sock.setblocking(False)
       self._selector.register(self._sock, selectors.EVENT_READ, self._read_msg)

       while self._running:
           events = self._selector.select()
           for key, mask in events:
               callback = key.data
               callback(key.fileobj, mask)


parser = argparse.ArgumentParser(description='Chat client arguments.')
parser.add_argument('-host', nargs='?', default='localhost')
parser.add_argument('-port', nargs='?', default=1234)
parser.add_argument('-username', nargs='?')
args = parser.parse_args()

chat = ChatClient(**vars(args))
chat.run()

2가지가 동시에 가능해야함

- stdin 으로 메세지 입력받고 전송하기

- 서버로 부터 받은 메세지 출력하기.

표준 입력, 표준 출력의 콤비네이션이다. 스레드로 해결할 수도 있다 (스레드 한개는 1번 담당, 다른 스레드는 2번 담당)

하지만 select를 이용해서 더 엘레강트하게 해결할 수 있는데 왜 스레드를 사용하나?

stdin도 파일이기 때문에 select로 모니터링 할 수 있다.

select로 (1) 서버와 연결된 소켓, (2) stdin 두가지를 모니터링 한다!

 

작성한 서버, 클라이언트 프로그램은 다음과 같이 작동한다

 

 

 

 

 

 

 

🐨 Reactor 디자인 패턴

위에서 작성한 Echo 서버/ 채팅 서버는 리액터 패턴으로 디자인 되어 있다. 

리액터 패턴을 구성하는 기본 요소들을 살펴보자.

- (concrete) 이벤트 핸들러

=> 특정 타입의 리소스에 대해 request를 핸들링 하는 객체 혹은 함수(콜백). 각 request 타입 마다 각자의 이벤트 핸들러를 가질 수 있다. 

위의 채팅 코드에서는 4가지의 이벤트 핸들러가 있었다.

1. _accept = 리소스 타입: 서버 소켓, 서버 소켓 핸들링하고 새로운 커넥션을 받아들인다.

2. _read = 리소스 타입: 클라이언트 연결, 클라이언트 커넥션으로 부터 새로운 메세지를 읽어들이는 것을 담당함

3. _write = 리소스 타입: 클라이언트 연결, 클라이언트 커넥션으로 메세지를 보내는 것을 담당함.

4. _read_write = 리소스 타입: 클라이언트 연결, 2,3번 핸들러를 조합한 것. 클라이언트 커넥션 소켓이 read, write를 둘다 핸들링 해야할 때 사용한다.

 

- Handles(Resources)

=> 입력을 주거나 출력을 사용하는 os 리소스. 예를 들면 파일, 소켓, 타이머, 동기화 객체 등. 

위 코드에서는 2가지 리소스 타입이 있었음.

1. 서버소켓

2. 클라이언트 커넥션 소켓

+) 클라이언트 코드에는 stdin도 있었음

 

- Synchronous Event Demultiplexer

=> select 시스템 콜(혹은 다른 대체제)에 붙여진 멋진 이름. select 함수는 입력으로 모니터 할 IO 리소스 셋을 받아서 그것들 중 1개 이상이 IO read/write에 레디 상태일 때 리턴한다.

위의 코드에서는 selectors.DefaultSelector를 사용했다. (현재 시스템에서 가장 효율적인 구현체에 해당 epoll | kqueue | devpoll > poll > select )

 

- Initiation Dispatcher(= Reactor)

=> 이벤트 핸들러를 register/unregister하고 이벤트 루프(여기서 select 함수 무한 루프로 호출) run을 담당한다.

이벤트 루프에서 synchronous event demultiplexer IO 작업에 ready인 리소스 리스트를 리턴하면, 리액터는 이 리소스들의 등록된 이벤트 핸들러를 호출 한다.

 

So as long you are running a single-threded event-loop using select to handle non-blocking IO, and dispach them with callbacks (Event Handlers) - Congratulations, you are using the Reactor Pattern!

 

 

💚 리액터 패턴을 사용하는 예시

libuv

node.js의 논 블로킹 IO 엔진

각 os마다 async IO (혹은 synchronous IO multiplexing)을 구현해 둔 방법이 다르기 때문에( epoll, kqueue, IOCP, 이벤트 포트 등등) 크로스 플랫폼 싱글 스레드 비동기 IO 이벤트 루프를 위한 추상화 레벨이 필요하다.

 

Node.js

node.js의 아키텍처

libuv -> 리액터 패턴을 위한 라이브러리

v8 -> 크롬 브라우저를 위해 구글에서 개발한 자바스크립트 엔진


nginx

수천개 혹은 그이상의의 커넥션을 핸들링 하기 위해 디자인 된 웹서버이자 로드밸런서이자 리버스 프록시

4가지 타입의 프로세스로 구성되어 있다

- 마스터 프로세스

- 캐시 로더 프로세스

- 캐시 매니저

- 워커 프로세스 => 싱글 스레드인 워커 프로세스는 IO 태스크 (네트워크, 파일) 핸들링을 위해 리액터 스타일의 이벤트 루프로 구현되어 있다. 공식 문서에 따르자면 1 CPU 코어당 1개의 워커 프로세스를 돌리는 것이 가장 효율적이라고 한다.

blocking 콜을 하는 라이브러리를 위해서 nginx도 스레드 풀을 가지고 있긴 하다는 것도 언급하는 것이 좋겠다.

 

twisted

python 이벤트 드리븐 네트워킹 엔진

from twisted.internet import protocol, reactor, endpoints

class Echo(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)

class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Echo()

endpoints.serverFromString(reactor, "tcp:1234").listen(EchoFactory())
reactor.run()

모든 종류의 서버( http, 메일, pub/sub, 등등)이 구현되어 있다.

 

eventmachine 

for 루비

 

 

 

 

🐨 C10K

어떻게 10K 동시 커넥션을 싱글 서버로 핸들링 할까하는 문제.

이 문제를 푸는 수많은 옵션이 있고 이중 몇가지는 이 포스트에서 다뤘다

Dan Kegel은 그의 논문에서 수많은 strategy를 소개하고 있는데 마지막 줄에서 리액터 패턴을 사용하는 것이 solution이라고 말하고 있다.

요즘에는 용어가 C10M으로 바꼈다. millions of 동시 연결

 

 

 

 

 

참고

🌹🌻🌸 [ I/O 모델 관련 ]

* I/O multiplexing 함수

linux -> select, poll, epoll ( O(n) 에서 O(1)로 개선된 현대 방식 )

BSD계열 -> kqueue ( epoll과 비슷)

window -> IOCP ( epoll과 비슷)

So the basic pattern is that one thread is used to call select, epoll, etc., and another thread is used to call recvfrom to process the data.

I/O model ( unix network programming )

I/O from an application perspective 👍👍

 

[기타]

The C10K problem

paper about reactor pattern

채팅 예시로 설명하는 ruby concurrency

루비 이벤트 루프

- use IO.select instead of registering interest with the OS

- The client is implemented as a Fiber

- The event loop will inform the fiber when its connection is ready (pause 했다가 레디 일때 resume)

루비 evented io 구현 라이브러리 => event-machine 

댓글
공지사항
최근에 올라온 글