mirror of
https://github.com/beetbox/beets.git
synced 2025-12-11 19:16:07 +01:00
426 lines
14 KiB
Python
426 lines
14 KiB
Python
"""Extremely simple pure-Python implementation of coroutine-style
|
|
asynchronous socket I/O. Inspired by, but inferior to, Eventlet.
|
|
Bluelet can also be thought of as a less-terrible replacement for
|
|
asyncore.
|
|
|
|
Bluelet: easy concurrency without all the messy parallelism.
|
|
"""
|
|
import socket
|
|
import select
|
|
import sys
|
|
import types
|
|
import errno
|
|
import traceback
|
|
|
|
|
|
# Basic events used for thread scheduling.
|
|
|
|
class Event(object):
|
|
pass
|
|
class WaitableEvent(Event):
|
|
def waitables(self):
|
|
"""Return "waitable" objects to pass to select. Should return
|
|
three iterables for input readiness, output readiness, and
|
|
exceptional conditions (i.e., the three lists passed to
|
|
select()).
|
|
"""
|
|
return (), (), ()
|
|
def fire(self):
|
|
pass
|
|
|
|
class ValueEvent(Event):
|
|
"""An event that does nothing but return a fixed value."""
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
class ExceptionEvent(Event):
|
|
"""Raise an exception at the yield point. Used internally."""
|
|
def __init__(self, exc_info):
|
|
self.exc_info = exc_info
|
|
|
|
class SpawnEvent(object):
|
|
"""Add a new coroutine thread to the scheduler."""
|
|
def __init__(self, coro):
|
|
self.spawned = coro
|
|
|
|
class DelegationEvent(object):
|
|
"""Suspend execution of the current thread, start a new thread and,
|
|
once the child thread finished, return control to the parent
|
|
thread.
|
|
"""
|
|
def __init__(self, coro):
|
|
self.spawned = coro
|
|
|
|
class ReturnEvent(object):
|
|
"""Return a value the current thread's delegator at the point of
|
|
delegation. Ends the current (delegate) thread.
|
|
"""
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
class ReadEvent(WaitableEvent):
|
|
"""Reads from a file-like object."""
|
|
def __init__(self, fd, bufsize):
|
|
self.fd = fd
|
|
self.bufsize = bufsize
|
|
def waitables(self):
|
|
return (self.fd,), (), ()
|
|
def fire(self):
|
|
return self.fd.read(self.bufsize)
|
|
|
|
class WriteEvent(WaitableEvent):
|
|
"""Writes to a file-like object."""
|
|
def __init__(self, fd, data):
|
|
self.fd = fd
|
|
self.data = data
|
|
def waitable(self):
|
|
return (), (self.fd,), ()
|
|
def fire(self):
|
|
self.fd.write(self.data)
|
|
|
|
|
|
# Core logic for executing and scheduling threads.
|
|
|
|
def _event_select(events):
|
|
"""Perform a select() over all the Events provided, returning the
|
|
ones ready to be fired.
|
|
"""
|
|
# Gather waitables.
|
|
waitable_to_event = {}
|
|
rlist, wlist, xlist = [], [], []
|
|
for event in events:
|
|
if isinstance(event, WaitableEvent):
|
|
r, w, x = event.waitables()
|
|
rlist += r
|
|
wlist += w
|
|
xlist += x
|
|
for waitable in r:
|
|
waitable_to_event[('r', waitable)] = event
|
|
for waitable in w:
|
|
waitable_to_event[('w', waitable)] = event
|
|
for waitable in x:
|
|
waitable_to_event[('x', waitable)] = event
|
|
|
|
# Perform select() if we have any waitables.
|
|
if rlist or wlist or xlist:
|
|
rready, wready, xready = select.select(rlist, wlist, xlist)
|
|
else:
|
|
rready, wready, xready = (), (), ()
|
|
|
|
# Gather ready events corresponding to the ready waitables.
|
|
ready_events = set()
|
|
for ready in rready:
|
|
ready_events.add(waitable_to_event[('r', ready)])
|
|
for ready in wready:
|
|
ready_events.add(waitable_to_event[('w', ready)])
|
|
for ready in xready:
|
|
ready_events.add(waitable_to_event[('x', ready)])
|
|
return ready_events
|
|
|
|
class ThreadException(Exception):
|
|
def __init__(self, coro, exc_info):
|
|
self.coro = coro
|
|
self.exc_info = exc_info
|
|
def reraise(self):
|
|
raise self.exc_info[0], self.exc_info[1], self.exc_info[2]
|
|
|
|
def run(root_coro):
|
|
"""Schedules a coroutine, running it to completion. This
|
|
encapsulates the Bluelet scheduler, which the root coroutine can
|
|
add to by spawning new coroutines.
|
|
"""
|
|
# The "threads" dictionary keeps track of all the currently-
|
|
# executing coroutines. It maps coroutines to their currenly
|
|
# "blocking" event.
|
|
threads = {root_coro: ValueEvent(None)}
|
|
|
|
# When one thread delegates to another thread, its execution is
|
|
# suspended until the delegate completes. This dictionary keeps
|
|
# track of each running delegate's delegator.
|
|
delegators = {}
|
|
|
|
def advance_thread(coro, value, is_exc=False):
|
|
"""After an event is fired, run a given coroutine associated with
|
|
it in the threads dict until it yields again. If the coroutine
|
|
exits, then the thread is removed from the pool. If the coroutine
|
|
raises an exception, it is reraised in a ThreadException. If
|
|
is_exc is True, then the value must be an exc_info tuple and the
|
|
exception is thrown into the coroutine.
|
|
"""
|
|
try:
|
|
if is_exc:
|
|
next_event = coro.throw(*value)
|
|
else:
|
|
next_event = coro.send(value)
|
|
except StopIteration:
|
|
# Thread is done.
|
|
del threads[coro]
|
|
if coro in delegators:
|
|
# Resume delegator.
|
|
threads[delegators[coro]] = ValueEvent(None)
|
|
del delegators[coro]
|
|
except:
|
|
# Thread raised some other exception.
|
|
del threads[coro]
|
|
raise ThreadException(coro, sys.exc_info())
|
|
else:
|
|
if isinstance(next_event, types.GeneratorType):
|
|
# Automatically invoke sub-coroutines.
|
|
next_event = DelegationEvent(next_event)
|
|
threads[coro] = next_event
|
|
|
|
# Continue advancing threads until root thread exits.
|
|
exit_te = None
|
|
while threads:
|
|
try:
|
|
# Look for events that can be run immediately. Continue
|
|
# running immediate events until nothing is ready.
|
|
while True:
|
|
have_ready = False
|
|
for coro, event in threads.items():
|
|
if isinstance(event, SpawnEvent):
|
|
threads[event.spawned] = ValueEvent(None) # Spawn.
|
|
advance_thread(coro, None)
|
|
have_ready = True
|
|
elif isinstance(event, ValueEvent):
|
|
advance_thread(coro, event.value)
|
|
have_ready = True
|
|
elif isinstance(event, ExceptionEvent):
|
|
advance_thread(coro, event.exc_info, True)
|
|
have_ready = True
|
|
elif isinstance(event, DelegationEvent):
|
|
del threads[coro] # Suspend.
|
|
threads[event.spawned] = ValueEvent(None) # Spawn.
|
|
delegators[event.spawned] = coro
|
|
have_ready = True
|
|
elif isinstance(event, ReturnEvent):
|
|
# Thread is done.
|
|
del threads[coro]
|
|
if coro in delegators:
|
|
threads[delegators[coro]] = ValueEvent(event.value)
|
|
del delegators[coro]
|
|
have_ready = True
|
|
|
|
# Only start the select when nothing else is ready.
|
|
if not have_ready:
|
|
break
|
|
|
|
# Wait and fire.
|
|
event2coro = dict((v,k) for k,v in threads.iteritems())
|
|
for event in _event_select(threads.values()):
|
|
# Run the IO operation, but catch socket errors.
|
|
try:
|
|
value = event.fire()
|
|
except socket.error, exc:
|
|
if isinstance(exc.args, tuple) and \
|
|
exc.args[0] == errno.EPIPE:
|
|
# Broken pipe. Remote host disconnected.
|
|
pass
|
|
else:
|
|
traceback.print_exc()
|
|
# Abort the coroutine.
|
|
threads[event2coro[event]] = ReturnEvent(None)
|
|
else:
|
|
advance_thread(event2coro[event], value)
|
|
|
|
except ThreadException, te:
|
|
# Exception raised from inside a thread.
|
|
event = ExceptionEvent(te.exc_info)
|
|
if te.coro in delegators:
|
|
# The thread is a delegate. Raise exception in its
|
|
# delegator.
|
|
threads[delegators[te.coro]] = event
|
|
del delegators[te.coro]
|
|
else:
|
|
# The thread is root-level. Raise in client code.
|
|
exit_te = te
|
|
break
|
|
|
|
except:
|
|
# For instance, KeyboardInterrupt during select(). Raise
|
|
# into root thread and terminate others.
|
|
threads = {root_coro: ExceptionEvent(sys.exc_info())}
|
|
|
|
# If any threads still remain, kill them.
|
|
for coro in threads:
|
|
coro.close()
|
|
|
|
# If we're exiting with an exception, raise it in the client.
|
|
if exit_te:
|
|
exit_te.reraise()
|
|
|
|
|
|
# Sockets and their associated events.
|
|
|
|
class AcceptEvent(WaitableEvent):
|
|
def __init__(self, listener):
|
|
self.listener = listener
|
|
def waitables(self):
|
|
return (self.listener.sock,), (), ()
|
|
def fire(self):
|
|
sock, addr = self.listener.sock.accept()
|
|
return Connection(sock, addr)
|
|
class Listener(object):
|
|
def __init__(self, host, port):
|
|
self.host = host
|
|
self.port = port
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
self.sock.bind((host, port))
|
|
self.sock.listen(5)
|
|
def accept(self):
|
|
return AcceptEvent(self)
|
|
def close(self):
|
|
self.sock.close()
|
|
|
|
class ReceiveEvent(WaitableEvent):
|
|
def __init__(self, conn, bufsize):
|
|
self.conn = conn
|
|
self.bufsize = bufsize
|
|
def waitables(self):
|
|
return (self.conn.sock,), (), ()
|
|
def fire(self):
|
|
return self.conn.sock.recv(self.bufsize)
|
|
class SendEvent(WaitableEvent):
|
|
def __init__(self, conn, data, sendall=False):
|
|
self.conn = conn
|
|
self.data = data
|
|
self.sendall = sendall
|
|
def waitables(self):
|
|
return (), (self.conn.sock,), ()
|
|
def fire(self):
|
|
if self.sendall:
|
|
return self.conn.sock.sendall(self.data)
|
|
else:
|
|
return self.conn.sock.send(self.data)
|
|
|
|
class Connection(object):
|
|
"""A socket-esque object for communicating asychronously on
|
|
network sockets.
|
|
"""
|
|
def __init__(self, sock, addr):
|
|
self.sock = sock
|
|
self.addr = addr
|
|
self._buf = ''
|
|
def close(self):
|
|
"""Close the connection."""
|
|
self.sock.close()
|
|
def recv(self, size):
|
|
"""Read at most size bytes of data from the socket."""
|
|
if self._buf:
|
|
# We already have data read previously.
|
|
out = self._buf[:size]
|
|
self._buf = self._buf[size:]
|
|
return ValueEvent(out)
|
|
else:
|
|
return ReceiveEvent(self, size)
|
|
def send(self, data):
|
|
"""Sends data on the socket, returning the number of bytes
|
|
successfully sent.
|
|
"""
|
|
return SendEvent(self, data)
|
|
def sendall(self, data):
|
|
"""Send all of data on the socket."""
|
|
return SendEvent(self, data, True)
|
|
def readline(self, terminator="\n", bufsize=1024):
|
|
"""Reads a line (delimited by terminator) from the socket."""
|
|
while True:
|
|
if terminator in self._buf:
|
|
line, self._buf = self._buf.split(terminator, 1)
|
|
line += terminator
|
|
yield ReturnEvent(line)
|
|
break
|
|
data = yield ReceiveEvent(self, bufsize)
|
|
if data:
|
|
self._buf += data
|
|
else:
|
|
line = self._buf
|
|
self._buf = ''
|
|
yield ReturnEvent(line)
|
|
break
|
|
|
|
|
|
# Public interface for threads; each returns an event object that
|
|
# can immediately be "yield"ed.
|
|
|
|
def null():
|
|
"""Event: yield to the scheduler without doing anything special.
|
|
"""
|
|
return ValueEvent(None)
|
|
|
|
def spawn(coro):
|
|
"""Event: add another coroutine to the scheduler. Both the parent
|
|
and child coroutines run concurrently.
|
|
"""
|
|
if not isinstance(coro, types.GeneratorType):
|
|
raise ValueError('%s is not a coroutine' % str(coro))
|
|
return SpawnEvent(coro)
|
|
|
|
def call(coro):
|
|
"""Event: delegate to another coroutine. The current coroutine
|
|
is resumed once the sub-coroutine finishes. If the sub-coroutine
|
|
returns a value using end(), then this event returns that value.
|
|
"""
|
|
if not isinstance(coro, types.GeneratorType):
|
|
raise ValueError('%s is not a coroutine' % str(coro))
|
|
return DelegationEvent(coro)
|
|
|
|
def end(value = None):
|
|
"""Event: ends the coroutine and returns a value to its
|
|
delegator.
|
|
"""
|
|
return ReturnEvent(value)
|
|
|
|
def read(fd, bufsize = None):
|
|
"""Event: read from a file descriptor asynchronously."""
|
|
if bufsize is None:
|
|
# Read all.
|
|
def reader():
|
|
buf = []
|
|
while True:
|
|
data = yield read(fd, 1024)
|
|
if not data:
|
|
break
|
|
buf.append(data)
|
|
yield ReturnEvent(''.join(buf))
|
|
return DelegationEvent(reader())
|
|
|
|
else:
|
|
return ReadEvent(fd, bufsize)
|
|
|
|
def write(fd, data):
|
|
"""Event: write to a file descriptor asynchronously."""
|
|
return WriteEvent(fd, data)
|
|
|
|
def connect(host, port):
|
|
"""Event: connect to a network address and return a Connection
|
|
object for communicating on the socket.
|
|
"""
|
|
addr = (host, port)
|
|
sock = socket.create_connection(addr)
|
|
return ValueEvent(Connection(sock, addr))
|
|
|
|
|
|
# Convenience function for running socket servers.
|
|
|
|
def server(host, port, func):
|
|
"""A coroutine that runs a network server. Host and port specify the
|
|
listening address. func should be a coroutine that takes a single
|
|
parameter, a Connection object. The coroutine is invoked for every
|
|
incoming connection on the listening socket.
|
|
"""
|
|
def handler(conn):
|
|
try:
|
|
yield func(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
listener = Listener(host, port)
|
|
try:
|
|
while True:
|
|
conn = yield listener.accept()
|
|
yield spawn(handler(conn))
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
listener.close()
|