시리즈/Concurrency

파이썬 비동기 프로그래밍

빅또리 2022. 3. 9. 13:15

파이썬 비동기 프로그램을 위한 문법은 어떻게 될까? 아래에 잘 정리된 문서가 있었다. 한번 알아보자

🐲 speed up your python program with concurrency

 

 

python concurrency method : threading vs asyncio vs multiprocessing

 

thread, task, process 세가지 모두 => sequence of instructions that run in order

high level에서 본다면 세가지 모두 다음과 같은 공통점이 있음 : cpu에서 실행하고, 중간 상태를 저장하고 멈췄다가 나중에 그 지점부터 다시 실행할 수 있는 instruction+메타 데이터 묶음(?)이라는 것.

 

그렇지만 multiprocessing만 진짜로 시간적으로 동시에 실행될 수 있음(parallelism도 가능).

threading과 asyncio는 GIL때문에(GIL없는 다른 언어에는 해당 사항 없음) one at a time실행. 고로 only utilize single processor. (concurrency만 가능)

 

thread와 task(asyncio)의 차이점은 어떤게 실행되게 할지 스케줄링 하는데에 있다. (which thread/task take turn)

threading => preemptive multitasking. os가 모든 스레드를 알고 있고 중간에 interrupt할 수 있음. 실행 흐름을 개발자가 관리하지 않음. 실행 중 언제 switch될 지 알 수 없음. (spawn n개의 native thread)

asyncio => cooperative multitasking. 코드 내에서 언제 switched out 될 지 "yield"해야 함 (이전 글 을 참고하면 좋다). 개발자가 명시적으로 control flow 제어 가능. not be swapped out in the middle of a Python statement (n개의 coroutine/tasks on top of single native thread)

 

(참고로)

Python threads are implemented using OS threads in all implementations I know (C Python, PyPy and Jython). For each Python thread, there is an underlying OS thread.  (출처 : Stack overflow 답변 중..)

-> python이나 ruby나 해당 언어의 thread가 native thread랑 1:1 매칭 되는 듯?

 

 

python 프로그램이 새로 산 랩탑의 멋진 멀티 core를 모두 활용하려면? : multiprocessing을 하라.

새로운 프로세스를 생성해서 코드 실행을 함. 새로운 프로세스라 함은 거의 그냥 별개의 프로그램인 것으로 생각해도 좋다. 프로세스 마다 각자의 파이썬 인터프리터를 가지고, 메모리, 파일 핸들링 등 자원 관리도 따로.

파이썬 GIL은 인터프리터 당 있는거니까, 프로세스끼리는 서로 다른 코어에서 동시 실행이 가능하다.

 

 

그럼 언제 concurrency가 유용할까?

=> I/O-bound 할때

I/O bound process vs CPU bound process

I/O bound process 속도 개선 => 여러 파일시스템이든 네트워크든 IO operation 기다리는 시간을 최대한 overlapping할 것.

cpu bound process 속도 개선 => 같은 시간 동안 computation을 더 많이 할 수 있는 방법을 찾기. 여러 코어가 일 나눠서 하기.

 

 

I/O bound 프로그램 속도 개선하기

예제 상황 : 몇 가지 사이트에서 네트워크로 페이지 다운로드 하기

 

(1) 동기수행 : non-concurrent version

import requests
import time

def download_site(url, session):
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")

def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)

if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

* requests.get 을 바로사용하는 대신, Session 객체를 사용하였다. 아래와 같은 이점이 있다고 함

- persist parameters, cookies

- connection pooling, reuse underlying TCP connection 

=> performance increase

 

 

 

(2) threading

* 추상클래스 concurrent.futures.Executor를 상속 받은 concrete 클래스에는 ThreadPoolExecutor, ProcessPoolExecutor 2가지가 있음.

executor가 스레드풀 실행 관리. executor.map으로 pool에 있는 스레드들로 알아서 concurrent한 함수 실행을 한다.

import concurrent.futures
import threading
import requests
import time

# thread-local 클래스의 인스턴스 생성
# represents data whose values are thread specific
thread_local = threading.local()

