Simplify plugin loading (#5887)

This PR centralises plugin loading logic inside `beets.plugins` module.


- Removed intermediatery `_classes` variable by initialising plugins
immediately.
- Simplified listeners registration by defining listener variables in
the base
`BeetsPlugin` class, and making the `register_listener` method a
`@classmethod`.
- Simplified plugin test setup accordingly.
This commit is contained in:
Šarūnas Nejus 2025-08-10 13:46:53 +01:00 committed by GitHub
commit 52cf74125e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 274 additions and 359 deletions

View file

@ -64,6 +64,12 @@ jobs:
poe docs
poe test-with-coverage
- if: ${{ !cancelled() }}
name: Upload test results to Codecov
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}
- if: ${{ env.IS_MAIN_PYTHON == 'true' }}
name: Store the coverage report
uses: actions/upload-artifact@v4
@ -86,7 +92,7 @@ jobs:
name: coverage-report
- name: Upload code coverage
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5
with:
files: ./coverage.xml
use_oidc: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork) }}

View file

@ -1,33 +0,0 @@
from typing import Literal
EventType = Literal[
"pluginload",
"import",
"album_imported",
"album_removed",
"item_copied",
"item_imported",
"before_item_moved",
"item_moved",
"item_linked",
"item_hardlinked",
"item_reflinked",
"item_removed",
"write",
"after_write",
"import_task_created",
"import_task_start",
"import_task_apply",
"import_task_before_choice",
"import_task_choice",
"import_task_files",
"library_opened",
"database_change",
"cli_exit",
"import_begin",
"trackinfo_received",
"albuminfo_received",
"before_choose_candidate",
"mb_track_extract",
"mb_album_extract",
]

View file

