split import tests, suppress progress checks for testing

This commit is contained in:
Adrian Sampson 2011-02-18 12:38:46 -08:00
parent 0d869340ec
commit 993519c004
2 changed files with 69 additions and 54 deletions

View file

@ -311,7 +311,7 @@ def progress_get(toppath):
# tagging.
DONE_SENTINEL = '__IMPORT_DONE_SENTINEL__'
def read_albums(paths):
def read_albums(paths, progress):
"""A generator yielding all the albums (as sets of Items) found in
the user-specified list of paths.
"""
@ -322,26 +322,29 @@ def read_albums(paths):
for path in paths:
if not os.path.isdir(library._syspath(path)):
raise ui.UserError('not a directory: ' + path)
# Look for saved progress.
resume_dirs = {}
for path in paths:
resume_dir = progress_get(path)
if resume_dir:
resume = ui.input_yn("Import of the directory:\n%s"
"\nwas interrupted. Resume (Y/n)? " %
path)
if resume:
resume_dirs[path] = resume_dir
else:
# Clear progress; we're starting from the top.
progress_set(path, None)
ui.print_()
if progress:
resume_dirs = {}
for path in paths:
resume_dir = progress_get(path)
if resume_dir:
resume = ui.input_yn("Import of the directory:\n%s"
"\nwas interrupted. Resume (Y/n)? " %
path)
if resume:
resume_dirs[path] = resume_dir
else:
# Clear progress; we're starting from the top.
progress_set(path, None)
ui.print_()
for toppath in paths:
# Produce each path.
resume_dir = resume_dirs.get(toppath)
if progress:
resume_dir = resume_dirs.get(toppath)
for path, items in autotag.albums_in_dir(os.path.expanduser(toppath)):
if resume_dir:
if progress and resume_dir:
# We're fast-forwarding to resume a previous tagging.
if path == resume_dir:
# We've hit the last good path! Turn off the
@ -440,7 +443,7 @@ def user_query(lib, logfile=None, color=True, quiet=False):
# Yield the result and get the next chunk of work.
out = toppath, path, items, info
def apply_choices(lib, copy, write, art, delete):
def apply_choices(lib, copy, write, art, delete, progress):
"""A coroutine for applying changes to albums during the autotag
process. The parameters to the generator control the behavior of
the import. The coroutine accepts (items, info) pairs and yields
@ -454,8 +457,9 @@ def apply_choices(lib, copy, write, art, delete):
# Check for "path finished" message.
if path is DONE_SENTINEL:
# Mark path as complete.
progress_set(toppath, None)
if progress:
# Mark path as complete.
progress_set(toppath, None)
continue
# Only process the items if info is not None (indicating a
@ -496,15 +500,16 @@ def apply_choices(lib, copy, write, art, delete):
os.remove(library._syspath(old_path))
# Update progress.
progress_set(toppath, path)
if progress:
progress_set(toppath, path)
# Non-autotagged import (always sequential).
def simple_import(lib, paths, copy, delete):
def simple_import(lib, paths, copy, delete, progress):
"""Add files from the paths to the library without changing any
tags.
"""
for toppath, path, items in read_albums(paths):
for toppath, path, items in read_albums(paths, progress):
if items is None:
continue
@ -516,7 +521,8 @@ def simple_import(lib, paths, copy, delete):
album = lib.add_album(items)
lib.save()
progress_set(toppath, path)
if progress:
progress_set(toppath, path)
if copy and delete:
new_paths = [os.path.realpath(item.path) for item in items]
@ -530,7 +536,7 @@ def simple_import(lib, paths, copy, delete):
# The import command.
def import_files(lib, paths, copy, write, autot, logpath,
art, threaded, color, delete, quiet):
art, threaded, color, delete, quiet, progress=True):
"""Import the files in the given list of paths, tagging each leaf
directory as an album. If copy, then the files are copied into
the library folder. If write, then new metadata is written to the
@ -542,7 +548,8 @@ def import_files(lib, paths, copy, write, autot, logpath,
ANSI-colorize some terminal output. If delete, then old files are
deleted when they are copied. If quiet, then the user is
never prompted for input; instead, the tagger just skips anything
it is not confident about.
it is not confident about. If progress, then state is saved in
case an import is interrupted.
"""
# Open the log.
if logpath:
@ -554,10 +561,10 @@ def import_files(lib, paths, copy, write, autot, logpath,
if autot:
# Autotag. Set up the pipeline.
pl = pipeline.Pipeline([
read_albums(paths),
read_albums(paths, progress),
initial_lookup(),
user_query(lib, logfile, color, quiet),
apply_choices(lib, copy, write, art, delete),
apply_choices(lib, copy, write, art, delete, progress),
])
# Run the pipeline.
@ -571,7 +578,7 @@ def import_files(lib, paths, copy, write, autot, logpath,
pass
else:
# Simple import without autotagging. Always sequential.
simple_import(lib, paths, copy, delete)
simple_import(lib, paths, copy, delete, progress)
# If we were logging, close the file.
if logfile:

View file

@ -30,6 +30,7 @@ from beets import autotag
from beets import mediafile
import test_db
TEST_TITLES = ('The Opener','The Second Track','The Last Track')
class ImportTest(unittest.TestCase):
def setUp(self):
self.io = _common.DummyIO()
@ -38,7 +39,9 @@ class ImportTest(unittest.TestCase):
self.lib = library.Library(':memory:')
self.libdir = os.path.join('rsrc', 'testlibdir')
self.lib.directory = self.libdir
self.lib.path_formats = {'default': os.path.join('$artist', '$album', '$title')}
self.lib.path_formats = {
'default': os.path.join('$artist', '$album', '$title')
}
self.srcdir = os.path.join('rsrc', 'testsrcdir')
@ -49,39 +52,40 @@ class ImportTest(unittest.TestCase):
if os.path.exists(self.srcdir):
shutil.rmtree(self.srcdir)
def create_test_file(self, filepath, metadata):
"""
Creates an mp3 file at the given path within self.srcdir. filepath is
given as an array of folder names, ending with the file name. Sets the
file's metadata from the provided dict. Returns the full, real path to
the file.
def _create_test_file(self, filepath, metadata):
"""Creates an mp3 file at the given path within self.srcdir.
filepath is given as an array of folder names, ending with the
file name. Sets the file's metadata from the provided dict.
Returns the full, real path to the file.
"""
realpath = os.path.join(self.srcdir, *filepath)
if not os.path.exists(os.path.dirname(realpath)):
os.makedirs(os.path.dirname(realpath))
realpath = os.path.join(self.srcdir, *filepath)
shutil.copy(os.path.join('rsrc', 'full.mp3'), realpath)
f = mediafile.MediaFile(realpath)
for attr in metadata:
setattr(f, attr, metadata[attr])
f.save()
return realpath
def test_import_copy_arrives(self):
track_names = ['The Opener', 'The Second Track', 'The Last Track']
for i, title in enumerate(track_names):
path = self.create_test_file(['the_album', 'track_%s.mp3' % (i+1)], {
'track': (i+1),
'artist': 'The Artist',
'album': 'The Album',
'title': title})
sources = [os.path.dirname(path)]
def _run_import(self, titles=TEST_TITLES):
# Make a bunch of tracks to import.
for i, title in enumerate(titles):
path = self._create_test_file(
['the_album', 'track_%s.mp3' % (i+1)],
{
'track': (i+1),
'artist': 'The Artist',
'album': 'The Album',
'title': title,
})
# Run the UI "beet import" command!
commands.import_files(
lib=self.lib,
paths=sources,
paths=[os.path.dirname(path)],
copy=True,
write=True,
autot=False,
@ -90,22 +94,26 @@ class ImportTest(unittest.TestCase):
threaded=False,
color=False,
delete=False,
quiet=True)
quiet=True,
progress=False,
)
def test_album_created_with_track_artist(self):
self._run_import()
albums = self.lib.albums()
self.assertEqual(len(albums), 1)
self.assertEqual(albums[0].albumartist, 'The Artist')
def test_import_copy_arrives(self):
self._run_import()
artist_folder = os.path.join(self.libdir, 'The Artist')
album_folder = os.path.join(artist_folder, 'The Album')
self.assertEqual(len(os.listdir(artist_folder)), 1)
self.assertEqual(len(os.listdir(album_folder)), 3)
files = sorted(os.listdir(album_folder))
names = sorted(track_names)
for file, name in zip(files, names):
self.assertEqual(file, name + ".mp3")
filenames = set(os.listdir(album_folder))
destinations = set('%s.mp3' % title for title in TEST_TITLES)
self.assertEqual(filenames, destinations)
class ListTest(unittest.TestCase):
def setUp(self):