# requests.Session는 thread-safe 하지 않기 때문에
# 각 thread 마다 각자의 requests.Session 객체를 생성해야 함.
def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session

def download_site(url):
    session = get_session()
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")

def download_all_sites(sites):
    # executor와 thread pool로 
    # Thrad.start(), Thread.join() 방식보다 high-level로 실행 관리.
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)

if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

thread-safe한 코드를 작성하려면 1) threading.Lock (코드 블럭이나 어떤 메모리 주소에 한번에 한 스레드만 접근할 수 있도록)

2) threading.local() 각자 thread만 접근할 수 있는 local 데이터 객체를 사용하는 방법 등이 있다.

threading 버전의 동작 방식

동기 방식보다 훨씬 빨라졌지만, 코드 작성이 조금 더 까다롭고, race condition 유발로 실행 결과가 random 하거나 간헐적인 버그 발생으로 관리가 어려워질 수 있다.

 

 

(3) asyncio 

asyncio 모듈이 파이썬 3.4 부터 standard library에 포함되었다고 한다. (이글을 쓰고 있는 2022.3월 현재 가장 latest release는 3.10이다)

 

asyncio의 기본 컨셉은 event loop라는 싱글 파이썬 객체가 어떤 task가 어떻게 실행될지 관리하는 것이다.

event loop은 모든 task가 어떤 state인지 다 알고 있다.

task가 속할 수 있는 상태는 실제로는 더 많지만, ready와 waiting 두가지로 단순화해서 설명해보도록 하자.

event loop은 각 상태에 대한 task 큐들을 관리한다. waiting 리스트에 있던 task의 IO작업이 완료되면 ready 큐로 갈 수 있음.

Event loop은 ready 큐에서 다음 실행될 task를 고른다. task가 cooperative 하게 event loop으로 실행 control을 돌려주기 전까지는 ("yield") 계속 task의 코드 실행. event loop이 종료될 때까지 이 작업을 반복한다.

asyncio의 중요한 포인트는 task가 자발적으로 실행 control을 넘겨주기 전에는 실행 중간에 interrupt되지 않는다는 것이다. 이 점이 threading과 달리 코드 thread-safety에 대해 고민하지 않도록 해줌.

 

stack overflow : How does asyncio actually work?

 

 

python의 3가지 awaitable 객체 (await expression 뒤에 쓸 수 있는)

coroutine => 파이썬에서는 async def 키워드로 정의할 수 있다. stop, resume할 수 있는 함수로 근본적으로는 제너레이터에서 파생되었다. await 키워드는 제너레이터의 yield from과 같은 역할을 한다. (outer 제너레이터에서 inner 제너레이터의 yield 결과 받기.)

future =>  general concept of a container of an async result, JavaScript promise 객체 같은 것. state(pending/cancelled/finished)랑 result를 들고 있음. 보통 application level에서 직접 사용할 일은 별로 없고, 라이브러리 코드 같은 것 보다 보면 나온다고 함.

task => subclass of future specialized for executing coroutines. 코루틴을 concurrently 스케줄링 하기 위해 사용. create_task, ensure_future같은 함수로 coroutine을 wrapping 해서 만든다.


* coroutine 직접 호출 vs task로 wrapping 한 호출

coroutine 직접 호출 => run sequentially

task로 wrapping한 호출 => run parallel

 

* asyncio.gather -> javascript Promise.all 같은 것. awaitable 객체들을 concurrent하게 샐행해준다. 그 중에 coroutine이 있으면 알아서 Task로 감싸서 스케줄링.

import asyncio
import time
import aiohttp

async def download_site(session, url):
    async with session.get(url) as response:
        print("Read {0} from {1}".format(response.content_length, url))

async def download_all_sites(sites):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in sites:
            # 최신 버전 파이썬에서는 create_task 메소드를 권장
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    # python3.7부터는 asyncio.run()로 대체 가능
    asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} sites in {duration} seconds")

Reuests 대신 aiohttp 패키지를 사용했다. threading 버전과 다르게 session 객체를 모든 task에서 공유할 수 있다. 

