move bluelet to util subpackage

This way, it can be shared among multiple plugins (as could happen
eventually...)
This commit is contained in:
Adrian Sampson 2012-05-13 21:22:50 -07:00
parent 760fff3ace
commit d82d74b422
2 changed files with 209 additions and 72 deletions

View file

@ -11,21 +11,50 @@ import sys
import types
import errno
import traceback
import time
import collections
# A little bit of "six" (Python 2/3 compatibility): cope with PEP 3109 syntax
# changes.
PY3 = sys.version_info[0] == 3
if PY3:
def _reraise(typ, exc, tb):
raise exc.with_traceback(tb)
else:
exec("""
def _reraise(typ, exc, tb):
raise typ, exc, tb
""")
# Basic events used for thread scheduling.
class Event(object):
"""Just a base class identifying Bluelet events. An event is an
object yielded from a Bluelet thread coroutine to suspend operation
and communicate with the scheduler.
"""
pass
class WaitableEvent(Event):
"""A waitable event is one encapsulating an action that can be
waited for using a select() call. That is, it's an event with an
associated file descriptor.
"""
def waitables(self):
"""Return "waitable" objects to pass to select. Should return
"""Return "waitable" objects to pass to select(). Should return
three iterables for input readiness, output readiness, and
exceptional conditions (i.e., the three lists passed to
select()).
"""
return (), (), ()
def fire(self):
"""Called when an assoicated file descriptor becomes ready
(i.e., is returned from a select() call).
"""
pass
class ValueEvent(Event):
@ -38,12 +67,19 @@ class ExceptionEvent(Event):
def __init__(self, exc_info):
self.exc_info = exc_info
class SpawnEvent(object):
class SpawnEvent(Event):
"""Add a new coroutine thread to the scheduler."""
def __init__(self, coro):
self.spawned = coro
class DelegationEvent(object):
class JoinEvent(Event):
"""Suspend the thread until the specified child thread has
completed.
"""
def __init__(self, child):
self.child = child
class DelegationEvent(Event):
"""Suspend execution of the current thread, start a new thread and,
once the child thread finished, return control to the parent
thread.
@ -51,20 +87,31 @@ class DelegationEvent(object):
def __init__(self, coro):
self.spawned = coro
class ReturnEvent(object):
class ReturnEvent(Event):
"""Return a value the current thread's delegator at the point of
delegation. Ends the current (delegate) thread.
"""
def __init__(self, value):
self.value = value
class SleepEvent(WaitableEvent):
"""Suspend the thread for a given duration.
"""
def __init__(self, duration):
self.wakeup_time = time.time() + duration
def time_left(self):
return max(self.wakeup_time - time.time(), 0.0)
class ReadEvent(WaitableEvent):
"""Reads from a file-like object."""
def __init__(self, fd, bufsize):
self.fd = fd
self.bufsize = bufsize
def waitables(self):
return (self.fd,), (), ()
def fire(self):
return self.fd.read(self.bufsize)
@ -73,8 +120,10 @@ class WriteEvent(WaitableEvent):
def __init__(self, fd, data):
self.fd = fd
self.data = data
def waitable(self):
return (), (self.fd,), ()
def fire(self):
self.fd.write(self.data)
@ -83,13 +132,20 @@ class WriteEvent(WaitableEvent):
def _event_select(events):
"""Perform a select() over all the Events provided, returning the
ones ready to be fired.
ones ready to be fired. Only WaitableEvents (including SleepEvents)
matter here; all other events are ignored (and thus postponed).
"""
# Gather waitables.
# Gather waitables and wakeup times.
waitable_to_event = {}
rlist, wlist, xlist = [], [], []
earliest_wakeup = None
for event in events:
if isinstance(event, WaitableEvent):
if isinstance(event, SleepEvent):
if not earliest_wakeup:
earliest_wakeup = event.wakeup_time
else:
earliest_wakeup = min(earliest_wakeup, event.wakeup_time)
elif isinstance(event, WaitableEvent):
r, w, x = event.waitables()
rlist += r
wlist += w
@ -101,11 +157,19 @@ def _event_select(events):
for waitable in x:
waitable_to_event[('x', waitable)] = event
# If we have a any sleeping threads, determine how long to sleep.
if earliest_wakeup:
timeout = max(earliest_wakeup - time.time(), 0.0)
else:
timeout = None
# Perform select() if we have any waitables.
if rlist or wlist or xlist:
rready, wready, xready = select.select(rlist, wlist, xlist)
rready, wready, xready = select.select(rlist, wlist, xlist, timeout)
else:
rready, wready, xready = (), (), ()
if timeout:
time.sleep(timeout)
# Gather ready events corresponding to the ready waitables.
ready_events = set()
@ -115,6 +179,12 @@ def _event_select(events):
ready_events.add(waitable_to_event[('w', ready)])
for ready in xready:
ready_events.add(waitable_to_event[('x', ready)])
# Gather any finished sleeps.
for event in events:
if isinstance(event, SleepEvent) and event.time_left() == 0.0:
ready_events.add(event)
return ready_events
class ThreadException(Exception):
@ -122,23 +192,48 @@ class ThreadException(Exception):
self.coro = coro
self.exc_info = exc_info
def reraise(self):
raise self.exc_info[0], self.exc_info[1], self.exc_info[2]
_reraise(self.exc_info[0], self.exc_info[1], self.exc_info[2])
SUSPENDED = Event() # Special sentinel placeholder for suspended threads.
def run(root_coro):
"""Schedules a coroutine, running it to completion. This
encapsulates the Bluelet scheduler, which the root coroutine can
add to by spawning new coroutines.
"""
# The "threads" dictionary keeps track of all the currently-
# executing coroutines. It maps coroutines to their currenly
# "blocking" event.
# executing and suspended coroutines. It maps coroutines to their
# currently "blocking" event. The event value may be SUSPENDED if
# the coroutine is waiting on some other condition: namely, a
# delegated coroutine or a joined coroutine. In this case, the
# coroutine should *also* appear as a value in one of the below
# dictionaries `delegators` or `joiners`.
threads = {root_coro: ValueEvent(None)}
# When one thread delegates to another thread, its execution is
# suspended until the delegate completes. This dictionary keeps
# track of each running delegate's delegator.
# Maps child coroutines to delegating parents.
delegators = {}
# Maps child coroutines to joining (exit-waiting) parents.
joiners = collections.defaultdict(list)
def complete_thread(coro, return_value):
"""Remove a coroutine from the scheduling pool, awaking
delegators and joiners as necessary and returning the specified
value to any delegating parent.
"""
del threads[coro]
# Resume delegator.
if coro in delegators:
threads[delegators[coro]] = ValueEvent(return_value)
del delegators[coro]
# Resume joiners.
if coro in joiners:
for parent in joiners[coro]:
threads[parent] = ValueEvent(None)
del joiners[coro]
def advance_thread(coro, value, is_exc=False):
"""After an event is fired, run a given coroutine associated with
it in the threads dict until it yields again. If the coroutine
@ -154,18 +249,15 @@ def run(root_coro):
next_event = coro.send(value)
except StopIteration:
# Thread is done.
del threads[coro]
if coro in delegators:
# Resume delegator.
threads[delegators[coro]] = ValueEvent(None)
del delegators[coro]
complete_thread(coro, None)
except:
# Thread raised some other exception.
del threads[coro]
raise ThreadException(coro, sys.exc_info())
else:
if isinstance(next_event, types.GeneratorType):
# Automatically invoke sub-coroutines.
# Automatically invoke sub-coroutines. (Shorthand for
# explicit bluelet.call().)
next_event = DelegationEvent(next_event)
threads[coro] = next_event
@ -177,9 +269,9 @@ def run(root_coro):
# running immediate events until nothing is ready.
while True:
have_ready = False
for coro, event in threads.items():
for coro, event in list(threads.items()):
if isinstance(event, SpawnEvent):
threads[event.spawned] = ValueEvent(None) # Spawn.
threads[event.spawned] = ValueEvent(None) # Spawn.
advance_thread(coro, None)
have_ready = True
elif isinstance(event, ValueEvent):
@ -189,16 +281,17 @@ def run(root_coro):
advance_thread(coro, event.exc_info, True)
have_ready = True
elif isinstance(event, DelegationEvent):
del threads[coro] # Suspend.
threads[event.spawned] = ValueEvent(None) # Spawn.
threads[coro] = SUSPENDED # Suspend.
threads[event.spawned] = ValueEvent(None) # Spawn.
delegators[event.spawned] = coro
have_ready = True
elif isinstance(event, ReturnEvent):
# Thread is done.
del threads[coro]
if coro in delegators:
threads[delegators[coro]] = ValueEvent(event.value)
del delegators[coro]
complete_thread(coro, event.value)
have_ready = True
elif isinstance(event, JoinEvent):
threads[coro] = SUSPENDED # Suspend.
joiners[event.child].append(coro)
have_ready = True
# Only start the select when nothing else is ready.
@ -206,12 +299,12 @@ def run(root_coro):
break
# Wait and fire.
event2coro = dict((v,k) for k,v in threads.iteritems())
event2coro = dict((v,k) for k,v in threads.items())
for event in _event_select(threads.values()):
# Run the IO operation, but catch socket errors.
try:
value = event.fire()
except socket.error, exc:
except socket.error as exc:
if isinstance(exc.args, tuple) and \
exc.args[0] == errno.EPIPE:
# Broken pipe. Remote host disconnected.
@ -222,8 +315,8 @@ def run(root_coro):
threads[event2coro[event]] = ReturnEvent(None)
else:
advance_thread(event2coro[event], value)
except ThreadException, te:
except ThreadException as te:
# Exception raised from inside a thread.
event = ExceptionEvent(te.exc_info)
if te.coro in delegators:
@ -235,7 +328,7 @@ def run(root_coro):
# The thread is root-level. Raise in client code.
exit_te = te
break
except:
# For instance, KeyboardInterrupt during select(). Raise
# into root thread and terminate others.
@ -252,59 +345,43 @@ def run(root_coro):
# Sockets and their associated events.
class AcceptEvent(WaitableEvent):
def __init__(self, listener):
self.listener = listener
def waitables(self):
return (self.listener.sock,), (), ()
def fire(self):
sock, addr = self.listener.sock.accept()
return Connection(sock, addr)
class Listener(object):
"""A socket wrapper object for listening sockets.
"""
def __init__(self, host, port):
"""Create a listening socket on the given hostname and port.
"""
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((host, port))
self.sock.listen(5)
def accept(self):
"""An event that waits for a connection on the listening socket.
When a connection is made, the event returns a Connection
object.
"""
return AcceptEvent(self)
def close(self):
"""Immediately close the listening socket. (Not an event.)
"""
self.sock.close()
class ReceiveEvent(WaitableEvent):
def __init__(self, conn, bufsize):
self.conn = conn
self.bufsize = bufsize
def waitables(self):
return (self.conn.sock,), (), ()
def fire(self):
return self.conn.sock.recv(self.bufsize)
class SendEvent(WaitableEvent):
def __init__(self, conn, data, sendall=False):
self.conn = conn
self.data = data
self.sendall = sendall
def waitables(self):
return (), (self.conn.sock,), ()
def fire(self):
if self.sendall:
return self.conn.sock.sendall(self.data)
else:
return self.conn.sock.send(self.data)
class Connection(object):
"""A socket-esque object for communicating asychronously on
network sockets.
"""A socket wrapper object for connected sockets.
"""
def __init__(self, sock, addr):
self.sock = sock
self.addr = addr
self._buf = ''
self._buf = b''
def close(self):
"""Close the connection."""
self.sock.close()
def recv(self, size):
"""Read at most size bytes of data from the socket."""
if self._buf:
@ -314,15 +391,18 @@ class Connection(object):
return ValueEvent(out)
else:
return ReceiveEvent(self, size)
def send(self, data):
"""Sends data on the socket, returning the number of bytes
successfully sent.
"""
return SendEvent(self, data)
def sendall(self, data):
"""Send all of data on the socket."""
return SendEvent(self, data, True)
def readline(self, terminator="\n", bufsize=1024):
def readline(self, terminator=b"\n", bufsize=1024):
"""Reads a line (delimited by terminator) from the socket."""
while True:
if terminator in self._buf:
@ -335,10 +415,56 @@ class Connection(object):
self._buf += data
else:
line = self._buf
self._buf = ''
self._buf = b''
yield ReturnEvent(line)
break
class AcceptEvent(WaitableEvent):
"""An event for Listener objects (listening sockets) that suspends
execution until the socket gets a connection.
"""
def __init__(self, listener):
self.listener = listener
def waitables(self):
return (self.listener.sock,), (), ()
def fire(self):
sock, addr = self.listener.sock.accept()
return Connection(sock, addr)
class ReceiveEvent(WaitableEvent):
"""An event for Connection objects (connected sockets) for
asynchronously reading data.
"""
def __init__(self, conn, bufsize):
self.conn = conn
self.bufsize = bufsize
def waitables(self):
return (self.conn.sock,), (), ()
def fire(self):
return self.conn.sock.recv(self.bufsize)
class SendEvent(WaitableEvent):
"""An event for Connection objects (connected sockets) for
asynchronously writing data.
"""
def __init__(self, conn, data, sendall=False):
self.conn = conn
self.data = data
self.sendall = sendall
def waitables(self):
return (), (self.conn.sock,), ()
def fire(self):
if self.sendall:
return self.conn.sock.sendall(self.data)
else:
return self.conn.sock.send(self.data)
# Public interface for threads; each returns an event object that
# can immediately be "yield"ed.
@ -400,6 +526,17 @@ def connect(host, port):
sock = socket.create_connection(addr)
return ValueEvent(Connection(sock, addr))
def sleep(duration):
"""Event: suspend the thread for ``duration`` seconds.
"""
return SleepEvent(duration)
def join(coro):
"""Suspend the thread until another, previously `spawn`ed thread
completes.
"""
return JoinEvent(coro)
# Convenience function for running socket servers.
@ -414,7 +551,7 @@ def server(host, port, func):
yield func(conn)
finally:
conn.close()
listener = Listener(host, port)
try:
while True:

