Added the import_task_created event

Improved the IHatePlugin to filter files based on file names
This commit is contained in:
Malte Ried 2014-12-28 16:18:51 +01:00
parent a892128996
commit 53c44d9a97
4 changed files with 461 additions and 3 deletions

View file

@ -277,6 +277,8 @@ class ImportSession(object):
else:
stages = [query_tasks(self)]
stages += [send_import_task_created_event(self)]
if self.config['pretend']:
# Only log the imported files and end the pipeline
stages += [log_files(self)]
@ -1299,6 +1301,9 @@ def manipulate_files(session, task):
def log_files(session, task):
"""A coroutine (pipeline stage) to log each file which will be imported
"""
if task.skip:
return
if isinstance(task, SingletonImportTask):
log.info(displayable_path(task.item['path']))
elif task.items:
@ -1306,6 +1311,16 @@ def log_files(session, task):
log.info(displayable_path(item['path']))
@pipeline.mutator_stage
def send_import_task_created_event(session, task):
"""A coroutine (pipeline stage) to send the import_task_created event
"""
if task.skip:
return
plugins.send('import_task_created', session=session, task=task)
def group_albums(session):
"""Group the items of a task by albumartist and album name and create a new
task for each album. Yield the tasks as a multi message.

View file

@ -15,8 +15,11 @@
"""Warns you about things you hate (or even blocks import)."""
import logging
import os
import re
from beets import config
from beets.plugins import BeetsPlugin
from beets.importer import action
from beets.importer import action, SingletonImportTask
from beets.library import parse_query_string
from beets.library import Item
from beets.library import Album
@ -43,11 +46,63 @@ class IHatePlugin(BeetsPlugin):
super(IHatePlugin, self).__init__()
self.register_listener('import_task_choice',
self.import_task_choice_event)
self.register_listener('import_task_created',
self.import_task_created_event)
self.config.add({
'warn': [],
'skip': [],
'regex_ignore_case': False,
'regex_invert_folder_result': False,
'regex_invert_file_result': False,
'regex_folder_name': '.*',
'regex_file_name': '.*'
})
flags = re.IGNORECASE if self.config['regex_ignore_case'].get() else 0
self.invert_folder_album_result = \
self.invert_folder_singleton_result = \
self.config['regex_invert_folder_result'].get()
self.invert_file_album_result = \
self.invert_file_singleton_result = \
self.config['regex_invert_file_result'].get()
self.folder_name_album_regex = \
self.folder_name_singleton_regex = \
re.compile(self.config['regex_folder_name'].get(), flags)
self.file_name_album_regex = \
self.file_name_singleton_regex = \
re.compile(self.config['regex_file_name'].get(), flags)
if 'album' in self.config:
album_config = self.config['album']
if 'regex_invert_folder_result' in album_config:
self.invert_folder_album_result = album_config[
'regex_invert_folder_result'].get()
if 'regex_invert_file_result' in album_config:
self.invert_file_album_result = album_config[
'regex_invert_file_result'].get()
if 'regex_folder_name' in album_config:
self.folder_name_album_regex = re.compile(
album_config['regex_folder_name'].get(), flags)
if 'regex_file_name' in album_config:
self.file_name_album_regex = re.compile(
album_config['regex_file_name'].get(), flags)
if 'singleton' in self.config:
singleton_config = self.config['singleton']
if 'regex_invert_folder_result' in singleton_config:
self.invert_folder_singleton_result = singleton_config[
'regex_invert_folder_result'].get()
if 'regex_invert_file_result' in singleton_config:
self.invert_file_singleton_result = singleton_config[
'regex_invert_file_result'].get()
if 'regex_folder_name' in singleton_config:
self.folder_name_singleton_regex = re.compile(
singleton_config['regex_folder_name'].get(), flags)
if 'regex_file_name' in singleton_config:
self.file_name_singleton_regex = re.compile(
singleton_config['regex_file_name'].get(), flags)
@classmethod
def do_i_hate_this(cls, task, action_patterns):
"""Process group of patterns (warn or skip) and returns True if
@ -82,3 +137,81 @@ class IHatePlugin(BeetsPlugin):
self._log.debug(u'[ihate] nothing to do')
else:
self._log.debug(u'[ihate] user made a decision, nothing to do')
def import_task_created_event(self, session, task):
if task.items and len(task.items) > 0:
items_to_import = []
for item in task.items:
if self.file_filter(item['path'], session.paths):
items_to_import.append(item)
if len(items_to_import) > 0:
task.items = items_to_import
else:
task.choice_flag = action.SKIP
elif isinstance(task, SingletonImportTask):
if not self.file_filter(task.item['path'], session.paths):
task.choice_flag = action.SKIP
def file_filter(self, full_path, base_paths):
"""Checks if the configured regular expressions allow the import of the
file given in full_path.
"""
# The folder regex only checks the folder names starting from the
# longest base path. Find this folder.
matched_base_path = ''
for base_path in base_paths:
if full_path.startswith(base_path) and len(base_path) > len(
matched_base_path):
matched_base_path = base_path
relative_path = full_path[len(matched_base_path):]
if os.path.isdir(full_path):
path = relative_path
file_name = None
else:
path, file_name = os.path.split(relative_path)
path, folder_name = os.path.split(path)
import_config = dict(config['import'])
if 'singletons' not in import_config or not import_config[
'singletons']:
# Album
# Folder
while len(folder_name) > 0:
matched = self.folder_name_album_regex.match(
folder_name) is not None
matched = not matched if self.invert_folder_album_result else \
matched
if not matched:
return False
path, folder_name = os.path.split(path)
# File
matched = self.file_name_album_regex.match(
file_name) is not None
matched = not matched if self.invert_file_album_result else matched
if not matched:
return False
return True
else:
# Singleton
# Folder
while len(folder_name) > 0:
matched = self.folder_name_singleton_regex.match(
folder_name) is not None
matched = not matched if \
self.invert_folder_singleton_result else matched
if not matched:
return False
path, folder_name = os.path.split(path)
# File
matched = self.file_name_singleton_regex.match(
file_name) is not None
matched = not matched if self.invert_file_singleton_result else \
matched
if not matched:
return False
return True