@ -19,24 +19,22 @@ from __future__ import annotations
import abc
import inspect
import re
import traceback
import sys
from collections import defaultdict
from functools import wraps
from pathlib import Path
from types import GenericAlias
from typing import TYPE_CHECKING, Any, Callable, Sequence, TypeVar
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypeVar
import mediafile
from typing_extensions import ParamSpec
import beets
from beets import logging
from beets.util import unique_list
if TYPE_CHECKING:
from beets.event_types import EventType
if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Callable, Iterable, Sequence
from confuse import ConfigView
@ -58,7 +56,7 @@ if TYPE_CHECKING:
P = ParamSpec("P")
Ret = TypeVar("Ret", bound=Any)
Listener = Callable[..., None]
Listener = Callable[..., Any]
IterF = Callable[P, Iterable[Ret]]
@ -67,6 +65,37 @@ PLUGIN_NAMESPACE = "beetsplug"
# Plugins using the Last.fm API can share the same API key.
LASTFM_KEY = "2dc3914abf35f0d9c92d97d8f8e42b43"
EventType = Literal[
"after_write",
"album_imported",
"album_removed",
"albuminfo_received",
"before_choose_candidate",
"before_item_moved",
"cli_exit",
"database_change",
"import",
"import_begin",
"import_task_apply",
"import_task_before_choice",
"import_task_choice",
"import_task_created",
"import_task_files",
"import_task_start",
"item_copied",
"item_hardlinked",
"item_imported",
"item_linked",
"item_moved",
"item_reflinked",
"item_removed",
"library_opened",
"mb_album_extract",
"mb_track_extract",
"pluginload",
"trackinfo_received",
"write",
]
# Global logger.
log = logging.getLogger("beets")
@ -79,6 +108,17 @@ class PluginConflictError(Exception):
"""
class PluginImportError(ImportError):
"""Indicates that a plugin could not be imported.
This is a subclass of ImportError so that it can be caught separately
from other errors.
"""
def __init__(self, name: str):
super().__init__(f"Could not import plugin {name}")
class PluginLogFilter(logging.Filter):
"""A logging filter that identifies the plugin that emitted a log
message.
@ -105,6 +145,14 @@ class BeetsPlugin(metaclass=abc.ABCMeta):
the abstract methods defined here.
"""
_raw_listeners: ClassVar[dict[EventType, list[Listener]]] = defaultdict(
list
)
listeners: ClassVar[dict[EventType, list[Listener]]] = defaultdict(list)
template_funcs: TFuncMap[str] | None = None
template_fields: TFuncMap[Item] | None = None
album_template_fields: TFuncMap[Album] | None = None
name: str
config: ConfigView
early_import_stages: list[ImportStageFunc]
@ -218,25 +266,13 @@ class BeetsPlugin(metaclass=abc.ABCMeta):
mediafile.MediaFile.add_field(name, descriptor)
library.Item._media_fields.add(name)
_raw_listeners: dict[str, list[Listener]] | None = None
listeners: dict[str, list[Listener]] | None = None
def register_listener(self, event: "EventType", func: Listener):
def register_listener(self, event: EventType, func: Listener) -> None:
"""Add a function as a listener for the specified event."""
wrapped_func = self._set_log_level_and_params(logging.WARNING, func)
cls = self.__class__
if cls.listeners is None or cls._raw_listeners is None:
cls._raw_listeners = defaultdict(list)
cls.listeners = defaultdict(list)
if func not in cls._raw_listeners[event]:
cls._raw_listeners[event].append(func)
cls.listeners[event].append(wrapped_func)
template_funcs: TFuncMap[str] | None = None
template_fields: TFuncMap[Item] | None = None
album_template_fields: TFuncMap[Album] | None = None
if func not in self._raw_listeners[event]:
self._raw_listeners[event].append(func)
self.listeners[event].append(
self._set_log_level_and_params(logging.WARNING, func)
)
@classmethod
def template_func(cls, name: str) -> Callable[[TFunc[str]], TFunc[str]]:
@ -270,69 +306,92 @@ class BeetsPlugin(metaclass=abc.ABCMeta):
return helper
_classes: set[type[BeetsPlugin]] = set()
def get_plugin_names() -> list[str]:
"""Discover and return the set of plugin names to be loaded.
def load_plugins(names: Sequence[str] = ()) -> None:
"""Imports the modules for a sequence of plugin names. Each name
must be the name of a Python module under the "beetsplug" namespace
package in sys.path; the module indicated should contain the
BeetsPlugin subclasses desired.
Configures the plugin search paths and resolves the final set of plugins
based on configuration settings, inclusion filters, and exclusion rules.
Automatically includes the musicbrainz plugin when enabled in configuration.
"""
for name in names:
modname = f"{PLUGIN_NAMESPACE}.{name}"
paths = [
str(Path(p).expanduser().absolute())
for p in beets.config["pluginpath"].as_str_seq(split=False)
]
log.debug("plugin paths: {}", paths)
# Extend the `beetsplug` package to include the plugin paths.
import beetsplug
beetsplug.__path__ = paths + list(beetsplug.__path__)
# For backwards compatibility, also support plugin paths that
# *contain* a `beetsplug` package.
sys.path += paths
plugins = unique_list(beets.config["plugins"].as_str_seq())
# TODO: Remove in v3.0.0
if (
"musicbrainz" not in plugins
and "musicbrainz" in beets.config
and beets.config["musicbrainz"].get().get("enabled")
):
plugins.append("musicbrainz")
beets.config.add({"disabled_plugins": []})
disabled_plugins = set(beets.config["disabled_plugins"].as_str_seq())
return [p for p in plugins if p not in disabled_plugins]
def _get_plugin(name: str) -> BeetsPlugin | None:
"""Dynamically load and instantiate a plugin class by name.
Attempts to import the plugin module, locate the appropriate plugin class
within it, and return an instance. Handles import failures gracefully and
logs warnings for missing plugins or loading errors.
"""
try:
try:
try:
namespace = __import__(modname, None, None)
except ImportError as exc:
# Again, this is hacky:
if exc.args[0].endswith(" " + name):
log.warning("** plugin {0} not found", name)
else:
raise
else:
for obj in getattr(namespace, name).__dict__.values():
if (
inspect.isclass(obj)
and not isinstance(
obj, GenericAlias
) # seems to be needed for python <= 3.9 only
and issubclass(obj, BeetsPlugin)
and obj != BeetsPlugin
and not inspect.isabstract(obj)
and obj not in _classes
):
_classes.add(obj)
namespace = __import__(f"{PLUGIN_NAMESPACE}.{name}", None, None)
except Exception as exc:
raise PluginImportError(name) from exc
except Exception:
log.warning(
"** error loading plugin {}:\n{}",
name,
traceback.format_exc(),
)
for obj in getattr(namespace, name).__dict__.values():
if (
inspect.isclass(obj)
and not isinstance(
obj, GenericAlias
) # seems to be needed for python <= 3.9 only
and issubclass(obj, BeetsPlugin)
and obj != BeetsPlugin
and not inspect.isabstract(obj)
):
return obj()
except Exception:
log.warning("** error loading plugin {}", name, exc_info=True)
return None
_instances: dict[type[BeetsPlugin], BeetsPlugin] = {}
_instances: list[BeetsPlugin] = []
def find_plugins() -> list[BeetsPlugin]:
"""Returns a list of BeetsPlugin subclass instances from all
currently loaded beets plugins. Loads the default plugin set
first.
def load_plugins() -> None:
"""Initialize the plugin system by loading all configured plugins.
Performs one-time plugin discovery and instantiation, storing loaded plugin
instances globally. Emits a pluginload event after successful initialization
to notify other components.
"""
if _instances:
# After the first call, use cached instances for performance reasons.
# See https://github.com/beetbox/beets/pull/3810
return list(_instances.values())
if not _instances:
names = get_plugin_names()
log.info("Loading plugins: {}", ", ".join(sorted(names)))
_instances.extend(filter(None, map(_get_plugin, names)))
load_plugins()
plugins = []
for cls in _classes:
# Only instantiate each plugin class once.
if cls not in _instances:
_instances[cls] = cls()
plugins.append(_instances[cls])
return plugins
send("pluginload")
def find_plugins() -> Iterable[BeetsPlugin]:
return _instances
# Communication with plugins.
@ -383,7 +442,9 @@ def named_queries(model_cls: type[AnyModel]) -> dict[str, FieldQueryType]:
}
def notify_info_yielded(event: str) -> Callable[[IterF[P, Ret]], IterF[P, Ret]]:
def notify_info_yielded(
event: EventType,
) -> Callable[[IterF[P, Ret]], IterF[P, Ret]]:
"""Makes a generator send the event 'event' every time it yields.
This decorator is supposed to decorate a generator, but any function
returning an iterable should work.
@ -474,19 +535,7 @@ def album_field_getters() -> TFuncMap[Album]:
# Event dispatch.
def event_handlers() -> dict[str, list[Listener]]:
"""Find all event handlers from plugins as a dictionary mapping
event names to sequences of callables.
"""
all_handlers: dict[str, list[Listener]] = defaultdict(list)
for plugin in find_plugins():
if plugin.listeners:
for event, handlers in plugin.listeners.items():
all_handlers[event] += handlers
return all_handlers
def send(event: str, **arguments: Any) -> list[Any]:
def send(event: EventType, **arguments: Any) -> list[Any]:
"""Send an event to all assigned event listeners.
`event` is the name of the event to send, all other named arguments
@ -495,12 +544,11 @@ def send(event: str, **arguments: Any) -> list[Any]:
Return a list of non-None values returned from the handlers.
"""
log.debug("Sending event: {0}", event)
results: list[Any] = []
for handler in event_handlers()[event]:
result = handler(**arguments)
if result is not None:
results.append(result)
return results
return [
r
for handler in BeetsPlugin.listeners[event]
if (r := handler(**arguments)) is not None
]
def feat_tokens(for_artist: bool = True) -> str:

