The Great Importer Refactoring

I'm shuffling around the feature-creeping importer code to keep it as
interface-agnostic as possible. The "importer" module now takes care of the
basic, increasingly complicated workflow while the ui.commands module is
relegated to containing actual user-interface stuff.
This commit is contained in:
Adrian Sampson 2011-04-10 18:12:47 -07:00
parent aa76be367a
commit 23392525ec
4 changed files with 609 additions and 548 deletions

441
beets/importer.py Normal file
View file

@ -0,0 +1,441 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Provides the basic, interface-agnostic workflow for importing and
autotagging music files.
"""
from __future__ import with_statement # Python 2.5
import os
import logging
import pickle
from beets import ui
from beets.ui import print_
from beets import autotag
from beets import library
import beets.autotag.art
from beets import plugins
from beets.ui import commands
CHOICE_SKIP = 'CHOICE_SKIP'
CHOICE_ASIS = 'CHOICE_ASIS'
CHOICE_TRACKS = 'CHOICE_TRACKS'
CHOICE_MANUAL = 'CHOICE_MANUAL'
CHOICE_ALBUM = 'CHOICE_ALBUM'
QUEUE_SIZE = 128
# Global logger.
log = logging.getLogger('beets')
class ImportAbort(Exception):
"""Raised when the user aborts the tagging operation.
"""
pass
# Utilities.
def tag_log(logfile, status, path):
"""Log a message about a given album to logfile. The status should
reflect the reason the album couldn't be tagged.
"""
if logfile:
print >>logfile, '%s %s' % (status, path)
def _reopen_lib(lib):
"""Because of limitations in SQLite, a given Library is bound to
the thread in which it was created. This function reopens Library
objects so that they can be used from separate threads.
"""
if isinstance(lib, library.Library):
return library.Library(
lib.path,
lib.directory,
lib.path_formats,
lib.art_filename,
)
else:
return lib
def _duplicate_check(lib, artist, album):
"""Check whether the match already exists in the library."""
if artist is None:
# As-is import with no artist. Skip check.
return False
for album_cand in lib.albums(artist):
if album_cand.album == album:
return True
return False
# Utilities for reading and writing the beets progress file, which
# allows long tagging tasks to be resumed when they pause (or crash).
PROGRESS_KEY = 'tagprogress'
def progress_set(toppath, path):
"""Record that tagging for the given `toppath` was successful up to
`path`. If path is None, then clear the progress value (indicating
that the tagging completed).
"""
try:
with open(ui.STATE_FILE) as f:
state = pickle.load(f)
except IOError:
state = {PROGRESS_KEY: {}}
if path is None:
# Remove progress from file.
if toppath in state[PROGRESS_KEY]:
del state[PROGRESS_KEY][toppath]
else:
state[PROGRESS_KEY][toppath] = path
with open(ui.STATE_FILE, 'w') as f:
pickle.dump(state, f)
def progress_get(toppath):
"""Get the last successfully tagged subpath of toppath. If toppath
has no progress information, returns None.
"""
try:
with open(ui.STATE_FILE) as f:
state = pickle.load(f)
except IOError:
return None
return state[PROGRESS_KEY].get(toppath)
# The importer task class.
class ImportTask(object):
"""Represents a single directory to be imported along with its
intermediate state.
"""
__slots__ = ['toppath', 'path', 'items', 'sentinel',
'cur_artist', 'cur_album', 'candidates', 'rec',
'choice_flag', 'info']
def __init__(self, toppath, path=None, items=None):
self.toppath = toppath
self.path = path
self.items = items
self.sentinel = False
@classmethod
def done_sentinel(cls, toppath):
"""Create an ImportTask that indicates the end of a top-level
directory import.
"""
obj = cls(toppath)
obj.sentinel = True
return obj
def set_match(self, cur_artist, cur_album, candidates, rec):
"""Sets the candidates matched by the autotag.tag_album method.
"""
assert not self.sentinel
self.cur_artist = cur_artist
self.cur_album = cur_album
self.candidates = candidates
self.rec = rec
def set_null_match(self):
"""Set the candidate to indicate no match was found."""
self.set_match(None, None, None, None)
def set_choice(self, choice):
"""Given either an (info, items) tuple or a CHOICE_ constant,
indicates that an action has been selected by the user (or
automatically).
"""
assert not self.sentinel
assert choice != CHOICE_MANUAL # Not part of the task structure.
assert choice != CHOICE_ALBUM # Only used internally.
if choice in (CHOICE_SKIP, CHOICE_ASIS, CHOICE_TRACKS):
self.choice_flag = choice
self.info = None
if choice == CHOICE_SKIP:
self.items = None # Items no longer needed.
else:
info, items = choice
self.items = items # Reordered items list.
self.info = info
self.choice_flag = CHOICE_ALBUM # Implicit choice.
def save_progress(self):
"""Updates the progress state to indicate that this album has
finished.
"""
if self.sentinel:
progress_set(self.toppath, None)
else:
progress_set(self.toppath, self.path)
def should_create_album(self):
"""Should an album structure be created for these items?"""
if self.choice_flag in (CHOICE_ALBUM, CHOICE_ASIS):
return True
elif self.choice_flag in (CHOICE_TRACKS, CHOICE_SKIP):
return False
else:
assert False
def should_write_tags(self):
"""Should new info be written to the files' metadata?"""
if self.choice_flag == CHOICE_ALBUM:
return True
elif self.choice_flag in (CHOICE_ASIS, CHOICE_TRACKS, CHOICE_SKIP):
return False
else:
assert False
def should_fetch_art(self):
"""Should album art be downloaded for this album?"""
return self.should_write_tags()
def should_infer_aa(self):
"""When creating an album structure, should the album artist
field be inferred from the plurality of track artists?
"""
assert self.should_create_album()
if self.choice_flag == CHOICE_ALBUM:
# Album artist comes from the info dictionary.
return False
elif self.choice_flag == CHOICE_ASIS:
# As-is imports likely don't have an album artist.
return True
else:
assert False
# Core autotagger pipeline stages.
def read_albums(paths, resume):
"""A generator yielding all the albums (as ImportTask objects) found
in the user-specified list of paths. `progress` specifies whether
the resuming feature should be used. It may be True (resume if
possible), False (never resume), or None (ask).
"""
# Use absolute paths.
paths = [library._normpath(path) for path in paths]
# Check the user-specified directories.
for path in paths:
if not os.path.isdir(library._syspath(path)):
raise ui.UserError('not a directory: ' + path)
# Look for saved progress.
progress = resume is not False
if progress:
resume_dirs = {}
for path in paths:
resume_dir = progress_get(path)
if resume_dir:
# Either accept immediately or prompt for input to decide.
if resume:
do_resume = True
ui.print_('Resuming interrupted import of %s' % path)
else:
do_resume = ui.input_yn("Import of the directory:\n%s"
"\nwas interrupted. Resume (Y/n)?" %
path)
ui.print_()
if do_resume:
resume_dirs[path] = resume_dir
else:
# Clear progress; we're starting from the top.
progress_set(path, None)
for toppath in paths:
# Produce each path.
if progress:
resume_dir = resume_dirs.get(toppath)
for path, items in autotag.albums_in_dir(os.path.expanduser(toppath)):
if progress and resume_dir:
# We're fast-forwarding to resume a previous tagging.
if path == resume_dir:
# We've hit the last good path! Turn off the
# fast-forwarding.
resume_dir = None
continue
yield ImportTask(toppath, path, items)
# Indicate the directory is finished.
yield ImportTask.done_sentinel(toppath)
def initial_lookup():
"""A coroutine for performing the initial MusicBrainz lookup for an
album. It accepts lists of Items and yields
(items, cur_artist, cur_album, candidates, rec) tuples. If no match
is found, all of the yielded parameters (except items) are None.
"""
task = yield
log.debug('Looking up: %s' % task.path)
while True:
if task.sentinel:
task = yield task
continue
try:
task.set_match(*autotag.tag_album(task.items))
except autotag.AutotagError:
task.set_null_match()
task = yield task
def user_query(lib, logfile, color, quiet, quiet_fallback):
"""A coroutine for interfacing with the user about the tagging
process. lib is the Library to import into and logfile may be
a file-like object for logging the import process. The coroutine
accepts and yields ImportTask objects.
"""
lib = _reopen_lib(lib)
first = True
task = None
while True:
task = yield task
if task.sentinel:
continue
# Empty lines between albums.
if not first:
print_()
first = False
# Show current album path.
print_(task.path)
# Ask the user for a choice.
choice = commands.choose_match(task.path, task.items, task.cur_artist,
task.cur_album, task.candidates,
task.rec, color, quiet, quiet_fallback)
task.set_choice(choice)
# Log certain choices.
if choice is CHOICE_ASIS:
tag_log(logfile, 'asis', task.path)
elif choice is CHOICE_SKIP:
tag_log(logfile, 'skip', task.path)
# Check for duplicates if we have a match.
if choice == CHOICE_ASIS or isinstance(choice, tuple):
if choice == CHOICE_ASIS:
artist = task.cur_artist
album = task.cur_album
else:
artist = task.info['artist']
album = task.info['album']
if _duplicate_check(lib, artist, album):
tag_log(logfile, 'duplicate', task.path)
print_("This album is already in the library!")
task.set_choice(CHOICE_SKIP)
def apply_choices(lib, copy, write, art, delete, progress):
"""A coroutine for applying changes to albums during the autotag
process. The parameters to the generator control the behavior of
the import. The coroutine accepts ImportTask objects and yields
nothing.
"""
lib = _reopen_lib(lib)
while True:
task = yield
# Don't do anything if we're skipping the album or we're done.
if task.choice_flag == CHOICE_SKIP or task.sentinel:
if progress:
task.save_progress()
continue
# Change metadata, move, and copy.
if task.should_write_tags():
autotag.apply_metadata(task.items, task.info)
if copy and delete:
old_paths = [os.path.realpath(item.path)
for item in task.items]
for item in task.items:
if copy:
item.move(lib, True, task.should_create_album())
if write and task.should_write_tags():
item.write()
# Add items to library. We consolidate this at the end to avoid
# locking while we do the copying and tag updates.
if task.should_create_album():
# Add an album.
albuminfo = lib.add_album(task.items,
infer_aa = task.should_infer_aa())
else:
# Add tracks.
for item in task.items:
lib.add(item)
# Get album art if requested.
if art and task.should_fetch_art():
artpath = beets.autotag.art.art_for_album(task.info)
if artpath:
albuminfo.set_art(artpath)
# Write the database after each album.
lib.save()
# Announce that we've added an album.
if task.should_create_album():
plugins.send('album_imported', album=albuminfo)
else:
for item in task.items:
plugins.send('item_imported', lib=lib, item=item)
# Finally, delete old files.
if copy and delete:
new_paths = [os.path.realpath(item.path) for item in task.items]
for old_path in old_paths:
# Only delete files that were actually moved.
if old_path not in new_paths:
os.remove(library._syspath(old_path))
# Update progress.
if progress:
task.save_progress()
# Non-autotagged import (always sequential).
#TODO probably no longer necessary; use the same machinery?
def simple_import(lib, paths, copy, delete, resume):
"""Add files from the paths to the library without changing any
tags.
"""
for task in read_albums(paths, resume):
if task.sentinel:
task.save_progress()
continue
if copy:
if delete:
old_paths = [os.path.realpath(item.path) for item in task.items]
for item in task.items:
item.move(lib, True, True)
album = lib.add_album(task.items, True)
lib.save()
# Announce that we added an album.
plugins.send('album_imported', album=album)
if resume is not False:
task.save_progress()
if copy and delete:
new_paths = [os.path.realpath(item.path) for item in task.items]
for old_path in old_paths:
# Only delete files that were actually moved.
if old_path not in new_paths:
os.remove(library._syspath(old_path))
log.info('added album: %s - %s' % (album.albumartist, album.album))

