diff --git a/beetsplug/regexfilefilter.py b/beetsplug/regexfilefilter.py new file mode 100644 index 000000000..56339fdee --- /dev/null +++ b/beetsplug/regexfilefilter.py @@ -0,0 +1,68 @@ +# This file is part of beets. +# Copyright 2015, Malte Ried. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + +"""Filters the imported files using a regular expression""" + +import re +from beets import config +from beets.plugins import BeetsPlugin +from beets.importer import action, SingletonImportTask + + +class RegexFileFilterPlugin(BeetsPlugin): + def __init__(self): + super(RegexFileFilterPlugin, self).__init__() + self.register_listener('import_task_created', + self.import_task_created_event) + self.config.add({ + 'path': '.*' + }) + + self.path_album_regex = \ + self.path_singleton_regex = \ + re.compile(self.config['path'].get()) + + if 'album_path' in self.config: + self.path_album_regex = re.compile(self.config['album_path'].get()) + + if 'singleton_path' in self.config: + self.path_singleton_regex = re.compile( + self.config['singleton_path'].get()) + + def import_task_created_event(self, session, task): + if task.items and len(task.items) > 0: + items_to_import = [] + for item in task.items: + if self.file_filter(item['path']): + items_to_import.append(item) + if len(items_to_import) > 0: + task.items = items_to_import + else: + task.choice_flag = action.SKIP + elif isinstance(task, SingletonImportTask): + if not self.file_filter(task.item['path']): + task.choice_flag = action.SKIP + + def file_filter(self, full_path): + """Checks if the configured regular expressions allow the import of the + file given in full_path. + """ + import_config = dict(config['import']) + if 'singletons' not in import_config or not import_config[ + 'singletons']: + # Album + return self.path_album_regex.match(full_path) is not None + else: + # Singleton + return self.path_singleton_regex.match(full_path) is not None diff --git a/docs/plugins/index.rst b/docs/plugins/index.rst index a84222fa0..f8ecb8975 100644 --- a/docs/plugins/index.rst +++ b/docs/plugins/index.rst @@ -63,6 +63,7 @@ Each plugin has its own set of options that can be defined in a section bearing play plexupdate random + regexfilefilter replaygain rewrite scrub @@ -151,6 +152,7 @@ Miscellaneous * :doc:`mbcollection`: Maintain your MusicBrainz collection list. * :doc:`missing`: List missing tracks. * :doc:`random`: Randomly choose albums and tracks from your library. +* :doc:`regexfilefilter`: Automatically skip files during the import process based on regular expressions. * :doc:`spotify`: Create Spotify playlists from the Beets library. * :doc:`types`: Declare types for flexible attributes. * :doc:`web`: An experimental Web-based GUI for beets. diff --git a/docs/plugins/regexfilefilter.rst b/docs/plugins/regexfilefilter.rst new file mode 100644 index 000000000..dcec1e052 --- /dev/null +++ b/docs/plugins/regexfilefilter.rst @@ -0,0 +1,32 @@ +RegexFileFilter Plugin +====================== + +The ``regexfilefilter`` plugin allows you to skip files during import using +regular expressions. + +To use the ``regexfilefilter`` plugin, enable it in your configuration (see +:ref:`using-plugins`). + +Configuration +------------- + +To configure the plugin, make an ``regexfilefilter:`` section in your +configuration file. The available options are: + +- **path**: A regular expression to filter files based on its path and name. + Default: ``.*`` (everything) +- **album_path** and **singleton_path**: You may specify different regular + expressions used for imports of albums and singletons. This way, you can + automatically skip singletons when importing albums if the names (and paths) + of the files are distinguishable via a regex. The path regex defined here + take precedence over the global ``path`` option. + +Here's an example:: + + regexfilefilter: + path: .*\d\d[^/]+$ + # will only import files which names start with two digits + album_path: .*\d\d[^/]+$ + singleton_path: .*/(?!\d\d)[^/]+$ + + diff --git a/test/test_plugins.py b/test/test_plugins.py index 256fc2ff4..3771828b7 100644 --- a/test/test_plugins.py +++ b/test/test_plugins.py @@ -11,15 +11,21 @@ # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. +import os from mock import patch +import shutil from _common import unittest +from beets.importer import SingletonImportTask, SentinelImportTask, \ + ArchiveImportTask import helper -from beets import plugins +from beets import plugins, config from beets.library import Item from beets.dbcore import types from beets.mediafile import MediaFile +from test import _common +from test.test_importer import ImportHelper class TestHelper(helper.TestHelper): @@ -151,6 +157,95 @@ class ItemTypeConflictTest(unittest.TestCase, TestHelper): self.assertNotEqual(None, plugins.types(Item)) +class EventsTest(unittest.TestCase, ImportHelper, TestHelper): + + def setUp(self): + self.setup_plugin_loader() + self.setup_beets() + self.__create_import_dir(2) + config['import']['pretend'] = True + + def tearDown(self): + self.teardown_plugin_loader() + self.teardown_beets() + + def __copy_file(self, dest_path, metadata): + # Copy files + resource_path = os.path.join(_common.RSRC, 'full.mp3') + shutil.copy(resource_path, dest_path) + medium = MediaFile(dest_path) + # Set metadata + for attr in metadata: + setattr(medium, attr, metadata[attr]) + medium.save() + + def __create_import_dir(self, count): + self.import_dir = os.path.join(self.temp_dir, 'testsrcdir') + if os.path.isdir(self.import_dir): + shutil.rmtree(self.import_dir) + + self.album_path = os.path.join(self.import_dir, 'album') + os.makedirs(self.album_path) + + metadata = { + 'artist': 'Tag Artist', + 'album': 'Tag Album', + 'albumartist': None, + 'mb_trackid': None, + 'mb_albumid': None, + 'comp': None + } + self.file_paths = [] + for i in range(count): + metadata['track'] = i + 1 + metadata['title'] = 'Tag Title Album %d' % (i + 1) + dest_path = os.path.join(self.album_path, + '%02d - track.mp3' % (i + 1)) + self.__copy_file(dest_path, metadata) + self.file_paths.append(dest_path) + + def test_import_task_created(self): + class ToSingletonPlugin(plugins.BeetsPlugin): + def __init__(self): + super(ToSingletonPlugin, self).__init__() + + self.register_listener('import_task_created', + self.import_task_created_event) + + def import_task_created_event(self, session, task): + if isinstance(task, SingletonImportTask) \ + or isinstance(task, SentinelImportTask)\ + or isinstance(task, ArchiveImportTask): + return task + + new_tasks = [] + for item in task.items: + new_tasks.append(SingletonImportTask(task.toppath, item)) + + return new_tasks + + to_singleton_plugin = ToSingletonPlugin + self.register_plugin(to_singleton_plugin) + + import_files = [self.import_dir] + self._setup_import_session(singletons=False) + self.importer.paths = import_files + + with helper.capture_log() as logs: + self.importer.run() + self.unload_plugins() + + self.assertEqual(logs.count('Sending event: import_task_created'), 2, + 'Only two import_task_created events (one for the ' + 'album and one for the sentinel)') + logs = [line for line in logs if not line.startswith('Sending event:')] + + self.assertEqual(logs, [ + 'Singleton: %s' % self.file_paths[0], + 'Singleton: %s' % self.file_paths[1] + ]) + + class HelpersTest(unittest.TestCase): def test_sanitize_choices(self): diff --git a/test/test_regexfilefilter.py b/test/test_regexfilefilter.py new file mode 100644 index 000000000..18b077703 --- /dev/null +++ b/test/test_regexfilefilter.py @@ -0,0 +1,197 @@ +"""Tests for the 'regexfilefilter' plugin""" +import os +import shutil + +from _common import unittest +from beets import config +from beets.mediafile import MediaFile +from beets.util import displayable_path +from beetsplug.regexfilefilter import RegexFileFilterPlugin +from test import _common +from test.helper import capture_log +from test.test_importer import ImportHelper + + +class RegexFileFilterPluginTest(unittest.TestCase, ImportHelper): + def setUp(self): + self.setup_beets() + self.__create_import_dir(2) + self._setup_import_session() + config['import']['pretend'] = True + + def tearDown(self): + self.teardown_beets() + + def __copy_file(self, dest_path, metadata): + # Copy files + resource_path = os.path.join(_common.RSRC, 'full.mp3') + shutil.copy(resource_path, dest_path) + medium = MediaFile(dest_path) + # Set metadata + for attr in metadata: + setattr(medium, attr, metadata[attr]) + medium.save() + + def __create_import_dir(self, count): + self.import_dir = os.path.join(self.temp_dir, 'testsrcdir') + if os.path.isdir(self.import_dir): + shutil.rmtree(self.import_dir) + + self.artist_path = os.path.join(self.import_dir, 'artist') + self.album_path = os.path.join(self.artist_path, 'album') + self.misc_path = os.path.join(self.import_dir, 'misc') + os.makedirs(self.album_path) + os.makedirs(self.misc_path) + + metadata = { + 'artist': 'Tag Artist', + 'album': 'Tag Album', + 'albumartist': None, + 'mb_trackid': None, + 'mb_albumid': None, + 'comp': None + } + self.album_paths = [] + for i in range(count): + metadata['track'] = i + 1 + metadata['title'] = 'Tag Title Album %d' % (i + 1) + dest_path = os.path.join(self.album_path, + '%02d - track.mp3' % (i + 1)) + self.__copy_file(dest_path, metadata) + self.album_paths.append(dest_path) + + self.artist_paths = [] + metadata['album'] = None + for i in range(count): + metadata['track'] = i + 10 + metadata['title'] = 'Tag Title Artist %d' % (i + 1) + dest_path = os.path.join(self.artist_path, + 'track_%d.mp3' % (i + 1)) + self.__copy_file(dest_path, metadata) + self.artist_paths.append(dest_path) + + self.misc_paths = [] + for i in range(count): + metadata['artist'] = 'Artist %d' % (i + 42) + metadata['track'] = i + 5 + metadata['title'] = 'Tag Title Misc %d' % (i + 1) + dest_path = os.path.join(self.misc_path, 'track_%d.mp3' % (i + 1)) + self.__copy_file(dest_path, metadata) + self.misc_paths.append(dest_path) + + def __run(self, expected_lines, singletons=False): + self.load_plugins('regexfilefilter') + + import_files = [self.import_dir] + self._setup_import_session(singletons=singletons) + self.importer.paths = import_files + + with capture_log() as logs: + self.importer.run() + self.unload_plugins() + RegexFileFilterPlugin.listeners = None + + logs = [line for line in logs if not line.startswith('Sending event:')] + + self.assertEqual(logs, expected_lines) + + def __reset_config(self): + config['regexfilefilter'] = {} + + def test_import_default(self): + """ The default configuration should import everything. + """ + self.__reset_config() + self.__run([ + 'Album %s' % displayable_path(self.artist_path), + ' %s' % displayable_path(self.artist_paths[0]), + ' %s' % displayable_path(self.artist_paths[1]), + 'Album %s' % displayable_path(self.album_path), + ' %s' % displayable_path(self.album_paths[0]), + ' %s' % displayable_path(self.album_paths[1]), + 'Album %s' % displayable_path(self.misc_path), + ' %s' % displayable_path(self.misc_paths[0]), + ' %s' % displayable_path(self.misc_paths[1]) + ]) + + def test_import_nothing(self): + self.__reset_config() + config['regexfilefilter']['path'] = 'not_there' + self.__run(['No files imported from %s' % self.import_dir]) + + # Global options + def test_import_global(self): + self.__reset_config() + config['regexfilefilter']['path'] = '.*track_1.*\.mp3' + self.__run([ + 'Album %s' % displayable_path(self.artist_path), + ' %s' % displayable_path(self.artist_paths[0]), + 'Album %s' % displayable_path(self.misc_path), + ' %s' % displayable_path(self.misc_paths[0]), + ]) + self.__run([ + 'Singleton: %s' % displayable_path(self.artist_paths[0]), + 'Singleton: %s' % displayable_path(self.misc_paths[0]) + ], singletons=True) + + # Album options + def test_import_album(self): + self.__reset_config() + config['regexfilefilter']['album_path'] = '.*track_1.*\.mp3' + self.__run([ + 'Album %s' % displayable_path(self.artist_path), + ' %s' % displayable_path(self.artist_paths[0]), + 'Album %s' % displayable_path(self.misc_path), + ' %s' % displayable_path(self.misc_paths[0]), + ]) + self.__run([ + 'Singleton: %s' % displayable_path(self.artist_paths[0]), + 'Singleton: %s' % displayable_path(self.artist_paths[1]), + 'Singleton: %s' % displayable_path(self.album_paths[0]), + 'Singleton: %s' % displayable_path(self.album_paths[1]), + 'Singleton: %s' % displayable_path(self.misc_paths[0]), + 'Singleton: %s' % displayable_path(self.misc_paths[1]) + ], singletons=True) + + # Singleton options + def test_import_singleton(self): + self.__reset_config() + config['regexfilefilter']['singleton_path'] = '.*track_1.*\.mp3' + self.__run([ + 'Singleton: %s' % displayable_path(self.artist_paths[0]), + 'Singleton: %s' % displayable_path(self.misc_paths[0]) + ], singletons=True) + self.__run([ + 'Album %s' % displayable_path(self.artist_path), + ' %s' % displayable_path(self.artist_paths[0]), + ' %s' % displayable_path(self.artist_paths[1]), + 'Album %s' % displayable_path(self.album_path), + ' %s' % displayable_path(self.album_paths[0]), + ' %s' % displayable_path(self.album_paths[1]), + 'Album %s' % displayable_path(self.misc_path), + ' %s' % displayable_path(self.misc_paths[0]), + ' %s' % displayable_path(self.misc_paths[1]) + ]) + + # Album and singleton options + def test_import_both(self): + self.__reset_config() + config['regexfilefilter']['album_path'] = '.*track_1.*\.mp3' + config['regexfilefilter']['singleton_path'] = '.*track_2.*\.mp3' + self.__run([ + 'Album %s' % displayable_path(self.artist_path), + ' %s' % displayable_path(self.artist_paths[0]), + 'Album %s' % displayable_path(self.misc_path), + ' %s' % displayable_path(self.misc_paths[0]), + ]) + self.__run([ + 'Singleton: %s' % displayable_path(self.artist_paths[1]), + 'Singleton: %s' % displayable_path(self.misc_paths[1]) + ], singletons=True) + + +def suite(): + return unittest.TestLoader().loadTestsFromName(__name__) + +if __name__ == '__main__': + unittest.main(defaultTest='suite')