View file

@ -18,7 +18,6 @@ use of the wide range of MPD clients.
"""
from __future__ import print_function
import bluelet
import re
from string import Template
import traceback
@ -30,6 +29,7 @@ import beets
from beets.plugins import BeetsPlugin
import beets.ui
from beets import vfs
from beets.util import bluelet
DEFAULT_PORT = 6600
@ -119,7 +119,7 @@ ArgumentIndexError = make_bpd_error(ERROR_ARG, 'argument out of range')
ArgumentNotFoundError = make_bpd_error(ERROR_NO_EXIST, 'argument not found')
def cast_arg(t, val):
"""Attempts to call t on val, raising a CommandArgumentError
"""Attempts to call t on val, raising a ArgumentTypeError
on ValueError.
If 't' is the special string 'intbool', attempts to cast first
@ -131,7 +131,7 @@ def cast_arg(t, val):
try:
return t(val)
except ValueError:
raise CommandArgumentError()
raise ArgumentTypeError()
class BPDClose(Exception):
"""Raised by a command invocation to indicate that the connection
@ -391,7 +391,7 @@ class BaseServer(object):
self.playlist_version += 1
def cmd_moveid(self, conn, id_from, idx_to):
def cmd_moveid(self, conn, idx_from, idx_to):
idx_from = self._id_to_index(idx_from)
return self.cmd_move(conn, idx_from, idx_to)