Merge branch 'mried-import-filefilter'

Merge of PR #1186.
This commit is contained in:
Adrian Sampson 2015-01-20 14:22:55 -08:00
commit ebc065ecec
5 changed files with 395 additions and 1 deletions

View file

@ -0,0 +1,68 @@
# This file is part of beets.
# Copyright 2015, Malte Ried.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Filters the imported files using a regular expression"""
import re
from beets import config
from beets.plugins import BeetsPlugin
from beets.importer import action, SingletonImportTask
class RegexFileFilterPlugin(BeetsPlugin):
def __init__(self):
super(RegexFileFilterPlugin, self).__init__()
self.register_listener('import_task_created',
self.import_task_created_event)
self.config.add({
'path': '.*'
})
self.path_album_regex = \
self.path_singleton_regex = \
re.compile(self.config['path'].get())
if 'album_path' in self.config:
self.path_album_regex = re.compile(self.config['album_path'].get())
if 'singleton_path' in self.config:
self.path_singleton_regex = re.compile(
self.config['singleton_path'].get())
def import_task_created_event(self, session, task):
if task.items and len(task.items) > 0:
items_to_import = []
for item in task.items:
if self.file_filter(item['path']):
items_to_import.append(item)
if len(items_to_import) > 0:
task.items = items_to_import
else:
task.choice_flag = action.SKIP
elif isinstance(task, SingletonImportTask):
if not self.file_filter(task.item['path']):
task.choice_flag = action.SKIP
def file_filter(self, full_path):
"""Checks if the configured regular expressions allow the import of the
file given in full_path.
"""
import_config = dict(config['import'])
if 'singletons' not in import_config or not import_config[
'singletons']:
# Album
return self.path_album_regex.match(full_path) is not None
else:
# Singleton
return self.path_singleton_regex.match(full_path) is not None

View file

@ -63,6 +63,7 @@ Each plugin has its own set of options that can be defined in a section bearing
play
plexupdate
random
regexfilefilter
replaygain
rewrite
scrub
@ -151,6 +152,7 @@ Miscellaneous
* :doc:`mbcollection`: Maintain your MusicBrainz collection list.
* :doc:`missing`: List missing tracks.
* :doc:`random`: Randomly choose albums and tracks from your library.
* :doc:`regexfilefilter`: Automatically skip files during the import process based on regular expressions.
* :doc:`spotify`: Create Spotify playlists from the Beets library.
* :doc:`types`: Declare types for flexible attributes.
* :doc:`web`: An experimental Web-based GUI for beets.

View file

@ -0,0 +1,32 @@
RegexFileFilter Plugin
======================
The ``regexfilefilter`` plugin allows you to skip files during import using
regular expressions.
To use the ``regexfilefilter`` plugin, enable it in your configuration (see
:ref:`using-plugins`).
Configuration
-------------
To configure the plugin, make an ``regexfilefilter:`` section in your
configuration file. The available options are:
- **path**: A regular expression to filter files based on its path and name.
Default: ``.*`` (everything)
- **album_path** and **singleton_path**: You may specify different regular
expressions used for imports of albums and singletons. This way, you can
automatically skip singletons when importing albums if the names (and paths)
of the files are distinguishable via a regex. The path regex defined here
take precedence over the global ``path`` option.
Here's an example::
regexfilefilter:
path: .*\d\d[^/]+$
# will only import files which names start with two digits
album_path: .*\d\d[^/]+$
singleton_path: .*/(?!\d\d)[^/]+$

View file

