diff --git a/beets/ui/commands.py b/beets/ui/commands.py index 740311e5f..b9dd839cf 100644 --- a/beets/ui/commands.py +++ b/beets/ui/commands.py @@ -27,6 +27,7 @@ from beets import autotag from beets import library from beets.mediafile import UnreadableFileError, FileTypeError import beets.autotag.art +from beets.ui import pipeline # Global logger. log = logging.getLogger('beets') @@ -197,7 +198,22 @@ def choose_match(items, cur_artist, cur_album, candidates, rec): except autotag.AutotagError: candidates, rec = None, None -# Core autotagger generators and coroutines. +def _reopen_lib(lib): + """Because of limitations in SQLite, a given Library is bound to + the thread in which it was created. This function reopens Library + objects so that they can be used from separate threads. + """ + if isinstance(lib, library.Library): + return library.Library( + lib.path, + lib.directory, + lib.path_format, + lib.art_filename, + ) + else: + return lib + +# Core autotagger pipeline stages. def read_albums(paths): """A generator yielding all the albums (as sets of Items) found in @@ -215,8 +231,8 @@ def read_albums(paths): def initial_lookup(): """A coroutine for performing the initial MusicBrainz lookup for an album. It accepts lists of Items and yields - (cur_artist, cur_album, candidates, rec) tuples. If no match is found, - all of the yielded parameters are None. + (items, cur_artist, cur_album, candidates, rec) tuples. If no match + is found, all of the yielded parameters (except items) are None. """ items = yield while True: @@ -224,7 +240,7 @@ def initial_lookup(): cur_artist, cur_album, candidates, rec = autotag.tag_album(items) except autotag.AutotagError: cur_artist, cur_album, candidates, rec = None, None, None, None - items = yield(cur_artist, cur_album, candidates, rec) + items = yield (items, cur_artist, cur_album, candidates, rec) def user_query(lib, logfile=None): """A coroutine for interfacing with the user about the tagging @@ -233,9 +249,11 @@ def user_query(lib, logfile=None): accepts (items, cur_artist, cur_album, candidates, rec) tuples. items is a set of Items in the album to be tagged; the remaining parameters are the result of an initial lookup from MusicBrainz. - The coroutine yields either a candidate info dict, CHOICE_ASIS, or - None (indicating that the items should not be imported). + The coroutine yields (items, info) pairs where info is either a + candidate info dict or CHOICE_ASIS. May also yield pipeline.BUBBLE, + indicating that the items should not be imported. """ + lib = _reopen_lib(lib) items, cur_artist, cur_album, candidates, rec = yield first = True while True: @@ -254,7 +272,8 @@ def user_query(lib, logfile=None): tag_log(logfile, 'skip', items) # Yield None, indicating that the pipeline should not # progress. - items, cur_artist, cur_album, candidates, rec = yield None + items, cur_artist, cur_album, candidates, rec = \ + yield pipeline.BUBBLE continue # Ensure that we don't have the album already. @@ -271,11 +290,12 @@ def user_query(lib, logfile=None): if count >= 1: print_("This album (%s - %s) is already in the library!" % (artist, album)) - items, cur_artist, cur_album, candidates, rec = yield None + items, cur_artist, cur_album, candidates, rec = \ + yield pipeline.BUBBLE continue # Yield the result and get the next chunk of work. - items, cur_artist, cur_album, candidates, rec = yield info + items, cur_artist, cur_album, candidates, rec = yield (items, info) def apply_choices(lib, copy, write, art): """A coroutine for applying changes to albums during the autotag @@ -284,6 +304,7 @@ def apply_choices(lib, copy, write, art): nothing. items the set of Items to import; info is either a candidate info dictionary or CHOICE_ASIS. """ + lib = _reopen_lib(lib) while True: # Get next chunk of work. items, info = yield @@ -310,142 +331,6 @@ def apply_choices(lib, copy, write, art): # Write the database after each album. lib.save() -# Importer main functions. - -def autotag_sequential(lib, paths, copy, write, logfile, art): - """Autotags and imports the albums in the directory in a single- - threaded manner. lib is the Library to import into. If copy, then - items are copied into the destination directory. If write, then - new metadata is written back to the files' tags. If logfile is - provided, then a log message will be added there if the album is - untaggable. If art, then attempt to download cover art for the - album. - """ - # Set up the various coroutines. - init_lookup_coro = initial_lookup() - init_lookup_coro.next() - user_coro = user_query(lib, logfile) - user_coro.next() - apply_coro = apply_choices(lib, copy, write, art) - apply_coro.next() - - # Crawl albums and send them through the pipeline one at a time. - for items in read_albums(paths): - cur_artist, cur_album, candidates, rec = \ - init_lookup_coro.send(items) - info = user_coro.send((items, cur_artist, cur_album, candidates, rec)) - if info is None: - # User-query coroutine yeilds None when the album - # should be skipped. Bypass the rest of the pipeline. - continue - apply_coro.send((items, info)) - - -def _reopen_lib(lib): - if isinstance(lib, library.Library): - return library.Library( - lib.path, - lib.directory, - lib.path_format, - lib.art_filename, - ) - else: - return lib - -CHANNEL_POISON = 'CHANNEL_POISION' -class ReadAlbumsThread(Thread): - def __init__(self, paths, out_queue): - super(ReadAlbumsThread, self).__init__() - self.gen = read_albums(paths) - self.out_queue = out_queue - def run(self): - for items in self.gen: - self.out_queue.put(items) - self.out_queue.put(CHANNEL_POISON) -class InitialLookupThread(Thread): - def __init__(self, in_queue, out_queue): - super(InitialLookupThread, self).__init__() - self.coro = initial_lookup() - self.coro.next() - self.in_queue, self.out_queue = in_queue, out_queue - def run(self): - while True: - items = self.in_queue.get() - if items is CHANNEL_POISON: - break - cur_artist, cur_album, candidates, rec = self.coro.send(items) - self.out_queue.put((items, cur_artist, cur_album, - candidates, rec)) - self.out_queue.put(CHANNEL_POISON) -class UIThread(Thread): - def __init__(self, lib, logfile, in_queue, out_queue): - super(UIThread, self).__init__() - self.lib, self.logfile = lib, logfile - self.in_queue, self.out_queue = in_queue, out_queue - def run(self): - self.coro = user_query(_reopen_lib(self.lib), self.logfile) - self.coro.next() - while True: - msg = self.in_queue.get() - if msg is CHANNEL_POISON: - break - items = msg[0] - info = self.coro.send(msg) - if info is None: - # Skip this album. - continue - self.out_queue.put((items, info)) - self.out_queue.put(CHANNEL_POISON) -class ApplyThread(Thread): - def __init__(self, lib, copy, write, art, in_queue): - super(ApplyThread, self).__init__() - self.lib, self.copy, self.write, self.art = lib, copy, write, art - self.in_queue = in_queue - def run(self): - self.coro = apply_choices(_reopen_lib(self.lib), self.copy, - self.write, self.art) - self.coro.next() - while True: - msg = self.in_queue.get() - if msg is CHANNEL_POISON: - break - self.coro.send(msg) -CHANNEL_SIZE = 10 -def autotag_threaded(lib, paths, copy, write, logfile, art): - """Autotags and imports albums using multiple threads. A drop-in - replacement for autotag_sequential. - """ - q_read_to_lookup = Queue(CHANNEL_SIZE) - q_lookup_to_user = Queue(CHANNEL_SIZE) - q_user_to_apply = Queue(CHANNEL_SIZE) - - reader_thread = ReadAlbumsThread(paths, q_read_to_lookup) - lookup_thread = InitialLookupThread(q_read_to_lookup, q_lookup_to_user) - ui_thread = UIThread(lib, logfile, q_lookup_to_user, q_user_to_apply) - apply_thread = ApplyThread(lib, copy, write, art, q_user_to_apply) - - reader_thread.start() - lookup_thread.start() - ui_thread.start() - apply_thread.start() - - reader_thread.join() - lookup_thread.join() - ui_thread.join() - apply_thread.join() - -def simple_import(lib, paths, copy): - """Imports all the albums found in the paths without attempting to - autotag them. The behavior is similar to an import in which the - user always chooses the "as-is" option. - """ - for items in read_albums(paths): - if copy: - for item in items: - item.move(lib, True) - lib.add_album(items) - lib.save() - # The import command. def import_files(lib, paths, copy, write, autot, logpath, art, threaded): @@ -465,12 +350,25 @@ def import_files(lib, paths, copy, write, autot, logpath, art, threaded): # Perform the import. if autot: + # Autotag. Set up the pipeline. + pl = pipeline.Pipeline([ + read_albums(paths), + initial_lookup(), + user_query(lib, logfile), + apply_choices(lib, copy, write, art), + ]) if threaded: - autotag_threaded(lib, paths, copy, write, logfile, art) + pl.run_parallel() else: - autotag_sequential(lib, paths, copy, write, logfile, art) + pl.run_sequential() else: - just_import(lib, paths, copy) + # Simple import without autotagging. Always sequential. + for items in read_albums(paths): + if copy: + for item in items: + item.move(lib, True) + lib.add_album(items) + lib.save() # If we were logging, close the file. if logfile: diff --git a/beets/ui/pipeline.py b/beets/ui/pipeline.py new file mode 100644 index 000000000..3b3cd8379 --- /dev/null +++ b/beets/ui/pipeline.py @@ -0,0 +1,246 @@ +# This file is part of beets. +# Copyright 2010, Adrian Sampson. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + +"""Simple but robust implementation of generator/coroutine-based +pipelines in Python. The pipelines may be run either sequentially +(single-threaded) or in parallel (one thread per pipeline stage). + +This implementation supports pipeline bubbles (indications that the +processing for a certain item should abort). To use them, yield the +BUBBLE constant from any stage coroutine except the last. + +In the parallel case, the implementation transparently handles thread +shutdown when the processing is complete and when a stage raises an +exception. +""" +from __future__ import with_statement # for Python 2.5 +from Queue import Queue +from threading import Thread, Lock + +BUBBLE = '__PIPELINE_BUBBLE__' +POISON = '__PIPELINE_POISON__' + +DEFAULT_QUEUE_SIZE = 16 + +class PipelineError(object): + """An indication that an exception occurred in the pipeline. The + object is passed through the pipeline to shut down all threads + before it is raised again in the main thread. + """ + def __init__(self, exc): + self.exc = exc + +class FirstPipelineThread(Thread): + """The thread running the first stage in a parallel pipeline setup. + The coroutine should just be a generator. + """ + def __init__(self, coro, out_queue): + super(FirstPipelineThread, self).__init__() + self.coro = coro + self.out_queue = out_queue + + self.abort_lock = Lock() + self.abort_flag = False + + def run(self): + while True: + # Time to abort? + with self.abort_lock: + if self.abort_flag: + break + + # Get the value from the generator. + try: + msg = self.coro.next() + except StopIteration: + break + except Exception, exc: + self.out_queue.put(PipelineError(exc)) + return + + # Send it to the next stage. + self.out_queue.put(msg) + if msg is BUBBLE: + continue + + # Generator finished; shut down the pipeline. + self.out_queue.put(POISON) + + def abort(self): + """Shut down the pipeline by canceling this thread and + poisoning out_channel. + """ + with self.abort_lock: + self.abort_flag = True + +class MiddlePipelineThread(Thread): + """A thread running any stage in the pipeline except the first or + last. + """ + def __init__(self, coro, in_queue, out_queue): + super(MiddlePipelineThread, self).__init__() + self.coro = coro + self.in_queue = in_queue + self.out_queue = out_queue + + def run(self): + # Prime the coroutine. + self.coro.next() + + while True: + # Get the message from the previous stage. + msg = self.in_queue.get() + if msg is POISON: + break + elif isinstance(msg, PipelineError): + self.out_queue.put(msg) + return + + # Invoke the current stage. + try: + out = self.coro.send(msg) + except Exception, exc: + self.out_queue.put(PipelineError(exc)) + return + + # Send message to next stage. + if out is BUBBLE: + continue + self.out_queue.put(out) + + # Pipeline is shutting down normally. + self.out_queue.put(POISON) + +class LastPipelineThread(Thread): + """A thread running the last stage in a pipeline. The coroutine + should yield nothing. + """ + def __init__(self, coro, in_queue): + super(LastPipelineThread, self).__init__() + self.coro = coro + self.in_queue = in_queue + + def run(self): + # Prime the coroutine. + self.coro.next() + + while True: + # Get the message from the previous stage. + msg = self.in_queue.get() + if msg is POISON: + break + elif isinstance(msg, PipelineError): + self.exc = msg.exc + return + + # Send to consumer. + try: + self.coro.send(msg) + except Exception, exc: + self.exc = exc + return + + # No exception raised in pipeline. + self.exc = None + +class Pipeline(object): + """Represents a staged pattern of work. Each stage in the pipeline + is a coroutine that receives messages from the previous stage and + yields messages to be sent to the next stage. + """ + def __init__(self, stages): + """Makes a new pipeline from a list of coroutines. There must + be at least two stages. + """ + if len(stages) < 2: + raise ValueError('pipeline must have at least two stages') + self.stages = stages + + def run_sequential(self): + """Run the pipeline sequentially in the current thread. The + stages are run one after the other. + """ + # "Prime" the coroutines. + for coro in self.stages[1:]: + coro.next() + + # Begin the pipeline. + for msg in self.stages[0]: + for stage in self.stages[1:]: + msg = stage.send(msg) + if msg is BUBBLE: + # Don't continue to the next stage. + break + + def run_parallel(self, queue_size=DEFAULT_QUEUE_SIZE): + """Run the pipeline in parallel using one thread per stage. The + messages between the stages are stored in queues of the given + size. + """ + queues = [Queue(queue_size) for i in range(len(self.stages)-1)] + threads = [FirstPipelineThread(self.stages[0], queues[0])] + for i in range(1, len(self.stages)-1): + threads.append(MiddlePipelineThread( + self.stages[i], queues[i-1], queues[i] + )) + threads.append(LastPipelineThread(self.stages[-1], queues[-1])) + + # Start threads. + for thread in threads: + thread.start() + + # Wait for termination. + try: + for thread in threads: + thread.join() + except: + # Shut down the pipeline by telling the first thread to + # poison its channel. + threads[0].abort() + raise + + # Was there an exception? + exc = threads[-1].exc + if exc: + raise exc + +# Smoke test. +if __name__ == '__main__': + import time + + def produce(): + for i in range(5): + print 'generating', i + time.sleep(1) + yield i + def work(): + num = yield + while True: + print 'processing', num + time.sleep(2) + num = yield num*2 + def consume(): + while True: + num = yield + time.sleep(1) + print 'received', num + + ts_start = time.time() + Pipeline([produce(), work(), consume()]).run_sequential() + ts_middle = time.time() + Pipeline([produce(), work(), consume()]).run_parallel() + ts_end = time.time() + + print 'Sequential time:', ts_middle - ts_start + print 'Parallel time:', ts_end - ts_middle