View file

@ -481,6 +481,11 @@ class PluginMixin(ConfigMixin):
super().teardown_beets()
self.unload_plugins()
def register_plugin(
self, plugin_class: type[beets.plugins.BeetsPlugin]
) -> None:
beets.plugins._instances.append(plugin_class())
def load_plugins(self, *plugins: str) -> None:
"""Load and initialize plugins by names.
@ -491,18 +496,15 @@ class PluginMixin(ConfigMixin):
plugins = (self.plugin,) if hasattr(self, "plugin") else plugins
self.config["plugins"] = plugins
cached_classproperty.cache.clear()
beets.plugins.load_plugins(plugins)
beets.plugins.send("pluginload")
beets.plugins.find_plugins()
beets.plugins.load_plugins()
def unload_plugins(self) -> None:
"""Unload all plugins and remove them from the configuration."""
# FIXME this should eventually be handled by a plugin manager
for plugin_class in beets.plugins._instances:
plugin_class.listeners = None
beets.plugins.BeetsPlugin.listeners.clear()
beets.plugins.BeetsPlugin._raw_listeners.clear()
self.config["plugins"] = []
beets.plugins._classes = set()
beets.plugins._instances = {}
beets.plugins._instances.clear()
@contextmanager
def configure_plugin(self, config: Any):

View file

@ -30,7 +30,7 @@ import textwrap
import traceback
import warnings
from difflib import SequenceMatcher
from typing import TYPE_CHECKING, Any, Callable
from typing import Any, Callable
import confuse
@ -40,9 +40,6 @@ from beets.dbcore import query as db_query
from beets.util import as_string
from beets.util.functemplate import template
if TYPE_CHECKING:
from types import ModuleType
# On Windows platforms, use colorama to support "ANSI" terminal colors.
if sys.platform == "win32":
try:
@ -1573,59 +1570,16 @@ optparse.Option.ALWAYS_TYPED_ACTIONS += ("callback",)
# The main entry point and bootstrapping.
def _load_plugins(
options: optparse.Values, config: confuse.LazyConfig
) -> ModuleType:
"""Load the plugins specified on the command line or in the configuration."""
paths = config["pluginpath"].as_str_seq(split=False)
paths = [util.normpath(p) for p in paths]
log.debug("plugin paths: {0}", util.displayable_path(paths))
# On Python 3, the search paths need to be unicode.
paths = [os.fsdecode(p) for p in paths]
# Extend the `beetsplug` package to include the plugin paths.
import beetsplug
beetsplug.__path__ = paths + list(beetsplug.__path__)
# For backwards compatibility, also support plugin paths that
# *contain* a `beetsplug` package.
sys.path += paths
# If we were given any plugins on the command line, use those.
if options.plugins is not None:
plugin_list = (
options.plugins.split(",") if len(options.plugins) > 0 else []
)
else:
plugin_list = config["plugins"].as_str_seq()
# TODO: Remove in v2.4 or v3
if "musicbrainz" in config and config["musicbrainz"].get().get(
"enabled"
):
plugin_list.append("musicbrainz")
# Exclude any plugins that were specified on the command line
if options.exclude is not None:
plugin_list = [
p for p in plugin_list if p not in options.exclude.split(",")
]
plugins.load_plugins(plugin_list)
return plugins
def _setup(options, lib=None):
def _setup(
options: optparse.Values, lib: library.Library | None
) -> tuple[list[Subcommand], library.Library]:
"""Prepare and global state and updates it with command line options.
Returns a list of subcommands, a list of plugins, and a library instance.
"""
config = _configure(options)
plugins = _load_plugins(options, config)
plugins.send("pluginload")
plugins.load_plugins()
# Get the default subcommands.
from beets.ui.commands import default_commands
@ -1637,7 +1591,7 @@ def _setup(options, lib=None):
lib = _open_library(config)
plugins.send("library_opened", lib=lib)
return subcommands, plugins, lib
return subcommands, lib
def _configure(options):
@ -1691,7 +1645,7 @@ def _ensure_db_directory_exists(path):
os.makedirs(newpath)
def _open_library(config):
def _open_library(config: confuse.LazyConfig) -> library.Library:
"""Create a new library instance from the configuration."""
dbpath = util.bytestring_path(config["library"].as_filename())
_ensure_db_directory_exists(dbpath)
@ -1718,7 +1672,7 @@ def _open_library(config):
return lib
def _raw_main(args, lib=None):
def _raw_main(args: list[str], lib=None) -> None:
"""A helper function for `main` without top-level exception
handling.
"""
@ -1744,16 +1698,31 @@ def _raw_main(args, lib=None):
parser.add_option(
"-c", "--config", dest="config", help="path to configuration file"
)
def parse_csl_callback(
option: optparse.Option, _, value: str, parser: SubcommandsOptionParser
):
"""Parse a comma-separated list of values."""
setattr(
parser.values,
option.dest, # type: ignore[arg-type]
list(filter(None, value.split(","))),
)
parser.add_option(
"-p",
"--plugins",
dest="plugins",
action="callback",
callback=parse_csl_callback,
help="a comma-separated list of plugins to load",
)
parser.add_option(
"-P",
"--disable-plugins",
dest="exclude",
dest="disabled_plugins",
action="callback",
callback=parse_csl_callback,
help="a comma-separated list of plugins to disable",
)
parser.add_option(
@ -1785,7 +1754,7 @@ def _raw_main(args, lib=None):
return config_edit()
test_lib = bool(lib)
subcommands, plugins, lib = _setup(options, lib)
subcommands, lib = _setup(options, lib)
parser.add_subcommand(*subcommands)
subcommand, suboptions, subargs = parser.parse_subcommand(subargs)

View file

@ -25,7 +25,7 @@ class LoadExtPlugin(BeetsPlugin):
super().__init__()
if not Database.supports_extensions:
self._log.warn(
self._log.warning(
"loadext is enabled but the current SQLite "
"installation does not support extensions"
)

View file

@ -1,5 +1,5 @@
comment:
layout: "condensed_header, condensed_files"
layout: "header, diff, files"
require_changes: true
# Sets non-blocking status checks
@ -13,6 +13,3 @@ coverage:
default:
informational: true
changes: false
github_checks:
annotations: false

View file

@ -92,10 +92,14 @@ For plugin developers:
Old imports are now deprecated and will be removed in version ``3.0.0``.
* ``beets.ui.decargs`` is deprecated and will be removed in version ``3.0.0``.
* Beets is now pep 561 compliant, which means that it provides type hints
* Beets is now PEP 561 compliant, which means that it provides type hints
for all public APIs. This allows IDEs to provide better autocompletion and
type checking for downstream users of the beets API.
* ``plugins.find_plugins`` function does not anymore load plugins. You need to
explicitly call ``plugins.load_plugins()`` to load them.
* ``plugins.load_plugins`` function does not anymore accept the list of plugins
to load. Instead, it loads all plugins that are configured by
:ref:`plugins-config` configuration.
Other changes:

View file

@ -3,10 +3,13 @@
cache_dir = /tmp/pytest_cache
# slightly more verbose output
console_output_style = count
# pretty-print test names in the Codecov U
junit_family = legacy
addopts =
# show all skipped/failed/xfailed tests in the summary except passed
-ra
--strict-config
--junitxml=.reports/pytest.xml
markers =
on_lyrics_update: mark a test to run only after lyrics source code is updated
integration_test: mark a test as an integration test

View file

@ -37,12 +37,14 @@ class HookTestCase(PluginTestCase):
class HookLogsTest(HookTestCase):
HOOK: plugins.EventType = "write"
@contextmanager
def _configure_logs(self, command: str) -> Iterator[list[str]]:
config = {"hooks": [self._get_hook("test_event", command)]}
config = {"hooks": [self._get_hook(self.HOOK, command)]}
with self.configure_plugin(config), capture_log("beets.hook") as logs:
plugins.send("test_event")
plugins.send(self.HOOK)
yield logs
def test_hook_empty_command(self):
@ -53,13 +55,13 @@ class HookLogsTest(HookTestCase):
@unittest.skipIf(sys.platform == "win32", "win32")
def test_hook_non_zero_exit(self):
with self._configure_logs('sh -c "exit 1"') as logs:
assert "hook: hook for test_event exited with status 1" in logs
assert f"hook: hook for {self.HOOK} exited with status 1" in logs
def test_hook_non_existent_command(self):
with self._configure_logs("non-existent-command") as logs:
logs = "\n".join(logs)
assert "hook: hook for test_event failed: " in logs
assert f"hook: hook for {self.HOOK} failed: " in logs
# The error message is different for each OS. Unfortunately the text is
# different in each case, where the only shared text is the string
# 'file' and substring 'Err'
@ -68,13 +70,11 @@ class HookLogsTest(HookTestCase):
class HookCommandTest(HookTestCase):
TEST_HOOK_COUNT = 2
events = [f"test_event_{i}" for i in range(TEST_HOOK_COUNT)]
EVENTS: list[plugins.EventType] = ["write", "after_write"]
def setUp(self):
super().setUp()
self.paths = [str(self.temp_dir_path / e) for e in self.events]
self.paths = [str(self.temp_dir_path / e) for e in self.EVENTS]
def _test_command(
self,
@ -93,13 +93,14 @@ class HookCommandTest(HookTestCase):
2. Assert that a file has been created under the original path, which proves
that the configured hook command has been executed.
"""
events_with_paths = list(zip(self.EVENTS, self.paths))
hooks = [
self._get_hook(e, f"touch {make_test_path(e, p)}")
for e, p in zip(self.events, self.paths)
for e, p in events_with_paths
]
with self.configure_plugin({"hooks": hooks}):
for event, path in zip(self.events, self.paths):
for event, path in events_with_paths:
if send_path_kwarg:
plugins.send(event, path=path)
else:

