initial ad-hoc implementation of threaded tagger

This commit is contained in:
Adrian Sampson 2010-07-31 17:02:25 -07:00
parent 2fecb1c8e8
commit 45ee9b210c

View file

@ -18,6 +18,8 @@ interface.
import os
import logging
from threading import Thread
from Queue import Queue
from beets import ui
from beets.ui import print_
@ -40,6 +42,7 @@ DEFAULT_IMPORT_COPY = True
DEFAULT_IMPORT_WRITE = True
DEFAULT_IMPORT_AUTOT = True
DEFAULT_IMPORT_ART = True
DEFAULT_THREADED = True
# Autotagger utilities and support.
@ -310,7 +313,7 @@ def apply_choices(lib, copy, write, art):
# Importer main functions.
def autotag_sequential(lib, paths, copy, write, logfile, art):
"""Autotags an imports the album in the directory in a single-
"""Autotags and imports the albums in the directory in a single-
threaded manner. lib is the Library to import into. If copy, then
items are copied into the destination directory. If write, then
new metadata is written back to the files' tags. If logfile is
@ -337,6 +340,100 @@ def autotag_sequential(lib, paths, copy, write, logfile, art):
continue
apply_coro.send((items, info))
def _reopen_lib(lib):
if isinstance(lib, library.Library):
return library.Library(
lib.path,
lib.directory,
lib.path_format,
lib.art_filename,
)
else:
return lib
CHANNEL_POISON = 'CHANNEL_POISION'
class ReadAlbumsThread(Thread):
def __init__(self, paths, out_queue):
super(ReadAlbumsThread, self).__init__()
self.gen = read_albums(paths)
self.out_queue = out_queue
def run(self):
for items in self.gen:
self.out_queue.put(items)
self.out_queue.put(CHANNEL_POISON)
class InitialLookupThread(Thread):
def __init__(self, in_queue, out_queue):
super(InitialLookupThread, self).__init__()
self.coro = initial_lookup()
self.coro.next()
self.in_queue, self.out_queue = in_queue, out_queue
def run(self):
while True:
items = self.in_queue.get()
if items is CHANNEL_POISON:
break
cur_artist, cur_album, candidates, rec = self.coro.send(items)
self.out_queue.put((items, cur_artist, cur_album,
candidates, rec))
self.out_queue.put(CHANNEL_POISON)
class UIThread(Thread):
def __init__(self, lib, logfile, in_queue, out_queue):
super(UIThread, self).__init__()
self.lib, self.logfile = lib, logfile
self.in_queue, self.out_queue = in_queue, out_queue
def run(self):
self.coro = user_query(_reopen_lib(self.lib), self.logfile)
self.coro.next()
while True:
msg = self.in_queue.get()
if msg is CHANNEL_POISON:
break
items = msg[0]
info = self.coro.send(msg)
if info is None:
# Skip this album.
continue
self.out_queue.put((items, info))
self.out_queue.put(CHANNEL_POISON)
class ApplyThread(Thread):
def __init__(self, lib, copy, write, art, in_queue):
super(ApplyThread, self).__init__()
self.lib, self.copy, self.write, self.art = lib, copy, write, art
self.in_queue = in_queue
def run(self):
self.coro = apply_choices(_reopen_lib(self.lib), self.copy,
self.write, self.art)
self.coro.next()
while True:
msg = self.in_queue.get()
if msg is CHANNEL_POISON:
break
self.coro.send(msg)
CHANNEL_SIZE = 10
def autotag_threaded(lib, paths, copy, write, logfile, art):
"""Autotags and imports albums using multiple threads. A drop-in
replacement for autotag_sequential.
"""
q_read_to_lookup = Queue(CHANNEL_SIZE)
q_lookup_to_user = Queue(CHANNEL_SIZE)
q_user_to_apply = Queue(CHANNEL_SIZE)
reader_thread = ReadAlbumsThread(paths, q_read_to_lookup)
lookup_thread = InitialLookupThread(q_read_to_lookup, q_lookup_to_user)
ui_thread = UIThread(lib, logfile, q_lookup_to_user, q_user_to_apply)
apply_thread = ApplyThread(lib, copy, write, art, q_user_to_apply)
reader_thread.start()
lookup_thread.start()
ui_thread.start()
apply_thread.start()
reader_thread.join()
lookup_thread.join()
ui_thread.join()
apply_thread.join()
def simple_import(lib, paths, copy):
"""Imports all the albums found in the paths without attempting to
autotag them. The behavior is similar to an import in which the
@ -351,8 +448,7 @@ def simple_import(lib, paths, copy):
# The import command.
def import_files(lib, paths, copy=True, write=True, autot=True,
logpath=None, art=True):
def import_files(lib, paths, copy, write, autot, logpath, art, threaded):
"""Import the files in the given list of paths, tagging each leaf
directory as an album. If copy, then the files are copied into
the library folder. If write, then new metadata is written to the
@ -369,7 +465,10 @@ def import_files(lib, paths, copy=True, write=True, autot=True,
# Perform the import.
if autot:
autotag_sequential(lib, paths, copy, write, logfile, art)
if threaded:
autotag_threaded(lib, paths, copy, write, logfile, art)
else:
autotag_sequential(lib, paths, copy, write, logfile, art)
else:
just_import(lib, paths, copy)
@ -409,7 +508,9 @@ def import_func(lib, config, opts, args):
art = opts.art if opts.art is not None else \
ui.config_val(config, 'beets', 'import_art',
DEFAULT_IMPORT_ART, bool)
import_files(lib, args, copy, write, autot, opts.logpath, art)
threaded = ui.config_val(config, 'beets', 'threaded',
DEFAULT_THREADED, bool)
import_files(lib, args, copy, write, autot, opts.logpath, art, threaded)
import_cmd.func = import_func
default_commands.append(import_cmd)