@ -11,15 +11,21 @@
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
import os
from mock import patch
import shutil
from _common import unittest
from beets.importer import SingletonImportTask, SentinelImportTask, \
ArchiveImportTask
import helper
from beets import plugins
from beets import plugins, config
from beets.library import Item
from beets.dbcore import types
from beets.mediafile import MediaFile
from test import _common
from test.test_importer import ImportHelper
class TestHelper(helper.TestHelper):
@ -151,6 +157,95 @@ class ItemTypeConflictTest(unittest.TestCase, TestHelper):
self.assertNotEqual(None, plugins.types(Item))
class EventsTest(unittest.TestCase, ImportHelper, TestHelper):
def setUp(self):
self.setup_plugin_loader()
self.setup_beets()
self.__create_import_dir(2)
config['import']['pretend'] = True
def tearDown(self):
self.teardown_plugin_loader()
self.teardown_beets()
def __copy_file(self, dest_path, metadata):
# Copy files
resource_path = os.path.join(_common.RSRC, 'full.mp3')
shutil.copy(resource_path, dest_path)
medium = MediaFile(dest_path)
# Set metadata
for attr in metadata:
setattr(medium, attr, metadata[attr])
medium.save()
def __create_import_dir(self, count):
self.import_dir = os.path.join(self.temp_dir, 'testsrcdir')
if os.path.isdir(self.import_dir):
shutil.rmtree(self.import_dir)
self.album_path = os.path.join(self.import_dir, 'album')
os.makedirs(self.album_path)
metadata = {
'artist': 'Tag Artist',
'album': 'Tag Album',
'albumartist': None,
'mb_trackid': None,
'mb_albumid': None,
'comp': None
}
self.file_paths = []
for i in range(count):
metadata['track'] = i + 1
metadata['title'] = 'Tag Title Album %d' % (i + 1)
dest_path = os.path.join(self.album_path,
'%02d - track.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.file_paths.append(dest_path)
def test_import_task_created(self):
class ToSingletonPlugin(plugins.BeetsPlugin):
def __init__(self):
super(ToSingletonPlugin, self).__init__()
self.register_listener('import_task_created',
self.import_task_created_event)
def import_task_created_event(self, session, task):
if isinstance(task, SingletonImportTask) \
or isinstance(task, SentinelImportTask)\
or isinstance(task, ArchiveImportTask):
return task
new_tasks = []
for item in task.items:
new_tasks.append(SingletonImportTask(task.toppath, item))
return new_tasks
to_singleton_plugin = ToSingletonPlugin
self.register_plugin(to_singleton_plugin)
import_files = [self.import_dir]
self._setup_import_session(singletons=False)
self.importer.paths = import_files
with helper.capture_log() as logs:
self.importer.run()
self.unload_plugins()
self.assertEqual(logs.count('Sending event: import_task_created'), 2,
'Only two import_task_created events (one for the '
'album and one for the sentinel)')
logs = [line for line in logs if not line.startswith('Sending event:')]
self.assertEqual(logs, [
'Singleton: %s' % self.file_paths[0],
'Singleton: %s' % self.file_paths[1]
])
class HelpersTest(unittest.TestCase):
def test_sanitize_choices(self):

View file

@ -0,0 +1,197 @@
"""Tests for the 'regexfilefilter' plugin"""
import os
import shutil
from _common import unittest
from beets import config
from beets.mediafile import MediaFile
from beets.util import displayable_path
from beetsplug.regexfilefilter import RegexFileFilterPlugin
from test import _common
from test.helper import capture_log
from test.test_importer import ImportHelper
class RegexFileFilterPluginTest(unittest.TestCase, ImportHelper):
def setUp(self):
self.setup_beets()
self.__create_import_dir(2)
self._setup_import_session()
config['import']['pretend'] = True
def tearDown(self):
self.teardown_beets()
def __copy_file(self, dest_path, metadata):
# Copy files
resource_path = os.path.join(_common.RSRC, 'full.mp3')
shutil.copy(resource_path, dest_path)
medium = MediaFile(dest_path)
# Set metadata
for attr in metadata:
setattr(medium, attr, metadata[attr])
medium.save()
def __create_import_dir(self, count):
self.import_dir = os.path.join(self.temp_dir, 'testsrcdir')
if os.path.isdir(self.import_dir):
shutil.rmtree(self.import_dir)
self.artist_path = os.path.join(self.import_dir, 'artist')
self.album_path = os.path.join(self.artist_path, 'album')
self.misc_path = os.path.join(self.import_dir, 'misc')
os.makedirs(self.album_path)
os.makedirs(self.misc_path)
metadata = {
'artist': 'Tag Artist',
'album': 'Tag Album',
'albumartist': None,
'mb_trackid': None,
'mb_albumid': None,
'comp': None
}
self.album_paths = []
for i in range(count):
metadata['track'] = i + 1
metadata['title'] = 'Tag Title Album %d' % (i + 1)
dest_path = os.path.join(self.album_path,
'%02d - track.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.album_paths.append(dest_path)
self.artist_paths = []
metadata['album'] = None
for i in range(count):
metadata['track'] = i + 10
metadata['title'] = 'Tag Title Artist %d' % (i + 1)
dest_path = os.path.join(self.artist_path,
'track_%d.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.artist_paths.append(dest_path)
self.misc_paths = []
for i in range(count):
metadata['artist'] = 'Artist %d' % (i + 42)
metadata['track'] = i + 5
metadata['title'] = 'Tag Title Misc %d' % (i + 1)
dest_path = os.path.join(self.misc_path, 'track_%d.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.misc_paths.append(dest_path)
def __run(self, expected_lines, singletons=False):
self.load_plugins('regexfilefilter')
import_files = [self.import_dir]
self._setup_import_session(singletons=singletons)
self.importer.paths = import_files
with capture_log() as logs:
self.importer.run()
self.unload_plugins()
RegexFileFilterPlugin.listeners = None
logs = [line for line in logs if not line.startswith('Sending event:')]
self.assertEqual(logs, expected_lines)
def __reset_config(self):
config['regexfilefilter'] = {}
def test_import_default(self):
""" The default configuration should import everything.
"""
self.__reset_config()
self.__run([
'Album %s' % displayable_path(self.artist_path),
' %s' % displayable_path(self.artist_paths[0]),
' %s' % displayable_path(self.artist_paths[1]),
'Album %s' % displayable_path(self.album_path),
' %s' % displayable_path(self.album_paths[0]),
' %s' % displayable_path(self.album_paths[1]),
'Album %s' % displayable_path(self.misc_path),
' %s' % displayable_path(self.misc_paths[0]),
' %s' % displayable_path(self.misc_paths[1])
])
def test_import_nothing(self):
self.__reset_config()
config['regexfilefilter']['path'] = 'not_there'
self.__run(['No files imported from %s' % self.import_dir])
# Global options
def test_import_global(self):
self.__reset_config()
config['regexfilefilter']['path'] = '.*track_1.*\.mp3'
self.__run([
'Album %s' % displayable_path(self.artist_path),
' %s' % displayable_path(self.artist_paths[0]),
'Album %s' % displayable_path(self.misc_path),
' %s' % displayable_path(self.misc_paths[0]),
])
self.__run([
'Singleton: %s' % displayable_path(self.artist_paths[0]),
'Singleton: %s' % displayable_path(self.misc_paths[0])
], singletons=True)
# Album options
def test_import_album(self):
self.__reset_config()
config['regexfilefilter']['album_path'] = '.*track_1.*\.mp3'
self.__run([
'Album %s' % displayable_path(self.artist_path),
' %s' % displayable_path(self.artist_paths[0]),
'Album %s' % displayable_path(self.misc_path),
' %s' % displayable_path(self.misc_paths[0]),
])
self.__run([
'Singleton: %s' % displayable_path(self.artist_paths[0]),
'Singleton: %s' % displayable_path(self.artist_paths[1]),
'Singleton: %s' % displayable_path(self.album_paths[0]),
'Singleton: %s' % displayable_path(self.album_paths[1]),
'Singleton: %s' % displayable_path(self.misc_paths[0]),
'Singleton: %s' % displayable_path(self.misc_paths[1])
], singletons=True)
# Singleton options
def test_import_singleton(self):
self.__reset_config()
config['regexfilefilter']['singleton_path'] = '.*track_1.*\.mp3'
self.__run([
'Singleton: %s' % displayable_path(self.artist_paths[0]),
'Singleton: %s' % displayable_path(self.misc_paths[0])
], singletons=True)
self.__run([
'Album %s' % displayable_path(self.artist_path),
' %s' % displayable_path(self.artist_paths[0]),
' %s' % displayable_path(self.artist_paths[1]),
'Album %s' % displayable_path(self.album_path),
' %s' % displayable_path(self.album_paths[0]),
' %s' % displayable_path(self.album_paths[1]),
'Album %s' % displayable_path(self.misc_path),
' %s' % displayable_path(self.misc_paths[0]),
' %s' % displayable_path(self.misc_paths[1])
])
# Album and singleton options
def test_import_both(self):
self.__reset_config()
config['regexfilefilter']['album_path'] = '.*track_1.*\.mp3'
config['regexfilefilter']['singleton_path'] = '.*track_2.*\.mp3'
self.__run([
'Album %s' % displayable_path(self.artist_path),
' %s' % displayable_path(self.artist_paths[0]),
'Album %s' % displayable_path(self.misc_path),
' %s' % displayable_path(self.misc_paths[0]),
])
self.__run([
'Singleton: %s' % displayable_path(self.artist_paths[1]),
'Singleton: %s' % displayable_path(self.misc_paths[1])
], singletons=True)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == '__main__':
unittest.main(defaultTest='suite')