비동기 라이브러리 Asyncio 이해
concurrent.futures(다중 스레드, 비동기/Non-block 방식)
Asyncio를 이해하기 전에,
thread, process에서의 concurrent 프로그래밍 방식으로 python 3.2에 추가된 future(PEP 3148 – futures - execute computations asynchronously)를 먼저 이해할 필요가 있다. 이후 만들어진 asyncio도 이와 동일한 API로 만들어진 것이다. concurrent.futures 모듈에는 ThreadPoolExecutor와 ProcessPoolExecutor 클래스가 존재한다.
concurrent.futures는 기본적으로 threading.Thread보다 고수준의 API를 구성한다. 기존의 스레드 모델은 일련의 코드를 다른 스레드에서 실행하는 것에 중점을 둔다. 단일 스레드에서는 메인루틴이 서브 루틴을 호출하는 경우, 이 서브루틴의 작업이 완료되기 전까지 메인루틴은 작업을 더 이상 진행하지 않는다. 이때 서브 루틴의 코드가 별도의 스레드에서 작동한다면, 메인 루틴은 서브 루틴이 처리하는 작업 때문에 ‘진행에 방해를 받지 않고’ 하던 일을 계속할 수 있다.
기존의 threading.Thread 모듈은 각각의 스레드가 싸우지 않도록 실행 흐름을 완전히 통제하는 데에 주안점을 두었다. 즉 어떤 데이터 A에 대한 처리를 별도의 work 스레드에서 처리하고 그 결과로 B가 만들어진다면 이 B를 원래의 스레드 혹은 메인 스레드로 전달할 방법에 대해서는 지금 당장 고려하지 않는다. 반면, concurrent.futures 모듈은 각각의 스레드에서 분산 처리된 결과를 손쉽게 원래의 스레드로 쉽게 전달받는 것에 주안점을 두었다. 이 고수준의 API는 직접 스레드를 제어하는 것이 아닌 Future 객체를 사용함으로써 자바스크립트의 Promise 개념을 도입한 것이다.
Future는 별도의 실행 흐름에서 처리되는 코드를 캡슐한 것이다. 이는 작업이 다른 어딘가에서 처리중이며 '미래의 적절한 시점에 그 결과가 반환되어 나오게 된다'는 약속을 의미한다. Future로 실행된 결과나 현재 상태를 조회할 수 있고, 결과가 완료된 Future부터 future.result()로 결과 값을 반환 받는다.
Future 객체의 중요 메소드 *add_done_callback() 메소드 중요
- cancel() : 작업 취소. 현재 실행 중이고 취소가 불가능할 경우 False를 리턴.작업이 취소되었다면 True가 리턴
- canceled() : 취소가 완료된 작업이면 True를 리턴
- running(): 실행 중인 경우 True를 리턴
- done(): 작업이 완료되었고 정상적으로 종료되었다면 True를 리턴
- result(timeout=): 결과를 대기한 후 리턴. timeout= 인자가 주어지면 해당 초까지 기다린다.
- add_done_callback(): future 객체가 완료될 때 호출될 함수(callback)를 등록할 수 있다. callback은 future를 인자로 받는 형태여야 하며, future이 완료되거나 취소될 때 호출된다. 콜백을 추가하는 시점에 이미 작업이 완료/취소되었다면 콜백이 즉시 호출된다.
Future의 사용 방법
- executor.submit(work, *args, **kwds)으로 특정 함수를 별도의 실행 흐름에서 시작(executor)하고, 해당 작업의 완료를 기다리면서 Block되지 않는 Future 객체를 얻는다.
- 1의 시점 직후 Future 내의 작업은 계속 실행 중이며 그 결과는 생성되지 않았을 것이다.
- Future 객체 내의 완료 여부와 상관 없이 Future에 callback이란 것을 추가하거나, 혹은 작업을 취소할 수 있다. 그리고 여전히 현재 스레드에서 다른 작업을 처리할 수도 있다.
- 현재 스레드에서 작업의 결과가 필요한 시점이 오면 future.result()를 통해서 결과를 요청한다. 이 시점에서 Future의 결과가 생성되어 있다면 그 결과는 즉시 반환된다. 만약 Future 내부의 코드가 아직 처리중이라면 result() 메소드는 그 시점부터 결과의 생성을 기다리도록 현재 스레드(메인 루트)를 중지(block)시킬 수 있다.
간단한 예제로 PEP 3148에 다음과 같은 web crawler가 있다.
from concurrent import futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
def main():
with futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = dict(
(executor.submit(load_url, url, 60), url)
for url in URLS)
for future in futures.as_completed(future_to_url):
url = future_to_url[future]
try:
print(f'{url} page is {len(future.result())} bytes')
except Exception as e:
print(f'{url} generated an exception: {e}')
if __name__ == '__main__':
main()
코드를 하나씩 들여다보자
1) executor.submit(적용할 함수, 매개변수)으로 thread pool에서 돌릴 함수를 등록하면 future클래스를 반환한다.
등록된 함수는 thread pool에서 비동기(별도의 실행 흐름)로 실행된다. executor는 호출 가능한 객체(함수나 코루틴)를 비동기 호출하는 일종의 실행기이다. submit 메소드는 실행 함수와 인자값을 받아서 Future 클래스를 반환한다.
with futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = dict(
(executor.submit(load_url, url, 60), url)
for url in URLS)
print(future_to_url)
executor.submit()과 url을 딕셔너리로 만들면, {Future(key): url(value)} 형태로 future_to_url 변수에 저장
Future 객체는 호출 가능 객체를 비동기 캡슐화한다. Executor에서 submit으로 전달된 함수를 비동기 실행, 실행된 결과나 현재 상태를 조회 가능하도록 만든다.
{
<Future at 0x23f4baaae20 state=running>: 'http://www.foxnews.com/',
<Future at 0x23f4baaaf10 state=running>: 'http://www.cnn.com/',
<Future at 0x23f4b9911c0 state=running>: 'http://some-made-up-domain.com/'
}
2) futures.as_completed()로 결과가 완료된 순서대로 이터레이터 객체를 리턴 받을 수 있다. 위와 같이 for .. in에 넣어 loop를 돌릴 수 있다. 비정상 종료되거나 완료된 future 객체부터 차례대로 나오게 된다.
for future in futures.as_completed(future_to_url):
# 딕셔너리(future_to_url)의 키(future)값 출력
print(future)
<Future at 0x23f4bb87280 state=finished returned bytes>
<Future at 0x23f4bb87b20 state=finished returned bytes>
<Future at 0x23f4ba45c70 state=finished raised HTTPError>
3) future.result()로 결과 값을 받을 수 있다.
for future in futures.as_completed(future_to_url):
print(future.result())
b'<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge"/>
<meta name="viewport" content="width=device-width,minimum-scale=1,initial-scale=1">
<meta name="pagetype" content="homepage"/><meta name="robots" content="noarchive,noodp">
<title>Fox News - Breaking News Updates | Latest News Headlines | Photos & News Videos</title>
<meta name="description" content="Breaking News, Latest News and Current News from FOXNews.com. Breaking news and video. Latest Current News: U.S., World, Entertainment, Health, Business, Technology, Politics, Sports."><meta name="keywords" content="news, breaking news, latest news, current news, world news, national news"><link rel="shortcut icon" href="//static.foxnews.com/static/orion/styles/img/fox-news/favicons/favicon.ico" type="image/x-icon"/><link rel="apple-touch-icon" sizes="57x57" href="//static.foxnews.com/static/orion/styles/img/fox-news/favicons/apple-touch-icon-57x57.png"><link \nrel="apple-touch-icon" sizes="60x60" href="//static.foxnews.com/static/orion/styles/img/fox-news/favicons/apple-touch-icon-60x60.png"><link rel="apple-touch-icon" sizes="72x72" href="//static.foxnews.com/static/orion/styles/img/fox-news/favicons/apple-touch-icon-72x72.png"><link rel="apple-touch-icon" sizes="76x76" href="//static.foxnews.com/static/orion/styles/img/fox-news/favicons/apple-touch-icon-76x76.png"><link rel="apple-touch-icon" sizes="114x114" href="//static.foxnews.com/static/orion/styles/img/fox-news/favicons/apple-touch-icon-114
만일 future 내의 함수 load_url()에서 exception이 발생한 것도 future를 통하여 호출한 thread에서 받을 수 있게 된다. 위 예제와 같이 future.result()를 try/except 문으로 감싸서 해당 작업에서 발생한 예외도 받을 수 있다.
for future in futures.as_completed(future_to_url):
# url(값) = 딕셔너리[키]
url = future_to_url[future]
try:
print(f'{url} page is {len(future.result())} bytes')
except Exception as e:
print(f'{url} generated an exception: {e}')
>>>
http://www.cnn.com/ page is 1126209 bytes
http://www.foxnews.com/ page is 271398 bytes
http://some-made-up-domain.com/ generated an exception: HTTP Error 400: Bad Request
이와 같이 future가 있으면 child thread에서 발생한 exception도 쉽게 처리가 가능해진다.
Asyncio(단일 스레드, 비동기/Non-block 방식)
다중 스레드 및 다중 프로세스에 대해서 Future를 적용하는 것이 성공적이었다면, 단일 스레드에 대해서도 비동기 non-blocking 코드를 작성하는데에 Future 개념을 도입할 수 있지 않을까하는 것으로 아이디어가 옮겨갔다.
Python 3.4에서는 Event Loop 기반의 코루틴 형식으로 실행되는 비동기 프로그래밍이 asyncio 표준 라이브러리로 새로 추가되었다. 단일 thread에서 multi tasking을 하는 것과 유사한 기능을 수행할 수 있게 되었다.
NodeJS는 단일 스레드 비동기 I/O를 사용하여 불필요한 대기 시간을 줄이는 것으로 더 빠른 성능을 내고 있다. 예를 들어 DB 쿼리 요청이나 네트워크 요청, 파일 I/O 등의 작업은 CPU를 거의 사용하지 않지만, 해당 작업에 대한 결과를 리턴할 때까지 스레드 흐름이 멈춘 상태가 된다. 이러한 대기시간 동안에 해당 작업 외에 다른 작업으로의 전환을 단일 스레드에서 처리할 수 있는 기반 기술들은 파이썬의 코루틴이다.
CPU에 부하가 집중되는 다른 작업들과는 달리 I/O 작업은 CPU와 개별적으로 동작이 가능하다는 점에 착안하여, I/O 처리를 기다리는 역할을 코루틴에게 위임하고, 그 시간 동안 다른 코루틴을 이용해서 별도의 작업을 또 비동기로 처리할 수 있다. 이러한 작업을 독립적으로 처리해줄 수 있는 구조가 파이썬에는 이미 코루틴이라는 이름으로 갖춰져 있었고, 미완료 작업을 액세스할 수 있는 방법이 Future에 의해 준비되었으므로, 둘을 결합하여 단일 스레드 기반 non-blocking API를 출시할 수 있었다.
GIL 제약을 우회했다거나, 멀티스레드에서 하던 걸 단일 스레드로 할 수 있게 하는 것은 아니고 I/O 작업과 CPU 중심 작업을 병렬로 처리한다는 것이고 결국 이는 NodeJS의 non-blocking 비동기 처리에 더 근접하는 개념이다.
Python 3.4 제너레이터 기반 코루틴 생성
Python 3.4에서 Asyncio 모듈은 @asyncio.coroutine을 사용하여 일명 제너레이터 기반 코루틴 객체를 생성한다. @는 데코레이터라고 부른다.
decorator는 함수를 parameter로 받아서 다시 함수를 리턴하는 함수이다. @asyncio.coroutine도 decorator다.
@asyncio.coroutine을 통해 내부 코드를 실행 예정인 제너레이터 객체를 만들지만, 함수 내부에 yield from를 포함하지 않으면 코루틴으로서 실행되지 않는다. 그래서 이 decoreator는 실제로 특별한 기능은 수행치 않고, asyncio와 같이 사용하는 coroutine이라고 표기하는 목적이다. 즉, 빼고 사용해도 특별히 문제될 것은 없다. 어차피 함수 내부에 yield from를 명시하면 제너레이터 객체로 생성된다.
def func(arg1, arg2, ...):
pass
func = dec1(func)
# 위의 func = dec1(func) 코드는 @dec1와 동일
@dec1
def func(arg1, arg2, ...):
pass
----------------------------------------------
@asyncio.coroutine
def sample():
pass
sample()
>> <generator object aaa at 0x000001DC5FD4E270>
일반적으로 호출 함수(caller)에서 반복적으로 next(), send()를 이용하여 yield에 멈춰있는 coroutine을 재개시킨다. 마치 중단되어 있는 coroutine이 계속 실행되는 것처럼 보이는 것이다. Coroutine(coroutine A)에서는 내부적으로 다시 coroutine(coroutine B)을 호출 할 수 있다. 이때는 편리하게 yield from으로 호출하면 호출된 coroutine B가 yield가 반복되어 최종 리턴될 때까지 coroutine A는 기다리게 된다. Caller, coroutine A, coroutine B를 놓고 보면 Caller가 send()를 호출 할 때마다 coroutine B의 yield가 풀리는 셈이 된다.
send()를 반복적으로 끊임없이 호출하여 멈춰있는 코루틴을 실행시키는 역할을 asyncio의 Event Loop한다. 이렇게 되면 coroutine도 event loop에서 마치 별도의 thread에서 도는 것과 같이 실행되는 셈이 된다. coroutine 여러 개를 event loop에서 관리하기 위해서는 future에서 상속받은 task라는 것을 사용한다.
외부 관점에서 볼 때 concurrent.future의 future는 다중스레드이기 때문에 내부적으로 작업이 계속 돌아가는 중이지만, asyncio의 Future는 아직 완료되지 않고 멈춰 있는 작업 상태이다. task 객체는 asyncio.future의 자식 클래스이며concurrent.future.Future와 거의 같은 API를 제공하고 있다.
- result()를 이용해서 결과를 얻거나
- done(), cancelled() 를 이용해 완료/취소 여부를 확인할 수 있으며, cancel() 메소드로 취소할 수 있다.
- add_done_callback()을 이용해서 완료 콜백함수를 삽입할 수 있다.
yield로 두 개가 concurrent하게 돌아가는 이벤트 루프를 만들어 보면 다음과 같다. next()나 send()를 반복적으로 끊임없이 호출하여 멈춰있는 코루틴을 실행시키는 역할을 하는 것이 이벤트 루프라고 했다.
def coroutine1():
print('C1: Start')
yield
print('C1: hello')
yield
print('C1: end')
def coroutine2():
print('C2: Start')
yield
print('C2: hello')
yield
print('C2: end')
# 이벤트 루프 실행기
def run(cors):
while cors:
for co in [c1,c2]:
co.send(None)
# 코루틴 객체 생성 => asyncio def
c1 = coroutine1()
c2 = coroutine2()
task = [c1, c2]
# 코루틴 객체들을 비동기적으로 실행시키기 위한 실행기 => 이벤트 루프
# 코루틴을 모아놓을 빈 리스트 [] 생성 => asyncio.get_event_loop()
# 실행시킬 코루틴을 리스트에 등록(예약) [c1, c2] => asyncio.ensure_future() 혹은 asyncio.create_task()
# send() 메소드를 호출해 코루틴 실행할 함수 run() => run_until_*()
run(task)
위의 예시에서 run() 함수 역할을 asyncio에서는 이벤트 루프가 하게 된다.
asyncio 라이브러리에서는 이벤트 루프(런 루프)라는 주체가 내가 만든 코루틴 함수들을 자동으로 호출해준다. 이벤트 루프가 없다면 위의 예시처럼 next()나 send()를 일일이 수동으로 호출해야 하는 제너레이너일 뿐이다.
이벤트 루프가 다중스레드 concurrent.executor와 같이 동기 코드에서 비동기 코드로 진입하는 일종의 실행기라면, 이 실행기가 다른 비동기 코루틴을 실행할 수 있도록 asyncio.ensure_future() 혹은 asyncio.create_task()을 호출해 코루틴 함수를 등록(schedule)하면 task 객체라는 것이 생성된다. 즉, task는 코루틴을 감싼 객체다. asyncio.ensure_future()는 다중 스레드 모듈인 concurrent.futures의 submit()과 동일한 역할을 한다고 할 수 있다.
task 객체(코루틴 실행을 예약(등록)한 객체, 생성되는 즉시 런루프에 등록)
코루틴을 실행하는 future류의 객체다. 즉, Task는 기본적으로 future 객체의 기능을 전부 가지고 있기 때문에, 퓨처 객체와 마찬가지로 어떠한 작업의 실행 상태 및 완료 상태를 담고 있다. concurrent.future 모듈의 다중 스레드에서 future는 여러 스레드에 의해 작업들이 계속 실행 중이었기 때문에 별도로 작업의 중단과 실행을 개시할 필요가 없었다.
단일 스레드 코루틴 기반의 asyncio.future도 작업의 상태를 담고 있지만, 별도로 작업의 중단과 실행을 개시를 따로 해주는 역할을 task에 부여하였다. task 객체는 생성될 때 코루틴 객체를 넘겨받아 _coro 필드에 저장하고, Task를 만들려면 저수준의 asyncio.ensure_future() 또는 loop.create_task() 함수를 사용한다. 파이썬 3.7부터는 task를 만든다는 더 명확한 의미로 asyncio.create_task() 함수로 업데이트되었다.
"만약 코루틴이 Future를 기다리고 있다면, 태스크는 코루틴의 실행을 일시 중지하고 Future의 완료를 기다립니다. 그동안 이벤트 루프는 다른 태스크, 콜백을 실행하거나 IO 연산을 수행합니다. (협업 스케줄링)
Future가 완료되면, 감싸진 코루틴의 실행이 다시 시작됩니다. 이벤트 루프는 한 번에 하나의 Task를 실행합니다."
Coroutine, Future, Task
Coroutine, Future, Task 객체는 모두 Awaitable(대기가능) 객체이다.
대기 가능 객체는 yield from(await) + [다음에 올 수 있는 것]으로 이해하면 된다. 이들은 실행흐름이 메인 루틴과 연결되지 않았지만, 작업이 완료되면 실행 결과를 기다리고 있는 await 지점으로 결과를 반환하고 실행을 종료한다. 제너레이터에서 배웠던 것처럼 서브 루틴의 return 값은 부모 루틴으로 반환되는 것이다.
Future는 어떤 비동기 동작의 결과를 내포하는 저수준의 대기가능 객체이다. Future를 await하는 것은 어떤 코루틴이 아직 실행 중에 있고, 그 결과가 다른 어디에서 만들어지고 있다는 것을 의미한다. Future는 주로 콜백에 기반한 디자인에 사용되며, 애플리케이션 레벨에서 Future 객체를 직접 만들 이유는 거의 없다.
# 제너레이터
def gen():
yield from iterable, iterator, generator 객체
gen()
>>> <generator object gen at 0x0000029CF89EA040>
-----------------------------------------------------------------
# 제너레이터 기반 코루틴
import asyncio
# 아래 데코레이터를 굳이 안써도 yield from을 통해 제너레이터 객체를 생성
@asyncio.coroutine
def 함수이름():
yield from iterator, generator(coroutine), furture, task 객체
함수이름()
>>> <generator object 함수이름 at 0x000001D4DD700350>
-----------------------------------------------------------------
이벤트 루프(런 루프)
예를 들어 코루틴 A가 yield from으로 웹 사이트에서 파일을 다운로드(또는 asyncio.sleep(30))하는 I/O작업을 호출했다고 해보자. 그럼 현재 스레드는 정지하지 않고 파일을 다운로드 받는 동안 어떻게 다른 코루틴 C를 바로 실행시킬 수 있을까? 그리고 요청했던 I/O 작업 하나가 완료되었다고 하자. 그러면 그 I/O 작업을 호출했던 코루틴 A가 다시 이어서 실행을 계속해야 하는데 어떻게 원래의 코루틴 A가 이를 알고 작업을 이어서 수행해나갈 수 있을까?
이 역할들을 이벤트 루프가 하게 된다. 이벤트 루프는 일종의 무한 루프로서 아무것도 안하고 상시대기 상태로 있다가 이벤트 루프에 [ 여러 개의 코루틴을 예약(schedule) => task 생성 ]하고 실행해나간다.
따라서 먼저 코루틴을 등록할 이벤트 루프가 필요하며, 비동기 작업을 처리하기 전에는 런루프를 가동 시켜놔야 한다.
이벤트 루프는 asyncio.get_event_loop() 함수로 만들고, run_until_*() 함수를 통해서 동기 코드에서 비동기 코드로 진입할 수 있다. 파이썬 3.7에서는 위의 두 개의 함수를 asyncio.run() 함수를 사용하여 한번에 처리할 수 있다. 파이썬 3.8에서는 python -m asyncio로 쉘을 시작하면 쉘 자체가 런루프 내에서 돌아간다. 즉 비동기 코루틴 함수를 실행해서 코루틴을 바로 실행할 수 있다.
Python 3.5 async, await을 사용하는 네이티브 코루틴
import asyncio
# 비동기 코루틴
async def func():
...
func()
>> <coroutine object test at 0x000001DC5FD231C0>
# 다른 비동기 코루틴을 호출
async def 함수이름():
await func()
Python 3.5에서는 coroutine임을 명시적으로 지정하는 async def와 yield, yield from를 대체하는 await 키워드가 추가 되었다. 기존의 yield 혹은 yield from를 사용하는 제너레이터 기반 corourinte과 비교하기 위하여 native coroutine이라고 한다. (PEP 492 – Coroutines with async and await syntax)
제너레이터 기반 코루틴은 iterator부터 시작하여 generator로 확장한 것이라, 이 개념까지 이해하고 있어야 했다. Python 3.5부터는 이를 명확히 정의하여 새로 native coroutine를 만들었다. async def로 코루틴 함수를 정의하면 코루틴 객체가 생성된다.
네이티브 코루틴은 async def로 코루틴 함수를 정의하여 코루틴 객체를 생성한다. 함수 내부에 await를 사용하지 않아도 async def로 정의된 함수는 코루틴 객체가 된다.
1. 코루틴 함수 호출
그러나 await가 없으면 RuntimeWarning: coroutine 'test' was never awaited 경고 메세지가 나온다.
공식 문서에 따르면, 이 경고 문구는 코루틴 함수가 호출되었지만 기다리지 않을 때나 코루틴이 asyncio.create_task()로 예약되지 않으면 asyncio가 RuntimeWarning을 내보낸다. 해결 방법은 코루틴을 await하거나 asyncio.create_task() 함수를 호출하면 된다. 즉, 비동기 코루틴 함수를 호출했지만 await가 없어서 제어권을 못 주고 test() 코루틴 함수가 실행되지 않은 것이다.
import asyncio
async def test():
print("never scheduled")
async def main():
test()
asyncio.run(main())
>>> 7: RuntimeWarning: coroutine 'test' was never awaited
test()
https://docs.python.org/ko/3/library/asyncio-task.html#coroutine
그래서 코루틴 함수를 호출할 때는 코루틴 함수를 호출하는 메인 루틴에 await를 추가하거나 asyncio.create_task()로 이벤트 루프에 코루틴 객체를 전달하여 실행을 등록해야 한다.
import asyncio
# 1.
async def test():
print("never scheduled")
async def main():
await test()
asyncio.run(main())
>> never scheduled
----------------------------------------
# 2.
async def test():
print("never scheduled")
async def main():
task1 = asyncio.create_task(test())
asyncio.run(main())
>> never scheduled
2. 코루틴을 await로 실행하기
import asyncio
async def nested_co():
return 100
async def main_co():
nested_co() # 1
print(await nested_co()) # 2
asyncio.run(main_co())
>> 100
위 예제의 #1의 코드처럼 nested_co()를 호출하면 코루틴 객체를 생성만 하고 실행을 하지 않는다. 따라서 코루틴을 실행하기 위해서는 #2처럼 코루틴 객체에 await를 붙여 해당 작업을 실행하고 리턴 값을 기다리는 동작을 명시해야 한다. main_co() 함수는 await 라인에서 nested_c0() 코루틴 객체가 결과값을 리턴할 때까지 대기한다.
3. 코루틴을 await로 바로 실행하지 않고, 실행 가능한 상태로만 등록: task 생성
# 일반 함수
def doSomething():
print('1')
print('2')
def main():
doSomething()
print('3')
main()
>> 1
2
3
----------------------------------------
# 비동기 함수
import asyncio
async def sub_coroutine():
print('1')
print('2')
async def main():
task1 = asyncio.create_task(sub_coroutine())
print('3')
asyncio.run(main())
>> 3
1
2
일반 함수와 다르게 asyncio.create_task() 함수에 코루틴 객체를 전달하여 task 객체를 만들면, task로 만들어진 코루틴은 이벤트 루프에 자동으로 등록되고, 실행을 예약(등록)해 놓는다. 그러면 현재 진행 중인 코드가 잠시 쉬는 시점(await를 만나는 시점)에 실행될 기회를 얻게 된다. 위 예제에서는 await가 없기 때문에 main() 코드의 실행이 끝나고 나면, 이벤트 루프가 닫히기 전에 이벤트 루프에 등록된 task 객체를 실행하게 된다.
만약 main() 함수에 await가 추가되면 이벤트 루프에 등록된 task 객체가 먼저 실행된다.
import asyncio
async def sub_coroutine():
print('1')
print('2')
async def main():
task1 = asyncio.create_task(sub_coroutine())
await task1
print('3')
asyncio.run(main())
>> 1
2
3
4. 반환값 저장: 변수 = await task
import asyncio
async def sub_coroutine():
print('1')
print('2')
return '서브루틴 실행 종료'
async def main():
task1 = asyncio.create_task(sub_coroutine())
result = await task1
print(f'반환 값: {result}')
asyncio.run(main())
>> 1
2
반환 값: 서브루틴 실행 종료
5. 서브 코루틴의 반환값을 반환: return await task
import asyncio
async def sub_coroutine():
print('1')
print('2')
return '서브루틴 실행 종료'
async def main():
task1 = asyncio.create_task(sub_coroutine())
result = await task1
return result
asyncio.run(main())
>> '서브루틴 실행 종료'
-------------------------------------------------
return await task1
반환 값을 변수에 저장한 뒤, 반환하는 위의 두 줄은 반환 값을 변수로 받지 않고 return await로 바로 반환이 가능
6. 여러 코루틴 한번에 처리
1) asyncio.gather(*aws)
비동기로 두 개 이상의 작업(코루틴)을 등록할 때에는 asyncio.gather() 함수를 이용한다. 이때, 각 태스크들은 unpacked 형태로 넣어주어야 한다. 즉, asyncio.gather(co_1(), co_2()) 혹은 asyncio.gather(*[co_1(), co_2()])처럼 넣어야 한다. 결과는 반환된 값들이 합쳐진 리스트다. 결과값의 순서는 *aws에 있는 어웨이터블 객체 순서와 일치한다.
import asyncio
async def co1():
print('코루틴 1 실행')
await asyncio.sleep(5)
print('코루틴 1 재개')
return '코루틴 1 종료'
async def co2():
print('코루틴 2 실행')
await asyncio.sleep(2)
print('코루틴 2 재개')
return '코루틴 2 종료'
async def main():
tasks = [co1(), co2()]
result = await asyncio.gather(*tasks)
print(result)
asyncio.run(main())
>> 코루틴 1 실행
코루틴 2 실행
코루틴 2 재개
코루틴 1 재개
['코루틴 1 종료', '코루틴 2 종료']
2) asyncio.as_completed(aws, *, timeout=None)
https://docs.python.org/ko/3/library/asyncio-task.html#running-tasks-concurrently
코루틴의 이터레이터를 반환한다. 이터레이터이기 때문에 for 반복문과 함께 쓰이며, asyncio.gather(*aws)와 달리 완료된 코루틴부터 이터레이터로 반환한다. *주피터 노트북에서 테스트시, 작업 시작 순서는 co2부터 진행될 때도 있는데 코루틴 등록 순서와 실행 순서는 일치하지 않는 것인가?
async def co1():
print('코루틴 1 실행')
await asyncio.sleep(5)
return '코루틴 1 종료'
async def co2():
print('코루틴 2 실행')
await asyncio.sleep(2)
return '코루틴 2 종료'
async def main():
tasks = [co1(), co2()]
for co in asyncio.as_completed(tasks):
result = await co
print(f'반환 값: {result}')
asyncio.run(main())
>>> 코루틴 2 실행
코루틴 1 실행
반환 값: 코루틴 2 종료
반환 값: 코루틴 1 종료
기존의 yield from을 대체 하기 위하여 다음과 같은 사항이 await 오른쪽에 올 수 있다.
- native coroutine object
- 오래된 제네레이터 기반의 것들은 import type 이후, @types.coroutine 데코레이터로 추가
- __await__ method를 가진 object를 리턴하는 iterator
- CPython API를 위한 tp_as_async.am_await
yield로 두 개가 concurrent하게 돌아가는 이벤트 루프 코드를 async, await로 바꿔보자. 우선 yield처럼 뒤에 피연산자 없이 그냥 await만 사용이 안 된다. await만으로는 동일한 로직을 만들 수 없어서 generator base coroutine으로 task switching을 하도록 하여 구현한다.
import types
@types.coroutine
def switch():
yield
async def coro1():
print('C1: Start')
await switch()
print('C1: a')
await switch()
print('C1: end')
async def coro2():
print('C2: Start')
await switch()
print('C2: a')
await switch()
print('C2: end')
def run(cors):
while cors:
for corou in [c1,c2]:
corou.send(None)
c1 = corou1()
c2 = corou2()
task = [c1, c2]
run(task)
Async, await의 확장된 Data Model
- Awaitable object
- __await__()가 구현된 객체. async def 함수을 호출하여 리턴되는 native coroutine이 awaitable 객체
- object.__await__(self)에서 iterator가 리턴되어, await에서 사용된다. Future의 경우도 __await__()가 구현되어서 await에 사용할 수 있는 것이다.
- Coroutine object
- Awaitable object 중 하나
- 여기에 coroutine.send(), coroutine.throw(type[, value[, traceback]]), coroutine.close()이 구현
- Asynchronous Iterators
- iterator와 비슷하게 __aiter__(), __anext__()가 구현된 객체
- 이 객체는 새로 추가된 async for에 사용할 수 있다.
- Asynchronous Context Managers
- __aenter__(), __aexit() 메소드가 구현된 객체
- 이 객체는 새로 추가된 async with에 사용할 수 있다.
Awatiable object는 기존의 generator based coroutine과 유사하게 __await__()로 iterator를 얻은 후 이를 send()를 이용하여 반복되는 구조가 된다. Asynchronous Context Manager인 async with를 살펴보자.
async with EXPR as VAR:
CODE_BLOCK
위와 같은 문장은 다음과 같은 code와 동일하게 실행된다.
mgr = (EXPR)
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__(mgr)
VAR = await aenter
try:
CODE_BLOCK
except:
if not await aexit(mgr, *sys.exc_info()):
raise
else:
await aexit(mgr, None, None, None)
with 문장이 시작될 때 __aenter__()가 호출되어 작업이 실행되고, 자동으로 __aexit__()가 호출되어 작업이 종료된다.
정리
- async def로 코루틴 함수 선언
- 동기 코드에서 비동기 코드로 진입: asyncio.run()
- 흐름 분기할 작업 등록(새로운 micro-thread 생성): asyncio.create_task(f())
- 흐름 유지(함수 호출): await f() -> 비동기 함수 f()를 호출하고, 리턴값을 기다리면서, 다른 작업 진행
하지만 asyncio를 활용하는 데는 몇가지 한계가 남아있다. asyncio가 아닌 파이썬 표준 라이브러리들은 blocking하게 작성되어 있기 때문에, 의도적으로 함수 중간에 asyncio.sleep()을 넣지 않는 이상 작업들이 순차적으로 실행된다. blocking한 함수들을 어떻게 asyncio와 같이 사용할 수 있을 지는 다음에 포스팅하겠다.
참고:
https://issuu.com/hanbit.co.kr/docs/____________________________________6a74b523ede5e6
https://www.youtube.com/watch?v=QaiczQzJAmA
https://blog.humminglab.io/posts/python-coroutine-programming-2/#asyncio