Merge branch 'master' into future

Conflicts:
	test/test_plugins.py
This commit is contained in:
Bruno Cauet 2015-01-21 10:04:33 +01:00
commit c2f8cb9983
16 changed files with 629 additions and 41 deletions

View file

@ -1006,16 +1006,14 @@ class ImportTaskFactory(object):
for dirs, paths in self.paths():
if self.session.config['singletons']:
for path in paths:
task = self.singleton(path)
task = self._create(self.singleton(path))
if task:
self.imported += 1
yield task
yield self.sentinel(dirs)
else:
task = self.album(paths, dirs)
task = self._create(self.album(paths, dirs))
if task:
self.imported += 1
yield task
# Produce the final sentinel for this toppath to indicate that
@ -1027,6 +1025,19 @@ class ImportTaskFactory(object):
else:
yield self.sentinel()
def _create(self, task):
"""Handle a new task to be emitted by the factory.
Emit the `import_task_created` event and increment the
`imported` count if the task is not skipped. Return the same
task. If `task` is None, do nothing.
"""
if task:
task.emit_created(self.session)
if not task.skip:
self.imported += 1
return task
def paths(self):
"""Walk `self.toppath` and yield `(dirs, files)` pairs where
`files` are individual music files and `dirs` the set of
@ -1157,7 +1168,6 @@ def read_tasks(session):
# Generate tasks.
task_factory = ImportTaskFactory(toppath, session)
for t in task_factory.tasks():
t.emit_created(session)
yield t
skipped += task_factory.skipped

View file

@ -26,6 +26,7 @@ from __future__ import (division, absolute_import, print_function,
from copy import copy
from logging import * # noqa
import sys
import subprocess
# We need special hacks for Python 2.6 due to logging.Logger being an
@ -33,6 +34,45 @@ import sys
PY26 = sys.version_info[:2] == (2, 6)
def logsafe(val):
"""Coerce a potentially "problematic" value so it can be formatted
in a Unicode log string.
This works around a number of pitfalls when logging objects in
Python 2:
- Logging path names, which must be byte strings, requires
conversion for output.
- Some objects, including some exceptions, will crash when you call
`unicode(v)` while `str(v)` works fine. CalledProcessError is an
example.
"""
# Already Unicode.
if isinstance(val, unicode):
return val
# Bytestring: needs decoding.
elif isinstance(val, bytes):
# Blindly convert with UTF-8. Eventually, it would be nice to
# (a) only do this for paths, if they can be given a distinct
# type, and (b) warn the developer if they do this for other
# bytestrings.
return val.decode('utf8', 'replace')
# A "problem" object: needs a workaround.
elif isinstance(val, subprocess.CalledProcessError):
try:
return unicode(val)
except UnicodeDecodeError:
# An object with a broken __unicode__ formatter. Use __str__
# instead.
return str(val).decode('utf8', 'replace')
# Other objects are used as-is so field access, etc., still works in
# the format string.
else:
return val
class StrFormatLogger(Logger):
"""A version of `Logger` that uses `str.format`-style formatting
instead of %-style formatting.
@ -45,7 +85,9 @@ class StrFormatLogger(Logger):
self.kwargs = kwargs
def __str__(self):
return self.msg.format(*self.args, **self.kwargs)
args = [logsafe(a) for a in self.args]
kwargs = dict((k, logsafe(v)) for (k, v) in self.kwargs.items())
return self.msg.format(*args, **kwargs)
def _log(self, level, msg, args, exc_info=None, extra=None, **kwargs):
"""Log msg.format(*args, **kwargs)"""

View file

