fix memory leak in bluelet, syncing with latest source (#151)

This commit is contained in:
Adrian Sampson 2011-04-21 21:40:28 -07:00
parent 2c81b40952
commit 30080bc520
2 changed files with 54 additions and 1 deletions

1
NEWS
View file

@ -43,6 +43,7 @@
* Fix adding individual tracks in BPD.
* Fix crash when ~/.beetsconfig does not exist.
* Fix occasional BPD crashes with "broken pipe" errors.
* Fix memory leak in BPD (really a leak in Bluelet).
1.0b7
-----

View file

@ -39,17 +39,45 @@ class ExceptionEvent(Event):
self.exc_info = exc_info
class SpawnEvent(object):
"""Add a new coroutine thread to the scheduler."""
def __init__(self, coro):
self.spawned = coro
class DelegationEvent(object):
"""Suspend execution of the current thread, start a new thread and,
once the child thread finished, return control to the parent
thread.
"""
def __init__(self, coro):
self.spawned = coro
class ReturnEvent(object):
"""Return a value the current thread's delegator at the point of
delegation. Ends the current (delegate) thread.
"""
def __init__(self, value):
self.value = value
class ReadEvent(WaitableEvent):
"""Reads from a file-like object."""
def __init__(self, fd, bufsize):
self.fd = fd
self.bufsize = bufsize
def waitables(self):
return (self.fd,), (), ()
def fire(self):
return self.fd.read(self.bufsize)
class WriteEvent(WaitableEvent):
"""Writes to a file-like object."""
def __init__(self, fd, data):
self.fd = fd
self.data = data
def waitable(self):
return (), (self.fd,), ()
def fire(self):
self.fd.write(self.data)
# Core logic for executing and scheduling threads.
@ -119,6 +147,7 @@ def run(root_coro):
if coro in delegators:
# Resume delegator.
threads[delegators[coro]] = ValueEvent(None)
del delegators[coro]
except:
# Thread raised some other exception.
del threads[coro]
@ -155,15 +184,17 @@ def run(root_coro):
del threads[coro]
if coro in delegators:
threads[delegators[coro]] = ValueEvent(event.value)
del delegators[coro]
have_ready = True
# Only start the select when nothing else is ready.
if not have_ready:
break
# Wait and fire.
event2coro = dict((v,k) for k,v in threads.iteritems())
for event in _event_select(threads.values()):
# Run the IO operation, but catch socket errors.
try:
value = event.fire()
except socket.error, exc:
@ -185,6 +216,7 @@ def run(root_coro):
# The thread is a delegate. Raise exception in its
# delegator.
threads[delegators[te.coro]] = event
del delegators[te.coro]
else:
# The thread is root-level. Raise in client code.
exit_te = te
@ -302,8 +334,28 @@ def call(coro):
return DelegationEvent(coro)
def end(value = None):
"""Ends the coroutine and returns a value to its delegator."""
return ReturnEvent(value)
def read(fd, bufsize = None):
if bufsize is None:
# Read all.
def reader():
buf = []
while True:
data = yield read(fd, 1024)
if not data:
break
buf.append(data)
yield ReturnEvent(''.join(buf))
return DelegationEvent(reader())
else:
return ReadEvent(fd, bufsize)
def write(fd, data):
return WriteEvent(fd, data)
# Convenience function for running socket servers.