diff --git a/beetsplug/bpd/__init__.py b/beetsplug/bpd/__init__.py index 5c3632547..ac308214f 100644 --- a/beetsplug/bpd/__init__.py +++ b/beetsplug/bpd/__init__.py @@ -17,7 +17,7 @@ Beets library. Attempts to implement a compatible protocol to allow use of the wide range of MPD clients. """ -import eventlet +import bluelet import re from string import Template import traceback @@ -232,13 +232,8 @@ class BaseServer(object): interrupt (^C) closes the server. """ self.startup_time = time.time() - try: - self.listener = eventlet.listen((self.host, self.port)) - while True: - sock, address = self.listener.accept() - eventlet.spawn_n(Connection.handle, sock, self) - except KeyboardInterrupt: - pass # ^C ends the server. + bluelet.run(bluelet.server(self.host, self.port, + Connection.handler(self))) def _item_info(self, item): """An abstract method that should response lines containing a @@ -288,13 +283,13 @@ class BaseServer(object): if self.password and not conn.authenticated: # Not authenticated. Show limited list of commands. for cmd in SAFE_COMMANDS: - conn.send(u'command: ' + cmd) + yield u'command: ' + cmd else: # Authenticated. Show all commands. for func in dir(self): if func.startswith('cmd_'): - conn.send(u'command: ' + func[4:]) + yield u'command: ' + func[4:] def cmd_notcommands(self, conn): """Lists all unavailable commands.""" @@ -304,7 +299,7 @@ class BaseServer(object): if func.startswith('cmd_'): cmd = func[4:] if cmd not in SAFE_COMMANDS: - conn.send(u'command: ' + cmd) + yield u'command: ' + cmd else: # Authenticated. No commands are unavailable. @@ -317,13 +312,13 @@ class BaseServer(object): Gives a list of response-lines for: volume, repeat, random, playlist, playlistlength, and xfade. """ - conn.send(u'volume: ' + unicode(self.volume), - u'repeat: ' + unicode(int(self.repeat)), - u'random: ' + unicode(int(self.random)), - u'playlist: ' + unicode(self.playlist_version), - u'playlistlength: ' + unicode(len(self.playlist)), - u'xfade: ' + unicode(self.crossfade), - ) + yield (u'volume: ' + unicode(self.volume), + u'repeat: ' + unicode(int(self.repeat)), + u'random: ' + unicode(int(self.random)), + u'playlist: ' + unicode(self.playlist_version), + u'playlistlength: ' + unicode(len(self.playlist)), + u'xfade: ' + unicode(self.crossfade), + ) if self.current_index == -1: state = u'stop' @@ -331,15 +326,15 @@ class BaseServer(object): state = u'pause' else: state = u'play' - conn.send(u'state: ' + state) + yield u'state: ' + state if self.current_index != -1: # i.e., paused or playing current_id = self._item_id(self.playlist[self.current_index]) - conn.send(u'song: ' + unicode(self.current_index)) - conn.send(u'songid: ' + unicode(current_id)) + yield u'song: ' + unicode(self.current_index) + yield u'songid: ' + unicode(current_id) if self.error: - conn.send(u'error: ' + self.error) + yield u'error: ' + self.error def cmd_clearerror(self, conn): """Removes the persistent error state of the server. This @@ -415,7 +410,7 @@ class BaseServer(object): def cmd_moveid(self, conn, id_from, idx_to): idx_from = self._id_to_index(idx_from) - self.cmd_move(conn, idx_from, idx_to) + return self.cmd_move(conn, idx_from, idx_to) def cmd_swap(self, conn, i, j): """Swaps two tracks in the playlist.""" @@ -441,7 +436,7 @@ class BaseServer(object): def cmd_swapid(self, conn, i_id, j_id): i = self._id_to_index(i_id) j = self._id_to_index(j_id) - self.cmd_swap(conn, i, j) + return self.cmd_swap(conn, i, j) def cmd_urlhandlers(self, conn): """Indicates supported URL schemes. None by default.""" @@ -454,15 +449,15 @@ class BaseServer(object): index = cast_arg(int, index) if index == -1: for track in self.playlist: - conn.send(*self._item_info(track)) + yield self._item_info(track) else: try: track = self.playlist[index] except IndexError: raise ArgumentIndexError() - conn.send(*self._item_info(track)) + yield self._item_info(track) def cmd_playlistid(self, conn, track_id=-1): - self.cmd_playlistinfo(conn, self._id_to_index(track_id)) + return self.cmd_playlistinfo(conn, self._id_to_index(track_id)) def cmd_plchanges(self, conn, version): """Sends playlist changes since the given version. @@ -471,7 +466,7 @@ class BaseServer(object): just returns the entire playlist (rather like version=0). This seems to satisfy many clients. """ - self.cmd_playlistinfo(conn) + return self.cmd_playlistinfo(conn) def cmd_plchangesposid(self, conn, version): """Like plchanges, but only sends position and id. @@ -479,33 +474,32 @@ class BaseServer(object): Also a dummy implementation. """ for idx, track in enumerate(self.playlist): - conn.send(u'cpos: ' + unicode(idx), - u'Id: ' + unicode(track.id), - ) + yield u'cpos: ' + unicode(idx) + yield u'Id: ' + unicode(track.id) def cmd_currentsong(self, conn): """Sends information about the currently-playing song. """ if self.current_index != -1: # -1 means stopped. track = self.playlist[self.current_index] - conn.send(*self._item_info(track)) + yield self._item_info(track) def cmd_next(self, conn): """Advance to the next song in the playlist.""" self.current_index += 1 if self.current_index >= len(self.playlist): # Fallen off the end. Just move to stopped state. - self.cmd_stop(conn) + return self.cmd_stop(conn) else: - self.cmd_play(conn) + return self.cmd_play(conn) def cmd_previous(self, conn): """Step back to the last song.""" self.current_index -= 1 if self.current_index < 0: - self.cmd_stop(conn) + return self.cmd_stop(conn) else: - self.cmd_play(conn) + return self.cmd_play(conn) def cmd_pause(self, conn, state=None): """Set the pause state playback.""" @@ -523,7 +517,7 @@ class BaseServer(object): if index == -1: # No index specified: start where we are. if not self.playlist: # Empty playlist: stop immediately. - self.cmd_stop(conn) + return self.cmd_stop(conn) if self.current_index == -1: # No current song. self.current_index = 0 # Start at the beginning. # If we have a current song, just stay there. @@ -539,7 +533,7 @@ class BaseServer(object): index = -1 else: index = self._id_to_index(track_id) - self.cmd_play(conn, index) + return self.cmd_play(conn, index) def cmd_stop(self, conn): """Stop playback.""" @@ -552,75 +546,64 @@ class BaseServer(object): if index < 0 or index >= len(self.playlist): raise ArgumentIndexError() self.current_index = index - def cmd_seekid(self, track_id, pos): + def cmd_seekid(self, conn, track_id, pos): index = self._id_to_index(track_id) - self.cmd_seek(conn, index, pos) + return self.cmd_seek(conn, index, pos) class Connection(object): """A connection between a client and the server. Handles input and output from and to the client. """ - - def __init__(self, client, server): + def __init__(self, server, sock): """Create a new connection for the accepted socket `client`. """ - self.client, self.server = client, server + self.server = server + self.sock = sock self.authenticated = False - def send(self, *lines): - """Send lines, which are strings, to the client. A newline is - added after every string. + def send(self, lines): + """Send lines, which which is either a single string or an + iterable consisting of strings, to the client. A newline is + added after every string. Returns a Bluelet event that sends + the data. """ + if isinstance(lines, basestring): + lines = [lines] out = NEWLINE.join(lines) + NEWLINE log.debug(out[:-1]) # Don't log trailing newline. if isinstance(out, unicode): out = out.encode('utf8') - self.client.sendall(out) - - line_re = re.compile(r'([^\r\n]*)(?:\r\n|\n\r|\n|\r)') - def lines(self): - """A generator yielding lines (delimited by some usual newline - code) as they arrive from the client. - """ - buf = '' - while True: - # Dump new data on the buffer. - chunk = self.client.recv(BUFSIZE) - if not chunk: break # EOF. - buf += chunk - - # Clear out and yield any lines in the buffer. - while True: - match = self.line_re.match(buf) - if not match: break # No lines remain. - yield match.group(1) - buf = buf[match.end():] # Remove line from buffer. + return self.sock.sendall(out) def do_command(self, command): - """Run the given command and give an appropriate response.""" + """A coroutine that runs the given command and sends an + appropriate response.""" try: - command.run(self) + yield bluelet.call(command.run(self)) except BPDError, e: # Send the error. - self.send(e.response()) + yield self.send(e.response()) else: # Send success code. - self.send(RESP_OK) + yield self.send(RESP_OK) def run(self): """Send a greeting to the client and begin processing commands - as they arrive. Blocks until the client disconnects. + as they arrive. """ - self.send(HELLO) + yield self.send(HELLO) clist = None # Initially, no command list is being constructed. - for line in self.lines(): + while True: + line = (yield self.sock.readline()).strip() + if not line: + break log.debug(line) if clist is not None: # Command list already opened. if line == CLIST_END: - self.do_command(clist) + yield bluelet.call(self.do_command(clist)) clist = None # Clear the command list. else: clist.append(Command(line)) @@ -632,18 +615,19 @@ class Connection(object): else: # Ordinary command. try: - self.do_command(Command(line)) + yield bluelet.call(self.do_command(Command(line))) except BPDClose: # Command indicates that the conn should close. self.client.close() return @classmethod - def handle(cls, client, server): - """Creates a new `Connection` for `client` and `server` and runs - it. - """ - cls(client, server).run() + def handler(cls, server): + def _handle(sock): + """Creates a new `Connection` and runs it. + """ + return cls(server, sock).run() + return _handle class Command(object): """A command issued by the client for processing by the server. @@ -673,9 +657,9 @@ class Command(object): self.args.append(arg) def run(self, conn): - """Executes the command on the given connection. + """A coroutine that executes the command on the given + connection. """ - # Attempt to get correct command function. func_name = 'cmd_' + self.name if not hasattr(conn.server, func_name): @@ -690,7 +674,10 @@ class Command(object): try: args = [conn] + self.args - func(*args) + results = func(*args) + if results: + for data in results: + yield conn.send(data) except BPDError, e: # An exposed error. Set the command name and then let @@ -724,12 +711,11 @@ class CommandList(list): self.verbose = verbose def run(self, conn): - """Execute all the commands in this list. + """Coroutine executing all the commands in this list. """ - for i, command in enumerate(self): try: - command.run(conn) + yield bluelet.call(command.run(conn)) except BPDError, e: # If the command failed, stop executing. e.index = i # Give the error the correct index. @@ -738,7 +724,7 @@ class CommandList(list): # Otherwise, possibly send the output delimeter if we're in a # verbose ("OK") command list. if self.verbose: - conn.send(RESP_CLIST_VERBOSE) + yield conn.send(RESP_CLIST_VERBOSE) @@ -834,18 +820,18 @@ class Server(BaseServer): if artist is None: # List all artists. for artist in self.lib.artists(): - conn.send(u'directory: ' + seq_to_path((artist,), PATH_PH)) + yield u'directory: ' + seq_to_path((artist,), PATH_PH) elif album is None: # List all albums for an artist. for album in self.lib.albums(artist): parts = (album.artist, album.album) - conn.send(u'directory: ' + seq_to_path(parts, PATH_PH)) + yield u'directory: ' + seq_to_path(parts, PATH_PH) elif track is None: # List all tracks on an album. for item in self.lib.items(artist, album): - conn.send(*self._item_info(item)) + yield self._item_info(item) else: # List a track. This isn't a directory. raise BPDError(ERROR_ARG, 'this is not a directory') - def _listall(self, conn, path=u"/", info=False): + def _listall(self, path=u"/", info=False): """Helper function for recursive listing. If info, show tracks' complete info; otherwise, just show items' paths. """ @@ -854,34 +840,34 @@ class Server(BaseServer): # artists if not artist: for a in self.lib.artists(): - conn.send(u'directory: ' + a) + yield u'directory: ' + a # albums if not album: for a in self.lib.albums(artist or None): parts = a.artist, a.album - conn.send(u'directory: ' + seq_to_path(parts, PATH_PH)) + yield u'directory: ' + seq_to_path(parts, PATH_PH) # tracks items = self.lib.items(artist or None, album or None) if info: for item in items: - conn.send(*self._item_info(item)) + yield self._item_info(item) else: for item in items: - conn.send(u'file: ' + self._item_path(i)) + yield u'file: ' + self._item_path(i) def cmd_listall(self, conn, path=u"/"): """Send the paths all items in the directory, recursively.""" - self._listall(conn, path, False) + return self._listall(path, False) def cmd_listallinfo(self, conn, path=u"/"): """Send info on all the items in the directory, recursively.""" - self._listall(conn, path, True) + return self._listall(path, True) # Playlist manipulation. - def _add(self, conn, path, send_id=False): + def _add(self, path, send_id=False): """Adds a track or directory to the playlist, specified by a path. If `send_id`, write each item's id to the client. """ @@ -894,7 +880,7 @@ class Server(BaseServer): found_an_item = True self.playlist.append(item) if send_id: - conn.send(u'Id: ' + unicode(item.id)) + yield u'Id: ' + unicode(item.id) if not found_an_item: # No items matched. @@ -910,25 +896,26 @@ class Server(BaseServer): """Adds a track or directory to the playlist, specified by a path. """ - self._add(conn, path, False) + return self._add(path, False) def cmd_addid(self, conn, path): """Same as `cmd_add` but sends an id back to the client.""" - self._add(conn, path, True) + return self._add(path, True) # Server info. def cmd_status(self, conn): - super(Server, self).cmd_status(conn) + for line in super(Server, self).cmd_status(conn): + yield line if self.current_index > -1: item = self.playlist[self.current_index] - conn.send(u'bitrate: ' + unicode(item.bitrate/1000)) + yield u'bitrate: ' + unicode(item.bitrate/1000) #fixme: missing 'audio' (pos, total) = self.player.time() - conn.send(u'time: ' + unicode(pos) + u':' + unicode(total)) + yield u'time: ' + unicode(pos) + u':' + unicode(total) #fixme: also missing 'updating_db' @@ -943,14 +930,14 @@ class Server(BaseServer): result = c.execute(statement).fetchone() artists, albums = result[0], result[1] - conn.send(u'artists: ' + unicode(artists), - u'albums: ' + unicode(albums), - u'songs: ' + unicode(songs), - u'uptime: ' + unicode(int(time.time() - self.startup_time)), - u'playtime: ' + u'0', #fixme - u'db_playtime: ' + unicode(int(totaltime)), - u'db_update: ' + unicode(int(self.startup_time)), #fixme - ) + yield (u'artists: ' + unicode(artists), + u'albums: ' + unicode(albums), + u'songs: ' + unicode(songs), + u'uptime: ' + unicode(int(time.time() - self.startup_time)), + u'playtime: ' + u'0', #fixme + u'db_playtime: ' + unicode(int(totaltime)), + u'db_update: ' + unicode(int(self.startup_time)), #fixme + ) # Searching. @@ -974,7 +961,7 @@ class Server(BaseServer): searching. """ for tag in self.tagtype_map: - conn.send(u'tagtype: ' + tag) + yield u'tagtype: ' + tag def _tagtype_lookup(self, tag): """Uses `tagtype_map` to look up the beets column name for an @@ -1017,7 +1004,7 @@ class Server(BaseServer): beets.library.AnySubstringQuery, kv) for item in self.lib.items(query=query): - conn.send(*self._item_info(item)) + yield self._item_info(item) def cmd_find(self, conn, *kv): """Perform an exact match for items.""" @@ -1025,7 +1012,7 @@ class Server(BaseServer): None, kv) for item in self.lib.items(query=query): - conn.send(*self._item_info(item)) + yield self._item_info(item) def cmd_list(self, conn, show_tag, *kv): """List distinct metadata values for show_tag, possibly @@ -1051,8 +1038,8 @@ class Server(BaseServer): _, key = self._tagtype_lookup(tag) query = beets.library.MatchQuery(key, value) songs, playtime = query.count(self.lib) - conn.send(u'songs: ' + unicode(songs), - u'playtime: ' + unicode(int(playtime))) + yield u'songs: ' + unicode(songs) + yield u'playtime: ' + unicode(int(playtime)) # "Outputs." Just a dummy implementation because we don't control @@ -1060,10 +1047,10 @@ class Server(BaseServer): def cmd_outputs(self, conn): """List the available outputs.""" - conn.send(u'outputid: 0', - u'outputname: gstreamer', - u'outputenabled: 1', - ) + yield (u'outputid: 0', + u'outputname: gstreamer', + u'outputenabled: 1', + ) def cmd_enableoutput(self, conn, output_id): output_id = cast_arg(int, output_id) @@ -1160,4 +1147,3 @@ class BPDPlugin(BeetsPlugin): cmd.func = func return [cmd] - diff --git a/beetsplug/bpd/bluelet.py b/beetsplug/bpd/bluelet.py new file mode 100644 index 000000000..d5da1dbad --- /dev/null +++ b/beetsplug/bpd/bluelet.py @@ -0,0 +1,312 @@ +"""Extremely simple pure-Python implementation of coroutine-style +asynchronous socket I/O. Inspired by, but inferior to, Eventlet. +Bluelet can also be thought of as a less-terrible replacement for +asyncore. + +Bluelet: easy concurrency without all the messy parallelism. +""" +import socket +import select +import sys +import types + + +# Basic events used for thread scheduling. + +class Event(object): + pass +class WaitableEvent(Event): + def waitables(self): + """Return "waitable" objects to pass to select. Should return + three iterables for input readiness, output readiness, and + exceptional conditions (i.e., the three lists passed to + select()). + """ + return (), (), () + def fire(self): + pass + +class ValueEvent(Event): + """An event that does nothing but return a fixed value.""" + def __init__(self, value): + self.value = value + +class ExceptionEvent(Event): + """Raise an exception at the yield point. Used internally.""" + def __init__(self, exc_info): + self.exc_info = exc_info + +class SpawnEvent(object): + def __init__(self, coro): + self.spawned = coro + +class DelegationEvent(object): + def __init__(self, coro): + self.spawned = coro + +class ReturnEvent(object): + def __init__(self, value): + self.value = value + + +# Core logic for executing and scheduling threads. + +def _event_select(events): + """Perform a select() over all the Events provided, returning the + ones ready to be fired. + """ + # Gather waitables. + waitable_to_event = {} + rlist, wlist, xlist = [], [], [] + for event in events: + if isinstance(event, WaitableEvent): + r, w, x = event.waitables() + rlist += r + wlist += w + xlist += x + for waitable in r + w + x: + waitable_to_event[waitable] = event + + # Perform select() if we have any waitables. + if rlist or wlist or xlist: + rready, wready, xready = select.select(rlist, wlist, xlist) + ready = rready + wready + xready + else: + ready = [] + + # Gather ready events corresponding to the ready waitables. + ready_events = set() + for waitable in ready: + ready_events.add(waitable_to_event[waitable]) + return ready_events + +class ThreadException(Exception): + def __init__(self, coro, exc_info): + self.coro = coro + self.exc_info = exc_info + def reraise(self): + raise self.exc_info[0], self.exc_info[1], self.exc_info[2] + +def run(root_coro): + # The "threads" dictionary keeps track of all the currently- + # executing coroutines. It maps coroutines to their currenly + # "blocking" event. + threads = {root_coro: ValueEvent(None)} + + # When one thread delegates to another thread, its execution is + # suspended until the delegate completes. This dictionary keeps + # track of each running delegate's delegator. + delegators = {} + + def advance_thread(coro, value, is_exc=False): + """After an event is fired, run a given coroutine associated with + it in the threads dict until it yields again. If the coroutine + exits, then the thread is removed from the pool. If the coroutine + raises an exception, it is reraised in a ThreadException. If + is_exc is True, then the value must be an exc_info tuple and the + exception is thrown into the coroutine. + """ + try: + if is_exc: + next_event = coro.throw(*value) + else: + next_event = coro.send(value) + except StopIteration: + # Thread is done. + del threads[coro] + if coro in delegators: + # Resume delegator. + threads[delegators[coro]] = ValueEvent(None) + except: + # Thread raised some other exception. + del threads[coro] + raise ThreadException(coro, sys.exc_info()) + else: + threads[coro] = next_event + + # Continue advancing threads until root thread exits. + exit_te = None + while threads: + try: + # Look for events that can be run immediately. Continue + # running immediate events until nothing is ready. + while True: + have_ready = False + for coro, event in threads.items(): + if isinstance(event, SpawnEvent): + threads[event.spawned] = ValueEvent(None) # Spawn. + advance_thread(coro, None) + have_ready = True + elif isinstance(event, ValueEvent): + advance_thread(coro, event.value) + have_ready = True + elif isinstance(event, ExceptionEvent): + advance_thread(coro, event.exc_info, True) + have_ready = True + elif isinstance(event, DelegationEvent): + del threads[coro] # Suspend. + threads[event.spawned] = ValueEvent(None) # Spawn. + delegators[event.spawned] = coro + have_ready = True + elif isinstance(event, ReturnEvent): + # Thread is done. + del threads[coro] + if coro in delegators: + threads[delegators[coro]] = ValueEvent(event.value) + have_ready = True + + # Only start the select when nothing else is ready. + if not have_ready: + break + + # Wait and fire. + event2coro = dict((v,k) for k,v in threads.iteritems()) + for event in _event_select(threads.values()): + value = event.fire() + advance_thread(event2coro[event], value) + + except ThreadException, te: + # Exception raised from inside a thread. + event = ExceptionEvent(te.exc_info) + if te.coro in delegators: + # The thread is a delegate. Raise exception in its + # delegator. + threads[delegators[te.coro]] = event + else: + # The thread is root-level. Raise in client code. + exit_te = te + break + + except: + # For instance, KeyboardInterrupt during select(). Raise + # into root thread and terminate others. + threads = {root_coro: ExceptionEvent(sys.exc_info())} + + # If any threads still remain, kill them. + for coro in threads: + coro.close() + + # If we're exiting with an exception, raise it in the client. + if exit_te: + exit_te.reraise() + + +# Sockets and their associated events. + +class AcceptEvent(WaitableEvent): + def __init__(self, listener): + self.listener = listener + def waitables(self): + return (self.listener.sock,), (), () + def fire(self): + sock, addr = self.listener.sock.accept() + return Connection(sock, addr) +class Listener(object): + def __init__(self, host, port): + self.host = host + self.port = port + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind((host, port)) + self.sock.listen(5) + def accept(self): + return AcceptEvent(self) + def close(self): + self.sock.close() + +class ReceiveEvent(WaitableEvent): + def __init__(self, conn, bufsize): + self.conn = conn + self.bufsize = bufsize + def waitables(self): + return (self.conn.sock,), (), () + def fire(self): + return self.conn.sock.recv(self.bufsize) +class SendEvent(WaitableEvent): + def __init__(self, conn, data, sendall=False): + self.conn = conn + self.data = data + self.sendall = sendall + def waitables(self): + return (), (self.conn.sock,), () + def fire(self): + if self.sendall: + return self.conn.sock.sendall(self.data) + else: + return self.conn.sock.send(self.data) +class Connection(object): + def __init__(self, sock, addr): + self.sock = sock + self.addr = addr + self._buf = '' + def close(self): + self.sock.close() + def recv(self, size): + if self._buf: + # We already have data read previously. + out = self._buf[:size] + self._buf = self._buf[size:] + return ValueEvent(out) + else: + return ReceiveEvent(self, size) + def send(self, data): + return SendEvent(self, data) + def sendall(self, data): + return SendEvent(self, data, True) + def readline(self, terminator="\n", bufsize=1024): + def line_reader(): + while True: + if terminator in self._buf: + line, self._buf = self._buf.split(terminator, 1) + line += terminator + yield ReturnEvent(line) + break + data = yield self.recv(bufsize) + if data: + self._buf += data + else: + line = self._buf + self._buf = '' + yield ReturnEvent(line) + break + return DelegationEvent(line_reader()) + + +# Public interface for threads; each returns an event object that +# can immediately be "yield"ed. + +def null(): + return ValueEvent(None) + +def spawn(coro): + if not isinstance(coro, types.GeneratorType): + raise ValueError('%s is not a coroutine' % str(coro)) + return SpawnEvent(coro) + +def call(coro): + if not isinstance(coro, types.GeneratorType): + raise ValueError('%s is not a coroutine' % str(coro)) + return DelegationEvent(coro) + +def end(value = None): + return ReturnEvent(value) + + +# Convenience function for running socket servers. + +def server(host, port, func): + def handler(conn): + try: + yield call(func(conn)) + finally: + conn.close() + + listener = Listener(host, port) + try: + while True: + conn = yield listener.accept() + yield spawn(handler(conn)) + except KeyboardInterrupt: + pass + finally: + listener.close()