위의 예제에서 모든 task는 같은 싱글 스레드에서 동작하고, 한 task가 다른 task를 인터럽트 하는 일은 발생하지 않는다.

그리고 thread 개수는 늘리는데 한계가 있는데(너무 많은 스레드를 생성하면 오히려 그것을 관리하기 위한 오버헤드가 더 커짐)

task는 훨씬 경량이라 괜찮다. 

asyncio 버전 동작 방식

Requests 대신 aiohttp를 사용한 것 처럼 asyncio를 위한 async 라이브러리를 사용해야 한다. (blocking하지 않는)

그리고 개발자가 어떻게 cooperative 하게 코드를 짜는지에 따라 성능이 많이 달라질 수 있다. 

그리고 asyncio를 사용할땐 다음을 명심해라 : event-loop로 control을 넘기지 않는 task는 다른 모든 task들을 blocking할 수 있다는 것!

 

(4) multiprocessing

위의 3가지 버전과의 차이점은, multiprocessing 버전은 다중 코어의 이득을 실질적으로 얻을 수 있다는 것이다.

나머지 버전이 싱글 코어에서만 동작하는 이유는 Cpython의 Global Interpreter Lock 때문.

multiprocessing 라이브러리는 high level에서 보자면, 새로운 python 인터프리터 인스턴스를 생성해서 각 코어에서 동작할 수 있도록 한다. 스레드, 태스트 생성보다 훨씬 heavyweight.. 그래서 I/O bound 한 작업에서는 별로 효과가 없다.

import requests
import multiprocessing
import time

# 프로세스마다 각자 1개의 global session 가진다. no shared memory
session = None

def set_global_session():
    global session
    if not session:
        session = requests.Session()

def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f"{name}:Read {len(response.content)} from {url}")

def download_all_sites(sites):
    # pool에 있는 프로세스 개수는 코드가 동작하는 컴퓨터 cpu 개수에 따라 생성
    with multiprocessing.Pool(initializer=set_global_session) as pool:
        pool.map(download_site, sites)

if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

main 프로세스랑 자식 프로레스 communication은 multiprocessing 모듈이 핸들링 해준다.

multiprocessing 버전 동작 방식

 

 

* 실행시간 비교

non-concurrent -> 14.28sec

threading -> 3.72sec

asyncio -> 2.57sec

multiprocessing -> 5.71sec

 

 

 

CPU-bound 프로그램 속도 개선하기

 

(1) non-concurrent

import time

def cpu_bound(number):
    return sum(i * i for i in range(number))

def find_sums(numbers):
    for number in numbers:
        cpu_bound(number)

if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

single cpu에서 동작한다.

 

 

(2) threading / asyncio

=> 속도 개선에 전혀 도움이 되지 않는다. 오히려 느려짐.

IO bound 에서는 기다리는 시간을 중첩할 수 있도록 도와주는 용도로 사용했기 때문에 속도가 개선 됐지만, 

파이썬에서는 GIL때문에 어차피 같은 프로세스 안에 있는 thread, task는 모두 같은 CPU 위에서 동작하므로 non-concurrent 버전의 작업 + 불필요한 context-switching이나 스레드, 태스트 세팅 작업만 추가되었을 뿐이다.

 

 

(3) multiprocessing

import multiprocessing
import time

def cpu_bound(number):
    return sum(i * i for i in range(number))

def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)

if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

드디어 multiprocessing이 빛을 발하는 상황에 도달했다. 다른 concurrency 라이브러리랑 다르게 multiprocessing은 실질적으로 heavy cpu workload를 여러 cpu가 나눠서 처리할 수 있도록 해준다.

위의 예시로는 잘 드러나지 않지만 실제로 작업을 쪼개서 각자 일하게 하도록 만드는 것은 어렵다. 그리고 프로세스 간의 통신이 필요한 경우도 많은데 추가적인 shared-memory라든지 message-passing을 사용하는 IPC를 사용해야 하는 것도 까다로운 포인트 중 하나이다.

 

 

* 실행시간 비교

non-concurrent -> 7.83sec

threading -> 10.4sec

multiprocessing ->  2.52sec

 

 

🚀 seven concurrency model with python