@ -85,7 +85,9 @@ class BeetsPlugin(object):
self._log = log.getChild(self.name)
self._log.setLevel(logging.NOTSET) # Use `beets` logger level.
if beets.config['verbose']:
self._log.addFilter(PluginLogFilter(self))
if not any(isinstance(f, PluginLogFilter)
for f in self._log.filters):
self._log.addFilter(PluginLogFilter(self))
def commands(self):
"""Should return a list of beets.ui.Subcommand objects for

69
beetsplug/filefilter.py Normal file
View file

@ -0,0 +1,69 @@
# This file is part of beets.
# Copyright 2015, Malte Ried.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Filter imported files using a regular expression.
"""
import re
from beets import config
from beets.plugins import BeetsPlugin
from beets.importer import action, SingletonImportTask
class FileFilterPlugin(BeetsPlugin):
def __init__(self):
super(FileFilterPlugin, self).__init__()
self.register_listener('import_task_created',
self.import_task_created_event)
self.config.add({
'path': '.*'
})
self.path_album_regex = \
self.path_singleton_regex = \
re.compile(self.config['path'].get())
if 'album_path' in self.config:
self.path_album_regex = re.compile(self.config['album_path'].get())
if 'singleton_path' in self.config:
self.path_singleton_regex = re.compile(
self.config['singleton_path'].get())
def import_task_created_event(self, session, task):
if task.items and len(task.items) > 0:
items_to_import = []
for item in task.items:
if self.file_filter(item['path']):
items_to_import.append(item)
if len(items_to_import) > 0:
task.items = items_to_import
else:
task.choice_flag = action.SKIP
elif isinstance(task, SingletonImportTask):
if not self.file_filter(task.item['path']):
task.choice_flag = action.SKIP
def file_filter(self, full_path):
"""Checks if the configured regular expressions allow the import
of the file given in full_path.
"""
import_config = dict(config['import'])
if 'singletons' not in import_config or not import_config[
'singletons']:
# Album
return self.path_album_regex.match(full_path) is not None
else:
# Singleton
return self.path_singleton_regex.match(full_path) is not None

View file

