diff --git a/beetsplug/bpd/bluelet.py b/beetsplug/bpd/bluelet.py index 626587813..545d87800 100644 --- a/beetsplug/bpd/bluelet.py +++ b/beetsplug/bpd/bluelet.py @@ -94,20 +94,27 @@ def _event_select(events): rlist += r wlist += w xlist += x - for waitable in r + w + x: - waitable_to_event[waitable] = event + for waitable in r: + waitable_to_event[('r', waitable)] = event + for waitable in w: + waitable_to_event[('w', waitable)] = event + for waitable in x: + waitable_to_event[('x', waitable)] = event # Perform select() if we have any waitables. if rlist or wlist or xlist: rready, wready, xready = select.select(rlist, wlist, xlist) - ready = rready + wready + xready else: - ready = [] + rready, wready, xready = (), (), () # Gather ready events corresponding to the ready waitables. ready_events = set() - for waitable in ready: - ready_events.add(waitable_to_event[waitable]) + for ready in rready: + ready_events.add(waitable_to_event[('r', ready)]) + for ready in wready: + ready_events.add(waitable_to_event[('w', ready)]) + for ready in xready: + ready_events.add(waitable_to_event[('x', ready)]) return ready_events class ThreadException(Exception): @@ -118,8 +125,12 @@ class ThreadException(Exception): raise self.exc_info[0], self.exc_info[1], self.exc_info[2] def run(root_coro): + """Schedules a coroutine, running it to completion. This + encapsulates the Bluelet scheduler, which the root coroutine can + add to by spawning new coroutines. + """ # The "threads" dictionary keeps track of all the currently- - # executing coroutines. It maps coroutines to their currently + # executing coroutines. It maps coroutines to their currenly # "blocking" event. threads = {root_coro: ValueEvent(None)} @@ -153,6 +164,9 @@ def run(root_coro): del threads[coro] raise ThreadException(coro, sys.exc_info()) else: + if isinstance(next_event, types.GeneratorType): + # Automatically invoke sub-coroutines. + next_event = DelegationEvent(next_event) threads[coro] = next_event # Continue advancing threads until root thread exits. @@ -279,14 +293,20 @@ class SendEvent(WaitableEvent): return self.conn.sock.sendall(self.data) else: return self.conn.sock.send(self.data) + class Connection(object): + """A socket-esque object for communicating asychronously on + network sockets. + """ def __init__(self, sock, addr): self.sock = sock self.addr = addr self._buf = '' def close(self): + """Close the connection.""" self.sock.close() def recv(self, size): + """Read at most size bytes of data from the socket.""" if self._buf: # We already have data read previously. out = self._buf[:size] @@ -295,49 +315,64 @@ class Connection(object): else: return ReceiveEvent(self, size) def send(self, data): + """Sends data on the socket, returning the number of bytes + successfully sent. + """ return SendEvent(self, data) def sendall(self, data): + """Send all of data on the socket.""" return SendEvent(self, data, True) def readline(self, terminator="\n", bufsize=1024): - def line_reader(): - while True: - if terminator in self._buf: - line, self._buf = self._buf.split(terminator, 1) - line += terminator - yield ReturnEvent(line) - break - data = yield self.recv(bufsize) - if data: - self._buf += data - else: - line = self._buf - self._buf = '' - yield ReturnEvent(line) - break - return DelegationEvent(line_reader()) + """Reads a line (delimited by terminator) from the socket.""" + while True: + if terminator in self._buf: + line, self._buf = self._buf.split(terminator, 1) + line += terminator + yield ReturnEvent(line) + break + data = yield ReceiveEvent(self, bufsize) + if data: + self._buf += data + else: + line = self._buf + self._buf = '' + yield ReturnEvent(line) + break # Public interface for threads; each returns an event object that # can immediately be "yield"ed. def null(): + """Event: yield to the scheduler without doing anything special. + """ return ValueEvent(None) def spawn(coro): + """Event: add another coroutine to the scheduler. Both the parent + and child coroutines run concurrently. + """ if not isinstance(coro, types.GeneratorType): raise ValueError('%s is not a coroutine' % str(coro)) return SpawnEvent(coro) def call(coro): + """Event: delegate to another coroutine. The current coroutine + is resumed once the sub-coroutine finishes. If the sub-coroutine + returns a value using end(), then this event returns that value. + """ if not isinstance(coro, types.GeneratorType): raise ValueError('%s is not a coroutine' % str(coro)) return DelegationEvent(coro) def end(value = None): - """Ends the coroutine and returns a value to its delegator.""" + """Event: ends the coroutine and returns a value to its + delegator. + """ return ReturnEvent(value) def read(fd, bufsize = None): + """Event: read from a file descriptor asynchronously.""" if bufsize is None: # Read all. def reader(): @@ -354,15 +389,29 @@ def read(fd, bufsize = None): return ReadEvent(fd, bufsize) def write(fd, data): + """Event: write to a file descriptor asynchronously.""" return WriteEvent(fd, data) +def connect(host, port): + """Event: connect to a network address and return a Connection + object for communicating on the socket. + """ + addr = (host, port) + sock = socket.create_connection(addr) + return ValueEvent(Connection(sock, addr)) + # Convenience function for running socket servers. def server(host, port, func): + """A coroutine that runs a network server. Host and port specify the + listening address. func should be a coroutine that takes a single + parameter, a Connection object. The coroutine is invoked for every + incoming connection on the listening socket. + """ def handler(conn): try: - yield call(func(conn)) + yield func(conn) finally: conn.close()