Configure plugins using PluginMixin.configure_plugin

This commit is contained in:
Šarūnas Nejus 2024-07-29 15:33:12 +01:00
parent 199f3079f2
commit 5f395ab4f4
No known key found for this signature in database
GPG key ID: DD28F6704DBE3435
6 changed files with 234 additions and 321 deletions

View file

@ -43,7 +43,7 @@ from functools import cached_property
from io import StringIO
from pathlib import Path
from tempfile import mkdtemp, mkstemp
from typing import ClassVar
from typing import Any, ClassVar
from unittest.mock import patch
import responses
@ -498,6 +498,19 @@ class PluginMixin:
Item._queries = getattr(Item, "_original_queries", {})
Album._queries = getattr(Album, "_original_queries", {})
@contextmanager
def configure_plugin(self, config: list[Any] | dict[str, Any]):
if isinstance(config, list):
beets.config[self.plugin] = config
else:
for key, value in config.items():
beets.config[self.plugin][key] = value
self.load_plugins(self.plugin)
yield
self.unload_plugins()
class PluginTestCase(PluginMixin, BeetsTestCase):
pass

View file

@ -16,139 +16,106 @@
"""
from beets.test.helper import BeetsTestCase, PluginMixin
from beets.test.helper import PluginTestCase
from beets.ui import UserError
PLUGIN_NAME = "advancedrewrite"
class AdvancedRewritePluginTest(PluginMixin, BeetsTestCase):
class AdvancedRewritePluginTest(PluginTestCase):
plugin = "advancedrewrite"
preload_plugin = False
def test_simple_rewrite_example(self):
self.config[PLUGIN_NAME] = [
{"artist ODD EYE CIRCLE": "이달의 소녀 오드아이써클"},
]
self.load_plugins(PLUGIN_NAME)
with self.configure_plugin(
[{"artist ODD EYE CIRCLE": "이달의 소녀 오드아이써클"}]
):
item = self.add_item(
artist="ODD EYE CIRCLE",
albumartist="ODD EYE CIRCLE",
)
item = self.add_item(
title="Uncover",
artist="ODD EYE CIRCLE",
albumartist="ODD EYE CIRCLE",
album="Mix & Match",
)
self.assertEqual(item.artist, "이달의 소녀 오드아이써클")
self.assertEqual(item.artist, "이달의 소녀 오드아이써클")
def test_advanced_rewrite_example(self):
self.config[PLUGIN_NAME] = [
{
"match": "mb_artistid:dec0f331-cb08-4c8e-9c9f-aeb1f0f6d88c year:..2022",
"replacements": {
"artist": "이달의 소녀 오드아이써클",
"artist_sort": "LOONA / ODD EYE CIRCLE",
with self.configure_plugin(
[
{
"match": "mb_artistid:dec0f331-cb08-4c8e-9c9f-aeb1f0f6d88c year:..2022", # noqa: E501
"replacements": {
"artist": "이달의 소녀 오드아이써클",
"artist_sort": "LOONA / ODD EYE CIRCLE",
},
},
},
]
self.load_plugins(PLUGIN_NAME)
]
):
item_a = self.add_item(
artist="ODD EYE CIRCLE",
artist_sort="ODD EYE CIRCLE",
mb_artistid="dec0f331-cb08-4c8e-9c9f-aeb1f0f6d88c",
year=2017,
)
item_b = self.add_item(
artist="ODD EYE CIRCLE",
artist_sort="ODD EYE CIRCLE",
mb_artistid="dec0f331-cb08-4c8e-9c9f-aeb1f0f6d88c",
year=2023,
)
item_a = self.add_item(
title="Uncover",
artist="ODD EYE CIRCLE",
albumartist="ODD EYE CIRCLE",
artist_sort="ODD EYE CIRCLE",
albumartist_sort="ODD EYE CIRCLE",
album="Mix & Match",
mb_artistid="dec0f331-cb08-4c8e-9c9f-aeb1f0f6d88c",
year=2017,
)
item_b = self.add_item(
title="Air Force One",
artist="ODD EYE CIRCLE",
albumartist="ODD EYE CIRCLE",
artist_sort="ODD EYE CIRCLE",
albumartist_sort="ODD EYE CIRCLE",
album="ODD EYE CIRCLE <Version Up>",
mb_artistid="dec0f331-cb08-4c8e-9c9f-aeb1f0f6d88c",
year=2023,
)
# Assert that all replacements were applied to item_a
self.assertEqual("이달의 소녀 오드아이써클", item_a.artist)
self.assertEqual("LOONA / ODD EYE CIRCLE", item_a.artist_sort)
self.assertEqual("LOONA / ODD EYE CIRCLE", item_a.albumartist_sort)
# Assert that all replacements were applied to item_a
self.assertEqual("이달의 소녀 오드아이써클", item_a.artist)
self.assertEqual("LOONA / ODD EYE CIRCLE", item_a.artist_sort)
self.assertEqual("LOONA / ODD EYE CIRCLE", item_a.albumartist_sort)
# Assert that no replacements were applied to item_b
self.assertEqual("ODD EYE CIRCLE", item_b.artist)
# Assert that no replacements were applied to item_b
self.assertEqual("ODD EYE CIRCLE", item_b.artist)
def test_advanced_rewrite_example_with_multi_valued_field(self):
self.config[PLUGIN_NAME] = [
{
"match": "artist:배유빈 feat. 김미현",
"replacements": {
"artists": ["유빈", "미미"],
with self.configure_plugin(
[
{
"match": "artist:배유빈 feat. 김미현",
"replacements": {"artists": ["유빈", "미미"]},
},
},
]
self.load_plugins(PLUGIN_NAME)
]
):
item = self.add_item(
artist="배유빈 feat. 김미현",
artists=["배유빈", "김미현"],
)
item = self.add_item(
artist="배유빈 feat. 김미현",
artists=["배유빈", "김미현"],
)
self.assertEqual(item.artists, ["유빈", "미미"])
self.assertEqual(item.artists, ["유빈", "미미"])
def test_fail_when_replacements_empty(self):
self.config[PLUGIN_NAME] = [
{
"match": "artist:A",
"replacements": {},
},
]
with self.assertRaises(
UserError,
msg="Advanced rewrites must have at least one replacement",
):
self.load_plugins(PLUGIN_NAME)
), self.configure_plugin([{"match": "artist:A", "replacements": {}}]):
pass
def test_fail_when_rewriting_single_valued_field_with_list(self):
self.config[PLUGIN_NAME] = [
{
"match": "artist:'A & B'",
"replacements": {
"artist": ["C", "D"],
},
},
]
with self.assertRaises(
UserError,
msg="Field artist is not a multi-valued field but a list was given: C, D",
), self.configure_plugin(
[
{
"match": "artist:'A & B'",
"replacements": {"artist": ["C", "D"]},
},
]
):
self.load_plugins(PLUGIN_NAME)
pass
def test_combined_rewrite_example(self):
self.config[PLUGIN_NAME] = [
{"artist A": "B"},
{
"match": "album:'C'",
"replacements": {
"artist": "D",
},
},
]
self.load_plugins(PLUGIN_NAME)
with self.configure_plugin(
[
{"artist A": "B"},
{"match": "album:'C'", "replacements": {"artist": "D"}},
]
):
item = self.add_item(artist="A", albumartist="A")
self.assertEqual(item.artist, "B")
item = self.add_item(
artist="A",
albumartist="A",
)
self.assertEqual(item.artist, "B")
item = self.add_item(
artist="C",
albumartist="C",
album="C",
)
self.assertEqual(item.artist, "D")
item = self.add_item(artist="C", albumartist="C", album="C")
self.assertEqual(item.artist, "D")

View file

@ -14,7 +14,6 @@
"""Tests for the `filefilter` plugin.
"""
from beets import config
from beets.test.helper import ImportTestCase, PluginMixin
from beets.util import bytestring_path
@ -42,10 +41,9 @@ class FileFilterPluginMixin(PluginMixin, ImportTestCase):
self.single_track,
}
def _run(self, expected_album_count, expected_paths):
self.load_plugins("filefilter")
self.importer.run()
def _run(self, config, expected_album_count, expected_paths):
with self.configure_plugin(config):
self.importer.run()
self.assertEqual(len(self.lib.albums()), expected_album_count)
self.assertEqual({i.path for i in self.lib.items()}, expected_paths)
@ -58,24 +56,28 @@ class FileFilterPluginNonSingletonTest(FileFilterPluginMixin):
def test_import_default(self):
"""The default configuration should import everything."""
self._run(3, self.all_tracks)
self._run({}, 3, self.all_tracks)
def test_import_nothing(self):
config["filefilter"]["path"] = "not_there"
self._run(0, set())
self._run({"path": "not_there"}, 0, set())
def test_global_config(self):
config["filefilter"]["path"] = ".*album.*"
self._run(2, {self.album_track, self.other_album_track})
self._run(
{"path": ".*album.*"},
2,
{self.album_track, self.other_album_track},
)
def test_album_config(self):
config["filefilter"]["album_path"] = ".*other_album.*"
self._run(1, {self.other_album_track})
self._run(
{"album_path": ".*other_album.*"},
1,
{self.other_album_track},
)
def test_singleton_config(self):
"""Check that singleton configuration is ignored for album import."""
config["filefilter"]["singleton_path"] = ".*other_album.*"
self._run(3, self.all_tracks)
self._run({"singleton_path": ".*other_album.*"}, 3, self.all_tracks)
class FileFilterPluginSingletonTest(FileFilterPluginMixin):
@ -84,14 +86,15 @@ class FileFilterPluginSingletonTest(FileFilterPluginMixin):
self.importer = self.setup_singleton_importer(autotag=False, copy=False)
def test_global_config(self):
config["filefilter"]["path"] = ".*album.*"
self._run(0, {self.album_track, self.other_album_track})
self._run(
{"path": ".*album.*"}, 0, {self.album_track, self.other_album_track}
)
def test_album_config(self):
"""Check that album configuration is ignored for singleton import."""
config["filefilter"]["album_path"] = ".*other_album.*"
self._run(0, self.all_tracks)
self._run({"album_path": ".*other_album.*"}, 0, self.all_tracks)
def test_singleton_config(self):
config["filefilter"]["singleton_path"] = ".*other_album.*"
self._run(0, {self.other_album_track})
self._run(
{"singleton_path": ".*other_album.*"}, 0, {self.other_album_track}
)

View file

@ -13,148 +13,112 @@
# included in all copies or substantial portions of the Software.
from __future__ import annotations
import os.path
import sys
import tempfile
import unittest
from contextlib import contextmanager
from typing import Callable, Iterator
from beets import config, plugins
from beets.test.helper import BeetsTestCase, PluginMixin, capture_log
from beets import plugins
from beets.test.helper import PluginTestCase, capture_log
def get_temporary_path():
temporary_directory = tempfile._get_default_tempdir()
temporary_name = next(tempfile._get_candidate_names())
return os.path.join(temporary_directory, temporary_name)
class HookTest(PluginMixin, BeetsTestCase):
class HookTestCase(PluginTestCase):
plugin = "hook"
preload_plugin = False
TEST_HOOK_COUNT = 5
def _add_hook(self, event, command):
hook = {"event": event, "command": command}
def _get_hook(self, event: str, command: str) -> dict[str, str]:
return {"event": event, "command": command}
hooks = config["hook"]["hooks"].get(list) if "hook" in config else []
hooks.append(hook)
config["hook"]["hooks"] = hooks
class HookLogsTest(HookTestCase):
@contextmanager
def _configure_logs(self, command: str) -> Iterator[list[str]]:
config = {"hooks": [self._get_hook("test_event", command)]}
with self.configure_plugin(config), capture_log("beets.hook") as logs:
plugins.send("test_event")
yield logs
def test_hook_empty_command(self):
self._add_hook("test_event", "")
self.load_plugins("hook")
with capture_log("beets.hook") as logs:
plugins.send("test_event")
self.assertIn('hook: invalid command ""', logs)
with self._configure_logs("") as logs:
self.assertIn('hook: invalid command ""', logs)
# FIXME: fails on windows
@unittest.skipIf(sys.platform == "win32", "win32")
def test_hook_non_zero_exit(self):
self._add_hook("test_event", 'sh -c "exit 1"')
self.load_plugins("hook")
with capture_log("beets.hook") as logs:
plugins.send("test_event")
self.assertIn("hook: hook for test_event exited with status 1", logs)
with self._configure_logs('sh -c "exit 1"') as logs:
self.assertIn(
"hook: hook for test_event exited with status 1", logs
)
def test_hook_non_existent_command(self):
self._add_hook("test_event", "non-existent-command")
with self._configure_logs("non-existent-command") as logs:
logs = "\n".join(logs)
self.load_plugins("hook")
self.assertIn("hook: hook for test_event failed: ", logs)
# The error message is different for each OS. Unfortunately the text is
# different in each case, where the only shared text is the string
# 'file' and substring 'Err'
self.assertIn("Err", logs)
self.assertIn("file", logs)
with capture_log("beets.hook") as logs:
plugins.send("test_event")
self.assertTrue(
any(
message.startswith("hook: hook for test_event failed: ")
for message in logs
)
)
class HookCommandTest(HookTestCase):
TEST_HOOK_COUNT = 2
events = [f"test_event_{i}" for i in range(TEST_HOOK_COUNT)]
def setUp(self):
super().setUp()
temp_dir = os.fsdecode(self.temp_dir)
self.paths = [os.path.join(temp_dir, e) for e in self.events]
def _test_command(
self,
make_test_path: Callable[[str, str], str],
send_path_kwarg: bool = False,
) -> None:
"""Check that each of the configured hooks is executed.
Configure hooks for each event:
1. Use the given 'make_test_path' callable to create a test path from the event
and the original path.
2. Configure a hook with a command to touch this path.
For each of the original paths:
1. Send a test event
2. Assert that a file has been created under the original path, which proves
that the configured hook command has been executed.
"""
hooks = [
self._get_hook(e, f"touch {make_test_path(e, p)}")
for e, p in zip(self.events, self.paths)
]
with self.configure_plugin({"hooks": hooks}):
for event, path in zip(self.events, self.paths):
if send_path_kwarg:
plugins.send(event, path=path)
else:
plugins.send(event)
self.assertTrue(os.path.isfile(path))
# FIXME: fails on windows
@unittest.skipIf(sys.platform == "win32", "win32")
def test_hook_no_arguments(self):
temporary_paths = [
get_temporary_path() for i in range(self.TEST_HOOK_COUNT)
]
self._test_command(lambda _, p: p)
for index, path in enumerate(temporary_paths):
self._add_hook(f"test_no_argument_event_{index}", f'touch "{path}"')
self.load_plugins("hook")
for index in range(len(temporary_paths)):
plugins.send(f"test_no_argument_event_{index}")
for path in temporary_paths:
self.assertTrue(os.path.isfile(path))
os.remove(path)
# FIXME: fails on windows
@unittest.skipIf(sys.platform == "win32", "win32")
def test_hook_event_substitution(self):
temporary_directory = tempfile._get_default_tempdir()
event_names = [
f"test_event_event_{i}" for i in range(self.TEST_HOOK_COUNT)
]
self._test_command(lambda e, p: p.replace(e, "{event}"))
for event in event_names:
self._add_hook(event, f'touch "{temporary_directory}/{{event}}"')
self.load_plugins("hook")
for event in event_names:
plugins.send(event)
for event in event_names:
path = os.path.join(temporary_directory, event)
self.assertTrue(os.path.isfile(path))
os.remove(path)
# FIXME: fails on windows
@unittest.skipIf(sys.platform == "win32", "win32")
def test_hook_argument_substitution(self):
temporary_paths = [
get_temporary_path() for i in range(self.TEST_HOOK_COUNT)
]
self._test_command(lambda *_: "{path}", send_path_kwarg=True)
for index, path in enumerate(temporary_paths):
self._add_hook(f"test_argument_event_{index}", 'touch "{path}"')
self.load_plugins("hook")
for index, path in enumerate(temporary_paths):
plugins.send(f"test_argument_event_{index}", path=path)
for path in temporary_paths:
self.assertTrue(os.path.isfile(path))
os.remove(path)
# FIXME: fails on windows
@unittest.skipIf(sys.platform == "win32", "win32")
def test_hook_bytes_interpolation(self):
temporary_paths = [
get_temporary_path().encode("utf-8")
for i in range(self.TEST_HOOK_COUNT)
]
for index, path in enumerate(temporary_paths):
self._add_hook(f"test_bytes_event_{index}", 'touch "{path}"')
self.load_plugins("hook")
for index, path in enumerate(temporary_paths):
plugins.send(f"test_bytes_event_{index}", path=path)
for path in temporary_paths:
self.assertTrue(os.path.isfile(path))
os.remove(path)
self.paths = [p.encode() for p in self.paths]
self._test_command(lambda *_: "{path}", send_path_kwarg=True)

View file

@ -3,26 +3,16 @@
from mediafile import MediaFile
from beets.library import Item
from beets.test.helper import BeetsTestCase, PluginMixin, control_stdin
from beets.test.helper import PluginTestCase, control_stdin
from beets.util import syspath
from beetsplug.zero import ZeroPlugin
class ZeroPluginTest(PluginMixin, BeetsTestCase):
class ZeroPluginTest(PluginTestCase):
plugin = "zero"
preload_plugin = False
def setUp(self):
super().setUp()
self.config["zero"] = {
"fields": [],
"keep_fields": [],
"update_database": False,
}
def test_no_patterns(self):
self.config["zero"]["fields"] = ["comments", "month"]
item = self.add_item_fixture(
comments="test comment",
title="Title",
@ -31,8 +21,8 @@ class ZeroPluginTest(PluginMixin, BeetsTestCase):
)
item.write()
self.load_plugins("zero")
item.write()
with self.configure_plugin({"fields": ["comments", "month"]}):
item.write()
mf = MediaFile(syspath(item.path))
self.assertIsNone(mf.comments)
@ -41,76 +31,67 @@ class ZeroPluginTest(PluginMixin, BeetsTestCase):
self.assertEqual(mf.year, 2000)
def test_pattern_match(self):
self.config["zero"]["fields"] = ["comments"]
self.config["zero"]["comments"] = ["encoded by"]
item = self.add_item_fixture(comments="encoded by encoder")
item.write()
self.load_plugins("zero")
item.write()
with self.configure_plugin(
{"fields": ["comments"], "comments": ["encoded by"]}
):
item.write()
mf = MediaFile(syspath(item.path))
self.assertIsNone(mf.comments)
def test_pattern_nomatch(self):
self.config["zero"]["fields"] = ["comments"]
self.config["zero"]["comments"] = ["encoded by"]
item = self.add_item_fixture(comments="recorded at place")
item.write()
self.load_plugins("zero")
item.write()
with self.configure_plugin(
{"fields": ["comments"], "comments": ["encoded_by"]}
):
item.write()
mf = MediaFile(syspath(item.path))
self.assertEqual(mf.comments, "recorded at place")
def test_do_not_change_database(self):
self.config["zero"]["fields"] = ["year"]
item = self.add_item_fixture(year=2000)
item.write()
self.load_plugins("zero")
item.write()
with self.configure_plugin({"fields": ["year"]}):
item.write()
self.assertEqual(item["year"], 2000)
def test_change_database(self):
self.config["zero"]["fields"] = ["year"]
self.config["zero"]["update_database"] = True
item = self.add_item_fixture(year=2000)
item.write()
self.load_plugins("zero")
item.write()
with self.configure_plugin(
{"fields": ["year"], "update_database": True}
):
item.write()
self.assertEqual(item["year"], 0)
def test_album_art(self):
self.config["zero"]["fields"] = ["images"]
path = self.create_mediafile_fixture(images=["jpg"])
item = Item.from_path(path)
self.load_plugins("zero")
item.write()
with self.configure_plugin({"fields": ["images"]}):
item.write()
mf = MediaFile(syspath(path))
self.assertFalse(mf.images)
def test_auto_false(self):
self.config["zero"]["fields"] = ["year"]
self.config["zero"]["update_database"] = True
self.config["zero"]["auto"] = False
item = self.add_item_fixture(year=2000)
item.write()
self.load_plugins("zero")
item.write()
with self.configure_plugin(
{"fields": ["year"], "update_database": True, "auto": False}
):
item.write()
self.assertEqual(item["year"], 2000)
@ -120,12 +101,10 @@ class ZeroPluginTest(PluginMixin, BeetsTestCase):
)
item.write()
item_id = item.id
self.config["zero"]["fields"] = ["comments"]
self.config["zero"]["update_database"] = True
self.config["zero"]["auto"] = False
self.load_plugins("zero")
with control_stdin("y"):
with self.configure_plugin(
{"fields": ["comments"], "update_database": True, "auto": False}
), control_stdin("y"):
self.run_command("zero")
mf = MediaFile(syspath(item.path))
@ -143,12 +122,9 @@ class ZeroPluginTest(PluginMixin, BeetsTestCase):
item.write()
item_id = item.id
self.config["zero"]["fields"] = ["comments"]
self.config["zero"]["update_database"] = False
self.config["zero"]["auto"] = False
self.load_plugins("zero")
with control_stdin("y"):
with self.configure_plugin(
{"fields": ["comments"], "update_database": False, "auto": False}
), control_stdin("y"):
self.run_command("zero")
mf = MediaFile(syspath(item.path))
@ -166,12 +142,10 @@ class ZeroPluginTest(PluginMixin, BeetsTestCase):
item.write()
self.config["zero"]["fields"] = ["comments"]
self.config["zero"]["update_database"] = False
self.config["zero"]["auto"] = False
self.load_plugins("zero")
self.run_command("zero", "year: 2016")
with self.configure_plugin(
{"fields": ["comments"], "update_database": False, "auto": False}
):
self.run_command("zero", "year: 2016")
mf = MediaFile(syspath(item.path))
@ -185,12 +159,10 @@ class ZeroPluginTest(PluginMixin, BeetsTestCase):
item.write()
self.config["zero"]["fields"] = ["comments"]
self.config["zero"]["update_database"] = False
self.config["zero"]["auto"] = False
self.load_plugins("zero")
self.run_command("zero", "year: 0000")
with self.configure_plugin(
{"fields": ["comments"], "update_database": False, "auto": False}
):
self.run_command("zero", "year: 0000")
mf = MediaFile(syspath(item.path))
@ -205,8 +177,7 @@ class ZeroPluginTest(PluginMixin, BeetsTestCase):
item_id = item.id
self.load_plugins("zero")
with control_stdin("y"):
with self.configure_plugin({"fields": []}), control_stdin("y"):
self.run_command("zero")
item = self.lib.get_item(item_id)
@ -221,11 +192,10 @@ class ZeroPluginTest(PluginMixin, BeetsTestCase):
self.assertEqual(mf.year, 2016)
item_id = item.id
self.config["zero"]["fields"] = ["year"]
self.config["zero"]["keep_fields"] = ["comments"]
self.load_plugins("zero")
with control_stdin("y"):
with self.configure_plugin(
{"fields": ["year"], "keep_fields": ["comments"]}
), control_stdin("y"):
self.run_command("zero")
item = self.lib.get_item(item_id)
@ -235,18 +205,17 @@ class ZeroPluginTest(PluginMixin, BeetsTestCase):
def test_keep_fields(self):
item = self.add_item_fixture(year=2016, comments="test comment")
self.config["zero"]["keep_fields"] = ["year"]
self.config["zero"]["fields"] = None
self.config["zero"]["update_database"] = True
tags = {
"comments": "test comment",
"year": 2016,
}
self.load_plugins("zero")
z = ZeroPlugin()
z.write_event(item, item.path, tags)
with self.configure_plugin(
{"fields": None, "keep_fields": ["year"], "update_database": True}
):
z = ZeroPlugin()
z.write_event(item, item.path, tags)
self.assertIsNone(tags["comments"])
self.assertEqual(tags["year"], 2016)
@ -273,12 +242,9 @@ class ZeroPluginTest(PluginMixin, BeetsTestCase):
)
item.write()
item_id = item.id
self.config["zero"]["fields"] = ["comments"]
self.config["zero"]["update_database"] = True
self.config["zero"]["auto"] = False
self.load_plugins("zero")
with control_stdin("n"):
with self.configure_plugin(
{"fields": ["comments"], "update_database": True, "auto": False}
), control_stdin("n"):
self.run_command("zero")
mf = MediaFile(syspath(item.path))

View file

@ -1687,7 +1687,7 @@ class ImportPretendTest(ImportTestCase):
self.matcher = AutotagStub().install()
self.io.install()
self.album_track_path, *_ = self.prepare_album_for_import(1)
self.album_track_path = self.prepare_album_for_import(1)[0]
self.single_path = self.prepare_track_for_import(2, self.import_path)
self.album_path = self.album_track_path.parent