asyncio 공부

https://docs.python.org/3/library/asyncio.html

Note에 provisional basis로 추가되었다고 해서 이게 뭔 소린지 봤더니 하위호환성을 포기하고 표준 라이브러리를 변경하는 경우를 일컫는 말인 듯. https://docs.python.org/3/glossary.html#term-provisional-api 심지어 PEP로 그 과정이 정의되어 있기까지 하다. 어떤 언어든 시간이 지날수록 발전해가지만 그 발전이 때때로 하위호환성을 지키느라 복잡해지는 방향으로만 가기도 하고, 메이저 버전업을 하면서 완전히 새로운 언어로 재탄생하는 바람에 하위호환성을 완전히 버리기도 하는데, 파이썬은 큰 변화를 하면서도 하위호환성을 제법 잘 지키고, 오래된 레거시에 발목 잡히지 않고 잘 빠져나오고, 나쁜 디자인을 개선하고, 시류에 한 발짝 뒤지지만 얼마 안 가서 더 나은 모습으로 시류를 수용하는 등, 진화의 과정이 매우 훌륭한데, 그 과정에 이런 개념들이 영향을 미치지 않았을까. 단순히 호환성을 깨뜨린다 아니다가 아니라, 이 기능은 하위호환성을 포기하는 것이라는 점을 설계자들이 인지한다는 사실이 좀더 믿음을 준다.

multiplexing I/O는 대충만 알고 있어서 좀더 검색해봤는데, 정확히 어떤 기술을 의미하는 것은 아니고, 그냥 하나의 프로세스에서 여러 개의 IO를 처리하는 것을 일컫는 말인 듯 하다. http://forum.falinux.com/zbxe/index.php?document_srl=463229&mid=lecture_tip 이 블로그의 그림이 명쾌하다. 이렇게 multiplexing IO를 하기 위한 방법으로 플랫폼마다 epoll, kqueue, iocp 등이 있는 것이고, 이런 라이브러리를 일관된 방식으로 래핑해서 쓸 수 있게 만들어주는 게 libev 등의 이벤트 드리븐 라이브러리라고 보면 될 듯. 나 전공자 맞아?

event loop는 multiplexing IO를 구현하는 방법으로 루프를 돌면서 IO를 체크한다. 적당한 문서를 못 찾아서 libuv의 Design Overview를 참조했는데 아직 확실히는 모르겠다. 대충 감으로 때려잡자면 일정 타임아웃 주기로 루프를 돌면서 호출해야 할 콜백이 있으면 호출하고 나서 IO 이벤트를 폴링한다. 이 시점에는 쓰레드가 블로킹된다. 그러다가 IO 이벤트가 발생하면 해당 이벤트에 걸린 콜백이 실행된다. 이벤트 루프를 여러 개 실행하고 싶으면 쓰레드가 여러 개여야 한다고 하는데, 이벤트 루프에서 IO를 블로킹으로 폴링하는 구간이 있으니까 당연히 쓰레드 하나에서 실행할 수 있는 이벤트 루프는 하나일 것이다.

아마도 asyncio의 loop도 libuv의 loop와 비슷하게 구현되어 있을 것이고, asyncio.get_event_loop()는 아마 쓰레드에서 loop가 돌고 있으면 그걸 가져오고 없으면 만드는 식이겠지. loop.run_forever()는 영원히 루프를 도는 거고, loop.run_until_complete은 특정 조건이 될 때까지만 루프를 돈다는 거겠지. 어쨋든 이제 asyncio에서 loop가 어떤 건지는 대충 안 듯.

trasport는 데이터를 실제로 전송하는 수단을 의미하는 것 같다. 이를테면 소켓이나 파일, 프로세스 등을 가리키는 듯. 열고 닫고 읽고 쓰는 등의 동작을 transport를 통해서 하게 되는 모양이다. 그리고 이 transport는 protocol 인스턴스와 짝을 이루는데, protocol은 transport에서 커넥션이 열린다든지, 데이터가 수신된다든지 할 때 수행되는 콜백을 담고 있다. 그러니까, 내가 소켓으로 뭔가 하고 싶으면 내가 소켓에다 하고 싶은 일을 담은 protocol을 소켓 transport에 넘겨주면 된다.

protocol 내의 메서드에서 코루틴을 쓸 수 있지만 순서가 보장되지 않고 순서를 보장하고 싶으면 stream objects를 쓰랜다. 

protocol-examples가 중요한 것 같다. 

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event lop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

