diff --git a/NEWS b/NEWS index 319367628..4570a02c7 100644 --- a/NEWS +++ b/NEWS @@ -43,6 +43,7 @@ * Fix adding individual tracks in BPD. * Fix crash when ~/.beetsconfig does not exist. * Fix occasional BPD crashes with "broken pipe" errors. +* Fix memory leak in BPD (really a leak in Bluelet). 1.0b7 ----- diff --git a/beetsplug/bpd/bluelet.py b/beetsplug/bpd/bluelet.py index 6bcd38d44..b6f9e7277 100644 --- a/beetsplug/bpd/bluelet.py +++ b/beetsplug/bpd/bluelet.py @@ -39,17 +39,45 @@ class ExceptionEvent(Event): self.exc_info = exc_info class SpawnEvent(object): + """Add a new coroutine thread to the scheduler.""" def __init__(self, coro): self.spawned = coro class DelegationEvent(object): + """Suspend execution of the current thread, start a new thread and, + once the child thread finished, return control to the parent + thread. + """ def __init__(self, coro): self.spawned = coro class ReturnEvent(object): + """Return a value the current thread's delegator at the point of + delegation. Ends the current (delegate) thread. + """ def __init__(self, value): self.value = value +class ReadEvent(WaitableEvent): + """Reads from a file-like object.""" + def __init__(self, fd, bufsize): + self.fd = fd + self.bufsize = bufsize + def waitables(self): + return (self.fd,), (), () + def fire(self): + return self.fd.read(self.bufsize) + +class WriteEvent(WaitableEvent): + """Writes to a file-like object.""" + def __init__(self, fd, data): + self.fd = fd + self.data = data + def waitable(self): + return (), (self.fd,), () + def fire(self): + self.fd.write(self.data) + # Core logic for executing and scheduling threads. @@ -119,6 +147,7 @@ def run(root_coro): if coro in delegators: # Resume delegator. threads[delegators[coro]] = ValueEvent(None) + del delegators[coro] except: # Thread raised some other exception. del threads[coro] @@ -155,15 +184,17 @@ def run(root_coro): del threads[coro] if coro in delegators: threads[delegators[coro]] = ValueEvent(event.value) + del delegators[coro] have_ready = True # Only start the select when nothing else is ready. if not have_ready: break - + # Wait and fire. event2coro = dict((v,k) for k,v in threads.iteritems()) for event in _event_select(threads.values()): + # Run the IO operation, but catch socket errors. try: value = event.fire() except socket.error, exc: @@ -185,6 +216,7 @@ def run(root_coro): # The thread is a delegate. Raise exception in its # delegator. threads[delegators[te.coro]] = event + del delegators[te.coro] else: # The thread is root-level. Raise in client code. exit_te = te @@ -302,8 +334,28 @@ def call(coro): return DelegationEvent(coro) def end(value = None): + """Ends the coroutine and returns a value to its delegator.""" return ReturnEvent(value) +def read(fd, bufsize = None): + if bufsize is None: + # Read all. + def reader(): + buf = [] + while True: + data = yield read(fd, 1024) + if not data: + break + buf.append(data) + yield ReturnEvent(''.join(buf)) + return DelegationEvent(reader()) + + else: + return ReadEvent(fd, bufsize) + +def write(fd, data): + return WriteEvent(fd, data) + # Convenience function for running socket servers.