@ -23,6 +23,7 @@ import subprocess
from beets import ui
from beets import util
from beets.plugins import BeetsPlugin
from beets import config
class KeyFinderPlugin(BeetsPlugin):
@ -34,8 +35,9 @@ class KeyFinderPlugin(BeetsPlugin):
u'auto': True,
u'overwrite': False,
})
self.config['auto'].get(bool)
self.import_stages = [self.imported]
if self.config['auto'].get(bool):
self.import_stages = [self.imported]
def commands(self):
cmd = ui.Subcommand('keyfinder',
@ -44,13 +46,13 @@ class KeyFinderPlugin(BeetsPlugin):
return [cmd]
def command(self, lib, opts, args):
self.find_key(lib.items(ui.decargs(args)))
self.find_key(lib.items(ui.decargs(args)),
write=config['import']['write'].get(bool))
def imported(self, session, task):
if self.config['auto'].get(bool):
self.find_key(task.items)
self.find_key(task.items)
def find_key(self, items):
def find_key(self, items, write=False):
overwrite = self.config['overwrite'].get(bool)
bin = util.bytestring_path(self.config['bin'].get(unicode))
@ -59,13 +61,22 @@ class KeyFinderPlugin(BeetsPlugin):
continue
try:
key = util.command_output([bin, '-f', item.path])
output = util.command_output([bin, '-f', item.path])
except (subprocess.CalledProcessError, OSError) as exc:
self._log.error(u'execution failed: {0}', exc)
continue
key_raw = output.rsplit(None, 1)[-1]
try:
key = key_raw.decode('utf8')
except UnicodeDecodeError:
self._log.error(u'output is invalid UTF-8')
continue
item['initial_key'] = key
self._log.debug(u'added computed initial key {0} for {1}',
key, util.displayable_path(item.path))
item.try_write()
self._log.info(u'added computed initial key {0} for {1}',
key, util.displayable_path(item.path))
if write:
item.try_write()
item.store()

View file

@ -21,6 +21,7 @@ from beets.plugins import BeetsPlugin
from beets import autotag, library, ui, util
from beets.autotag import hooks
from beets import config
from beets.util.functemplate import Template
from collections import defaultdict
@ -52,6 +53,8 @@ class MBSyncPlugin(BeetsPlugin):
cmd.parser.add_option('-W', '--nowrite', action='store_false',
default=config['import']['write'], dest='write',
help="don't write updated metadata to files")
cmd.parser.add_option('-f', '--format', action='store', default=None,
help='print with custom format')
cmd.func = self.func
return [cmd]
@ -62,24 +65,30 @@ class MBSyncPlugin(BeetsPlugin):
pretend = opts.pretend
write = opts.write
query = ui.decargs(args)
fmt = opts.format
self.singletons(lib, query, move, pretend, write)
self.albums(lib, query, move, pretend, write)
self.singletons(lib, query, move, pretend, write, fmt)
self.albums(lib, query, move, pretend, write, fmt)
def singletons(self, lib, query, move, pretend, write):
def singletons(self, lib, query, move, pretend, write, fmt):
"""Retrieve and apply info from the autotagger for items matched by
query.
"""
template = Template(ui._pick_format(False, fmt))
for item in lib.items(query + ['singleton:true']):
item_formatted = item.evaluate_template(template)
if not item.mb_trackid:
self._log.info(u'Skipping singleton {0}: has no mb_trackid',
item.title)
self._log.info(u'Skipping singleton with no mb_trackid: {0}',
item_formatted)
continue
# Get the MusicBrainz recording info.
track_info = hooks.track_for_mbid(item.mb_trackid)
if not track_info:
self._log.info(u'Recording ID not found: {0}', item.mb_trackid)
self._log.info(u'Recording ID not found: {0} for track {0}',
item.mb_trackid,
item_formatted)
continue
# Apply.
@ -87,14 +96,18 @@ class MBSyncPlugin(BeetsPlugin):
autotag.apply_item_metadata(item, track_info)
apply_item_changes(lib, item, move, pretend, write)
def albums(self, lib, query, move, pretend, write):
def albums(self, lib, query, move, pretend, write, fmt):
"""Retrieve and apply info from the autotagger for albums matched by
query and their items.
"""
template = Template(ui._pick_format(True, fmt))
# Process matching albums.
for a in lib.albums(query):
album_formatted = a.evaluate_template(template)
if not a.mb_albumid:
self._log.info(u'Skipping album {0}: has no mb_albumid', a.id)
self._log.info(u'Skipping album with no mb_albumid: {0}',
album_formatted)
continue
items = list(a.items())
@ -102,7 +115,9 @@ class MBSyncPlugin(BeetsPlugin):
# Get the MusicBrainz album information.
album_info = hooks.album_for_mbid(a.mb_albumid)
if not album_info:
self._log.info(u'Release ID not found: {0}', a.mb_albumid)
self._log.info(u'Release ID {0} not found for album {1}',
a.mb_albumid,
album_formatted)
continue
# Map recording MBIDs to their information. Recordings can appear
@ -150,5 +165,5 @@ class MBSyncPlugin(BeetsPlugin):
# Move album art (and any inconsistent items).
if move and lib.directory in util.ancestry(items[0].path):
self._log.debug(u'moving album {0}', a.id)
self._log.debug(u'moving album {0}', album_formatted)
a.move()

View file

@ -257,8 +257,9 @@ class WebPlugin(BeetsPlugin):
def __init__(self):
super(WebPlugin, self).__init__()
self.config.add({
'host': u'',
'host': u'127.0.0.1',
'port': 8337,
'cors': '',
})
def commands(self):
@ -274,6 +275,16 @@ class WebPlugin(BeetsPlugin):
self.config['port'] = int(args.pop(0))
app.config['lib'] = lib
# Enable CORS if required.
if self.config['cors']:
print "Enabling cors"
from flask.ext.cors import CORS
app.config['CORS_ALLOW_HEADERS'] = "Content-Type"
app.config['CORS_RESOURCES'] = {
r"/*": {"origins": self.config['cors'].get(str)}
}
CORS(app)
# Start the web application.
app.run(host=self.config['host'].get(unicode),
port=self.config['port'].get(int),
debug=opts.debug, threaded=True)

View file

@ -6,17 +6,26 @@ Changelog
Features:
* A new :doc:`/plugins/filefilter` lets you write regular expressions to
automatically avoid importing certain files. Thanks to :user:`mried`.
:bug:`1186`
* Stop on invalid queries instead of ignoring the invalid part.
* A new :ref:`searchlimit` configuration option allows you to specify how many
search results you wish to see when looking up releases at MusicBrainz
during import. :bug:`1245`
* :doc:`/plugins/lastgenre`: Add *comedy*, *humor*, and *stand-up* as well as
a longer list of classical music genre tags to the built-in whitelist and
canonicalization tree. :bug:`1206` :bug:`1239` :bug:`1240`
* :doc:`/plugins/web`: Add support for *cross-origin resource sharing* for
more flexible in-browser clients. Thanks to Andre Miller. :bug:`1236`
:bug:`1237`
* :doc:`plugins/mbsync`: Add ``-f/--format`` option for controlling
the output format for unrecognized items.
Fixes:
* :doc:`/plugins/lyrics`: Silence a warning about insecure requests in the new
MusixMatch backend. :bug:`1204`
* :doc:`/plugins/lastgenre`: Add *comedy*, *humor*, and *stand-up* to the
built-in whitelist/canonicalization tree. :bug:`1206`
* Fix a crash when ``beet`` is invoked without arguments. :bug:`1205`
:bug:`1207`
* :doc:`/plugins/fetchart`: Do not attempt to import directories as album art.
@ -30,10 +39,10 @@ Fixes:
* Remove the ``beatport`` plugin. `Beatport`_ has shut off public access to
their API and denied our request for an account. We have not heard from the
company since 2013, so we are assuming access will not be restored.
* :doc:`/plugins/lastgenre`: Add classical music to the built-in whitelist and
canonicalization tree. :bug:`1239` :bug:`1240`
* Incremental imports now (once again) show a "skipped N directories" message.
* :doc:`/plugins/embedart`: Handle errors in ImageMagick's output. :bug:`1241`
* :doc:`/plugins/keyfinder`: Parse the underlying tool's output more robustly.
:bug:`1248`
For developers:

View file

@ -0,0 +1,30 @@
FileFilter Plugin
=================
The ``filefilter`` plugin allows you to skip files during import using
regular expressions.
To use the ``filefilter`` plugin, enable it in your configuration (see
:ref:`using-plugins`).
Configuration
-------------
To configure the plugin, make a ``filefilter:`` section in your
configuration file. The available options are:
- **path**: A regular expression to filter files based on its path and name.
Default: ``.*`` (everything)
- **album_path** and **singleton_path**: You may specify different regular
expressions used for imports of albums and singletons. This way, you can
automatically skip singletons when importing albums if the names (and paths)
of the files are distinguishable via a regex. The regexes defined here
take precedence over the global ``path`` option.
Here's an example::
filefilter:
path: .*\d\d[^/]+$
# will only import files which names start with two digits
album_path: .*\d\d[^/]+$
singleton_path: .*/(?!\d\d)[^/]+$

View file

@ -63,6 +63,7 @@ Each plugin has its own set of options that can be defined in a section bearing
play
plexupdate
random
filefilter
replaygain
rewrite
scrub
@ -151,6 +152,8 @@ Miscellaneous
* :doc:`mbcollection`: Maintain your MusicBrainz collection list.
* :doc:`missing`: List missing tracks.
* :doc:`random`: Randomly choose albums and tracks from your library.
* :doc:`filefilter`: Automatically skip files during the import process based
on regular expressions.
* :doc:`spotify`: Create Spotify playlists from the Beets library.
* :doc:`types`: Declare types for flexible attributes.
* :doc:`web`: An experimental Web-based GUI for beets.

View file

@ -33,3 +33,6 @@ The command has a few command-line options:
* If you have the `import.write` configuration option enabled, then this
plugin will write new metadata to files' tags. To disable this, use the
``-W`` (``--nowrite``) option.
* To customize the output of unrecognized items, use the ``-f``
(``--format``) option. The default output is ``list_format_item`` or
``list_format_album`` for items and albums, respectively.

View file

@ -20,6 +20,12 @@ flask``.
.. _Flask: http://flask.pocoo.org/
If you require `CORS`_ (Cross-origin resource sharing), then you also
need `flask-cors`_. This can be installed by running ``pip install flask-cors``.
.. _flask-cors: https://github.com/CoryDolphin/flask-cors
.. _CORS: http://en.wikipedia.org/wiki/Cross-origin_resource_sharing
Finally, enable the ``web`` plugin in your configuration
(see :ref:`using-plugins`).
@ -52,10 +58,12 @@ Configuration
To configure the plugin, make a ``web:`` section in your
configuration file. The available options are:
- **host**: The server hostname.
Default: Bind to all interfaces.
- **host**: The server hostname. Set this to 0.0.0.0 to bind to all interfaces.
Default: Bind to 127.0.0.1.
- **port**: The server port.
Default: 8337.
- **cors**: The CORS allowed origin (see :ref:`web-cors`, below).
Default: CORS is disabled.
Implementation
--------------
@ -78,6 +86,30 @@ for unsupported formats/browsers. There are a number of options for this:
.. _html5media: http://html5media.info/
.. _MediaElement.js: http://mediaelementjs.com/
.. _web-cors:
Cross-Origin Resource Sharing (CORS)
------------------------------------
The ``web`` plugin's API can be used as a backend for an in-browser client. By
default, browsers will only allow access from clients running on the same
server as the API. (You will get an arcane error about ``XMLHttpRequest``
otherwise.) A technology called `CORS`_ lets you relax this restriction.
If you want to use an in-browser client hosted elsewhere (or running from
a different server on your machine), set the ``cors`` configuration option to
the "origin" (protocol, host, and optional port number) where the client is
served. Or set it to ``'*'`` to enable access from all origins. Note that
there are security implications if you set the origin to ``'*'``, so please
research this before using it.
For example::
web:
host: 0.0.0.0
cors: 'http://example.com'
JSON API
--------

View file

@ -77,7 +77,7 @@ setup(
install_requires=[
'enum34',
'mutagen>=1.23',
'mutagen>=1.27',
'munkres',
'unidecode',
'musicbrainzngs>=0.4',
@ -104,7 +104,7 @@ setup(
'echonest': ['pyechonest'],
'lastgenre': ['pylast'],
'mpdstats': ['python-mpd'],
'web': ['flask'],
'web': ['flask', 'flask-cors'],
'import': ['rarfile'],
},
# Non-Python/non-PyPI plugin dependencies:

203
test/test_filefilter.py Normal file
View file

@ -0,0 +1,203 @@
# This file is part of beets.
# Copyright 2015, Malte Ried.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Tests for the `filefilter` plugin.
"""
import os
import shutil
from _common import unittest
from beets import config
from beets.mediafile import MediaFile
from beets.util import displayable_path
from beetsplug.filefilter import FileFilterPlugin
from test import _common
from test.helper import capture_log
from test.test_importer import ImportHelper
class FileFilterPluginTest(unittest.TestCase, ImportHelper):
def setUp(self):
self.setup_beets()
self.__create_import_dir(2)
self._setup_import_session()
config['import']['pretend'] = True
def tearDown(self):
self.teardown_beets()
def __copy_file(self, dest_path, metadata):
# Copy files
resource_path = os.path.join(_common.RSRC, 'full.mp3')
shutil.copy(resource_path, dest_path)
medium = MediaFile(dest_path)
# Set metadata
for attr in metadata:
setattr(medium, attr, metadata[attr])
medium.save()
def __create_import_dir(self, count):
self.import_dir = os.path.join(self.temp_dir, 'testsrcdir')
if os.path.isdir(self.import_dir):
shutil.rmtree(self.import_dir)
self.artist_path = os.path.join(self.import_dir, 'artist')
self.album_path = os.path.join(self.artist_path, 'album')
self.misc_path = os.path.join(self.import_dir, 'misc')
os.makedirs(self.album_path)
os.makedirs(self.misc_path)
metadata = {
'artist': 'Tag Artist',
'album': 'Tag Album',
'albumartist': None,
'mb_trackid': None,
'mb_albumid': None,
'comp': None,
}
self.album_paths = []
for i in range(count):
metadata['track'] = i + 1
metadata['title'] = 'Tag Title Album %d' % (i + 1)
dest_path = os.path.join(self.album_path,
'%02d - track.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.album_paths.append(dest_path)
self.artist_paths = []
metadata['album'] = None
for i in range(count):
metadata['track'] = i + 10
metadata['title'] = 'Tag Title Artist %d' % (i + 1)
dest_path = os.path.join(self.artist_path,
'track_%d.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.artist_paths.append(dest_path)
self.misc_paths = []
for i in range(count):
metadata['artist'] = 'Artist %d' % (i + 42)
metadata['track'] = i + 5
metadata['title'] = 'Tag Title Misc %d' % (i + 1)
dest_path = os.path.join(self.misc_path, 'track_%d.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.misc_paths.append(dest_path)
def __run(self, expected_lines, singletons=False):
self.load_plugins('filefilter')
import_files = [self.import_dir]
self._setup_import_session(singletons=singletons)
self.importer.paths = import_files
with capture_log() as logs:
self.importer.run()
self.unload_plugins()
FileFilterPlugin.listeners = None
logs = [line for line in logs if not line.startswith('Sending event:')]
self.assertEqual(logs, expected_lines)
def test_import_default(self):
""" The default configuration should import everything.
"""
self.__run([
'Album: %s' % displayable_path(self.artist_path),
' %s' % displayable_path(self.artist_paths[0]),
' %s' % displayable_path(self.artist_paths[1]),
'Album: %s' % displayable_path(self.album_path),
' %s' % displayable_path(self.album_paths[0]),
' %s' % displayable_path(self.album_paths[1]),
'Album: %s' % displayable_path(self.misc_path),
' %s' % displayable_path(self.misc_paths[0]),
' %s' % displayable_path(self.misc_paths[1])
])
def test_import_nothing(self):
config['filefilter']['path'] = 'not_there'
self.__run(['No files imported from %s' % self.import_dir])
# Global options
def test_import_global(self):
config['filefilter']['path'] = '.*track_1.*\.mp3'
self.__run([
'Album: %s' % displayable_path(self.artist_path),
' %s' % displayable_path(self.artist_paths[0]),
'Album: %s' % displayable_path(self.misc_path),
' %s' % displayable_path(self.misc_paths[0]),
])
self.__run([
'Singleton: %s' % displayable_path(self.artist_paths[0]),
'Singleton: %s' % displayable_path(self.misc_paths[0])
], singletons=True)
# Album options
def test_import_album(self):
config['filefilter']['album_path'] = '.*track_1.*\.mp3'
self.__run([
'Album: %s' % displayable_path(self.artist_path),
' %s' % displayable_path(self.artist_paths[0]),
'Album: %s' % displayable_path(self.misc_path),
' %s' % displayable_path(self.misc_paths[0]),
])
self.__run([
'Singleton: %s' % displayable_path(self.artist_paths[0]),
'Singleton: %s' % displayable_path(self.artist_paths[1]),
'Singleton: %s' % displayable_path(self.album_paths[0]),
'Singleton: %s' % displayable_path(self.album_paths[1]),
'Singleton: %s' % displayable_path(self.misc_paths[0]),
'Singleton: %s' % displayable_path(self.misc_paths[1])
], singletons=True)
# Singleton options
def test_import_singleton(self):
config['filefilter']['singleton_path'] = '.*track_1.*\.mp3'
self.__run([
'Singleton: %s' % displayable_path(self.artist_paths[0]),
'Singleton: %s' % displayable_path(self.misc_paths[0])
], singletons=True)
self.__run([
'Album: %s' % displayable_path(self.artist_path),
' %s' % displayable_path(self.artist_paths[0]),
' %s' % displayable_path(self.artist_paths[1]),
'Album: %s' % displayable_path(self.album_path),
' %s' % displayable_path(self.album_paths[0]),
' %s' % displayable_path(self.album_paths[1]),
'Album: %s' % displayable_path(self.misc_path),
' %s' % displayable_path(self.misc_paths[0]),
' %s' % displayable_path(self.misc_paths[1])
])
# Album and singleton options
def test_import_both(self):
config['filefilter']['album_path'] = '.*track_1.*\.mp3'
config['filefilter']['singleton_path'] = '.*track_2.*\.mp3'
self.__run([
'Album: %s' % displayable_path(self.artist_path),
' %s' % displayable_path(self.artist_paths[0]),
'Album: %s' % displayable_path(self.misc_path),
' %s' % displayable_path(self.misc_paths[0]),
])
self.__run([
'Singleton: %s' % displayable_path(self.artist_paths[1]),
'Singleton: %s' % displayable_path(self.misc_paths[1])
], singletons=True)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View file

@ -20,8 +20,10 @@ from mock import patch
from test._common import unittest
from test.helper import TestHelper,\
generate_album_info, \
generate_track_info
generate_track_info, \
capture_log
from beets import config
from beets.library import Item
@ -45,7 +47,7 @@ class MbsyncCliTest(unittest.TestCase, TestHelper):
{'title': 'singleton info'})
album_item = Item(
title='old title',
album='old title',
mb_albumid='album id',
mb_trackid='track id',
path=''
@ -70,6 +72,57 @@ class MbsyncCliTest(unittest.TestCase, TestHelper):
album.load()
self.assertEqual(album.album, 'album info')
def test_message_when_skipping(self):
config['list_format_item'] = '$artist - $album - $title'
config['list_format_album'] = '$albumartist - $album'
# Test album with no mb_albumid.
# The default format for an album include $albumartist so
# set that here, too.
album_invalid = Item(
albumartist='album info',
album='album info',
path=''
)
self.lib.add_album([album_invalid])
# default format
with capture_log('beets.mbsync') as logs:
self.run_command('mbsync')
e = 'mbsync: Skipping album with no mb_albumid: ' + \
'album info - album info'
self.assertEqual(e, logs[0])
# custom format
with capture_log('beets.mbsync') as logs:
self.run_command('mbsync', '-f', "'$album'")
e = "mbsync: Skipping album with no mb_albumid: 'album info'"
self.assertEqual(e, logs[0])
# Test singleton with no mb_trackid.
# The default singleton format includes $artist and $album
# so we need to stub them here
item_invalid = Item(
artist='album info',
album='album info',
title='old title',
path='',
)
self.lib.add(item_invalid)
# default format
with capture_log('beets.mbsync') as logs:
self.run_command('mbsync')
e = 'mbsync: Skipping singleton with no mb_trackid: ' + \
'album info - album info - old title'
self.assertEqual(e, logs[0])
# custom format
with capture_log('beets.mbsync') as logs:
self.run_command('mbsync', '-f', "'$title'")
e = "mbsync: Skipping singleton with no mb_trackid: 'old title'"
self.assertEqual(e, logs[0])
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View file

@ -15,14 +15,19 @@
from __future__ import (division, absolute_import, print_function,
unicode_literals)
import os
from mock import patch
from test._common import unittest
from test import helper
import shutil
from beets import plugins
from beets.importer import SingletonImportTask, SentinelImportTask, \
ArchiveImportTask
from beets import plugins, config
from beets.library import Item
from beets.dbcore import types
from beets.mediafile import MediaFile
from test.test_importer import ImportHelper
from test._common import unittest, RSRC
from test import helper
class TestHelper(helper.TestHelper):
@ -154,6 +159,96 @@ class ItemTypeConflictTest(unittest.TestCase, TestHelper):
self.assertNotEqual(None, plugins.types(Item))
class EventsTest(unittest.TestCase, ImportHelper, TestHelper):
def setUp(self):
self.setup_plugin_loader()
self.setup_beets()
self.__create_import_dir(2)
config['import']['pretend'] = True
def tearDown(self):
self.teardown_plugin_loader()
self.teardown_beets()
def __copy_file(self, dest_path, metadata):
# Copy files
resource_path = os.path.join(RSRC, 'full.mp3')
shutil.copy(resource_path, dest_path)
medium = MediaFile(dest_path)
# Set metadata
for attr in metadata:
setattr(medium, attr, metadata[attr])
medium.save()
def __create_import_dir(self, count):
self.import_dir = os.path.join(self.temp_dir, 'testsrcdir')
if os.path.isdir(self.import_dir):
shutil.rmtree(self.import_dir)
self.album_path = os.path.join(self.import_dir, 'album')
os.makedirs(self.album_path)
metadata = {
'artist': 'Tag Artist',
'album': 'Tag Album',
'albumartist': None,
'mb_trackid': None,
'mb_albumid': None,
'comp': None
}
self.file_paths = []
for i in range(count):
metadata['track'] = i + 1
metadata['title'] = 'Tag Title Album %d' % (i + 1)
dest_path = os.path.join(self.album_path,
'%02d - track.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.file_paths.append(dest_path)
def test_import_task_created(self):
class ToSingletonPlugin(plugins.BeetsPlugin):
def __init__(self):
super(ToSingletonPlugin, self).__init__()
self.register_listener('import_task_created',
self.import_task_created_event)
def import_task_created_event(self, session, task):
if isinstance(task, SingletonImportTask) \
or isinstance(task, SentinelImportTask)\
or isinstance(task, ArchiveImportTask):
return task
new_tasks = []
for item in task.items:
new_tasks.append(SingletonImportTask(task.toppath, item))
return new_tasks
to_singleton_plugin = ToSingletonPlugin
self.register_plugin(to_singleton_plugin)
import_files = [self.import_dir]
self._setup_import_session(singletons=False)
self.importer.paths = import_files
with helper.capture_log() as logs:
self.importer.run()
self.unload_plugins()
# Exactly one event should have been imported (for the album).
# Sentinels do not get emitted.
self.assertEqual(logs.count('Sending event: import_task_created'), 1)
logs = [line for line in logs if not line.startswith('Sending event:')]
self.assertEqual(logs, [
'Album: {0}/album'.format(self.import_dir),
' {0}'.format(self.file_paths[0]),
' {0}'.format(self.file_paths[1]),
])
class HelpersTest(unittest.TestCase):
def test_sanitize_choices(self):