View file

@ -16,18 +16,16 @@
interface.
"""
from __future__ import with_statement # Python 2.5
import os
import logging
import pickle
import sys
from beets import ui
from beets.ui import print_
from beets import autotag
from beets import library
import beets.autotag.art
from beets.ui import pipeline
from beets import plugins
from beets import importer
# Global logger.
log = logging.getLogger('beets')
@ -50,15 +48,9 @@ DEFAULT_IMPORT_RESUME = None # "ask"
DEFAULT_THREADED = True
DEFAULT_COLOR = True
QUEUE_SIZE = 128
VARIOUS_ARTISTS = u'Various Artists'
class ImportAbort(Exception):
"""Raised when the user aborts the tagging operation.
"""
pass
# Autotagger utilities and support.
# Importer utilities and support.
def dist_string(dist, color):
"""Formats a distance (a float) as a similarity percentage string.
@ -131,11 +123,6 @@ def show_change(cur_artist, cur_album, items, info, dist, color=True):
elif cur_track != new_track:
print_(" * %s (%s -> %s)" % (item.title, cur_track, new_track))
CHOICE_SKIP = 'CHOICE_SKIP'
CHOICE_ASIS = 'CHOICE_ASIS'
CHOICE_TRACKS = 'CHOICE_TRACKS'
CHOICE_MANUAL = 'CHOICE_MANUAL'
CHOICE_ALBUM = 'CHOICE_ALBUM'
def choose_candidate(cur_artist, cur_album, candidates, rec, color=True):
"""Given current metadata and a sorted list of
(distance, candidate) pairs, ask the user for a selection
@ -155,15 +142,15 @@ def choose_candidate(cur_artist, cur_album, candidates, rec, color=True):
'Enter U, T, S, E, or B:'
)
if sel == 'u':
return CHOICE_ASIS
return importer.CHOICE_ASIS
elif sel == 't':
return CHOICE_TRACKS
return importer.CHOICE_TRACKS
elif sel == 'e':
return CHOICE_MANUAL
return importer.CHOICE_MANUAL
elif sel == 's':
return CHOICE_SKIP
return importer.CHOICE_SKIP
elif sel == 'b':
raise ImportAbort()
raise importer.ImportAbort()
else:
assert False
@ -192,15 +179,15 @@ def choose_candidate(cur_artist, cur_album, candidates, rec, color=True):
(1, len(candidates))
)
if sel == 's':
return CHOICE_SKIP
return importer.CHOICE_SKIP
elif sel == 'u':
return CHOICE_ASIS
return importer.CHOICE_ASIS
elif sel == 'e':
return CHOICE_MANUAL
return importer.CHOICE_MANUAL
elif sel == 't':
return CHOICE_TRACKS
return importer.CHOICE_TRACKS
elif sel == 'b':
raise ImportAbort()
raise importer.ImportAbort()
else: # Numerical selection.
dist, items, info = candidates[sel-1]
bypass_candidates = False
@ -224,15 +211,15 @@ def choose_candidate(cur_artist, cur_album, candidates, rec, color=True):
elif sel == 'm':
pass
elif sel == 's':
return CHOICE_SKIP
return importer.CHOICE_SKIP
elif sel == 'u':
return CHOICE_ASIS
return importer.CHOICE_ASIS
elif sel == 't':
return CHOICE_TRACKS
return importer.CHOICE_TRACKS
elif sel == 'e':
return CHOICE_MANUAL
return importer.CHOICE_MANUAL
elif sel == 'b':
raise ImportAbort()
raise importer.ImportAbort()
def manual_search():
"""Input an artist and album for manual search."""
@ -240,13 +227,6 @@ def manual_search():
album = raw_input('Album: ').decode(sys.stdin.encoding)
return artist.strip(), album.strip()
def tag_log(logfile, status, path):
"""Log a message about a given album to logfile. The status should
reflect the reason the album couldn't be tagged.
"""
if logfile:
print >>logfile, '%s %s' % (status, path)
def choose_match(path, items, cur_artist, cur_album, candidates,
rec, color, quiet, quiet_fallback):
"""Given an initial autotagging of items, go through an interactive
@ -260,9 +240,9 @@ def choose_match(path, items, cur_artist, cur_album, candidates,
show_change(cur_artist, cur_album, items, info, dist, color)
return info, items
else:
if quiet_fallback == CHOICE_SKIP:
if quiet_fallback == importer.CHOICE_SKIP:
print_('Skipping.')
elif quiet_fallback == CHOICE_ASIS:
elif quiet_fallback == importer.CHOICE_ASIS:
print_('Importing as-is.')
else:
assert(False)
@ -274,10 +254,11 @@ def choose_match(path, items, cur_artist, cur_album, candidates,
choice = choose_candidate(cur_artist, cur_album, candidates, rec, color)
# Choose which tags to use.
if choice in (CHOICE_SKIP, CHOICE_ASIS, CHOICE_TRACKS):
if choice in (importer.CHOICE_SKIP, importer.CHOICE_ASIS,
importer.CHOICE_TRACKS):
# Pass selection to main control flow.
return choice
elif choice is CHOICE_MANUAL:
elif choice is importer.CHOICE_MANUAL:
# Try again with manual search terms.
search_artist, search_album = manual_search()
try:
@ -290,396 +271,6 @@ def choose_match(path, items, cur_artist, cur_album, candidates,
# an (info, items) pair as desired.
return choice
def _reopen_lib(lib):
"""Because of limitations in SQLite, a given Library is bound to
the thread in which it was created. This function reopens Library
objects so that they can be used from separate threads.
"""
if isinstance(lib, library.Library):
return library.Library(
lib.path,
lib.directory,
lib.path_formats,
lib.art_filename,
)
else:
return lib
def _duplicate_check(lib, artist, album):
"""Check whether the match already exists in the library."""
if artist is None:
# As-is import with no artist. Skip check.
return False
for album_cand in lib.albums(artist):
if album_cand.album == album:
return True
return False
# Utilities for reading and writing the beets progress file, which
# allows long tagging tasks to be resumed when they pause (or crash).
PROGRESS_KEY = 'tagprogress'
def progress_set(toppath, path):
"""Record that tagging for the given `toppath` was successful up to
`path`. If path is None, then clear the progress value (indicating
that the tagging completed).
"""
try:
with open(ui.STATE_FILE) as f:
state = pickle.load(f)
except IOError:
state = {PROGRESS_KEY: {}}
if path is None:
# Remove progress from file.
if toppath in state[PROGRESS_KEY]:
del state[PROGRESS_KEY][toppath]
else:
state[PROGRESS_KEY][toppath] = path
with open(ui.STATE_FILE, 'w') as f:
pickle.dump(state, f)
def progress_get(toppath):
"""Get the last successfully tagged subpath of toppath. If toppath
has no progress information, returns None.
"""
try:
with open(ui.STATE_FILE) as f:
state = pickle.load(f)
except IOError:
return None
return state[PROGRESS_KEY].get(toppath)
class ImportTask(object):
"""Represents a single directory to be imported along with its
intermediate state.
"""
__slots__ = ['toppath', 'path', 'items', 'sentinel',
'cur_artist', 'cur_album', 'candidates', 'rec',
'choice_flag', 'info']
def __init__(self, toppath, path=None, items=None):
self.toppath = toppath
self.path = path
self.items = items
self.sentinel = False
@classmethod
def done_sentinel(cls, toppath):
"""Create an ImportTask that indicates the end of a top-level
directory import.
"""
obj = cls(toppath)
obj.sentinel = True
return obj
def set_match(self, cur_artist, cur_album, candidates, rec):
"""Sets the candidates matched by the autotag.tag_album method.
"""
assert not self.sentinel
self.cur_artist = cur_artist
self.cur_album = cur_album
self.candidates = candidates
self.rec = rec
def set_null_match(self):
"""Set the candidate to indicate no match was found."""
self.set_match(None, None, None, None)
def set_choice(self, choice):
"""Given either an (info, items) tuple or a CHOICE_ constant,
indicates that an action has been selected by the user (or
automatically).
"""
assert not self.sentinel
assert choice != CHOICE_MANUAL # Not part of the task structure.
assert choice != CHOICE_ALBUM # Only used internally.
if choice in (CHOICE_SKIP, CHOICE_ASIS, CHOICE_TRACKS):
self.choice_flag = choice
self.info = None
if choice == CHOICE_SKIP:
self.items = None # Items no longer needed.
else:
info, items = choice
self.items = items # Reordered items list.
self.info = info
self.choice_flag = CHOICE_ALBUM # Implicit choice.
def save_progress(self):
"""Updates the progress state to indicate that this album has
finished.
"""
if self.sentinel:
progress_set(self.toppath, None)
else:
progress_set(self.toppath, self.path)
def should_create_album(self):
"""Should an album structure be created for these items?"""
if self.choice_flag in (CHOICE_ALBUM, CHOICE_ASIS):
return True
elif self.choice_flag in (CHOICE_TRACKS, CHOICE_SKIP):
return False
else:
assert False
def should_write_tags(self):
"""Should new info be written to the files' metadata?"""
if self.choice_flag == CHOICE_ALBUM:
return True
elif self.choice_flag in (CHOICE_ASIS, CHOICE_TRACKS, CHOICE_SKIP):
return False
else:
assert False
def should_fetch_art(self):
"""Should album art be downloaded for this album?"""
return self.should_write_tags()
def should_infer_aa(self):
"""When creating an album structure, should the album artist
field be inferred from the plurality of track artists?
"""
assert self.should_create_album()
if self.choice_flag == CHOICE_ALBUM:
# Album artist comes from the info dictionary.
return False
elif self.choice_flag == CHOICE_ASIS:
# As-is imports likely don't have an album artist.
return True
else:
assert False
# Core autotagger pipeline stages.
def read_albums(paths, resume):
"""A generator yielding all the albums (as ImportTask objects) found
in the user-specified list of paths. `progress` specifies whether
the resuming feature should be used. It may be True (resume if
possible), False (never resume), or None (ask).
"""
# Use absolute paths.
paths = [library._normpath(path) for path in paths]
# Check the user-specified directories.
for path in paths:
if not os.path.isdir(library._syspath(path)):
raise ui.UserError('not a directory: ' + path)
# Look for saved progress.
progress = resume is not False
if progress:
resume_dirs = {}
for path in paths:
resume_dir = progress_get(path)
if resume_dir:
# Either accept immediately or prompt for input to decide.
if resume:
do_resume = True
ui.print_('Resuming interrupted import of %s' % path)
else:
do_resume = ui.input_yn("Import of the directory:\n%s"
"\nwas interrupted. Resume (Y/n)?" %
path)
ui.print_()
if do_resume:
resume_dirs[path] = resume_dir
else:
# Clear progress; we're starting from the top.
progress_set(path, None)
for toppath in paths:
# Produce each path.
if progress:
resume_dir = resume_dirs.get(toppath)
for path, items in autotag.albums_in_dir(os.path.expanduser(toppath)):
if progress and resume_dir:
# We're fast-forwarding to resume a previous tagging.
if path == resume_dir:
# We've hit the last good path! Turn off the
# fast-forwarding.
resume_dir = None
continue
yield ImportTask(toppath, path, items)
# Indicate the directory is finished.
yield ImportTask.done_sentinel(toppath)
def initial_lookup():
"""A coroutine for performing the initial MusicBrainz lookup for an
album. It accepts lists of Items and yields
(items, cur_artist, cur_album, candidates, rec) tuples. If no match
is found, all of the yielded parameters (except items) are None.
"""
task = yield
log.debug('Looking up: %s' % task.path)
while True:
if task.sentinel:
task = yield task
continue
try:
task.set_match(*autotag.tag_album(task.items))
except autotag.AutotagError:
task.set_null_match()
task = yield task
def user_query(lib, logfile, color, quiet, quiet_fallback):
"""A coroutine for interfacing with the user about the tagging
process. lib is the Library to import into and logfile may be
a file-like object for logging the import process. The coroutine
accepts (items, cur_artist, cur_album, candidates, rec) tuples.
items is a set of Items in the album to be tagged; the remaining
parameters are the result of an initial lookup from MusicBrainz.
The coroutine yields (toppath, path, items, info) pairs where info
is either a candidate info dict, CHOICE_ASIS, CHOICE_TRACKS, or
None (indicating that the album should not be tagged) and items are
the constituent Item objects, ordered in the case of successful
tagging.
"""
lib = _reopen_lib(lib)
first = True
task = None
while True:
task = yield task
if task.sentinel:
continue
# Empty lines between albums.
if not first:
print_()
first = False
# Show current album path.
print_(task.path)
# Ask the user for a choice.
choice = choose_match(task.path, task.items, task.cur_artist,
task.cur_album, task.candidates, task.rec,
color, quiet, quiet_fallback)
task.set_choice(choice)
# Log certain choices.
if choice is CHOICE_ASIS:
tag_log(logfile, 'asis', task.path)
elif choice is CHOICE_SKIP:
tag_log(logfile, 'skip', task.path)
# Check for duplicates if we have a match.
if choice == CHOICE_ASIS or isinstance(choice, tuple):
if choice == CHOICE_ASIS:
artist = task.cur_artist
album = task.cur_album
else:
artist = task.info['artist']
album = task.info['album']
if _duplicate_check(lib, artist, album):
tag_log(logfile, 'duplicate', task.path)
print_("This album is already in the library!")
task.set_choice(CHOICE_SKIP)
def apply_choices(lib, copy, write, art, delete, progress):
"""A coroutine for applying changes to albums during the autotag
process. The parameters to the generator control the behavior of
the import. The coroutine accepts (items, info) pairs and yields
nothing. items the set of Items to import; info is either a
candidate info dictionary, CHOICE_ASIS, or CHOICE_TRACKS.
"""
lib = _reopen_lib(lib)
while True:
task = yield
# Don't do anything if we're skipping the album or we're done.
if task.choice_flag == CHOICE_SKIP or task.sentinel:
if progress:
task.save_progress()
continue
# Change metadata, move, and copy.
if task.should_write_tags():
autotag.apply_metadata(task.items, task.info)
if copy and delete:
old_paths = [os.path.realpath(item.path)
for item in task.items]
for item in task.items:
if copy:
item.move(lib, True, task.should_create_album())
if write and task.should_write_tags():
item.write()
# Add items to library. We consolidate this at the end to avoid
# locking while we do the copying and tag updates.
if task.should_create_album():
# Add an album.
albuminfo = lib.add_album(task.items,
infer_aa = task.should_infer_aa())
else:
# Add tracks.
for item in task.items:
lib.add(item)
# Get album art if requested.
if art and task.should_fetch_art():
artpath = beets.autotag.art.art_for_album(task.info)
if artpath:
albuminfo.set_art(artpath)
# Write the database after each album.
lib.save()
# Announce that we've added an album.
if task.should_create_album():
plugins.send('album_imported', album=albuminfo)
else:
for item in task.items:
plugins.send('item_imported', lib=lib, item=item)
# Finally, delete old files.
if copy and delete:
new_paths = [os.path.realpath(item.path) for item in task.items]
for old_path in old_paths:
# Only delete files that were actually moved.
if old_path not in new_paths:
os.remove(library._syspath(old_path))
# Update progress.
if progress:
task.save_progress()
# Non-autotagged import (always sequential).
def simple_import(lib, paths, copy, delete, resume):
"""Add files from the paths to the library without changing any
tags.
"""
for task in read_albums(paths, resume):
if task.sentinel:
task.save_progress()
continue
if copy:
if delete:
old_paths = [os.path.realpath(item.path) for item in task.items]
for item in task.items:
item.move(lib, True, True)
album = lib.add_album(task.items, True)
lib.save()
# Announce that we added an album.
plugins.send('album_imported', album=album)
if resume is not False:
task.save_progress()
if copy and delete:
new_paths = [os.path.realpath(item.path) for item in task.items]
for old_path in old_paths:
# Only delete files that were actually moved.
if old_path not in new_paths:
os.remove(library._syspath(old_path))
log.info('added album: %s - %s' % (album.albumartist, album.album))
# The import command.
def import_files(lib, paths, copy, write, autot, logpath, art, threaded,
@ -715,24 +306,24 @@ def import_files(lib, paths, copy, write, autot, logpath, art, threaded,
if autot:
# Autotag. Set up the pipeline.
pl = pipeline.Pipeline([
read_albums(paths, resume),
initial_lookup(),
user_query(lib, logfile, color, quiet, quiet_fallback),
apply_choices(lib, copy, write, art, delete, resume is not False),
importer.read_albums(paths, resume),
importer.initial_lookup(),
importer.user_query(lib, logfile, color, quiet, quiet_fallback),
importer.apply_choices(lib, copy, write, art, delete, resume is not False),
])
# Run the pipeline.
try:
if threaded:
pl.run_parallel(QUEUE_SIZE)
pl.run_parallel(importer.QUEUE_SIZE)
else:
pl.run_sequential()
except ImportAbort:
except importer.ImportAbort:
# User aborted operation. Silently stop.
pass
else:
# Simple import without autotagging. Always sequential.
simple_import(lib, paths, copy, delete, resume)
importer.simple_import(lib, paths, copy, delete, resume)
# If we were logging, close the file.
if logfile:
@ -800,9 +391,9 @@ def import_func(lib, config, opts, args):
resume = None
if quiet_fallback_str == 'asis':
quiet_fallback = CHOICE_ASIS
quiet_fallback = importer.CHOICE_ASIS
else:
quiet_fallback = CHOICE_SKIP
quiet_fallback = importer.CHOICE_SKIP
import_files(lib, args, copy, write, autot, opts.logpath, art, threaded,
color, delete, quiet, resume, quiet_fallback)
import_cmd.func = import_func

132
test/test_importer.py Normal file
View file

@ -0,0 +1,132 @@
# This file is part of beets.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Tests for the general importer functionality.
"""
import unittest
import os
import shutil
import _common
from beets import library
from beets import importer
class ImportApplyTest(unittest.TestCase, _common.ExtraAsserts):
def setUp(self):
self.libdir = os.path.join('rsrc', 'testlibdir')
os.mkdir(self.libdir)
self.lib = library.Library(':memory:', self.libdir)
self.lib.path_formats = {
'default': 'one',
'comp': 'two',
'singleton': 'three',
}
self.srcpath = os.path.join(self.libdir, 'srcfile.mp3')
shutil.copy(os.path.join('rsrc', 'full.mp3'), self.srcpath)
self.i = library.Item.from_path(self.srcpath)
self.i.comp = False
trackinfo = {'title': 'one', 'artist': 'some artist',
'track': 1, 'length': 1, 'id': 'trackid'}
self.info = {
'artist': 'some artist',
'album': 'some album',
'tracks': [trackinfo],
'va': False,
'album_id': 'albumid',
'artist_id': 'artistid',
'albumtype': 'soundtrack',
}
def tearDown(self):
shutil.rmtree(self.libdir)
def _call_apply(self, coro, items, info):
task = importer.ImportTask(None, None, None)
task.set_choice((info, items))
coro.send(task)
def _call_apply_choice(self, coro, items, choice):
task = importer.ImportTask(None, None, items)
task.set_choice(choice)
coro.send(task)
def test_apply_no_delete(self):
coro = importer.apply_choices(self.lib, True, False, False,
False, False)
coro.next() # Prime coroutine.
self._call_apply(coro, [self.i], self.info)
self.assertExists(self.srcpath)
def test_apply_with_delete(self):
coro = importer.apply_choices(self.lib, True, False, False,
True, False)
coro.next() # Prime coroutine.
self._call_apply(coro, [self.i], self.info)
self.assertNotExists(self.srcpath)
def test_apply_asis_uses_album_path(self):
coro = importer.apply_choices(self.lib, True, False, False,
False, False)
coro.next() # Prime coroutine.
self._call_apply_choice(coro, [self.i], importer.CHOICE_ASIS)
self.assertExists(
os.path.join(self.libdir, self.lib.path_formats['default']+'.mp3')
)
def test_apply_match_uses_album_path(self):
coro = importer.apply_choices(self.lib, True, False, False,
False, False)
coro.next() # Prime coroutine.
self._call_apply(coro, [self.i], self.info)
self.assertExists(
os.path.join(self.libdir, self.lib.path_formats['default']+'.mp3')
)
def test_apply_as_tracks_uses_singleton_path(self):
coro = importer.apply_choices(self.lib, True, False, False,
False, False)
coro.next() # Prime coroutine.
self._call_apply_choice(coro, [self.i], importer.CHOICE_TRACKS)
self.assertExists(
os.path.join(self.libdir, self.lib.path_formats['singleton']+'.mp3')
)
class DuplicateCheckTest(unittest.TestCase):
def setUp(self):
self.lib = library.Library(':memory:')
self.i = _common.item()
self.album = self.lib.add_album([self.i], True)
def test_duplicate_album(self):
res = importer._duplicate_check(self.lib, self.i.albumartist,
self.i.album)
self.assertTrue(res)
def test_different_album(self):
res = importer._duplicate_check(self.lib, 'xxx', 'yyy')
self.assertFalse(res)
def test_duplicate_va_album(self):
self.album.albumartist = 'an album artist'
res = importer._duplicate_check(self.lib, 'an album artist',
self.i.album)
self.assertTrue(res)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View file

@ -1,5 +1,5 @@
# This file is part of beets.
# Copyright 2010, Adrian Sampson.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
@ -14,7 +14,6 @@
"""Tests for the command-line interface.
"""
import unittest
import os
import shutil
@ -28,6 +27,7 @@ from beets import ui
from beets.ui import commands
from beets import autotag
from beets import mediafile
from beets import importer
TEST_TITLES = ('The Opener','The Second Track','The Last Track')
class ImportTest(unittest.TestCase):
@ -130,109 +130,6 @@ class ImportTest(unittest.TestCase):
paths = self._run_import(['sometrack'], delete=True)
self.assertFalse(os.path.exists(paths[0]))
class ImportApplyTest(unittest.TestCase, _common.ExtraAsserts):
def setUp(self):
self.libdir = os.path.join('rsrc', 'testlibdir')
os.mkdir(self.libdir)
self.lib = library.Library(':memory:', self.libdir)
self.lib.path_formats = {
'default': 'one',
'comp': 'two',
'singleton': 'three',
}
self.srcpath = os.path.join(self.libdir, 'srcfile.mp3')
shutil.copy(os.path.join('rsrc', 'full.mp3'), self.srcpath)
self.i = library.Item.from_path(self.srcpath)
self.i.comp = False
trackinfo = {'title': 'one', 'artist': 'some artist',
'track': 1, 'length': 1, 'id': 'trackid'}
self.info = {
'artist': 'some artist',
'album': 'some album',
'tracks': [trackinfo],
'va': False,
'album_id': 'albumid',
'artist_id': 'artistid',
'albumtype': 'soundtrack',
}
def tearDown(self):
shutil.rmtree(self.libdir)
def _call_apply(self, coro, items, info):
task = commands.ImportTask(None, None, None)
task.set_choice((info, items))
coro.send(task)
def _call_apply_choice(self, coro, items, choice):
task = commands.ImportTask(None, None, items)
task.set_choice(choice)
coro.send(task)
def test_apply_no_delete(self):
coro = commands.apply_choices(self.lib, True, False, False,
False, False)
coro.next() # Prime coroutine.
self._call_apply(coro, [self.i], self.info)
self.assertExists(self.srcpath)
def test_apply_with_delete(self):
coro = commands.apply_choices(self.lib, True, False, False,
True, False)
coro.next() # Prime coroutine.
self._call_apply(coro, [self.i], self.info)
self.assertNotExists(self.srcpath)
def test_apply_asis_uses_album_path(self):
coro = commands.apply_choices(self.lib, True, False, False,
False, False)
coro.next() # Prime coroutine.
self._call_apply_choice(coro, [self.i], commands.CHOICE_ASIS)
self.assertExists(
os.path.join(self.libdir, self.lib.path_formats['default']+'.mp3')
)
def test_apply_match_uses_album_path(self):
coro = commands.apply_choices(self.lib, True, False, False,
False, False)
coro.next() # Prime coroutine.
self._call_apply(coro, [self.i], self.info)
self.assertExists(
os.path.join(self.libdir, self.lib.path_formats['default']+'.mp3')
)
def test_apply_as_tracks_uses_singleton_path(self):
coro = commands.apply_choices(self.lib, True, False, False,
False, False)
coro.next() # Prime coroutine.
self._call_apply_choice(coro, [self.i], commands.CHOICE_TRACKS)
self.assertExists(
os.path.join(self.libdir, self.lib.path_formats['singleton']+'.mp3')
)
class DuplicateCheckTest(unittest.TestCase):
def setUp(self):
self.lib = library.Library(':memory:')
self.i = _common.item()
self.album = self.lib.add_album([self.i], True)
def test_duplicate_album(self):
res = commands._duplicate_check(self.lib, self.i.albumartist,
self.i.album)
self.assertTrue(res)
def test_different_album(self):
res = commands._duplicate_check(self.lib, 'xxx', 'yyy')
self.assertFalse(res)
def test_duplicate_va_album(self):
self.album.albumartist = 'an album artist'
res = commands._duplicate_check(self.lib, 'an album artist',
self.i.album)
self.assertTrue(res)
class ListTest(unittest.TestCase):
def setUp(self):
self.io = _common.DummyIO()
@ -359,18 +256,18 @@ class AutotagTest(unittest.TestCase):
'album',
[], # candidates
autotag.RECOMMEND_NONE,
True, False, commands.CHOICE_SKIP
True, False, importer.CHOICE_SKIP
)
self.assertEqual(res, result)
self.assertTrue('No match' in self.io.getoutput())
def test_choose_match_with_no_candidates_skip(self):
self.io.addinput('s')
self._no_candidates_test(commands.CHOICE_SKIP)
self._no_candidates_test(importer.CHOICE_SKIP)
def test_choose_match_with_no_candidates_asis(self):
self.io.addinput('u')
self._no_candidates_test(commands.CHOICE_ASIS)
self._no_candidates_test(importer.CHOICE_ASIS)
class InputTest(unittest.TestCase):
def setUp(self):