# -*- coding: utf-8 -*- # This file is part of beets. # Copyright 2016, Thomas Scholtes. # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. from __future__ import division, absolute_import, print_function import os from mock import patch, Mock, ANY import shutil import itertools from beets.importer import SingletonImportTask, SentinelImportTask, \ ArchiveImportTask, action from beets import plugins, config, ui from beets.library import Item from beets.dbcore import types from beets.mediafile import MediaFile from test.test_importer import ImportHelper, AutotagStub from test.test_ui_importer import TerminalImportSessionSetup from test._common import unittest, RSRC from test import helper class TestHelper(helper.TestHelper): def setup_plugin_loader(self): # FIXME the mocking code is horrific, but this is the lowest and # earliest level of the plugin mechanism we can hook into. self.load_plugins() self._plugin_loader_patch = patch('beets.plugins.load_plugins') self._plugin_classes = set() load_plugins = self._plugin_loader_patch.start() def myload(names=()): plugins._classes.update(self._plugin_classes) load_plugins.side_effect = myload self.setup_beets() def teardown_plugin_loader(self): self._plugin_loader_patch.stop() self.unload_plugins() def register_plugin(self, plugin_class): self._plugin_classes.add(plugin_class) class ItemTypesTest(unittest.TestCase, TestHelper): def setUp(self): self.setup_plugin_loader() def tearDown(self): self.teardown_plugin_loader() self.teardown_beets() def test_flex_field_type(self): class RatingPlugin(plugins.BeetsPlugin): item_types = {'rating': types.Float()} self.register_plugin(RatingPlugin) self.config['plugins'] = 'rating' item = Item(path=u'apath', artist=u'aaa') item.add(self.lib) # Do not match unset values out = self.run_with_output(u'ls', u'rating:1..3') self.assertNotIn(u'aaa', out) self.run_command(u'modify', u'rating=2', u'--yes') # Match in range out = self.run_with_output(u'ls', u'rating:1..3') self.assertIn(u'aaa', out) # Don't match out of range out = self.run_with_output(u'ls', u'rating:3..5') self.assertNotIn(u'aaa', out) class ItemWriteTest(unittest.TestCase, TestHelper): def setUp(self): self.setup_plugin_loader() self.setup_beets() class EventListenerPlugin(plugins.BeetsPlugin): pass self.event_listener_plugin = EventListenerPlugin() self.register_plugin(EventListenerPlugin) def tearDown(self): self.teardown_plugin_loader() self.teardown_beets() def test_change_tags(self): def on_write(item=None, path=None, tags=None): if tags['artist'] == u'XXX': tags['artist'] = u'YYY' self.register_listener('write', on_write) item = self.add_item_fixture(artist=u'XXX') item.write() mediafile = MediaFile(item.path) self.assertEqual(mediafile.artist, u'YYY') def register_listener(self, event, func): self.event_listener_plugin.register_listener(event, func) class ItemTypeConflictTest(unittest.TestCase, TestHelper): def setUp(self): self.setup_plugin_loader() self.setup_beets() def tearDown(self): self.teardown_plugin_loader() self.teardown_beets() def test_mismatch(self): class EventListenerPlugin(plugins.BeetsPlugin): item_types = {'duplicate': types.INTEGER} class AdventListenerPlugin(plugins.BeetsPlugin): item_types = {'duplicate': types.FLOAT} self.event_listener_plugin = EventListenerPlugin self.advent_listener_plugin = AdventListenerPlugin self.register_plugin(EventListenerPlugin) self.register_plugin(AdventListenerPlugin) self.assertRaises(plugins.PluginConflictException, plugins.types, Item ) def test_match(self): class EventListenerPlugin(plugins.BeetsPlugin): item_types = {'duplicate': types.INTEGER} class AdventListenerPlugin(plugins.BeetsPlugin): item_types = {'duplicate': types.INTEGER} self.event_listener_plugin = EventListenerPlugin self.advent_listener_plugin = AdventListenerPlugin self.register_plugin(EventListenerPlugin) self.register_plugin(AdventListenerPlugin) self.assertNotEqual(None, plugins.types(Item)) class EventsTest(unittest.TestCase, ImportHelper, TestHelper): def setUp(self): self.setup_plugin_loader() self.setup_beets() self.__create_import_dir(2) config['import']['pretend'] = True def tearDown(self): self.teardown_plugin_loader() self.teardown_beets() def __copy_file(self, dest_path, metadata): # Copy files resource_path = os.path.join(RSRC, 'full.mp3') shutil.copy(resource_path, dest_path) medium = MediaFile(dest_path) # Set metadata for attr in metadata: setattr(medium, attr, metadata[attr]) medium.save() def __create_import_dir(self, count): self.import_dir = os.path.join(self.temp_dir, 'testsrcdir') if os.path.isdir(self.import_dir): shutil.rmtree(self.import_dir) self.album_path = os.path.join(self.import_dir, 'album') os.makedirs(self.album_path) metadata = { 'artist': u'Tag Artist', 'album': u'Tag Album', 'albumartist': None, 'mb_trackid': None, 'mb_albumid': None, 'comp': None } self.file_paths = [] for i in range(count): metadata['track'] = i + 1 metadata['title'] = u'Tag Title Album %d' % (i + 1) dest_path = os.path.join(self.album_path, '%02d - track.mp3' % (i + 1)) self.__copy_file(dest_path, metadata) self.file_paths.append(dest_path) def test_import_task_created(self): import_files = [self.import_dir] self._setup_import_session(singletons=False) self.importer.paths = import_files with helper.capture_log() as logs: self.importer.run() self.unload_plugins() # Exactly one event should have been imported (for the album). # Sentinels do not get emitted. self.assertEqual(logs.count(u'Sending event: import_task_created'), 1) logs = [line for line in logs if not line.startswith( u'Sending event:')] self.assertEqual(logs, [ u'Album: {0}'.format(os.path.join(self.import_dir, 'album')), u' {0}'.format(self.file_paths[0]), u' {0}'.format(self.file_paths[1]), ]) def test_import_task_created_with_plugin(self): class ToSingletonPlugin(plugins.BeetsPlugin): def __init__(self): super(ToSingletonPlugin, self).__init__() self.register_listener('import_task_created', self.import_task_created_event) def import_task_created_event(self, session, task): if isinstance(task, SingletonImportTask) \ or isinstance(task, SentinelImportTask)\ or isinstance(task, ArchiveImportTask): return task new_tasks = [] for item in task.items: new_tasks.append(SingletonImportTask(task.toppath, item)) return new_tasks to_singleton_plugin = ToSingletonPlugin self.register_plugin(to_singleton_plugin) import_files = [self.import_dir] self._setup_import_session(singletons=False) self.importer.paths = import_files with helper.capture_log() as logs: self.importer.run() self.unload_plugins() # Exactly one event should have been imported (for the album). # Sentinels do not get emitted. self.assertEqual(logs.count(u'Sending event: import_task_created'), 1) logs = [line for line in logs if not line.startswith( u'Sending event:')] self.assertEqual(logs, [ u'Singleton: {0}'.format(self.file_paths[0]), u'Singleton: {0}'.format(self.file_paths[1]), ]) class HelpersTest(unittest.TestCase): def test_sanitize_choices(self): self.assertEqual( plugins.sanitize_choices([u'A', u'Z'], (u'A', u'B')), [u'A']) self.assertEqual( plugins.sanitize_choices([u'A', u'A'], (u'A')), [u'A']) self.assertEqual( plugins.sanitize_choices([u'D', u'*', u'A'], (u'A', u'B', u'C', u'D')), [u'D', u'B', u'C', u'A']) class ListenersTest(unittest.TestCase, TestHelper): def setUp(self): self.setup_plugin_loader() def tearDown(self): self.teardown_plugin_loader() self.teardown_beets() def test_register(self): class DummyPlugin(plugins.BeetsPlugin): def __init__(self): super(DummyPlugin, self).__init__() self.register_listener('cli_exit', self.dummy) self.register_listener('cli_exit', self.dummy) def dummy(self): pass d = DummyPlugin() self.assertEqual(DummyPlugin._raw_listeners['cli_exit'], [d.dummy]) d2 = DummyPlugin() self.assertEqual(DummyPlugin._raw_listeners['cli_exit'], [d.dummy, d2.dummy]) d.register_listener('cli_exit', d2.dummy) self.assertEqual(DummyPlugin._raw_listeners['cli_exit'], [d.dummy, d2.dummy]) @patch('beets.plugins.find_plugins') @patch('beets.plugins.inspect') def test_events_called(self, mock_inspect, mock_find_plugins): mock_inspect.getargspec.return_value = None class DummyPlugin(plugins.BeetsPlugin): def __init__(self): super(DummyPlugin, self).__init__() self.foo = Mock(__name__=b'foo') self.register_listener('event_foo', self.foo) self.bar = Mock(__name__=b'bar') self.register_listener('event_bar', self.bar) d = DummyPlugin() mock_find_plugins.return_value = d, plugins.send('event') d.foo.assert_has_calls([]) d.bar.assert_has_calls([]) plugins.send('event_foo', var=u"tagada") d.foo.assert_called_once_with(var=u"tagada") d.bar.assert_has_calls([]) @patch('beets.plugins.find_plugins') def test_listener_params(self, mock_find_plugins): test = self class DummyPlugin(plugins.BeetsPlugin): def __init__(self): super(DummyPlugin, self).__init__() for i in itertools.count(1): try: meth = getattr(self, 'dummy{0}'.format(i)) except AttributeError: break self.register_listener('event{0}'.format(i), meth) def dummy1(self, foo): test.assertEqual(foo, 5) def dummy2(self, foo=None): test.assertEqual(foo, 5) def dummy3(self): # argument cut off pass def dummy4(self, bar=None): # argument cut off pass def dummy5(self, bar): test.assertFalse(True) # more complex exmaples def dummy6(self, foo, bar=None): test.assertEqual(foo, 5) test.assertEqual(bar, None) def dummy7(self, foo, **kwargs): test.assertEqual(foo, 5) test.assertEqual(kwargs, {}) def dummy8(self, foo, bar, **kwargs): test.assertFalse(True) def dummy9(self, **kwargs): test.assertEqual(kwargs, {"foo": 5}) d = DummyPlugin() mock_find_plugins.return_value = d, plugins.send('event1', foo=5) plugins.send('event2', foo=5) plugins.send('event3', foo=5) plugins.send('event4', foo=5) with self.assertRaises(TypeError): plugins.send('event5', foo=5) plugins.send('event6', foo=5) plugins.send('event7', foo=5) with self.assertRaises(TypeError): plugins.send('event8', foo=5) plugins.send('event9', foo=5) class PromptChoicesTest(TerminalImportSessionSetup, unittest.TestCase, ImportHelper, TestHelper): def setUp(self): self.setup_plugin_loader() self.setup_beets() self._create_import_dir(3) self._setup_import_session() self.matcher = AutotagStub().install() # keep track of ui.input_option() calls self.input_options_patcher = patch('beets.ui.input_options', side_effect=ui.input_options) self.mock_input_options = self.input_options_patcher.start() def tearDown(self): self.input_options_patcher.stop() self.teardown_plugin_loader() self.teardown_beets() self.matcher.restore() def test_plugin_choices_in_ui_input_options_album(self): """Test the presence of plugin choices on the prompt (album).""" class DummyPlugin(plugins.BeetsPlugin): def __init__(self): super(DummyPlugin, self).__init__() self.register_listener('before_choose_candidate', self.return_choices) def return_choices(self, session, task): return [ui.commands.PromptChoice('f', u'Foo', None), ui.commands.PromptChoice('r', u'baR', None)] self.register_plugin(DummyPlugin) # Default options + extra choices by the plugin ('Foo', 'Bar') opts = (u'Apply', u'More candidates', u'Skip', u'Use as-is', u'as Tracks', u'Group albums', u'Enter search', u'enter Id', u'aBort') + (u'Foo', u'baR') self.importer.add_choice(action.SKIP) self.importer.run() self.mock_input_options.assert_called_once_with(opts, default='a', require=ANY) def test_plugin_choices_in_ui_input_options_singleton(self): """Test the presence of plugin choices on the prompt (singleton).""" class DummyPlugin(plugins.BeetsPlugin): def __init__(self): super(DummyPlugin, self).__init__() self.register_listener('before_choose_candidate', self.return_choices) def return_choices(self, session, task): return [ui.commands.PromptChoice('f', u'Foo', None), ui.commands.PromptChoice('r', u'baR', None)] self.register_plugin(DummyPlugin) # Default options + extra choices by the plugin ('Foo', 'Bar') opts = (u'Apply', u'More candidates', u'Skip', u'Use as-is', u'Enter search', u'enter Id', u'aBort') + (u'Foo', u'baR') config['import']['singletons'] = True self.importer.add_choice(action.SKIP) self.importer.run() self.mock_input_options.assert_called_with(opts, default='a', require=ANY) def test_choices_conflicts(self): """Test the short letter conflict solving.""" class DummyPlugin(plugins.BeetsPlugin): def __init__(self): super(DummyPlugin, self).__init__() self.register_listener('before_choose_candidate', self.return_choices) def return_choices(self, session, task): return [ui.commands.PromptChoice('a', u'A foo', None), # dupe ui.commands.PromptChoice('z', u'baZ', None), # ok ui.commands.PromptChoice('z', u'Zupe', None), # dupe ui.commands.PromptChoice('z', u'Zoo', None)] # dupe self.register_plugin(DummyPlugin) # Default options + not dupe extra choices by the plugin ('baZ') opts = (u'Apply', u'More candidates', u'Skip', u'Use as-is', u'as Tracks', u'Group albums', u'Enter search', u'enter Id', u'aBort') + (u'baZ',) self.importer.add_choice(action.SKIP) self.importer.run() self.mock_input_options.assert_called_once_with(opts, default='a', require=ANY) def test_plugin_callback(self): """Test that plugin callbacks are being called upon user choice.""" class DummyPlugin(plugins.BeetsPlugin): def __init__(self): super(DummyPlugin, self).__init__() self.register_listener('before_choose_candidate', self.return_choices) def return_choices(self, session, task): return [ui.commands.PromptChoice('f', u'Foo', self.foo)] def foo(self, session, task): pass self.register_plugin(DummyPlugin) # Default options + extra choices by the plugin ('Foo', 'Bar') opts = (u'Apply', u'More candidates', u'Skip', u'Use as-is', u'as Tracks', u'Group albums', u'Enter search', u'enter Id', u'aBort') + (u'Foo',) # DummyPlugin.foo() should be called once with patch.object(DummyPlugin, 'foo', autospec=True) as mock_foo: with helper.control_stdin('\n'.join(['f', 's'])): self.importer.run() self.assertEqual(mock_foo.call_count, 1) # input_options should be called twice, as foo() returns None self.assertEqual(self.mock_input_options.call_count, 2) self.mock_input_options.assert_called_with(opts, default='a', require=ANY) def test_plugin_callback_return(self): """Test that plugin callbacks that return a value exit the loop.""" class DummyPlugin(plugins.BeetsPlugin): def __init__(self): super(DummyPlugin, self).__init__() self.register_listener('before_choose_candidate', self.return_choices) def return_choices(self, session, task): return [ui.commands.PromptChoice('f', u'Foo', self.foo)] def foo(self, session, task): return action.SKIP self.register_plugin(DummyPlugin) # Default options + extra choices by the plugin ('Foo', 'Bar') opts = (u'Apply', u'More candidates', u'Skip', u'Use as-is', u'as Tracks', u'Group albums', u'Enter search', u'enter Id', u'aBort') + (u'Foo',) # DummyPlugin.foo() should be called once with helper.control_stdin('f\n'): self.importer.run() # input_options should be called once, as foo() returns SKIP self.mock_input_options.assert_called_once_with(opts, default='a', require=ANY) def suite(): return unittest.TestLoader().loadTestsFromName(__name__) if __name__ == b'__main__': unittest.main(defaultTest='suite')