Fix a pair of regressions, types and enforce mypy in CI (#5870)

**Bug Fixes**:
   - Wrong `raw_seconds_short` module in `DurationQuery`.
   - Missing `byte_paths` variable in `import_func`.

Otherwise

- Addressed all static type checking errors in the codebase (except
`spotify` plugin which has already been fixed in a separate PR)
- Removed `continue-on-error` from in `mypy` CI job, which means any
errors in type checking will now fail the CI job.
This commit is contained in:
Šarūnas Nejus 2025-07-16 14:36:13 +01:00 committed by GitHub
commit 852cbd2650
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 111 additions and 147 deletions

View file

@ -33,7 +33,7 @@ jobs:
if: matrix.platform == 'ubuntu-latest' if: matrix.platform == 'ubuntu-latest'
run: | run: |
sudo apt update sudo apt update
sudo apt install ffmpeg gobject-introspection libcairo2-dev libgirepository-2.0-dev pandoc imagemagick sudo apt install --yes --no-install-recommends ffmpeg gobject-introspection gstreamer1.0-plugins-base python3-gst-1.0 libcairo2-dev libgirepository-2.0-dev pandoc imagemagick
- name: Get changed lyrics files - name: Get changed lyrics files
id: lyrics-update id: lyrics-update

View file

@ -105,7 +105,6 @@ jobs:
- name: Type check code - name: Type check code
uses: liskin/gh-problem-matcher-wrap@v3 uses: liskin/gh-problem-matcher-wrap@v3
continue-on-error: true
with: with:
linters: mypy linters: mypy
run: poe check-types --show-column-numbers --no-error-summary ${{ needs.changed-files.outputs.changed_python_files }} run: poe check-types --show-column-numbers --no-error-summary ${{ needs.changed-files.outputs.changed_python_files }}

View file

@ -28,6 +28,7 @@ from re import Pattern
from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union
from beets import util from beets import util
from beets.util.units import raw_seconds_short
if TYPE_CHECKING: if TYPE_CHECKING:
from beets.dbcore.db import AnyModel, Model from beets.dbcore.db import AnyModel, Model
@ -892,7 +893,7 @@ class DurationQuery(NumericQuery):
if not s: if not s:
return None return None
try: try:
return util.raw_seconds_short(s) return raw_seconds_short(s)
except ValueError: except ValueError:
try: try:
return float(s) return float(s)

View file

@ -292,7 +292,7 @@ class DelimitedString(BaseString[list[str], list[str]]):
containing delimiter-separated values. containing delimiter-separated values.
""" """
model_type = list model_type = list[str]
def __init__(self, delimiter: str): def __init__(self, delimiter: str):
self.delimiter = delimiter self.delimiter = delimiter

View file

@ -70,6 +70,7 @@ def query_tasks(session: ImportSession):
Instead of finding files from the filesystem, a query is used to Instead of finding files from the filesystem, a query is used to
match items from the library. match items from the library.
""" """
task: ImportTask
if session.config["singletons"]: if session.config["singletons"]:
# Search for items. # Search for items.
for item in session.lib.items(session.query): for item in session.lib.items(session.query):
@ -143,9 +144,7 @@ def lookup_candidates(session: ImportSession, task: ImportTask):
# Restrict the initial lookup to IDs specified by the user via the -m # Restrict the initial lookup to IDs specified by the user via the -m
# option. Currently all the IDs are passed onto the tasks directly. # option. Currently all the IDs are passed onto the tasks directly.
task.search_ids = session.config["search_ids"].as_str_seq() task.lookup_candidates(session.config["search_ids"].as_str_seq())
task.lookup_candidates()
@pipeline.stage @pipeline.stage

View file

@ -22,7 +22,7 @@ import time
from collections import defaultdict from collections import defaultdict
from enum import Enum from enum import Enum
from tempfile import mkdtemp from tempfile import mkdtemp
from typing import TYPE_CHECKING, Callable, Iterable, Sequence from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
import mediafile import mediafile
@ -32,6 +32,8 @@ from beets.dbcore.query import PathQuery
from .state import ImportState from .state import ImportState
if TYPE_CHECKING: if TYPE_CHECKING:
from beets.autotag.match import Recommendation
from .session import ImportSession from .session import ImportSession
# Global logger. # Global logger.
@ -159,6 +161,7 @@ class ImportTask(BaseImportTask):
cur_album: str | None = None cur_album: str | None = None
cur_artist: str | None = None cur_artist: str | None = None
candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = [] candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = []
rec: Recommendation | None = None
def __init__( def __init__(
self, self,
@ -167,11 +170,9 @@ class ImportTask(BaseImportTask):
items: Iterable[library.Item] | None, items: Iterable[library.Item] | None,
): ):
super().__init__(toppath, paths, items) super().__init__(toppath, paths, items)
self.rec = None
self.should_remove_duplicates = False self.should_remove_duplicates = False
self.should_merge_duplicates = False self.should_merge_duplicates = False
self.is_album = True self.is_album = True
self.search_ids = [] # user-supplied candidate IDs.
def set_choice( def set_choice(
self, choice: Action | autotag.AlbumMatch | autotag.TrackMatch self, choice: Action | autotag.AlbumMatch | autotag.TrackMatch
@ -356,20 +357,17 @@ class ImportTask(BaseImportTask):
tasks = [t for inner in tasks for t in inner] tasks = [t for inner in tasks for t in inner]
return tasks return tasks
def lookup_candidates(self): def lookup_candidates(self, search_ids: list[str]) -> None:
"""Retrieve and store candidates for this album. User-specified """Retrieve and store candidates for this album.
candidate IDs are stored in self.search_ids: if present, the
initial lookup is restricted to only those IDs.
"""
artist, album, prop = autotag.tag_album(
self.items, search_ids=self.search_ids
)
self.cur_artist = artist
self.cur_album = album
self.candidates = prop.candidates
self.rec = prop.recommendation
def find_duplicates(self, lib: library.Library): If User-specified ``search_ids`` list is not empty, the lookup is
restricted to only those IDs.
"""
self.cur_artist, self.cur_album, (self.candidates, self.rec) = (
autotag.tag_album(self.items, search_ids=search_ids)
)
def find_duplicates(self, lib: library.Library) -> list[library.Album]:
"""Return a list of albums from `lib` with the same artist and """Return a list of albums from `lib` with the same artist and
album name as the task. album name as the task.
""" """
@ -695,12 +693,12 @@ class SingletonImportTask(ImportTask):
for item in self.imported_items(): for item in self.imported_items():
plugins.send("item_imported", lib=lib, item=item) plugins.send("item_imported", lib=lib, item=item)
def lookup_candidates(self): def lookup_candidates(self, search_ids: list[str]) -> None:
prop = autotag.tag_item(self.item, search_ids=self.search_ids) self.candidates, self.rec = autotag.tag_item(
self.candidates = prop.candidates self.item, search_ids=search_ids
self.rec = prop.recommendation )
def find_duplicates(self, lib): def find_duplicates(self, lib: library.Library) -> list[library.Item]: # type: ignore[override] # Need splitting Singleton and Album tasks into separate classes
"""Return a list of items from `lib` that have the same artist """Return a list of items from `lib` that have the same artist
and title as the task. and title as the task.
""" """
@ -802,6 +800,11 @@ class SentinelImportTask(ImportTask):
pass pass
ArchiveHandler = tuple[
Callable[[util.StrPath], bool], Callable[[util.StrPath], Any]
]
class ArchiveImportTask(SentinelImportTask): class ArchiveImportTask(SentinelImportTask):
"""An import task that represents the processing of an archive. """An import task that represents the processing of an archive.
@ -827,13 +830,13 @@ class ArchiveImportTask(SentinelImportTask):
if not os.path.isfile(path): if not os.path.isfile(path):
return False return False
for path_test, _ in cls.handlers(): for path_test, _ in cls.handlers:
if path_test(os.fsdecode(path)): if path_test(os.fsdecode(path)):
return True return True
return False return False
@classmethod @util.cached_classproperty
def handlers(cls): def handlers(cls) -> list[ArchiveHandler]:
"""Returns a list of archive handlers. """Returns a list of archive handlers.
Each handler is a `(path_test, ArchiveClass)` tuple. `path_test` Each handler is a `(path_test, ArchiveClass)` tuple. `path_test`
@ -841,28 +844,27 @@ class ArchiveImportTask(SentinelImportTask):
handled by `ArchiveClass`. `ArchiveClass` is a class that handled by `ArchiveClass`. `ArchiveClass` is a class that
implements the same interface as `tarfile.TarFile`. implements the same interface as `tarfile.TarFile`.
""" """
if not hasattr(cls, "_handlers"): _handlers: list[ArchiveHandler] = []
cls._handlers: list[tuple[Callable, ...]] = [] from zipfile import ZipFile, is_zipfile
from zipfile import ZipFile, is_zipfile
cls._handlers.append((is_zipfile, ZipFile)) _handlers.append((is_zipfile, ZipFile))
import tarfile import tarfile
cls._handlers.append((tarfile.is_tarfile, tarfile.open)) _handlers.append((tarfile.is_tarfile, tarfile.open))
try: try:
from rarfile import RarFile, is_rarfile from rarfile import RarFile, is_rarfile
except ImportError: except ImportError:
pass pass
else: else:
cls._handlers.append((is_rarfile, RarFile)) _handlers.append((is_rarfile, RarFile))
try: try:
from py7zr import SevenZipFile, is_7zfile from py7zr import SevenZipFile, is_7zfile
except ImportError: except ImportError:
pass pass
else: else:
cls._handlers.append((is_7zfile, SevenZipFile)) _handlers.append((is_7zfile, SevenZipFile))
return cls._handlers return _handlers
def cleanup(self, copy=False, delete=False, move=False): def cleanup(self, copy=False, delete=False, move=False):
"""Removes the temporary directory the archive was extracted to.""" """Removes the temporary directory the archive was extracted to."""
@ -879,7 +881,7 @@ class ArchiveImportTask(SentinelImportTask):
""" """
assert self.toppath is not None, "toppath must be set" assert self.toppath is not None, "toppath must be set"
for path_test, handler_class in self.handlers(): for path_test, handler_class in self.handlers:
if path_test(os.fsdecode(self.toppath)): if path_test(os.fsdecode(self.toppath)):
break break
else: else:
@ -925,7 +927,7 @@ class ImportTaskFactory:
self.imported = 0 # "Real" tasks created. self.imported = 0 # "Real" tasks created.
self.is_archive = ArchiveImportTask.is_archive(util.syspath(toppath)) self.is_archive = ArchiveImportTask.is_archive(util.syspath(toppath))
def tasks(self): def tasks(self) -> Iterable[ImportTask]:
"""Yield all import tasks for music found in the user-specified """Yield all import tasks for music found in the user-specified
path `self.toppath`. Any necessary sentinel tasks are also path `self.toppath`. Any necessary sentinel tasks are also
produced. produced.
@ -1114,7 +1116,10 @@ def albums_in_dir(path: util.PathBytes):
a list of Items that is probably an album. Specifically, any folder a list of Items that is probably an album. Specifically, any folder
containing any media files is an album. containing any media files is an album.
""" """
collapse_pat = collapse_paths = collapse_items = None collapse_paths: list[util.PathBytes] = []
collapse_items: list[util.PathBytes] = []
collapse_pat = None
ignore: list[str] = config["ignore"].as_str_seq() ignore: list[str] = config["ignore"].as_str_seq()
ignore_hidden: bool = config["ignore_hidden"].get(bool) ignore_hidden: bool = config["ignore_hidden"].get(bool)
@ -1139,7 +1144,7 @@ def albums_in_dir(path: util.PathBytes):
# proceed to process the current one. # proceed to process the current one.
if collapse_items: if collapse_items:
yield collapse_paths, collapse_items yield collapse_paths, collapse_items
collapse_pat = collapse_paths = collapse_items = None collapse_pat, collapse_paths, collapse_items = None, [], []
# Check whether this directory looks like the *first* directory # Check whether this directory looks like the *first* directory
# in a multi-disc sequence. There are two indicators: the file # in a multi-disc sequence. There are two indicators: the file

View file

@ -1343,7 +1343,7 @@ def import_func(lib, opts, args: list[str]):
if opts.library: if opts.library:
query = args query = args
paths = [] byte_paths = []
else: else:
query = None query = None
paths = args paths = args

View file

@ -63,6 +63,7 @@ MAX_FILENAME_LENGTH = 200
WINDOWS_MAGIC_PREFIX = "\\\\?\\" WINDOWS_MAGIC_PREFIX = "\\\\?\\"
T = TypeVar("T") T = TypeVar("T")
PathLike = Union[str, bytes, Path] PathLike = Union[str, bytes, Path]
StrPath = Union[str, Path]
Replacements = Sequence[tuple[Pattern[str], str]] Replacements = Sequence[tuple[Pattern[str], str]]
# Here for now to allow for a easy replace later on # Here for now to allow for a easy replace later on

View file

@ -48,6 +48,8 @@ POISON = "__PIPELINE_POISON__"
DEFAULT_QUEUE_SIZE = 16 DEFAULT_QUEUE_SIZE = 16
Tq = TypeVar("Tq")
def _invalidate_queue(q, val=None, sync=True): def _invalidate_queue(q, val=None, sync=True):
"""Breaks a Queue such that it never blocks, always has size 1, """Breaks a Queue such that it never blocks, always has size 1,
@ -91,7 +93,7 @@ def _invalidate_queue(q, val=None, sync=True):
q.mutex.release() q.mutex.release()
class CountedQueue(queue.Queue): class CountedQueue(queue.Queue[Tq]):
"""A queue that keeps track of the number of threads that are """A queue that keeps track of the number of threads that are
still feeding into it. The queue is poisoned when all threads are still feeding into it. The queue is poisoned when all threads are
finished with the queue. finished with the queue.

View file

@ -15,10 +15,10 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Iterable
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import librosa import librosa
import numpy as np
from beets.plugins import BeetsPlugin from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, should_write from beets.ui import Subcommand, should_write
@ -76,7 +76,10 @@ class AutoBPMPlugin(BeetsPlugin):
self._log.error("Failed to measure BPM for {}: {}", path, exc) self._log.error("Failed to measure BPM for {}: {}", path, exc)
continue continue
bpm = round(tempo[0] if isinstance(tempo, Iterable) else tempo) bpm = round(
float(tempo[0] if isinstance(tempo, np.ndarray) else tempo)
)
item["bpm"] = bpm item["bpm"] = bpm
self._log.info("Computed BPM for {}: {}", path, bpm) self._log.info("Computed BPM for {}: {}", path, bpm)

View file

@ -30,7 +30,7 @@ from typing import TYPE_CHECKING
import beets import beets
import beets.ui import beets.ui
from beets import dbcore, vfs from beets import dbcore, logging, vfs
from beets.library import Item from beets.library import Item
from beets.plugins import BeetsPlugin from beets.plugins import BeetsPlugin
from beets.util import as_string, bluelet from beets.util import as_string, bluelet
@ -38,6 +38,17 @@ from beets.util import as_string, bluelet
if TYPE_CHECKING: if TYPE_CHECKING:
from beets.dbcore.query import Query from beets.dbcore.query import Query
log = logging.getLogger(__name__)
try:
from . import gstplayer
except ImportError as e:
raise ImportError(
"Gstreamer Python bindings not found."
' Install "gstreamer1.0" and "python-gi" or similar package to use BPD.'
) from e
PROTOCOL_VERSION = "0.16.0" PROTOCOL_VERSION = "0.16.0"
BUFSIZE = 1024 BUFSIZE = 1024
@ -94,11 +105,6 @@ SUBSYSTEMS = [
] ]
# Gstreamer import error.
class NoGstreamerError(Exception):
pass
# Error-handling, exceptions, parameter parsing. # Error-handling, exceptions, parameter parsing.
@ -1099,14 +1105,6 @@ class Server(BaseServer):
""" """
def __init__(self, library, host, port, password, ctrl_port, log): def __init__(self, library, host, port, password, ctrl_port, log):
try:
from beetsplug.bpd import gstplayer
except ImportError as e:
# This is a little hacky, but it's the best I know for now.
if e.args[0].endswith(" gst"):
raise NoGstreamerError()
else:
raise
log.info("Starting server...") log.info("Starting server...")
super().__init__(host, port, password, ctrl_port, log) super().__init__(host, port, password, ctrl_port, log)
self.lib = library self.lib = library
@ -1616,16 +1614,9 @@ class BPDPlugin(BeetsPlugin):
def start_bpd(self, lib, host, port, password, volume, ctrl_port): def start_bpd(self, lib, host, port, password, volume, ctrl_port):
"""Starts a BPD server.""" """Starts a BPD server."""
try: server = Server(lib, host, port, password, ctrl_port, self._log)
server = Server(lib, host, port, password, ctrl_port, self._log) server.cmd_setvol(None, volume)
server.cmd_setvol(None, volume) server.run()
server.run()
except NoGstreamerError:
self._log.error("Gstreamer Python bindings not found.")
self._log.error(
'Install "gstreamer1.0" and "python-gi"'
"or similar package to use BPD."
)
def commands(self): def commands(self):
cmd = beets.ui.Subcommand( cmd = beets.ui.Subcommand(

View file

@ -401,7 +401,7 @@ class LastGenrePlugin(plugins.BeetsPlugin):
label = "album" label = "album"
if not new_genres and "artist" in self.sources: if not new_genres and "artist" in self.sources:
new_genres = None new_genres = []
if isinstance(obj, library.Item): if isinstance(obj, library.Item):
new_genres = self.fetch_artist_genre(obj) new_genres = self.fetch_artist_genre(obj)
label = "artist" label = "artist"

View file

@ -1161,7 +1161,9 @@ class ExceptionWatcher(Thread):
Once an exception occurs, raise it and execute a callback. Once an exception occurs, raise it and execute a callback.
""" """
def __init__(self, queue: queue.Queue, callback: Callable[[], None]): def __init__(
self, queue: queue.Queue[Exception], callback: Callable[[], None]
):
self._queue = queue self._queue = queue
self._callback = callback self._callback = callback
self._stopevent = Event() self._stopevent = Event()
@ -1197,7 +1199,9 @@ BACKENDS: dict[str, type[Backend]] = {b.NAME: b for b in BACKEND_CLASSES}
class ReplayGainPlugin(BeetsPlugin): class ReplayGainPlugin(BeetsPlugin):
"""Provides ReplayGain analysis.""" """Provides ReplayGain analysis."""
def __init__(self): pool: ThreadPool | None = None
def __init__(self) -> None:
super().__init__() super().__init__()
# default backend is 'command' for backward-compatibility. # default backend is 'command' for backward-compatibility.
@ -1261,9 +1265,6 @@ class ReplayGainPlugin(BeetsPlugin):
except (ReplayGainError, FatalReplayGainError) as e: except (ReplayGainError, FatalReplayGainError) as e:
raise ui.UserError(f"replaygain initialization failed: {e}") raise ui.UserError(f"replaygain initialization failed: {e}")
# Start threadpool lazily.
self.pool = None
def should_use_r128(self, item: Item) -> bool: def should_use_r128(self, item: Item) -> bool:
"""Checks the plugin setting to decide whether the calculation """Checks the plugin setting to decide whether the calculation
should be done using the EBU R128 standard and use R128_ tags instead. should be done using the EBU R128 standard and use R128_ tags instead.
@ -1420,7 +1421,7 @@ class ReplayGainPlugin(BeetsPlugin):
"""Open a `ThreadPool` instance in `self.pool`""" """Open a `ThreadPool` instance in `self.pool`"""
if self.pool is None and self.backend_instance.do_parallel: if self.pool is None and self.backend_instance.do_parallel:
self.pool = ThreadPool(threads) self.pool = ThreadPool(threads)
self.exc_queue: queue.Queue = queue.Queue() self.exc_queue: queue.Queue[Exception] = queue.Queue()
signal.signal(signal.SIGINT, self._interrupt) signal.signal(signal.SIGINT, self._interrupt)

View file

@ -162,7 +162,7 @@ class SpotifyPlugin(
"""Get the path to the JSON file for storing the OAuth token.""" """Get the path to the JSON file for storing the OAuth token."""
return self.config["tokenfile"].get(confuse.Filename(in_app_dir=True)) return self.config["tokenfile"].get(confuse.Filename(in_app_dir=True))
def _authenticate(self): def _authenticate(self) -> None:
"""Request an access token via the Client Credentials Flow: """Request an access token via the Client Credentials Flow:
https://developer.spotify.com/documentation/general/guides/authorization-guide/#client-credentials-flow https://developer.spotify.com/documentation/general/guides/authorization-guide/#client-credentials-flow
""" """

View file

@ -1,5 +1,6 @@
# Don't post a comment on pull requests. comment:
comment: off layout: "condensed_header, condensed_files"
require_changes: true
# Sets non-blocking status checks # Sets non-blocking status checks
# https://docs.codecov.com/docs/commit-status#informational # https://docs.codecov.com/docs/commit-status#informational
@ -11,7 +12,7 @@ coverage:
patch: patch:
default: default:
informational: true informational: true
changes: no changes: false
github_checks: github_checks:
annotations: false annotations: false

View file

@ -124,7 +124,7 @@ aura = ["flask", "flask-cors", "Pillow"]
autobpm = ["librosa", "resampy"] autobpm = ["librosa", "resampy"]
# badfiles # mp3val and flac # badfiles # mp3val and flac
beatport = ["requests-oauthlib"] beatport = ["requests-oauthlib"]
bpd = ["PyGObject"] # python-gi and GStreamer 1.0+ bpd = ["PyGObject"] # gobject-introspection, gstreamer1.0-plugins-base, python3-gst-1.0
chroma = ["pyacoustid"] # chromaprint or fpcalc chroma = ["pyacoustid"] # chromaprint or fpcalc
# convert # ffmpeg # convert # ffmpeg
docs = ["pydata-sphinx-theme", "sphinx"] docs = ["pydata-sphinx-theme", "sphinx"]

View file

@ -15,7 +15,7 @@ markers =
data_file = .reports/coverage/data data_file = .reports/coverage/data
branch = true branch = true
relative_files = true relative_files = true
omit = omit =
beets/test/* beets/test/*
beetsplug/_typing.py beetsplug/_typing.py
@ -34,7 +34,6 @@ exclude_also =
show_contexts = true show_contexts = true
[mypy] [mypy]
files = beets,beetsplug,test,extra,docs
allow_any_generics = false allow_any_generics = false
# FIXME: Would be better to actually type the libraries (if under our control), # FIXME: Would be better to actually type the libraries (if under our control),
# or write our own stubs. For now, silence errors # or write our own stubs. For now, silence errors
@ -46,11 +45,8 @@ explicit_package_bases = true
# config for all files. # config for all files.
[[mypy-beets.plugins]] [[mypy-beets.plugins]]
disallow_untyped_decorators = true disallow_untyped_decorators = true
disallow_any_generics = true
check_untyped_defs = true check_untyped_defs = true
allow_redefinition = true
[[mypy-beets.metadata_plugins]] [[mypy-beets.metadata_plugins]]
disallow_untyped_decorators = true disallow_untyped_decorators = true
disallow_any_generics = true
check_untyped_defs = true check_untyped_defs = true

View file

@ -14,19 +14,15 @@
"""Tests for BPD's implementation of the MPD protocol.""" """Tests for BPD's implementation of the MPD protocol."""
import importlib.util
import multiprocessing as mp import multiprocessing as mp
import os import os
import socket import socket
import sys
import tempfile import tempfile
import threading import threading
import time import time
import unittest import unittest
from contextlib import contextmanager from contextlib import contextmanager
from unittest.mock import MagicMock, patch
# Mock GstPlayer so that the forked process doesn't attempt to import gi:
from unittest import mock
import confuse import confuse
import pytest import pytest
@ -34,43 +30,8 @@ import yaml
from beets.test.helper import PluginTestCase from beets.test.helper import PluginTestCase
from beets.util import bluelet from beets.util import bluelet
from beetsplug import bpd
gstplayer = importlib.util.module_from_spec( bpd = pytest.importorskip("beetsplug.bpd")
importlib.util.find_spec("beetsplug.bpd.gstplayer")
)
def _gstplayer_play(*_):
bpd.gstplayer._GstPlayer.playing = True
return mock.DEFAULT
gstplayer._GstPlayer = mock.MagicMock(
spec_set=[
"time",
"volume",
"playing",
"run",
"play_file",
"pause",
"stop",
"seek",
"play",
"get_decoders",
],
**{
"playing": False,
"volume": 0,
"time.return_value": (0, 0),
"play_file.side_effect": _gstplayer_play,
"play.side_effect": _gstplayer_play,
"get_decoders.return_value": {"default": ({"audio/mpeg"}, {"mp3"})},
},
)
gstplayer.GstPlayer = lambda _: gstplayer._GstPlayer
sys.modules["beetsplug.bpd.gstplayer"] = gstplayer
bpd.gstplayer = gstplayer
class CommandParseTest(unittest.TestCase): class CommandParseTest(unittest.TestCase):
@ -256,7 +217,7 @@ def implements(commands, fail=False):
bluelet_listener = bluelet.Listener bluelet_listener = bluelet.Listener
@mock.patch("beets.util.bluelet.Listener") @patch("beets.util.bluelet.Listener")
def start_server(args, assigned_port, listener_patch): def start_server(args, assigned_port, listener_patch):
"""Start the bpd server, writing the port to `assigned_port`.""" """Start the bpd server, writing the port to `assigned_port`."""
@ -938,7 +899,7 @@ class BPDPlaylistsTest(BPDTestHelper):
response = client.send_command("load", "anything") response = client.send_command("load", "anything")
self._assert_failed(response, bpd.ERROR_NO_EXIST) self._assert_failed(response, bpd.ERROR_NO_EXIST)
@unittest.skip @unittest.expectedFailure
def test_cmd_playlistadd(self): def test_cmd_playlistadd(self):
with self.run_bpd() as client: with self.run_bpd() as client:
self._bpd_add(client, self.item1, playlist="anything") self._bpd_add(client, self.item1, playlist="anything")
@ -1128,7 +1089,7 @@ class BPDConnectionTest(BPDTestHelper):
self._assert_ok(response) self._assert_ok(response)
assert self.TAGTYPES == set(response.data["tagtype"]) assert self.TAGTYPES == set(response.data["tagtype"])
@unittest.skip @unittest.expectedFailure
def test_tagtypes_mask(self): def test_tagtypes_mask(self):
with self.run_bpd() as client: with self.run_bpd() as client:
response = client.send_command("tagtypes", "clear") response = client.send_command("tagtypes", "clear")
@ -1169,6 +1130,10 @@ class BPDReflectionTest(BPDTestHelper):
fail=True, fail=True,
) )
@patch(
"beetsplug.bpd.gstplayer.GstPlayer.get_decoders",
MagicMock(return_value={"default": ({"audio/mpeg"}, {"mp3"})}),
)
def test_cmd_decoders(self): def test_cmd_decoders(self):
with self.run_bpd() as client: with self.run_bpd() as client:
response = client.send_command("decoders") response = client.send_command("decoders")

View file

@ -1627,9 +1627,9 @@ class ImportIdTest(ImportTestCase):
task = importer.ImportTask( task = importer.ImportTask(
paths=self.import_dir, toppath="top path", items=[_common.item()] paths=self.import_dir, toppath="top path", items=[_common.item()]
) )
task.search_ids = [self.ID_RELEASE_0, self.ID_RELEASE_1]
task.lookup_candidates() task.lookup_candidates([self.ID_RELEASE_0, self.ID_RELEASE_1])
assert {"VALID_RELEASE_0", "VALID_RELEASE_1"} == { assert {"VALID_RELEASE_0", "VALID_RELEASE_1"} == {
c.info.album for c in task.candidates c.info.album for c in task.candidates
} }
@ -1639,9 +1639,9 @@ class ImportIdTest(ImportTestCase):
task = importer.SingletonImportTask( task = importer.SingletonImportTask(
toppath="top path", item=_common.item() toppath="top path", item=_common.item()
) )
task.search_ids = [self.ID_RECORDING_0, self.ID_RECORDING_1]
task.lookup_candidates() task.lookup_candidates([self.ID_RECORDING_0, self.ID_RECORDING_1])
assert {"VALID_RECORDING_0", "VALID_RECORDING_1"} == { assert {"VALID_RECORDING_0", "VALID_RECORDING_1"} == {
c.info.title for c in task.candidates c.info.title for c in task.candidates
} }