diff --git a/beets/ui/commands.py b/beets/ui/commands.py index 77252d4f2..740311e5f 100644 --- a/beets/ui/commands.py +++ b/beets/ui/commands.py @@ -18,6 +18,8 @@ interface. import os import logging +from threading import Thread +from Queue import Queue from beets import ui from beets.ui import print_ @@ -40,6 +42,7 @@ DEFAULT_IMPORT_COPY = True DEFAULT_IMPORT_WRITE = True DEFAULT_IMPORT_AUTOT = True DEFAULT_IMPORT_ART = True +DEFAULT_THREADED = True # Autotagger utilities and support. @@ -310,7 +313,7 @@ def apply_choices(lib, copy, write, art): # Importer main functions. def autotag_sequential(lib, paths, copy, write, logfile, art): - """Autotags an imports the album in the directory in a single- + """Autotags and imports the albums in the directory in a single- threaded manner. lib is the Library to import into. If copy, then items are copied into the destination directory. If write, then new metadata is written back to the files' tags. If logfile is @@ -337,6 +340,100 @@ def autotag_sequential(lib, paths, copy, write, logfile, art): continue apply_coro.send((items, info)) + +def _reopen_lib(lib): + if isinstance(lib, library.Library): + return library.Library( + lib.path, + lib.directory, + lib.path_format, + lib.art_filename, + ) + else: + return lib + +CHANNEL_POISON = 'CHANNEL_POISION' +class ReadAlbumsThread(Thread): + def __init__(self, paths, out_queue): + super(ReadAlbumsThread, self).__init__() + self.gen = read_albums(paths) + self.out_queue = out_queue + def run(self): + for items in self.gen: + self.out_queue.put(items) + self.out_queue.put(CHANNEL_POISON) +class InitialLookupThread(Thread): + def __init__(self, in_queue, out_queue): + super(InitialLookupThread, self).__init__() + self.coro = initial_lookup() + self.coro.next() + self.in_queue, self.out_queue = in_queue, out_queue + def run(self): + while True: + items = self.in_queue.get() + if items is CHANNEL_POISON: + break + cur_artist, cur_album, candidates, rec = self.coro.send(items) + self.out_queue.put((items, cur_artist, cur_album, + candidates, rec)) + self.out_queue.put(CHANNEL_POISON) +class UIThread(Thread): + def __init__(self, lib, logfile, in_queue, out_queue): + super(UIThread, self).__init__() + self.lib, self.logfile = lib, logfile + self.in_queue, self.out_queue = in_queue, out_queue + def run(self): + self.coro = user_query(_reopen_lib(self.lib), self.logfile) + self.coro.next() + while True: + msg = self.in_queue.get() + if msg is CHANNEL_POISON: + break + items = msg[0] + info = self.coro.send(msg) + if info is None: + # Skip this album. + continue + self.out_queue.put((items, info)) + self.out_queue.put(CHANNEL_POISON) +class ApplyThread(Thread): + def __init__(self, lib, copy, write, art, in_queue): + super(ApplyThread, self).__init__() + self.lib, self.copy, self.write, self.art = lib, copy, write, art + self.in_queue = in_queue + def run(self): + self.coro = apply_choices(_reopen_lib(self.lib), self.copy, + self.write, self.art) + self.coro.next() + while True: + msg = self.in_queue.get() + if msg is CHANNEL_POISON: + break + self.coro.send(msg) +CHANNEL_SIZE = 10 +def autotag_threaded(lib, paths, copy, write, logfile, art): + """Autotags and imports albums using multiple threads. A drop-in + replacement for autotag_sequential. + """ + q_read_to_lookup = Queue(CHANNEL_SIZE) + q_lookup_to_user = Queue(CHANNEL_SIZE) + q_user_to_apply = Queue(CHANNEL_SIZE) + + reader_thread = ReadAlbumsThread(paths, q_read_to_lookup) + lookup_thread = InitialLookupThread(q_read_to_lookup, q_lookup_to_user) + ui_thread = UIThread(lib, logfile, q_lookup_to_user, q_user_to_apply) + apply_thread = ApplyThread(lib, copy, write, art, q_user_to_apply) + + reader_thread.start() + lookup_thread.start() + ui_thread.start() + apply_thread.start() + + reader_thread.join() + lookup_thread.join() + ui_thread.join() + apply_thread.join() + def simple_import(lib, paths, copy): """Imports all the albums found in the paths without attempting to autotag them. The behavior is similar to an import in which the @@ -351,8 +448,7 @@ def simple_import(lib, paths, copy): # The import command. -def import_files(lib, paths, copy=True, write=True, autot=True, - logpath=None, art=True): +def import_files(lib, paths, copy, write, autot, logpath, art, threaded): """Import the files in the given list of paths, tagging each leaf directory as an album. If copy, then the files are copied into the library folder. If write, then new metadata is written to the @@ -369,7 +465,10 @@ def import_files(lib, paths, copy=True, write=True, autot=True, # Perform the import. if autot: - autotag_sequential(lib, paths, copy, write, logfile, art) + if threaded: + autotag_threaded(lib, paths, copy, write, logfile, art) + else: + autotag_sequential(lib, paths, copy, write, logfile, art) else: just_import(lib, paths, copy) @@ -409,7 +508,9 @@ def import_func(lib, config, opts, args): art = opts.art if opts.art is not None else \ ui.config_val(config, 'beets', 'import_art', DEFAULT_IMPORT_ART, bool) - import_files(lib, args, copy, write, autot, opts.logpath, art) + threaded = ui.config_val(config, 'beets', 'threaded', + DEFAULT_THREADED, bool) + import_files(lib, args, copy, write, autot, opts.logpath, art, threaded) import_cmd.func = import_func default_commands.append(import_cmd)