mirror of
https://github.com/beetbox/beets.git
synced 2026-02-09 17:01:55 +01:00
preliminary replacement of eventlet with my own bluelet library in BPD
This commit is contained in:
parent
d860dd12c3
commit
2ed72f796a
2 changed files with 420 additions and 122 deletions
|
|
@ -17,7 +17,7 @@ Beets library. Attempts to implement a compatible protocol to allow
|
|||
use of the wide range of MPD clients.
|
||||
"""
|
||||
|
||||
import eventlet
|
||||
import bluelet
|
||||
import re
|
||||
from string import Template
|
||||
import traceback
|
||||
|
|
@ -232,13 +232,8 @@ class BaseServer(object):
|
|||
interrupt (^C) closes the server.
|
||||
"""
|
||||
self.startup_time = time.time()
|
||||
try:
|
||||
self.listener = eventlet.listen((self.host, self.port))
|
||||
while True:
|
||||
sock, address = self.listener.accept()
|
||||
eventlet.spawn_n(Connection.handle, sock, self)
|
||||
except KeyboardInterrupt:
|
||||
pass # ^C ends the server.
|
||||
bluelet.run(bluelet.server(self.host, self.port,
|
||||
Connection.handler(self)))
|
||||
|
||||
def _item_info(self, item):
|
||||
"""An abstract method that should response lines containing a
|
||||
|
|
@ -288,13 +283,13 @@ class BaseServer(object):
|
|||
if self.password and not conn.authenticated:
|
||||
# Not authenticated. Show limited list of commands.
|
||||
for cmd in SAFE_COMMANDS:
|
||||
conn.send(u'command: ' + cmd)
|
||||
yield u'command: ' + cmd
|
||||
|
||||
else:
|
||||
# Authenticated. Show all commands.
|
||||
for func in dir(self):
|
||||
if func.startswith('cmd_'):
|
||||
conn.send(u'command: ' + func[4:])
|
||||
yield u'command: ' + func[4:]
|
||||
|
||||
def cmd_notcommands(self, conn):
|
||||
"""Lists all unavailable commands."""
|
||||
|
|
@ -304,7 +299,7 @@ class BaseServer(object):
|
|||
if func.startswith('cmd_'):
|
||||
cmd = func[4:]
|
||||
if cmd not in SAFE_COMMANDS:
|
||||
conn.send(u'command: ' + cmd)
|
||||
yield u'command: ' + cmd
|
||||
|
||||
else:
|
||||
# Authenticated. No commands are unavailable.
|
||||
|
|
@ -317,13 +312,13 @@ class BaseServer(object):
|
|||
Gives a list of response-lines for: volume, repeat, random,
|
||||
playlist, playlistlength, and xfade.
|
||||
"""
|
||||
conn.send(u'volume: ' + unicode(self.volume),
|
||||
u'repeat: ' + unicode(int(self.repeat)),
|
||||
u'random: ' + unicode(int(self.random)),
|
||||
u'playlist: ' + unicode(self.playlist_version),
|
||||
u'playlistlength: ' + unicode(len(self.playlist)),
|
||||
u'xfade: ' + unicode(self.crossfade),
|
||||
)
|
||||
yield (u'volume: ' + unicode(self.volume),
|
||||
u'repeat: ' + unicode(int(self.repeat)),
|
||||
u'random: ' + unicode(int(self.random)),
|
||||
u'playlist: ' + unicode(self.playlist_version),
|
||||
u'playlistlength: ' + unicode(len(self.playlist)),
|
||||
u'xfade: ' + unicode(self.crossfade),
|
||||
)
|
||||
|
||||
if self.current_index == -1:
|
||||
state = u'stop'
|
||||
|
|
@ -331,15 +326,15 @@ class BaseServer(object):
|
|||
state = u'pause'
|
||||
else:
|
||||
state = u'play'
|
||||
conn.send(u'state: ' + state)
|
||||
yield u'state: ' + state
|
||||
|
||||
if self.current_index != -1: # i.e., paused or playing
|
||||
current_id = self._item_id(self.playlist[self.current_index])
|
||||
conn.send(u'song: ' + unicode(self.current_index))
|
||||
conn.send(u'songid: ' + unicode(current_id))
|
||||
yield u'song: ' + unicode(self.current_index)
|
||||
yield u'songid: ' + unicode(current_id)
|
||||
|
||||
if self.error:
|
||||
conn.send(u'error: ' + self.error)
|
||||
yield u'error: ' + self.error
|
||||
|
||||
def cmd_clearerror(self, conn):
|
||||
"""Removes the persistent error state of the server. This
|
||||
|
|
@ -415,7 +410,7 @@ class BaseServer(object):
|
|||
|
||||
def cmd_moveid(self, conn, id_from, idx_to):
|
||||
idx_from = self._id_to_index(idx_from)
|
||||
self.cmd_move(conn, idx_from, idx_to)
|
||||
return self.cmd_move(conn, idx_from, idx_to)
|
||||
|
||||
def cmd_swap(self, conn, i, j):
|
||||
"""Swaps two tracks in the playlist."""
|
||||
|
|
@ -441,7 +436,7 @@ class BaseServer(object):
|
|||
def cmd_swapid(self, conn, i_id, j_id):
|
||||
i = self._id_to_index(i_id)
|
||||
j = self._id_to_index(j_id)
|
||||
self.cmd_swap(conn, i, j)
|
||||
return self.cmd_swap(conn, i, j)
|
||||
|
||||
def cmd_urlhandlers(self, conn):
|
||||
"""Indicates supported URL schemes. None by default."""
|
||||
|
|
@ -454,15 +449,15 @@ class BaseServer(object):
|
|||
index = cast_arg(int, index)
|
||||
if index == -1:
|
||||
for track in self.playlist:
|
||||
conn.send(*self._item_info(track))
|
||||
yield self._item_info(track)
|
||||
else:
|
||||
try:
|
||||
track = self.playlist[index]
|
||||
except IndexError:
|
||||
raise ArgumentIndexError()
|
||||
conn.send(*self._item_info(track))
|
||||
yield self._item_info(track)
|
||||
def cmd_playlistid(self, conn, track_id=-1):
|
||||
self.cmd_playlistinfo(conn, self._id_to_index(track_id))
|
||||
return self.cmd_playlistinfo(conn, self._id_to_index(track_id))
|
||||
|
||||
def cmd_plchanges(self, conn, version):
|
||||
"""Sends playlist changes since the given version.
|
||||
|
|
@ -471,7 +466,7 @@ class BaseServer(object):
|
|||
just returns the entire playlist (rather like version=0). This
|
||||
seems to satisfy many clients.
|
||||
"""
|
||||
self.cmd_playlistinfo(conn)
|
||||
return self.cmd_playlistinfo(conn)
|
||||
|
||||
def cmd_plchangesposid(self, conn, version):
|
||||
"""Like plchanges, but only sends position and id.
|
||||
|
|
@ -479,33 +474,32 @@ class BaseServer(object):
|
|||
Also a dummy implementation.
|
||||
"""
|
||||
for idx, track in enumerate(self.playlist):
|
||||
conn.send(u'cpos: ' + unicode(idx),
|
||||
u'Id: ' + unicode(track.id),
|
||||
)
|
||||
yield u'cpos: ' + unicode(idx)
|
||||
yield u'Id: ' + unicode(track.id)
|
||||
|
||||
def cmd_currentsong(self, conn):
|
||||
"""Sends information about the currently-playing song.
|
||||
"""
|
||||
if self.current_index != -1: # -1 means stopped.
|
||||
track = self.playlist[self.current_index]
|
||||
conn.send(*self._item_info(track))
|
||||
yield self._item_info(track)
|
||||
|
||||
def cmd_next(self, conn):
|
||||
"""Advance to the next song in the playlist."""
|
||||
self.current_index += 1
|
||||
if self.current_index >= len(self.playlist):
|
||||
# Fallen off the end. Just move to stopped state.
|
||||
self.cmd_stop(conn)
|
||||
return self.cmd_stop(conn)
|
||||
else:
|
||||
self.cmd_play(conn)
|
||||
return self.cmd_play(conn)
|
||||
|
||||
def cmd_previous(self, conn):
|
||||
"""Step back to the last song."""
|
||||
self.current_index -= 1
|
||||
if self.current_index < 0:
|
||||
self.cmd_stop(conn)
|
||||
return self.cmd_stop(conn)
|
||||
else:
|
||||
self.cmd_play(conn)
|
||||
return self.cmd_play(conn)
|
||||
|
||||
def cmd_pause(self, conn, state=None):
|
||||
"""Set the pause state playback."""
|
||||
|
|
@ -523,7 +517,7 @@ class BaseServer(object):
|
|||
|
||||
if index == -1: # No index specified: start where we are.
|
||||
if not self.playlist: # Empty playlist: stop immediately.
|
||||
self.cmd_stop(conn)
|
||||
return self.cmd_stop(conn)
|
||||
if self.current_index == -1: # No current song.
|
||||
self.current_index = 0 # Start at the beginning.
|
||||
# If we have a current song, just stay there.
|
||||
|
|
@ -539,7 +533,7 @@ class BaseServer(object):
|
|||
index = -1
|
||||
else:
|
||||
index = self._id_to_index(track_id)
|
||||
self.cmd_play(conn, index)
|
||||
return self.cmd_play(conn, index)
|
||||
|
||||
def cmd_stop(self, conn):
|
||||
"""Stop playback."""
|
||||
|
|
@ -552,75 +546,64 @@ class BaseServer(object):
|
|||
if index < 0 or index >= len(self.playlist):
|
||||
raise ArgumentIndexError()
|
||||
self.current_index = index
|
||||
def cmd_seekid(self, track_id, pos):
|
||||
def cmd_seekid(self, conn, track_id, pos):
|
||||
index = self._id_to_index(track_id)
|
||||
self.cmd_seek(conn, index, pos)
|
||||
return self.cmd_seek(conn, index, pos)
|
||||
|
||||
class Connection(object):
|
||||
"""A connection between a client and the server. Handles input and
|
||||
output from and to the client.
|
||||
"""
|
||||
|
||||
def __init__(self, client, server):
|
||||
def __init__(self, server, sock):
|
||||
"""Create a new connection for the accepted socket `client`.
|
||||
"""
|
||||
self.client, self.server = client, server
|
||||
self.server = server
|
||||
self.sock = sock
|
||||
self.authenticated = False
|
||||
|
||||
def send(self, *lines):
|
||||
"""Send lines, which are strings, to the client. A newline is
|
||||
added after every string.
|
||||
def send(self, lines):
|
||||
"""Send lines, which which is either a single string or an
|
||||
iterable consisting of strings, to the client. A newline is
|
||||
added after every string. Returns a Bluelet event that sends
|
||||
the data.
|
||||
"""
|
||||
if isinstance(lines, basestring):
|
||||
lines = [lines]
|
||||
out = NEWLINE.join(lines) + NEWLINE
|
||||
log.debug(out[:-1]) # Don't log trailing newline.
|
||||
if isinstance(out, unicode):
|
||||
out = out.encode('utf8')
|
||||
self.client.sendall(out)
|
||||
|
||||
line_re = re.compile(r'([^\r\n]*)(?:\r\n|\n\r|\n|\r)')
|
||||
def lines(self):
|
||||
"""A generator yielding lines (delimited by some usual newline
|
||||
code) as they arrive from the client.
|
||||
"""
|
||||
buf = ''
|
||||
while True:
|
||||
# Dump new data on the buffer.
|
||||
chunk = self.client.recv(BUFSIZE)
|
||||
if not chunk: break # EOF.
|
||||
buf += chunk
|
||||
|
||||
# Clear out and yield any lines in the buffer.
|
||||
while True:
|
||||
match = self.line_re.match(buf)
|
||||
if not match: break # No lines remain.
|
||||
yield match.group(1)
|
||||
buf = buf[match.end():] # Remove line from buffer.
|
||||
return self.sock.sendall(out)
|
||||
|
||||
def do_command(self, command):
|
||||
"""Run the given command and give an appropriate response."""
|
||||
"""A coroutine that runs the given command and sends an
|
||||
appropriate response."""
|
||||
try:
|
||||
command.run(self)
|
||||
yield bluelet.call(command.run(self))
|
||||
except BPDError, e:
|
||||
# Send the error.
|
||||
self.send(e.response())
|
||||
yield self.send(e.response())
|
||||
else:
|
||||
# Send success code.
|
||||
self.send(RESP_OK)
|
||||
yield self.send(RESP_OK)
|
||||
|
||||
def run(self):
|
||||
"""Send a greeting to the client and begin processing commands
|
||||
as they arrive. Blocks until the client disconnects.
|
||||
as they arrive.
|
||||
"""
|
||||
self.send(HELLO)
|
||||
yield self.send(HELLO)
|
||||
|
||||
clist = None # Initially, no command list is being constructed.
|
||||
for line in self.lines():
|
||||
while True:
|
||||
line = (yield self.sock.readline()).strip()
|
||||
if not line:
|
||||
break
|
||||
log.debug(line)
|
||||
|
||||
if clist is not None:
|
||||
# Command list already opened.
|
||||
if line == CLIST_END:
|
||||
self.do_command(clist)
|
||||
yield bluelet.call(self.do_command(clist))
|
||||
clist = None # Clear the command list.
|
||||
else:
|
||||
clist.append(Command(line))
|
||||
|
|
@ -632,18 +615,19 @@ class Connection(object):
|
|||
else:
|
||||
# Ordinary command.
|
||||
try:
|
||||
self.do_command(Command(line))
|
||||
yield bluelet.call(self.do_command(Command(line)))
|
||||
except BPDClose:
|
||||
# Command indicates that the conn should close.
|
||||
self.client.close()
|
||||
return
|
||||
|
||||
@classmethod
|
||||
def handle(cls, client, server):
|
||||
"""Creates a new `Connection` for `client` and `server` and runs
|
||||
it.
|
||||
"""
|
||||
cls(client, server).run()
|
||||
def handler(cls, server):
|
||||
def _handle(sock):
|
||||
"""Creates a new `Connection` and runs it.
|
||||
"""
|
||||
return cls(server, sock).run()
|
||||
return _handle
|
||||
|
||||
class Command(object):
|
||||
"""A command issued by the client for processing by the server.
|
||||
|
|
@ -673,9 +657,9 @@ class Command(object):
|
|||
self.args.append(arg)
|
||||
|
||||
def run(self, conn):
|
||||
"""Executes the command on the given connection.
|
||||
"""A coroutine that executes the command on the given
|
||||
connection.
|
||||
"""
|
||||
|
||||
# Attempt to get correct command function.
|
||||
func_name = 'cmd_' + self.name
|
||||
if not hasattr(conn.server, func_name):
|
||||
|
|
@ -690,7 +674,10 @@ class Command(object):
|
|||
|
||||
try:
|
||||
args = [conn] + self.args
|
||||
func(*args)
|
||||
results = func(*args)
|
||||
if results:
|
||||
for data in results:
|
||||
yield conn.send(data)
|
||||
|
||||
except BPDError, e:
|
||||
# An exposed error. Set the command name and then let
|
||||
|
|
@ -724,12 +711,11 @@ class CommandList(list):
|
|||
self.verbose = verbose
|
||||
|
||||
def run(self, conn):
|
||||
"""Execute all the commands in this list.
|
||||
"""Coroutine executing all the commands in this list.
|
||||
"""
|
||||
|
||||
for i, command in enumerate(self):
|
||||
try:
|
||||
command.run(conn)
|
||||
yield bluelet.call(command.run(conn))
|
||||
except BPDError, e:
|
||||
# If the command failed, stop executing.
|
||||
e.index = i # Give the error the correct index.
|
||||
|
|
@ -738,7 +724,7 @@ class CommandList(list):
|
|||
# Otherwise, possibly send the output delimeter if we're in a
|
||||
# verbose ("OK") command list.
|
||||
if self.verbose:
|
||||
conn.send(RESP_CLIST_VERBOSE)
|
||||
yield conn.send(RESP_CLIST_VERBOSE)
|
||||
|
||||
|
||||
|
||||
|
|
@ -834,18 +820,18 @@ class Server(BaseServer):
|
|||
|
||||
if artist is None: # List all artists.
|
||||
for artist in self.lib.artists():
|
||||
conn.send(u'directory: ' + seq_to_path((artist,), PATH_PH))
|
||||
yield u'directory: ' + seq_to_path((artist,), PATH_PH)
|
||||
elif album is None: # List all albums for an artist.
|
||||
for album in self.lib.albums(artist):
|
||||
parts = (album.artist, album.album)
|
||||
conn.send(u'directory: ' + seq_to_path(parts, PATH_PH))
|
||||
yield u'directory: ' + seq_to_path(parts, PATH_PH)
|
||||
elif track is None: # List all tracks on an album.
|
||||
for item in self.lib.items(artist, album):
|
||||
conn.send(*self._item_info(item))
|
||||
yield self._item_info(item)
|
||||
else: # List a track. This isn't a directory.
|
||||
raise BPDError(ERROR_ARG, 'this is not a directory')
|
||||
|
||||
def _listall(self, conn, path=u"/", info=False):
|
||||
def _listall(self, path=u"/", info=False):
|
||||
"""Helper function for recursive listing. If info, show
|
||||
tracks' complete info; otherwise, just show items' paths.
|
||||
"""
|
||||
|
|
@ -854,34 +840,34 @@ class Server(BaseServer):
|
|||
# artists
|
||||
if not artist:
|
||||
for a in self.lib.artists():
|
||||
conn.send(u'directory: ' + a)
|
||||
yield u'directory: ' + a
|
||||
|
||||
# albums
|
||||
if not album:
|
||||
for a in self.lib.albums(artist or None):
|
||||
parts = a.artist, a.album
|
||||
conn.send(u'directory: ' + seq_to_path(parts, PATH_PH))
|
||||
yield u'directory: ' + seq_to_path(parts, PATH_PH)
|
||||
|
||||
# tracks
|
||||
items = self.lib.items(artist or None, album or None)
|
||||
if info:
|
||||
for item in items:
|
||||
conn.send(*self._item_info(item))
|
||||
yield self._item_info(item)
|
||||
else:
|
||||
for item in items:
|
||||
conn.send(u'file: ' + self._item_path(i))
|
||||
yield u'file: ' + self._item_path(i)
|
||||
|
||||
def cmd_listall(self, conn, path=u"/"):
|
||||
"""Send the paths all items in the directory, recursively."""
|
||||
self._listall(conn, path, False)
|
||||
return self._listall(path, False)
|
||||
def cmd_listallinfo(self, conn, path=u"/"):
|
||||
"""Send info on all the items in the directory, recursively."""
|
||||
self._listall(conn, path, True)
|
||||
return self._listall(path, True)
|
||||
|
||||
|
||||
# Playlist manipulation.
|
||||
|
||||
def _add(self, conn, path, send_id=False):
|
||||
def _add(self, path, send_id=False):
|
||||
"""Adds a track or directory to the playlist, specified by a
|
||||
path. If `send_id`, write each item's id to the client.
|
||||
"""
|
||||
|
|
@ -894,7 +880,7 @@ class Server(BaseServer):
|
|||
found_an_item = True
|
||||
self.playlist.append(item)
|
||||
if send_id:
|
||||
conn.send(u'Id: ' + unicode(item.id))
|
||||
yield u'Id: ' + unicode(item.id)
|
||||
|
||||
if not found_an_item:
|
||||
# No items matched.
|
||||
|
|
@ -910,25 +896,26 @@ class Server(BaseServer):
|
|||
"""Adds a track or directory to the playlist, specified by a
|
||||
path.
|
||||
"""
|
||||
self._add(conn, path, False)
|
||||
return self._add(path, False)
|
||||
|
||||
def cmd_addid(self, conn, path):
|
||||
"""Same as `cmd_add` but sends an id back to the client."""
|
||||
self._add(conn, path, True)
|
||||
return self._add(path, True)
|
||||
|
||||
|
||||
# Server info.
|
||||
|
||||
def cmd_status(self, conn):
|
||||
super(Server, self).cmd_status(conn)
|
||||
for line in super(Server, self).cmd_status(conn):
|
||||
yield line
|
||||
if self.current_index > -1:
|
||||
item = self.playlist[self.current_index]
|
||||
|
||||
conn.send(u'bitrate: ' + unicode(item.bitrate/1000))
|
||||
yield u'bitrate: ' + unicode(item.bitrate/1000)
|
||||
#fixme: missing 'audio'
|
||||
|
||||
(pos, total) = self.player.time()
|
||||
conn.send(u'time: ' + unicode(pos) + u':' + unicode(total))
|
||||
yield u'time: ' + unicode(pos) + u':' + unicode(total)
|
||||
|
||||
#fixme: also missing 'updating_db'
|
||||
|
||||
|
|
@ -943,14 +930,14 @@ class Server(BaseServer):
|
|||
result = c.execute(statement).fetchone()
|
||||
artists, albums = result[0], result[1]
|
||||
|
||||
conn.send(u'artists: ' + unicode(artists),
|
||||
u'albums: ' + unicode(albums),
|
||||
u'songs: ' + unicode(songs),
|
||||
u'uptime: ' + unicode(int(time.time() - self.startup_time)),
|
||||
u'playtime: ' + u'0', #fixme
|
||||
u'db_playtime: ' + unicode(int(totaltime)),
|
||||
u'db_update: ' + unicode(int(self.startup_time)), #fixme
|
||||
)
|
||||
yield (u'artists: ' + unicode(artists),
|
||||
u'albums: ' + unicode(albums),
|
||||
u'songs: ' + unicode(songs),
|
||||
u'uptime: ' + unicode(int(time.time() - self.startup_time)),
|
||||
u'playtime: ' + u'0', #fixme
|
||||
u'db_playtime: ' + unicode(int(totaltime)),
|
||||
u'db_update: ' + unicode(int(self.startup_time)), #fixme
|
||||
)
|
||||
|
||||
|
||||
# Searching.
|
||||
|
|
@ -974,7 +961,7 @@ class Server(BaseServer):
|
|||
searching.
|
||||
"""
|
||||
for tag in self.tagtype_map:
|
||||
conn.send(u'tagtype: ' + tag)
|
||||
yield u'tagtype: ' + tag
|
||||
|
||||
def _tagtype_lookup(self, tag):
|
||||
"""Uses `tagtype_map` to look up the beets column name for an
|
||||
|
|
@ -1017,7 +1004,7 @@ class Server(BaseServer):
|
|||
beets.library.AnySubstringQuery,
|
||||
kv)
|
||||
for item in self.lib.items(query=query):
|
||||
conn.send(*self._item_info(item))
|
||||
yield self._item_info(item)
|
||||
|
||||
def cmd_find(self, conn, *kv):
|
||||
"""Perform an exact match for items."""
|
||||
|
|
@ -1025,7 +1012,7 @@ class Server(BaseServer):
|
|||
None,
|
||||
kv)
|
||||
for item in self.lib.items(query=query):
|
||||
conn.send(*self._item_info(item))
|
||||
yield self._item_info(item)
|
||||
|
||||
def cmd_list(self, conn, show_tag, *kv):
|
||||
"""List distinct metadata values for show_tag, possibly
|
||||
|
|
@ -1051,8 +1038,8 @@ class Server(BaseServer):
|
|||
_, key = self._tagtype_lookup(tag)
|
||||
query = beets.library.MatchQuery(key, value)
|
||||
songs, playtime = query.count(self.lib)
|
||||
conn.send(u'songs: ' + unicode(songs),
|
||||
u'playtime: ' + unicode(int(playtime)))
|
||||
yield u'songs: ' + unicode(songs)
|
||||
yield u'playtime: ' + unicode(int(playtime))
|
||||
|
||||
|
||||
# "Outputs." Just a dummy implementation because we don't control
|
||||
|
|
@ -1060,10 +1047,10 @@ class Server(BaseServer):
|
|||
|
||||
def cmd_outputs(self, conn):
|
||||
"""List the available outputs."""
|
||||
conn.send(u'outputid: 0',
|
||||
u'outputname: gstreamer',
|
||||
u'outputenabled: 1',
|
||||
)
|
||||
yield (u'outputid: 0',
|
||||
u'outputname: gstreamer',
|
||||
u'outputenabled: 1',
|
||||
)
|
||||
|
||||
def cmd_enableoutput(self, conn, output_id):
|
||||
output_id = cast_arg(int, output_id)
|
||||
|
|
@ -1160,4 +1147,3 @@ class BPDPlugin(BeetsPlugin):
|
|||
|
||||
cmd.func = func
|
||||
return [cmd]
|
||||
|
||||
|
|
|
|||
312
beetsplug/bpd/bluelet.py
Normal file
312
beetsplug/bpd/bluelet.py
Normal file
|
|
@ -0,0 +1,312 @@
|
|||
"""Extremely simple pure-Python implementation of coroutine-style
|
||||
asynchronous socket I/O. Inspired by, but inferior to, Eventlet.
|
||||
Bluelet can also be thought of as a less-terrible replacement for
|
||||
asyncore.
|
||||
|
||||
Bluelet: easy concurrency without all the messy parallelism.
|
||||
"""
|
||||
import socket
|
||||
import select
|
||||
import sys
|
||||
import types
|
||||
|
||||
|
||||
# Basic events used for thread scheduling.
|
||||
|
||||
class Event(object):
|
||||
pass
|
||||
class WaitableEvent(Event):
|
||||
def waitables(self):
|
||||
"""Return "waitable" objects to pass to select. Should return
|
||||
three iterables for input readiness, output readiness, and
|
||||
exceptional conditions (i.e., the three lists passed to
|
||||
select()).
|
||||
"""
|
||||
return (), (), ()
|
||||
def fire(self):
|
||||
pass
|
||||
|
||||
class ValueEvent(Event):
|
||||
"""An event that does nothing but return a fixed value."""
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
class ExceptionEvent(Event):
|
||||
"""Raise an exception at the yield point. Used internally."""
|
||||
def __init__(self, exc_info):
|
||||
self.exc_info = exc_info
|
||||
|
||||
class SpawnEvent(object):
|
||||
def __init__(self, coro):
|
||||
self.spawned = coro
|
||||
|
||||
class DelegationEvent(object):
|
||||
def __init__(self, coro):
|
||||
self.spawned = coro
|
||||
|
||||
class ReturnEvent(object):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
|
||||
# Core logic for executing and scheduling threads.
|
||||
|
||||
def _event_select(events):
|
||||
"""Perform a select() over all the Events provided, returning the
|
||||
ones ready to be fired.
|
||||
"""
|
||||
# Gather waitables.
|
||||
waitable_to_event = {}
|
||||
rlist, wlist, xlist = [], [], []
|
||||
for event in events:
|
||||
if isinstance(event, WaitableEvent):
|
||||
r, w, x = event.waitables()
|
||||
rlist += r
|
||||
wlist += w
|
||||
xlist += x
|
||||
for waitable in r + w + x:
|
||||
waitable_to_event[waitable] = event
|
||||
|
||||
# Perform select() if we have any waitables.
|
||||
if rlist or wlist or xlist:
|
||||
rready, wready, xready = select.select(rlist, wlist, xlist)
|
||||
ready = rready + wready + xready
|
||||
else:
|
||||
ready = []
|
||||
|
||||
# Gather ready events corresponding to the ready waitables.
|
||||
ready_events = set()
|
||||
for waitable in ready:
|
||||
ready_events.add(waitable_to_event[waitable])
|
||||
return ready_events
|
||||
|
||||
class ThreadException(Exception):
|
||||
def __init__(self, coro, exc_info):
|
||||
self.coro = coro
|
||||
self.exc_info = exc_info
|
||||
def reraise(self):
|
||||
raise self.exc_info[0], self.exc_info[1], self.exc_info[2]
|
||||
|
||||
def run(root_coro):
|
||||
# The "threads" dictionary keeps track of all the currently-
|
||||
# executing coroutines. It maps coroutines to their currenly
|
||||
# "blocking" event.
|
||||
threads = {root_coro: ValueEvent(None)}
|
||||
|
||||
# When one thread delegates to another thread, its execution is
|
||||
# suspended until the delegate completes. This dictionary keeps
|
||||
# track of each running delegate's delegator.
|
||||
delegators = {}
|
||||
|
||||
def advance_thread(coro, value, is_exc=False):
|
||||
"""After an event is fired, run a given coroutine associated with
|
||||
it in the threads dict until it yields again. If the coroutine
|
||||
exits, then the thread is removed from the pool. If the coroutine
|
||||
raises an exception, it is reraised in a ThreadException. If
|
||||
is_exc is True, then the value must be an exc_info tuple and the
|
||||
exception is thrown into the coroutine.
|
||||
"""
|
||||
try:
|
||||
if is_exc:
|
||||
next_event = coro.throw(*value)
|
||||
else:
|
||||
next_event = coro.send(value)
|
||||
except StopIteration:
|
||||
# Thread is done.
|
||||
del threads[coro]
|
||||
if coro in delegators:
|
||||
# Resume delegator.
|
||||
threads[delegators[coro]] = ValueEvent(None)
|
||||
except:
|
||||
# Thread raised some other exception.
|
||||
del threads[coro]
|
||||
raise ThreadException(coro, sys.exc_info())
|
||||
else:
|
||||
threads[coro] = next_event
|
||||
|
||||
# Continue advancing threads until root thread exits.
|
||||
exit_te = None
|
||||
while threads:
|
||||
try:
|
||||
# Look for events that can be run immediately. Continue
|
||||
# running immediate events until nothing is ready.
|
||||
while True:
|
||||
have_ready = False
|
||||
for coro, event in threads.items():
|
||||
if isinstance(event, SpawnEvent):
|
||||
threads[event.spawned] = ValueEvent(None) # Spawn.
|
||||
advance_thread(coro, None)
|
||||
have_ready = True
|
||||
elif isinstance(event, ValueEvent):
|
||||
advance_thread(coro, event.value)
|
||||
have_ready = True
|
||||
elif isinstance(event, ExceptionEvent):
|
||||
advance_thread(coro, event.exc_info, True)
|
||||
have_ready = True
|
||||
elif isinstance(event, DelegationEvent):
|
||||
del threads[coro] # Suspend.
|
||||
threads[event.spawned] = ValueEvent(None) # Spawn.
|
||||
delegators[event.spawned] = coro
|
||||
have_ready = True
|
||||
elif isinstance(event, ReturnEvent):
|
||||
# Thread is done.
|
||||
del threads[coro]
|
||||
if coro in delegators:
|
||||
threads[delegators[coro]] = ValueEvent(event.value)
|
||||
have_ready = True
|
||||
|
||||
# Only start the select when nothing else is ready.
|
||||
if not have_ready:
|
||||
break
|
||||
|
||||
# Wait and fire.
|
||||
event2coro = dict((v,k) for k,v in threads.iteritems())
|
||||
for event in _event_select(threads.values()):
|
||||
value = event.fire()
|
||||
advance_thread(event2coro[event], value)
|
||||
|
||||
except ThreadException, te:
|
||||
# Exception raised from inside a thread.
|
||||
event = ExceptionEvent(te.exc_info)
|
||||
if te.coro in delegators:
|
||||
# The thread is a delegate. Raise exception in its
|
||||
# delegator.
|
||||
threads[delegators[te.coro]] = event
|
||||
else:
|
||||
# The thread is root-level. Raise in client code.
|
||||
exit_te = te
|
||||
break
|
||||
|
||||
except:
|
||||
# For instance, KeyboardInterrupt during select(). Raise
|
||||
# into root thread and terminate others.
|
||||
threads = {root_coro: ExceptionEvent(sys.exc_info())}
|
||||
|
||||
# If any threads still remain, kill them.
|
||||
for coro in threads:
|
||||
coro.close()
|
||||
|
||||
# If we're exiting with an exception, raise it in the client.
|
||||
if exit_te:
|
||||
exit_te.reraise()
|
||||
|
||||
|
||||
# Sockets and their associated events.
|
||||
|
||||
class AcceptEvent(WaitableEvent):
|
||||
def __init__(self, listener):
|
||||
self.listener = listener
|
||||
def waitables(self):
|
||||
return (self.listener.sock,), (), ()
|
||||
def fire(self):
|
||||
sock, addr = self.listener.sock.accept()
|
||||
return Connection(sock, addr)
|
||||
class Listener(object):
|
||||
def __init__(self, host, port):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.sock.bind((host, port))
|
||||
self.sock.listen(5)
|
||||
def accept(self):
|
||||
return AcceptEvent(self)
|
||||
def close(self):
|
||||
self.sock.close()
|
||||
|
||||
class ReceiveEvent(WaitableEvent):
|
||||
def __init__(self, conn, bufsize):
|
||||
self.conn = conn
|
||||
self.bufsize = bufsize
|
||||
def waitables(self):
|
||||
return (self.conn.sock,), (), ()
|
||||
def fire(self):
|
||||
return self.conn.sock.recv(self.bufsize)
|
||||
class SendEvent(WaitableEvent):
|
||||
def __init__(self, conn, data, sendall=False):
|
||||
self.conn = conn
|
||||
self.data = data
|
||||
self.sendall = sendall
|
||||
def waitables(self):
|
||||
return (), (self.conn.sock,), ()
|
||||
def fire(self):
|
||||
if self.sendall:
|
||||
return self.conn.sock.sendall(self.data)
|
||||
else:
|
||||
return self.conn.sock.send(self.data)
|
||||
class Connection(object):
|
||||
def __init__(self, sock, addr):
|
||||
self.sock = sock
|
||||
self.addr = addr
|
||||
self._buf = ''
|
||||
def close(self):
|
||||
self.sock.close()
|
||||
def recv(self, size):
|
||||
if self._buf:
|
||||
# We already have data read previously.
|
||||
out = self._buf[:size]
|
||||
self._buf = self._buf[size:]
|
||||
return ValueEvent(out)
|
||||
else:
|
||||
return ReceiveEvent(self, size)
|
||||
def send(self, data):
|
||||
return SendEvent(self, data)
|
||||
def sendall(self, data):
|
||||
return SendEvent(self, data, True)
|
||||
def readline(self, terminator="\n", bufsize=1024):
|
||||
def line_reader():
|
||||
while True:
|
||||
if terminator in self._buf:
|
||||
line, self._buf = self._buf.split(terminator, 1)
|
||||
line += terminator
|
||||
yield ReturnEvent(line)
|
||||
break
|
||||
data = yield self.recv(bufsize)
|
||||
if data:
|
||||
self._buf += data
|
||||
else:
|
||||
line = self._buf
|
||||
self._buf = ''
|
||||
yield ReturnEvent(line)
|
||||
break
|
||||
return DelegationEvent(line_reader())
|
||||
|
||||
|
||||
# Public interface for threads; each returns an event object that
|
||||
# can immediately be "yield"ed.
|
||||
|
||||
def null():
|
||||
return ValueEvent(None)
|
||||
|
||||
def spawn(coro):
|
||||
if not isinstance(coro, types.GeneratorType):
|
||||
raise ValueError('%s is not a coroutine' % str(coro))
|
||||
return SpawnEvent(coro)
|
||||
|
||||
def call(coro):
|
||||
if not isinstance(coro, types.GeneratorType):
|
||||
raise ValueError('%s is not a coroutine' % str(coro))
|
||||
return DelegationEvent(coro)
|
||||
|
||||
def end(value = None):
|
||||
return ReturnEvent(value)
|
||||
|
||||
|
||||
# Convenience function for running socket servers.
|
||||
|
||||
def server(host, port, func):
|
||||
def handler(conn):
|
||||
try:
|
||||
yield call(func(conn))
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
listener = Listener(host, port)
|
||||
try:
|
||||
while True:
|
||||
conn = yield listener.accept()
|
||||
yield spawn(handler(conn))
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
listener.close()
|
||||
Loading…
Reference in a new issue