문제는 loop.create_connection 코드인데, 일단 이 메서드는 코루틴이고 백그라운드로 커넥션을 맺어서 성공하면 이 코루틴이 (transport, protocol) 페어를 리턴 받는다. 첫번째 인자의 이름이 protocol factory라고 되어 있고 이 예제에선 lambda를 넘겨주는데, 뭐가 되었든 실행해서 protocol 객체가 나오기만 하면 되는 것 같다. 생성자 인자가 따로 없으면 그냥 내가 작성한 protocol 클래스만 넘겨주면 되겠지. 커넥션이 연결되면 transport를 생성하고 내가 넘겨준 protocol_factory로 protocol을 생성해서 넘겨주는 코루틴을 반환하는 것, 그게 loop.create_connection의 역할이다. 여기서 생성되는 transport는 구현에 의존하는 양방향 스트림이라고.

run_until_complete(coro)는 코루틴에서 응답이 올 때까지 loop를 돌리라는 것일 테고, run_forever는 응답이 오고 나서도 계속 돌리라는 거겠지. 그러다가 커넥션이 끊어지만 protocol의 connection_lost에서 loop를 정지시킨다. protocol에 loop를 전달해서 컨트롤시키는 게 좋은 방법인지는 잘 모르겠다. 예제코드라서 이렇게 해놓은 걸까?

이제 서버 쪽도 알 것 같지만 그래도 한 번 더 보자.

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until CTRL+c is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

create_server는 아마 create_connection과 마찬가지로 서버를 띄우는 코루틴을 반환할 텐데, 좀 다른 것은 그 코루틴이 앞서와 달리 server를 반환한다는 것. 그리고 나서 run_forever를 하면 계속 loop 돌면서 IO 이벤트를 기다리는 거고, 이벤트가 생기면 protocol이 호출되는 거고. ctrl-c로 중지시키면 server.close()를 실행하는데 이건 비동기로 실행되서 wait_closed()를 기다려야 하는데 이것도 코루틴이라 run_until_complete으로 기다린 후 loop를 닫는다. 근데 아마 run_until_complete을 하면 close 안해도 프로세스가 끝나니까 같이 종료되겠지.

Future는 concurrent.futures를 따라한 거라고 하니 그것부터 살펴보자. 

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

이 코드가 이해하기 좋다. executor로 submit하면 비동기로 실행이 되고 그 결과를 future를 통해서 받을 수 있다. Executor는 Thread나 Process로도 가능하다. asyncio의 Future도 별반 다르지 않은 것 같다. asyncio에서는 Future를 상속 받은 Task가 생기는 경우가 더 많은데, 코루틴을 loop.create_task로 시작하면 리턴되는 게 Task 객체인데 이게 Future이기도 하다. 그러니까 비동기 함수 호출을 하는 방식을 요약하면 이런 식인 거 같다.

@asyncio.coroutine
def do_something(): # 코루틴 함수
return do()

coro = do_something() # 코루틴 객체. 아직 do() 실행 안됨
future = loop.create_task(coro) # do() 스케쥴링되어서 비동기로 실행

loop.run_until_complete(future) do()가 완료될 때까지 루프를 돌린다.
result = future.result() 완료되면 future에서 결과를 받을 수 있다.

근데 코루틴을 정확히 이해하려면 yield from도 알아야 할 것 같다. 일단 yield from 자체는 subgenerator를 wrapping하는 generator를 만들 수 있는 놈이다. yield가 들어간 함수는 제너레이터 객체를 리턴하는 제너레이터 함수가 되듯 yield from이 들어간 함수도 제너레이터 함수고, 다른 제너레이터에 yield를 delegate시킬 수 있다. 여기까지는 쉬운데, yield from이 값을 가진다는 게 좀 혼동하기 쉽다. yield는 generator.send로 전달된 값을 받는 반면, yield from은 제너레이터가 return한 값을 받는다. 그래서 코루틴과 연동되면 yield from은 비동기 호출의 값을 바로 받는 방식으로 쓰일 수 있다.

import asyncio

@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y

@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

자주 느끼는 거지만 파이썬 문서의 예제코드들은 개념이 잘 드러난다. 코루틴 내에서는 yield from을 통해 비동기 호출 코드를 마치 sequential한 코드처럼 조립해서 쓸 수 있다. 물론 그 코루틴은 loop로 실행해줘야 하지만. ECMAScript의 yield* 같은 개념이라고 봐도 될 것 같다.

이제 내가 작성한 tcp relay server의 코드를 완전히 이해할 수 있게 되었으므로 공부는 여기서 종료.

 

p.s. 내가 원래 알고 있던 개념을 바탕으로 잘 모르는 부분만 중점적으로 공부한 노트이기 때문에 다른 사람에게는 도움이 안될 가능성이 높음.