View file

@ -159,6 +159,9 @@ currently available are:
* *after_write*: called with an ``Item`` object after a file's metadata is
written to disk (i.e., just after the file on disk is closed).
* *import_task_created*: called after an import task has been created.
Parameters: ``task`` (an `ImportTask`) and ``session`` (an `ImportSession`).
* *import_task_start*: called when before an import task begins processing.
Parameters: ``task`` (an `ImportTask`) and ``session`` (an `ImportSession`).

View file

@ -1,12 +1,108 @@
"""Tests for the 'ihate' plugin"""
import os
import shutil
from _common import unittest
from beets import importer
from beets import importer, config
from beets.library import Item
from beets.mediafile import MediaFile
from beetsplug.ihate import IHatePlugin
from test import _common
from test.helper import capture_log
from test.test_importer import ImportHelper
class IHatePluginTest(unittest.TestCase):
class IHatePluginTest(_common.TestCase, ImportHelper):
def setUp(self):
super(IHatePluginTest, self).setUp()
self.setup_beets()
self.__create_import_dir(2)
self._setup_import_session()
config['import']['pretend'] = True
self.all_paths = [self.artist_paths[0], self.artist_paths[1],
self.album_paths[0], self.album_paths[1],
self.misc_paths[0], self.misc_paths[1]]
def tearDown(self):
self.teardown_beets()
def __copy_file(self, dest_path, metadata):
# Copy files
resource_path = os.path.join(_common.RSRC, 'full.mp3')
shutil.copy(resource_path, dest_path)
medium = MediaFile(dest_path)
# Set metadata
for attr in metadata:
setattr(medium, attr, metadata[attr])
medium.save()
def __create_import_dir(self, count):
self.import_dir = os.path.join(self.temp_dir, 'testsrcdir')
if os.path.isdir(self.import_dir):
shutil.rmtree(self.import_dir)
artist_path = os.path.join(self.import_dir, 'artist')
album_path = os.path.join(artist_path, 'album')
misc_path = os.path.join(self.import_dir, 'misc')
os.makedirs(album_path)
os.makedirs(misc_path)
metadata = {
'artist': 'Tag Artist',
'album': 'Tag Album',
'albumartist': None,
'mb_trackid': None,
'mb_albumid': None,
'comp': None
}
self.album_paths = []
for i in range(count):
metadata['track'] = i + 1
metadata['title'] = 'Tag Title Album %d' % (i + 1)
dest_path = os.path.join(album_path, '%02d - track.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.album_paths.append(dest_path)
self.artist_paths = []
metadata['album'] = None
for i in range(count):
metadata['track'] = i + 10
metadata['title'] = 'Tag Title Artist %d' % (i + 1)
dest_path = os.path.join(artist_path, 'track_%d.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.artist_paths.append(dest_path)
self.misc_paths = []
for i in range(count):
metadata['artist'] = 'Artist %d' % (i + 42)
metadata['track'] = i + 5
metadata['title'] = 'Tag Title Misc %d' % (i + 1)
dest_path = os.path.join(misc_path, 'track_%d.mp3' % (i + 1))
self.__copy_file(dest_path, metadata)
self.misc_paths.append(dest_path)
def __run(self, expected_lines, singletons=False):
import beetsplug
path = beetsplug.__path__
print path
self.load_plugins('ihate')
import_files = [self.import_dir]
self._setup_import_session(singletons=singletons)
self.importer.paths = import_files
with capture_log() as logs:
self.importer.run()
self.unload_plugins()
IHatePlugin.listeners = None
logs = [line for line in logs if not line.startswith('Sending event:')]
self.assertEqual(logs, expected_lines)
def __reset_config(self):
config['ihate'] = {}
def test_hate(self):
@ -42,6 +138,217 @@ class IHatePluginTest(unittest.TestCase):
"artist:testartist album:notthis"]
self.assertTrue(IHatePlugin.do_i_hate_this(task, match_pattern))
def test_import_default(self):
""" The default configuration should import everything.
"""
self.__reset_config()
self.__run(self.all_paths)
def test_import_nothing(self):
self.__reset_config()
config['ihate']['regex_invert_folder_result'] = True
config['ihate']['regex_invert_file_result'] = True
self.__run([])
# Global options
def test_import_global_match_folder(self):
self.__reset_config()
config['ihate']['regex_folder_name'] = 'artist'
self.__run([self.artist_paths[0],
self.artist_paths[1]])
def test_import_global_invert_folder(self):
self.__reset_config()
config['ihate']['regex_folder_name'] = 'artist'
config['ihate']['regex_invert_folder_result'] = True
self.__run([self.misc_paths[0],
self.misc_paths[1]])
def test_import_global_match_file(self):
self.__reset_config()
config['ihate']['regex_file_name'] = '.*2.*'
self.__run([self.artist_paths[1],
self.album_paths[1],
self.misc_paths[1]])
def test_import_global_invert_file(self):
self.__reset_config()
config['ihate']['regex_file_name'] = '.*2.*'
config['ihate']['regex_invert_file_result'] = True
self.__run([self.artist_paths[0],
self.album_paths[0],
self.misc_paths[0]])
def test_import_global_match_folder_case_sensitive(self):
self.__reset_config()
config['ihate']['regex_folder_name'] = 'Artist'
self.__run([])
def test_import_global_match_folder_ignore_case(self):
self.__reset_config()
config['ihate']['regex_ignore_case'] = True
config['ihate']['regex_folder_name'] = 'Artist'
self.__run([self.artist_paths[0],
self.artist_paths[1]])
# Album options
def test_import_album_match_folder(self):
self.__reset_config()
config['ihate']['album']['regex_folder_name'] = 'artist'
self.__run([self.artist_paths[0],
self.artist_paths[1]])
self.__run(self.all_paths, singletons=True)
def test_import_album_invert_folder(self):
self.__reset_config()
config['ihate']['album']['regex_folder_name'] = 'artist'
config['ihate']['album']['regex_invert_folder_result'] = True
self.__run([self.misc_paths[0],
self.misc_paths[1]])
self.__run(self.all_paths, singletons=True)
def test_import_album_match_file(self):
self.__reset_config()
config['ihate']['album']['regex_file_name'] = '.*2.*'
self.__run([self.artist_paths[1],
self.album_paths[1],
self.misc_paths[1]])
self.__run(self.all_paths, singletons=True)
def test_import_album_invert_file(self):
self.__reset_config()
config['ihate']['album']['regex_file_name'] = '.*2.*'
config['ihate']['album']['regex_invert_file_result'] = True
self.__run([self.artist_paths[0],
self.album_paths[0],
self.misc_paths[0]])
self.__run(self.all_paths, singletons=True)
def test_import_album_match_folder_case_sensitive(self):
self.__reset_config()
config['ihate']['album']['regex_folder_name'] = 'Artist'
self.__run([])
self.__run(self.all_paths, singletons=True)
def test_import_album_match_folder_ignore_case(self):
self.__reset_config()
config['ihate']['regex_ignore_case'] = True
config['ihate']['album']['regex_folder_name'] = 'Artist'
self.__run([self.artist_paths[0],
self.artist_paths[1]])
self.__run(self.all_paths, singletons=True)
# Singleton options
def test_import_singleton_match_folder(self):
self.__reset_config()
config['ihate']['singleton']['regex_folder_name'] = 'artist'
self.__run([self.artist_paths[0],
self.artist_paths[1]], singletons=True)
self.__run(self.all_paths)
def test_import_singleton_invert_folder(self):
self.__reset_config()
config['ihate']['singleton']['regex_folder_name'] = 'artist'
config['ihate']['singleton']['regex_invert_folder_result'] = True
self.__run([self.misc_paths[0],
self.misc_paths[1]], singletons=True)
self.__run(self.all_paths)
def test_import_singleton_match_file(self):
self.__reset_config()
config['ihate']['singleton']['regex_file_name'] = '.*2.*'
self.__run([self.artist_paths[1],
self.album_paths[1],
self.misc_paths[1]], singletons=True)
self.__run(self.all_paths)
def test_import_singleton_invert_file(self):
self.__reset_config()
config['ihate']['singleton']['regex_file_name'] = '.*2.*'
config['ihate']['singleton']['regex_invert_file_result'] = True
self.__run([self.artist_paths[0],
self.album_paths[0],
self.misc_paths[0]], singletons=True)
self.__run(self.all_paths)
def test_import_singleton_match_folder_case_sensitive(self):
self.__reset_config()
config['ihate']['singleton']['regex_folder_name'] = 'Artist'
self.__run([], singletons=True)
self.__run(self.all_paths)
def test_import_singleton_match_folder_ignore_case(self):
self.__reset_config()
config['ihate']['regex_ignore_case'] = True
config['ihate']['singleton']['regex_folder_name'] = 'Artist'
self.__run([self.artist_paths[0],
self.artist_paths[1]], singletons=True)
self.__run(self.all_paths)
# Album and singleton options
def test_import_both_match_folder(self):
self.__reset_config()
config['ihate']['album']['regex_folder_name'] = 'artist'
config['ihate']['singleton']['regex_folder_name'] = 'misc'
self.__run([self.artist_paths[0],
self.artist_paths[1]])
self.__run([self.misc_paths[0],
self.misc_paths[1]], singletons=True)
def test_import_both_invert_folder(self):
self.__reset_config()
config['ihate']['album']['regex_folder_name'] = 'artist'
config['ihate']['album']['regex_invert_folder_result'] = True
config['ihate']['singleton']['regex_folder_name'] = 'misc'
config['ihate']['singleton']['regex_invert_folder_result'] = True
self.__run([self.misc_paths[0],
self.misc_paths[1]])
self.__run([self.artist_paths[0],
self.artist_paths[1],
self.album_paths[0],
self.album_paths[1]], singletons=True)
def test_import_both_match_file(self):
self.__reset_config()
config['ihate']['album']['regex_file_name'] = '.*2.*'
config['ihate']['singleton']['regex_file_name'] = '.*1.*'
self.__run([self.artist_paths[1],
self.album_paths[1],
self.misc_paths[1]])
self.__run([self.artist_paths[0],
self.album_paths[0],
self.misc_paths[0]], singletons=True)
def test_import_both_invert_file(self):
self.__reset_config()
config['ihate']['album']['regex_file_name'] = '.*2.*'
config['ihate']['album']['regex_invert_file_result'] = True
config['ihate']['singleton']['regex_file_name'] = '.*1.*'
config['ihate']['singleton']['regex_invert_file_result'] = True
self.__run([self.artist_paths[0],
self.album_paths[0],
self.misc_paths[0]])
self.__run([self.artist_paths[1],
self.album_paths[1],
self.misc_paths[1]], singletons=True)
def test_import_both_match_folder_case_sensitive(self):
self.__reset_config()
config['ihate']['album']['regex_folder_name'] = 'Artist'
config['ihate']['singleton']['regex_folder_name'] = 'Misc'
self.__run([])
self.__run([], singletons=True)
def test_import_both_match_folder_ignore_case(self):
self.__reset_config()
config['ihate']['regex_ignore_case'] = True
config['ihate']['album']['regex_folder_name'] = 'Artist'
config['ihate']['singleton']['regex_folder_name'] = 'Misc'
self.__run([self.artist_paths[0],
self.artist_paths[1]])
self.__run([self.misc_paths[0],
self.misc_paths[1]], singletons=True)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)