https://docs.python.org/3/library/asyncio.html
Note에 provisional basis로 추가되었다고 해서 이게 뭔 소린지 봤더니 하위호환성을 포기하고 표준 라이브러리를 변경하는 경우를 일컫는 말인 듯. https://docs.python.org/3/glossary.html#term-provisional-api 심지어 PEP로 그 과정이 정의되어 있기까지 하다. 어떤 언어든 시간이 지날수록 발전해가지만 그 발전이 때때로 하위호환성을 지키느라 복잡해지는 방향으로만 가기도 하고, 메이저 버전업을 하면서 완전히 새로운 언어로 재탄생하는 바람에 하위호환성을 완전히 버리기도 하는데, 파이썬은 큰 변화를 하면서도 하위호환성을 제법 잘 지키고, 오래된 레거시에 발목 잡히지 않고 잘 빠져나오고, 나쁜 디자인을 개선하고, 시류에 한 발짝 뒤지지만 얼마 안 가서 더 나은 모습으로 시류를 수용하는 등, 진화의 과정이 매우 훌륭한데, 그 과정에 이런 개념들이 영향을 미치지 않았을까. 단순히 호환성을 깨뜨린다 아니다가 아니라, 이 기능은 하위호환성을 포기하는 것이라는 점을 설계자들이 인지한다는 사실이 좀더 믿음을 준다.
multiplexing I/O는 대충만 알고 있어서 좀더 검색해봤는데, 정확히 어떤 기술을 의미하는 것은 아니고, 그냥 하나의 프로세스에서 여러 개의 IO를 처리하는 것을 일컫는 말인 듯 하다. http://forum.falinux.com/zbxe/index.php?document_srl=463229&mid=lecture_tip 이 블로그의 그림이 명쾌하다. 이렇게 multiplexing IO를 하기 위한 방법으로 플랫폼마다 epoll, kqueue, iocp 등이 있는 것이고, 이런 라이브러리를 일관된 방식으로 래핑해서 쓸 수 있게 만들어주는 게 libev 등의 이벤트 드리븐 라이브러리라고 보면 될 듯. 나 전공자 맞아?
event loop는 multiplexing IO를 구현하는 방법으로 루프를 돌면서 IO를 체크한다. 적당한 문서를 못 찾아서 libuv의 Design Overview를 참조했는데 아직 확실히는 모르겠다. 대충 감으로 때려잡자면 일정 타임아웃 주기로 루프를 돌면서 호출해야 할 콜백이 있으면 호출하고 나서 IO 이벤트를 폴링한다. 이 시점에는 쓰레드가 블로킹된다. 그러다가 IO 이벤트가 발생하면 해당 이벤트에 걸린 콜백이 실행된다. 이벤트 루프를 여러 개 실행하고 싶으면 쓰레드가 여러 개여야 한다고 하는데, 이벤트 루프에서 IO를 블로킹으로 폴링하는 구간이 있으니까 당연히 쓰레드 하나에서 실행할 수 있는 이벤트 루프는 하나일 것이다.
아마도 asyncio의 loop도 libuv의 loop와 비슷하게 구현되어 있을 것이고, asyncio.get_event_loop()
는 아마 쓰레드에서 loop가 돌고 있으면 그걸 가져오고 없으면 만드는 식이겠지. loop.run_forever()
는 영원히 루프를 도는 거고, loop.run_until_complete
은 특정 조건이 될 때까지만 루프를 돈다는 거겠지. 어쨋든 이제 asyncio에서 loop가 어떤 건지는 대충 안 듯.
trasport는 데이터를 실제로 전송하는 수단을 의미하는 것 같다. 이를테면 소켓이나 파일, 프로세스 등을 가리키는 듯. 열고 닫고 읽고 쓰는 등의 동작을 transport를 통해서 하게 되는 모양이다. 그리고 이 transport는 protocol 인스턴스와 짝을 이루는데, protocol은 transport에서 커넥션이 열린다든지, 데이터가 수신된다든지 할 때 수행되는 콜백을 담고 있다. 그러니까, 내가 소켓으로 뭔가 하고 싶으면 내가 소켓에다 하고 싶은 일을 담은 protocol을 소켓 transport에 넘겨주면 된다.
protocol 내의 메서드에서 코루틴을 쓸 수 있지만 순서가 보장되지 않고 순서를 보장하고 싶으면 stream objects를 쓰랜다.
protocol-examples가 중요한 것 같다.
import asyncio class EchoClientProtocol(asyncio.Protocol): def __init__(self, message, loop): self.message = message self.loop = loop def connection_made(self, transport): transport.write(self.message.encode()) print('Data sent: {!r}'.format(self.message)) def data_received(self, data): print('Data received: {!r}'.format(data.decode())) def connection_lost(self, exc): print('The server closed the connection') print('Stop the event lop') self.loop.stop() loop = asyncio.get_event_loop() message = 'Hello World!' coro = loop.create_connection(lambda: EchoClientProtocol(message, loop), '127.0.0.1', 8888) loop.run_until_complete(coro) loop.run_forever() loop.close()
문제는 loop.create_connection 코드인데, 일단 이 메서드는 코루틴이고 백그라운드로 커넥션을 맺어서 성공하면 이 코루틴이 (transport, protocol) 페어를 리턴 받는다. 첫번째 인자의 이름이 protocol factory라고 되어 있고 이 예제에선 lambda를 넘겨주는데, 뭐가 되었든 실행해서 protocol 객체가 나오기만 하면 되는 것 같다. 생성자 인자가 따로 없으면 그냥 내가 작성한 protocol 클래스만 넘겨주면 되겠지. 커넥션이 연결되면 transport를 생성하고 내가 넘겨준 protocol_factory로 protocol을 생성해서 넘겨주는 코루틴을 반환하는 것, 그게 loop.create_connection의 역할이다. 여기서 생성되는 transport는 구현에 의존하는 양방향 스트림이라고.
run_until_complete(coro)는 코루틴에서 응답이 올 때까지 loop를 돌리라는 것일 테고, run_forever는 응답이 오고 나서도 계속 돌리라는 거겠지. 그러다가 커넥션이 끊어지만 protocol의 connection_lost에서 loop를 정지시킨다. protocol에 loop를 전달해서 컨트롤시키는 게 좋은 방법인지는 잘 모르겠다. 예제코드라서 이렇게 해놓은 걸까?
이제 서버 쪽도 알 것 같지만 그래도 한 번 더 보자.
import asyncio class EchoServerClientProtocol(asyncio.Protocol): def connection_made(self, transport): peername = transport.get_extra_info('peername') print('Connection from {}'.format(peername)) self.transport = transport def data_received(self, data): message = data.decode() print('Data received: {!r}'.format(message)) print('Send: {!r}'.format(message)) self.transport.write(data) print('Close the client socket') self.transport.close() loop = asyncio.get_event_loop() # Each client connection will create a new protocol instance coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888) server = loop.run_until_complete(coro) # Serve requests until CTRL+c is pressed print('Serving on {}'.format(server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: pass # Close the server server.close() loop.run_until_complete(server.wait_closed()) loop.close()
create_server는 아마 create_connection과 마찬가지로 서버를 띄우는 코루틴을 반환할 텐데, 좀 다른 것은 그 코루틴이 앞서와 달리 server를 반환한다는 것. 그리고 나서 run_forever를 하면 계속 loop 돌면서 IO 이벤트를 기다리는 거고, 이벤트가 생기면 protocol이 호출되는 거고. ctrl-c로 중지시키면 server.close()를 실행하는데 이건 비동기로 실행되서 wait_closed()를 기다려야 하는데 이것도 코루틴이라 run_until_complete으로 기다린 후 loop를 닫는다. 근데 아마 run_until_complete을 하면 close 안해도 프로세스가 끝나니까 같이 종료되겠지.
Future는 concurrent.futures를 따라한 거라고 하니 그것부터 살펴보자.
with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
이 코드가 이해하기 좋다. executor로 submit하면 비동기로 실행이 되고 그 결과를 future를 통해서 받을 수 있다. Executor는 Thread나 Process로도 가능하다. asyncio의 Future도 별반 다르지 않은 것 같다. asyncio에서는 Future를 상속 받은 Task가 생기는 경우가 더 많은데, 코루틴을 loop.create_task로 시작하면 리턴되는 게 Task 객체인데 이게 Future이기도 하다. 그러니까 비동기 함수 호출을 하는 방식을 요약하면 이런 식인 거 같다.
@asyncio.coroutine
def do_something(): # 코루틴 함수
return do()
coro = do_something() # 코루틴 객체. 아직 do() 실행 안됨
future = loop.create_task(coro) # do() 스케쥴링되어서 비동기로 실행
loop.run_until_complete(future) do()가 완료될 때까지 루프를 돌린다.
result = future.result() 완료되면 future에서 결과를 받을 수 있다.
근데 코루틴을 정확히 이해하려면 yield from도 알아야 할 것 같다.