diff --git a/beetsplug/bpd/bluelet.py b/beets/util/bluelet.py similarity index 70% rename from beetsplug/bpd/bluelet.py rename to beets/util/bluelet.py index 43b62d2a5..f22e5b29c 100644 --- a/beetsplug/bpd/bluelet.py +++ b/beets/util/bluelet.py @@ -11,21 +11,50 @@ import sys import types import errno import traceback +import time +import collections + + +# A little bit of "six" (Python 2/3 compatibility): cope with PEP 3109 syntax +# changes. + +PY3 = sys.version_info[0] == 3 +if PY3: + def _reraise(typ, exc, tb): + raise exc.with_traceback(tb) +else: + exec(""" +def _reraise(typ, exc, tb): + raise typ, exc, tb + """) # Basic events used for thread scheduling. class Event(object): + """Just a base class identifying Bluelet events. An event is an + object yielded from a Bluelet thread coroutine to suspend operation + and communicate with the scheduler. + """ pass + class WaitableEvent(Event): + """A waitable event is one encapsulating an action that can be + waited for using a select() call. That is, it's an event with an + associated file descriptor. + """ def waitables(self): - """Return "waitable" objects to pass to select. Should return + """Return "waitable" objects to pass to select(). Should return three iterables for input readiness, output readiness, and exceptional conditions (i.e., the three lists passed to select()). """ return (), (), () + def fire(self): + """Called when an assoicated file descriptor becomes ready + (i.e., is returned from a select() call). + """ pass class ValueEvent(Event): @@ -38,12 +67,19 @@ class ExceptionEvent(Event): def __init__(self, exc_info): self.exc_info = exc_info -class SpawnEvent(object): +class SpawnEvent(Event): """Add a new coroutine thread to the scheduler.""" def __init__(self, coro): self.spawned = coro -class DelegationEvent(object): +class JoinEvent(Event): + """Suspend the thread until the specified child thread has + completed. + """ + def __init__(self, child): + self.child = child + +class DelegationEvent(Event): """Suspend execution of the current thread, start a new thread and, once the child thread finished, return control to the parent thread. @@ -51,20 +87,31 @@ class DelegationEvent(object): def __init__(self, coro): self.spawned = coro -class ReturnEvent(object): +class ReturnEvent(Event): """Return a value the current thread's delegator at the point of delegation. Ends the current (delegate) thread. """ def __init__(self, value): self.value = value +class SleepEvent(WaitableEvent): + """Suspend the thread for a given duration. + """ + def __init__(self, duration): + self.wakeup_time = time.time() + duration + + def time_left(self): + return max(self.wakeup_time - time.time(), 0.0) + class ReadEvent(WaitableEvent): """Reads from a file-like object.""" def __init__(self, fd, bufsize): self.fd = fd self.bufsize = bufsize + def waitables(self): return (self.fd,), (), () + def fire(self): return self.fd.read(self.bufsize) @@ -73,8 +120,10 @@ class WriteEvent(WaitableEvent): def __init__(self, fd, data): self.fd = fd self.data = data + def waitable(self): return (), (self.fd,), () + def fire(self): self.fd.write(self.data) @@ -83,13 +132,20 @@ class WriteEvent(WaitableEvent): def _event_select(events): """Perform a select() over all the Events provided, returning the - ones ready to be fired. + ones ready to be fired. Only WaitableEvents (including SleepEvents) + matter here; all other events are ignored (and thus postponed). """ - # Gather waitables. + # Gather waitables and wakeup times. waitable_to_event = {} rlist, wlist, xlist = [], [], [] + earliest_wakeup = None for event in events: - if isinstance(event, WaitableEvent): + if isinstance(event, SleepEvent): + if not earliest_wakeup: + earliest_wakeup = event.wakeup_time + else: + earliest_wakeup = min(earliest_wakeup, event.wakeup_time) + elif isinstance(event, WaitableEvent): r, w, x = event.waitables() rlist += r wlist += w @@ -101,11 +157,19 @@ def _event_select(events): for waitable in x: waitable_to_event[('x', waitable)] = event + # If we have a any sleeping threads, determine how long to sleep. + if earliest_wakeup: + timeout = max(earliest_wakeup - time.time(), 0.0) + else: + timeout = None + # Perform select() if we have any waitables. if rlist or wlist or xlist: - rready, wready, xready = select.select(rlist, wlist, xlist) + rready, wready, xready = select.select(rlist, wlist, xlist, timeout) else: rready, wready, xready = (), (), () + if timeout: + time.sleep(timeout) # Gather ready events corresponding to the ready waitables. ready_events = set() @@ -115,6 +179,12 @@ def _event_select(events): ready_events.add(waitable_to_event[('w', ready)]) for ready in xready: ready_events.add(waitable_to_event[('x', ready)]) + + # Gather any finished sleeps. + for event in events: + if isinstance(event, SleepEvent) and event.time_left() == 0.0: + ready_events.add(event) + return ready_events class ThreadException(Exception): @@ -122,23 +192,48 @@ class ThreadException(Exception): self.coro = coro self.exc_info = exc_info def reraise(self): - raise self.exc_info[0], self.exc_info[1], self.exc_info[2] + _reraise(self.exc_info[0], self.exc_info[1], self.exc_info[2]) +SUSPENDED = Event() # Special sentinel placeholder for suspended threads. + def run(root_coro): """Schedules a coroutine, running it to completion. This encapsulates the Bluelet scheduler, which the root coroutine can add to by spawning new coroutines. """ # The "threads" dictionary keeps track of all the currently- - # executing coroutines. It maps coroutines to their currenly - # "blocking" event. + # executing and suspended coroutines. It maps coroutines to their + # currently "blocking" event. The event value may be SUSPENDED if + # the coroutine is waiting on some other condition: namely, a + # delegated coroutine or a joined coroutine. In this case, the + # coroutine should *also* appear as a value in one of the below + # dictionaries `delegators` or `joiners`. threads = {root_coro: ValueEvent(None)} - # When one thread delegates to another thread, its execution is - # suspended until the delegate completes. This dictionary keeps - # track of each running delegate's delegator. + # Maps child coroutines to delegating parents. delegators = {} + # Maps child coroutines to joining (exit-waiting) parents. + joiners = collections.defaultdict(list) + + def complete_thread(coro, return_value): + """Remove a coroutine from the scheduling pool, awaking + delegators and joiners as necessary and returning the specified + value to any delegating parent. + """ + del threads[coro] + + # Resume delegator. + if coro in delegators: + threads[delegators[coro]] = ValueEvent(return_value) + del delegators[coro] + + # Resume joiners. + if coro in joiners: + for parent in joiners[coro]: + threads[parent] = ValueEvent(None) + del joiners[coro] + def advance_thread(coro, value, is_exc=False): """After an event is fired, run a given coroutine associated with it in the threads dict until it yields again. If the coroutine @@ -154,18 +249,15 @@ def run(root_coro): next_event = coro.send(value) except StopIteration: # Thread is done. - del threads[coro] - if coro in delegators: - # Resume delegator. - threads[delegators[coro]] = ValueEvent(None) - del delegators[coro] + complete_thread(coro, None) except: # Thread raised some other exception. del threads[coro] raise ThreadException(coro, sys.exc_info()) else: if isinstance(next_event, types.GeneratorType): - # Automatically invoke sub-coroutines. + # Automatically invoke sub-coroutines. (Shorthand for + # explicit bluelet.call().) next_event = DelegationEvent(next_event) threads[coro] = next_event @@ -177,9 +269,9 @@ def run(root_coro): # running immediate events until nothing is ready. while True: have_ready = False - for coro, event in threads.items(): + for coro, event in list(threads.items()): if isinstance(event, SpawnEvent): - threads[event.spawned] = ValueEvent(None) # Spawn. + threads[event.spawned] = ValueEvent(None) # Spawn. advance_thread(coro, None) have_ready = True elif isinstance(event, ValueEvent): @@ -189,16 +281,17 @@ def run(root_coro): advance_thread(coro, event.exc_info, True) have_ready = True elif isinstance(event, DelegationEvent): - del threads[coro] # Suspend. - threads[event.spawned] = ValueEvent(None) # Spawn. + threads[coro] = SUSPENDED # Suspend. + threads[event.spawned] = ValueEvent(None) # Spawn. delegators[event.spawned] = coro have_ready = True elif isinstance(event, ReturnEvent): # Thread is done. - del threads[coro] - if coro in delegators: - threads[delegators[coro]] = ValueEvent(event.value) - del delegators[coro] + complete_thread(coro, event.value) + have_ready = True + elif isinstance(event, JoinEvent): + threads[coro] = SUSPENDED # Suspend. + joiners[event.child].append(coro) have_ready = True # Only start the select when nothing else is ready. @@ -206,12 +299,12 @@ def run(root_coro): break # Wait and fire. - event2coro = dict((v,k) for k,v in threads.iteritems()) + event2coro = dict((v,k) for k,v in threads.items()) for event in _event_select(threads.values()): # Run the IO operation, but catch socket errors. try: value = event.fire() - except socket.error, exc: + except socket.error as exc: if isinstance(exc.args, tuple) and \ exc.args[0] == errno.EPIPE: # Broken pipe. Remote host disconnected. @@ -222,8 +315,8 @@ def run(root_coro): threads[event2coro[event]] = ReturnEvent(None) else: advance_thread(event2coro[event], value) - - except ThreadException, te: + + except ThreadException as te: # Exception raised from inside a thread. event = ExceptionEvent(te.exc_info) if te.coro in delegators: @@ -235,7 +328,7 @@ def run(root_coro): # The thread is root-level. Raise in client code. exit_te = te break - + except: # For instance, KeyboardInterrupt during select(). Raise # into root thread and terminate others. @@ -252,59 +345,43 @@ def run(root_coro): # Sockets and their associated events. -class AcceptEvent(WaitableEvent): - def __init__(self, listener): - self.listener = listener - def waitables(self): - return (self.listener.sock,), (), () - def fire(self): - sock, addr = self.listener.sock.accept() - return Connection(sock, addr) class Listener(object): + """A socket wrapper object for listening sockets. + """ def __init__(self, host, port): + """Create a listening socket on the given hostname and port. + """ self.host = host self.port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((host, port)) self.sock.listen(5) + def accept(self): + """An event that waits for a connection on the listening socket. + When a connection is made, the event returns a Connection + object. + """ return AcceptEvent(self) + def close(self): + """Immediately close the listening socket. (Not an event.) + """ self.sock.close() -class ReceiveEvent(WaitableEvent): - def __init__(self, conn, bufsize): - self.conn = conn - self.bufsize = bufsize - def waitables(self): - return (self.conn.sock,), (), () - def fire(self): - return self.conn.sock.recv(self.bufsize) -class SendEvent(WaitableEvent): - def __init__(self, conn, data, sendall=False): - self.conn = conn - self.data = data - self.sendall = sendall - def waitables(self): - return (), (self.conn.sock,), () - def fire(self): - if self.sendall: - return self.conn.sock.sendall(self.data) - else: - return self.conn.sock.send(self.data) - class Connection(object): - """A socket-esque object for communicating asychronously on - network sockets. + """A socket wrapper object for connected sockets. """ def __init__(self, sock, addr): self.sock = sock self.addr = addr - self._buf = '' + self._buf = b'' + def close(self): """Close the connection.""" self.sock.close() + def recv(self, size): """Read at most size bytes of data from the socket.""" if self._buf: @@ -314,15 +391,18 @@ class Connection(object): return ValueEvent(out) else: return ReceiveEvent(self, size) + def send(self, data): """Sends data on the socket, returning the number of bytes successfully sent. """ return SendEvent(self, data) + def sendall(self, data): """Send all of data on the socket.""" return SendEvent(self, data, True) - def readline(self, terminator="\n", bufsize=1024): + + def readline(self, terminator=b"\n", bufsize=1024): """Reads a line (delimited by terminator) from the socket.""" while True: if terminator in self._buf: @@ -335,10 +415,56 @@ class Connection(object): self._buf += data else: line = self._buf - self._buf = '' + self._buf = b'' yield ReturnEvent(line) break +class AcceptEvent(WaitableEvent): + """An event for Listener objects (listening sockets) that suspends + execution until the socket gets a connection. + """ + def __init__(self, listener): + self.listener = listener + + def waitables(self): + return (self.listener.sock,), (), () + + def fire(self): + sock, addr = self.listener.sock.accept() + return Connection(sock, addr) + +class ReceiveEvent(WaitableEvent): + """An event for Connection objects (connected sockets) for + asynchronously reading data. + """ + def __init__(self, conn, bufsize): + self.conn = conn + self.bufsize = bufsize + + def waitables(self): + return (self.conn.sock,), (), () + + def fire(self): + return self.conn.sock.recv(self.bufsize) + +class SendEvent(WaitableEvent): + """An event for Connection objects (connected sockets) for + asynchronously writing data. + """ + def __init__(self, conn, data, sendall=False): + self.conn = conn + self.data = data + self.sendall = sendall + + def waitables(self): + return (), (self.conn.sock,), () + + def fire(self): + if self.sendall: + return self.conn.sock.sendall(self.data) + else: + return self.conn.sock.send(self.data) + # Public interface for threads; each returns an event object that # can immediately be "yield"ed. @@ -400,6 +526,17 @@ def connect(host, port): sock = socket.create_connection(addr) return ValueEvent(Connection(sock, addr)) +def sleep(duration): + """Event: suspend the thread for ``duration`` seconds. + """ + return SleepEvent(duration) + +def join(coro): + """Suspend the thread until another, previously `spawn`ed thread + completes. + """ + return JoinEvent(coro) + # Convenience function for running socket servers. @@ -414,7 +551,7 @@ def server(host, port, func): yield func(conn) finally: conn.close() - + listener = Listener(host, port) try: while True: diff --git a/beetsplug/bpd/__init__.py b/beetsplug/bpd/__init__.py index e3cbdd20c..a0f704b6d 100644 --- a/beetsplug/bpd/__init__.py +++ b/beetsplug/bpd/__init__.py @@ -18,7 +18,6 @@ use of the wide range of MPD clients. """ from __future__ import print_function -import bluelet import re from string import Template import traceback @@ -30,6 +29,7 @@ import beets from beets.plugins import BeetsPlugin import beets.ui from beets import vfs +from beets.util import bluelet DEFAULT_PORT = 6600 @@ -119,7 +119,7 @@ ArgumentIndexError = make_bpd_error(ERROR_ARG, 'argument out of range') ArgumentNotFoundError = make_bpd_error(ERROR_NO_EXIST, 'argument not found') def cast_arg(t, val): - """Attempts to call t on val, raising a CommandArgumentError + """Attempts to call t on val, raising a ArgumentTypeError on ValueError. If 't' is the special string 'intbool', attempts to cast first @@ -131,7 +131,7 @@ def cast_arg(t, val): try: return t(val) except ValueError: - raise CommandArgumentError() + raise ArgumentTypeError() class BPDClose(Exception): """Raised by a command invocation to indicate that the connection @@ -391,7 +391,7 @@ class BaseServer(object): self.playlist_version += 1 - def cmd_moveid(self, conn, id_from, idx_to): + def cmd_moveid(self, conn, idx_from, idx_to): idx_from = self._id_to_index(idx_from) return self.cmd_move(conn, idx_from, idx_to)