sync with latest bluelet version (824609773a85)

This commit is contained in:
Adrian Sampson 2011-12-14 14:11:49 -08:00
parent 59b4338f81
commit fed9e206c0

View file

@ -94,20 +94,27 @@ def _event_select(events):
rlist += r
wlist += w
xlist += x
for waitable in r + w + x:
waitable_to_event[waitable] = event
for waitable in r:
waitable_to_event[('r', waitable)] = event
for waitable in w:
waitable_to_event[('w', waitable)] = event
for waitable in x:
waitable_to_event[('x', waitable)] = event
# Perform select() if we have any waitables.
if rlist or wlist or xlist:
rready, wready, xready = select.select(rlist, wlist, xlist)
ready = rready + wready + xready
else:
ready = []
rready, wready, xready = (), (), ()
# Gather ready events corresponding to the ready waitables.
ready_events = set()
for waitable in ready:
ready_events.add(waitable_to_event[waitable])
for ready in rready:
ready_events.add(waitable_to_event[('r', ready)])
for ready in wready:
ready_events.add(waitable_to_event[('w', ready)])
for ready in xready:
ready_events.add(waitable_to_event[('x', ready)])
return ready_events
class ThreadException(Exception):
@ -118,8 +125,12 @@ class ThreadException(Exception):
raise self.exc_info[0], self.exc_info[1], self.exc_info[2]
def run(root_coro):
"""Schedules a coroutine, running it to completion. This
encapsulates the Bluelet scheduler, which the root coroutine can
add to by spawning new coroutines.
"""
# The "threads" dictionary keeps track of all the currently-
# executing coroutines. It maps coroutines to their currently
# executing coroutines. It maps coroutines to their currenly
# "blocking" event.
threads = {root_coro: ValueEvent(None)}
@ -153,6 +164,9 @@ def run(root_coro):
del threads[coro]
raise ThreadException(coro, sys.exc_info())
else:
if isinstance(next_event, types.GeneratorType):
# Automatically invoke sub-coroutines.
next_event = DelegationEvent(next_event)
threads[coro] = next_event
# Continue advancing threads until root thread exits.
@ -279,14 +293,20 @@ class SendEvent(WaitableEvent):
return self.conn.sock.sendall(self.data)
else:
return self.conn.sock.send(self.data)
class Connection(object):
"""A socket-esque object for communicating asychronously on
network sockets.
"""
def __init__(self, sock, addr):
self.sock = sock
self.addr = addr
self._buf = ''
def close(self):
"""Close the connection."""
self.sock.close()
def recv(self, size):
"""Read at most size bytes of data from the socket."""
if self._buf:
# We already have data read previously.
out = self._buf[:size]
@ -295,49 +315,64 @@ class Connection(object):
else:
return ReceiveEvent(self, size)
def send(self, data):
"""Sends data on the socket, returning the number of bytes
successfully sent.
"""
return SendEvent(self, data)
def sendall(self, data):
"""Send all of data on the socket."""
return SendEvent(self, data, True)
def readline(self, terminator="\n", bufsize=1024):
def line_reader():
while True:
if terminator in self._buf:
line, self._buf = self._buf.split(terminator, 1)
line += terminator
yield ReturnEvent(line)
break
data = yield self.recv(bufsize)
if data:
self._buf += data
else:
line = self._buf
self._buf = ''
yield ReturnEvent(line)
break
return DelegationEvent(line_reader())
"""Reads a line (delimited by terminator) from the socket."""
while True:
if terminator in self._buf:
line, self._buf = self._buf.split(terminator, 1)
line += terminator
yield ReturnEvent(line)
break
data = yield ReceiveEvent(self, bufsize)
if data:
self._buf += data
else:
line = self._buf
self._buf = ''
yield ReturnEvent(line)
break
# Public interface for threads; each returns an event object that
# can immediately be "yield"ed.
def null():
"""Event: yield to the scheduler without doing anything special.
"""
return ValueEvent(None)
def spawn(coro):
"""Event: add another coroutine to the scheduler. Both the parent
and child coroutines run concurrently.
"""
if not isinstance(coro, types.GeneratorType):
raise ValueError('%s is not a coroutine' % str(coro))
return SpawnEvent(coro)
def call(coro):
"""Event: delegate to another coroutine. The current coroutine
is resumed once the sub-coroutine finishes. If the sub-coroutine
returns a value using end(), then this event returns that value.
"""
if not isinstance(coro, types.GeneratorType):
raise ValueError('%s is not a coroutine' % str(coro))
return DelegationEvent(coro)
def end(value = None):
"""Ends the coroutine and returns a value to its delegator."""
"""Event: ends the coroutine and returns a value to its
delegator.
"""
return ReturnEvent(value)
def read(fd, bufsize = None):
"""Event: read from a file descriptor asynchronously."""
if bufsize is None:
# Read all.
def reader():
@ -354,15 +389,29 @@ def read(fd, bufsize = None):
return ReadEvent(fd, bufsize)
def write(fd, data):
"""Event: write to a file descriptor asynchronously."""
return WriteEvent(fd, data)
def connect(host, port):
"""Event: connect to a network address and return a Connection
object for communicating on the socket.
"""
addr = (host, port)
sock = socket.create_connection(addr)
return ValueEvent(Connection(sock, addr))
# Convenience function for running socket servers.
def server(host, port, func):
"""A coroutine that runs a network server. Host and port specify the
listening address. func should be a coroutine that takes a single
parameter, a Connection object. The coroutine is invoked for every
incoming connection on the listening socket.
"""
def handler(conn):
try:
yield call(func(conn))
yield func(conn)
finally:
conn.close()