View file

@ -38,132 +38,59 @@ from beets.test.helper import (
AutotagStub,
ImportHelper,
PluginMixin,
PluginTestCase,
TerminalImportMixin,
)
from beets.test.helper import PluginTestCase as BasePluginTestCase
from beets.util import displayable_path, syspath
class PluginLoaderTestCase(BasePluginTestCase):
def setup_plugin_loader(self):
# FIXME the mocking code is horrific, but this is the lowest and
# earliest level of the plugin mechanism we can hook into.
self._plugin_loader_patch = patch("beets.plugins.load_plugins")
self._plugin_classes = set()
load_plugins = self._plugin_loader_patch.start()
class TestPluginRegistration(PluginTestCase):
class RatingPlugin(plugins.BeetsPlugin):
item_types = {"rating": types.Float()}
def myload(names=()):
plugins._classes.update(self._plugin_classes)
def __init__(self):
super().__init__()
self.register_listener("write", self.on_write)
load_plugins.side_effect = myload
def teardown_plugin_loader(self):
self._plugin_loader_patch.stop()
def register_plugin(self, plugin_class):
self._plugin_classes.add(plugin_class)
@staticmethod
def on_write(item=None, path=None, tags=None):
if tags["artist"] == "XXX":
tags["artist"] = "YYY"
def setUp(self):
self.setup_plugin_loader()
super().setUp()
def tearDown(self):
self.teardown_plugin_loader()
super().tearDown()
self.register_plugin(self.RatingPlugin)
def test_field_type_registered(self):
assert isinstance(Item._types.get("rating"), types.Float)
def test_duplicate_type(self):
class DuplicateTypePlugin(plugins.BeetsPlugin):
item_types = {"rating": types.INTEGER}
self.register_plugin(DuplicateTypePlugin)
with pytest.raises(
plugins.PluginConflictError, match="already been defined"
):
Item._types
def test_listener_registered(self):
self.RatingPlugin()
item = self.add_item_fixture(artist="XXX")
item.write()
assert MediaFile(syspath(item.path)).artist == "YYY"
class PluginImportTestCase(ImportHelper, PluginLoaderTestCase):
class PluginImportTestCase(ImportHelper, PluginTestCase):
def setUp(self):
super().setUp()
self.prepare_album_for_import(2)
class ItemTypesTest(PluginLoaderTestCase):
def test_flex_field_type(self):
class RatingPlugin(plugins.BeetsPlugin):
item_types = {"rating": types.Float()}
self.register_plugin(RatingPlugin)
self.config["plugins"] = "rating"
item = Item(path="apath", artist="aaa")
item.add(self.lib)
# Do not match unset values
out = self.run_with_output("ls", "rating:1..3")
assert "aaa" not in out
self.run_command("modify", "rating=2", "--yes")
# Match in range
out = self.run_with_output("ls", "rating:1..3")
assert "aaa" in out
# Don't match out of range
out = self.run_with_output("ls", "rating:3..5")
assert "aaa" not in out
class ItemWriteTest(PluginLoaderTestCase):
def setUp(self):
super().setUp()
class EventListenerPlugin(plugins.BeetsPlugin):
pass
self.event_listener_plugin = EventListenerPlugin()
self.register_plugin(EventListenerPlugin)
def test_change_tags(self):
def on_write(item=None, path=None, tags=None):
if tags["artist"] == "XXX":
tags["artist"] = "YYY"
self.register_listener("write", on_write)
item = self.add_item_fixture(artist="XXX")
item.write()
mediafile = MediaFile(syspath(item.path))
assert mediafile.artist == "YYY"
def register_listener(self, event, func):
self.event_listener_plugin.register_listener(event, func)
class ItemTypeConflictTest(PluginLoaderTestCase):
def test_mismatch(self):
class EventListenerPlugin(plugins.BeetsPlugin):
item_types = {"duplicate": types.INTEGER}
class AdventListenerPlugin(plugins.BeetsPlugin):
item_types = {"duplicate": types.FLOAT}
self.event_listener_plugin = EventListenerPlugin
self.advent_listener_plugin = AdventListenerPlugin
self.register_plugin(EventListenerPlugin)
self.register_plugin(AdventListenerPlugin)
with pytest.raises(plugins.PluginConflictError):
plugins.types(Item)
def test_match(self):
class EventListenerPlugin(plugins.BeetsPlugin):
item_types = {"duplicate": types.INTEGER}
class AdventListenerPlugin(plugins.BeetsPlugin):
item_types = {"duplicate": types.INTEGER}
self.event_listener_plugin = EventListenerPlugin
self.advent_listener_plugin = AdventListenerPlugin
self.register_plugin(EventListenerPlugin)
self.register_plugin(AdventListenerPlugin)
assert plugins.types(Item) is not None
class EventsTest(PluginImportTestCase):
def setUp(self):
super().setUp()
def test_import_task_created(self):
self.importer = self.setup_importer(pretend=True)
@ -223,7 +150,7 @@ class EventsTest(PluginImportTestCase):
]
class ListenersTest(PluginLoaderTestCase):
class ListenersTest(PluginTestCase):
def test_register(self):
class DummyPlugin(plugins.BeetsPlugin):
def __init__(self):
@ -243,15 +170,7 @@ class ListenersTest(PluginLoaderTestCase):
d.register_listener("cli_exit", d2.dummy)
assert DummyPlugin._raw_listeners["cli_exit"] == [d.dummy, d2.dummy]
@patch("beets.plugins.find_plugins")
@patch("inspect.getfullargspec")
def test_events_called(self, mock_gfa, mock_find_plugins):
mock_gfa.return_value = Mock(
args=(),
varargs="args",
varkw="kwargs",
)
def test_events_called(self):
class DummyPlugin(plugins.BeetsPlugin):
def __init__(self):
super().__init__()
@ -261,7 +180,6 @@ class ListenersTest(PluginLoaderTestCase):
self.register_listener("event_bar", self.bar)
d = DummyPlugin()
mock_find_plugins.return_value = (d,)
plugins.send("event")
d.foo.assert_has_calls([])
@ -271,8 +189,7 @@ class ListenersTest(PluginLoaderTestCase):
d.foo.assert_called_once_with(var="tagada")
d.bar.assert_has_calls([])
@patch("beets.plugins.find_plugins")
def test_listener_params(self, mock_find_plugins):
def test_listener_params(self):
class DummyPlugin(plugins.BeetsPlugin):
def __init__(self):
super().__init__()
@ -316,8 +233,7 @@ class ListenersTest(PluginLoaderTestCase):
def dummy9(self, **kwargs):
assert kwargs == {"foo": 5}
d = DummyPlugin()
mock_find_plugins.return_value = (d,)
DummyPlugin()
plugins.send("event1", foo=5)
plugins.send("event2", foo=5)
@ -553,10 +469,22 @@ def get_available_plugins():
]
class TestImportAllPlugins(PluginMixin):
def unimport_plugins(self):
class TestImportPlugin(PluginMixin):
@pytest.fixture(params=get_available_plugins())
def plugin_name(self, request):
"""Fixture to provide the name of each available plugin."""
name = request.param
# skip gstreamer plugins on windows
gstreamer_plugins = {"bpd", "replaygain"}
if sys.platform == "win32" and name in gstreamer_plugins:
pytest.skip(f"GStreamer is not available on Windows: {name}")
return name
def unload_plugins(self):
"""Unimport plugins before each test to avoid conflicts."""
self.unload_plugins()
super().unload_plugins()
for mod in list(sys.modules):
if mod.startswith("beetsplug."):
del sys.modules[mod]
@ -564,32 +492,22 @@ class TestImportAllPlugins(PluginMixin):
@pytest.fixture(autouse=True)
def cleanup(self):
"""Ensure plugins are unimported before and after each test."""
self.unimport_plugins()
self.unload_plugins()
yield
self.unimport_plugins()
self.unload_plugins()
@pytest.mark.skipif(
os.environ.get("GITHUB_ACTIONS") != "true",
reason="Requires all dependencies to be installed, "
+ "which we can't guarantee in the local environment.",
reason=(
"Requires all dependencies to be installed, which we can't"
" guarantee in the local environment."
),
)
@pytest.mark.parametrize("plugin_name", get_available_plugins())
def test_import_plugin(self, caplog, plugin_name): #
"""Test that a plugin is importable without an error using the
load_plugins function."""
# skip gstreamer plugins on windows
gstreamer_plugins = ["bpd", "replaygain"]
if sys.platform == "win32" and plugin_name in gstreamer_plugins:
pytest.xfail("GStreamer is not available on Windows: {plugin_name}")
def test_import_plugin(self, caplog, plugin_name):
"""Test that a plugin is importable without an error."""
caplog.set_level(logging.WARNING)
caplog.clear()
plugins.load_plugins([plugin_name])
self.load_plugins(plugin_name)
# Check for warnings, is a bit hacky but we can make full use of the beets
# load_plugins code that way
assert len(caplog.records) == 0, (
f"Plugin '{plugin_name}' has issues during import. ",
caplog.records,
assert "PluginImportError" not in caplog.text, (
f"Plugin '{plugin_name}' has issues during import."
)