Fixed an failing bpd test:

The failing test stems from differences in how the patch decorator
from unitest works in python 3.14 in compassion to earlier versions. It
does not patch code running in a multiprocessing.

Generally enhanced typehints in file.
This commit is contained in:
Sebastian Mohr 2026-01-05 22:21:18 +01:00
parent 83efb44ffb
commit 32cc47814e

View file

@ -21,7 +21,9 @@ import tempfile
import threading
import time
import unittest
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any, Literal, NamedTuple, overload
from unittest.mock import MagicMock, patch
import confuse
@ -34,7 +36,7 @@ from beets.util import bluelet
bpd = pytest.importorskip("beetsplug.bpd")
class CommandParseTest(unittest.TestCase):
class TestCommandParse:
def test_no_args(self):
s = r"command"
c = bpd.Command(s)
@ -76,14 +78,28 @@ class CommandParseTest(unittest.TestCase):
assert c.args == ["hello \\ there"]
class ErrorData(NamedTuple):
code: int
pos: int
cmd: str
msg: str
class MPCResponse:
def __init__(self, raw_response):
ok: bool
data: dict[str, str | list[str]]
err_data: None | ErrorData
def __init__(self, raw_response: bytes):
body = b"\n".join(raw_response.split(b"\n")[:-2]).decode("utf-8")
self.data = self._parse_body(body)
status = raw_response.split(b"\n")[-2].decode("utf-8")
self.ok, self.err_data = self._parse_status(status)
def _parse_status(self, status):
@staticmethod
def _parse_status(
status: str,
) -> tuple[Literal[True], None] | tuple[Literal[False], ErrorData]:
"""Parses the first response line, which contains the status."""
if status.startswith("OK") or status.startswith("list_OK"):
return True, None
@ -91,17 +107,18 @@ class MPCResponse:
code, rest = status[5:].split("@", 1)
pos, rest = rest.split("]", 1)
cmd, rest = rest[2:].split("}")
return False, (int(code), int(pos), cmd, rest[1:])
return False, ErrorData(int(code), int(pos), cmd, rest[1:])
else:
raise RuntimeError(f"Unexpected status: {status!r}")
def _parse_body(self, body):
@staticmethod
def _parse_body(body: str) -> dict[str, str | list[str]]:
"""Messages are generally in the format "header: content".
Convert them into a dict, storing the values for repeated headers as
lists of strings, and non-repeated ones as string.
"""
data = {}
repeated_headers = set()
data: dict[str, str | list[str]] = {}
repeated_headers: set[str] = set()
for line in body.split("\n"):
if not line:
continue
@ -110,9 +127,9 @@ class MPCResponse:
header, content = line.split(":", 1)
content = content.lstrip()
if header in repeated_headers:
data[header].append(content)
data[header].append(content) # type: ignore
elif header in data:
data[header] = [data[header], content]
data[header] = [data[header], content] # type: ignore[list-item]
repeated_headers.add(header)
else:
data[header] = content
@ -120,7 +137,10 @@ class MPCResponse:
class MPCClient:
def __init__(self, sock, do_hello=True):
sock: socket.socket
buf: bytes
def __init__(self, sock: socket.socket, do_hello: bool = True):
self.sock = sock
self.buf = b""
if do_hello:
@ -128,19 +148,25 @@ class MPCClient:
if not hello.ok:
raise RuntimeError("Bad hello")
def get_response(self, force_multi=None):
@overload
def get_response(self, force_multi: None = None) -> MPCResponse: ...
@overload
def get_response(self, force_multi: int) -> list[MPCResponse | None]: ...
def get_response(
self, force_multi: int | None = None
) -> MPCResponse | list[MPCResponse | None]:
"""Wait for a full server response and wrap it in a helper class.
If the request was a batch request then this will return a list of
`MPCResponse`s, one for each processed subcommand.
"""
response = b""
responses = []
response: bytes = b""
responses: list[MPCResponse | None] = []
while True:
line = self.readline()
response += line
if line.startswith(b"OK") or line.startswith(b"ACK"):
if force_multi or any(responses):
if isinstance(force_multi, int):
if line.startswith(b"ACK"):
responses.append(MPCResponse(response))
n_remaining = force_multi - len(responses)
@ -154,7 +180,7 @@ class MPCClient:
elif not line:
raise RuntimeError(f"Unexpected response: {line!r}")
def serialise_command(self, command, *args):
def serialise_command(self, command: str, *args: str) -> bytes:
cmd = [command.encode("utf-8")]
for arg in [a.encode("utf-8") for a in args]:
if b" " in arg:
@ -163,12 +189,12 @@ class MPCClient:
cmd.append(arg)
return b" ".join(cmd) + b"\n"
def send_command(self, command, *args):
def send_command(self, command: str, *args: str) -> MPCResponse:
request = self.serialise_command(command, *args)
self.sock.sendall(request)
return self.get_response()
def send_commands(self, *commands):
def send_commands(self, *commands: str) -> list[MPCResponse]:
"""Use MPD command batching to send multiple commands at once.
Each item of commands is a tuple containing a command followed by
any arguments.
@ -183,9 +209,9 @@ class MPCClient:
requests.append(b"command_list_end\n")
request = b"".join(requests)
self.sock.sendall(request)
return self.get_response(force_multi=len(commands))
return list(filter(None, self.get_response(force_multi=len(commands))))
def readline(self, terminator=b"\n", bufsize=1024):
def readline(self, terminator: bytes = b"\n", bufsize: int = 1024) -> bytes:
"""Reads a line of data from the socket."""
while True:
@ -203,7 +229,7 @@ class MPCClient:
return line
def implements(commands, fail=False):
def implements(commands: set[str], fail=False):
def _test(self):
with self.run_bpd() as client:
response = client.send_command("commands")
@ -221,8 +247,17 @@ bluelet_listener = bluelet.Listener
def start_server(args, assigned_port, listener_patch):
"""Start the bpd server, writing the port to `assigned_port`."""
# FIXME: This is used in the test_cmd_decoders test. Patch does not apply to
# code running in mp.Process anymore (change in 3.14)
# There might be a better way to fix this but as I have already spent
# way more time here than planned this seems like the easiest way forward
patch(
"beetsplug.bpd.gstplayer.GstPlayer.get_decoders",
MagicMock(return_value={"default": ({"audio/mpeg"}, {"mp3"})}),
).start()
def listener_wrap(host, port):
"""Wrap `bluelet.Listener`, writing the port to `assigend_port`."""
"""Wrap `bluelet.Listener`, writing the port to `assigned_port`."""
# `bluelet.Listener` has previously been saved to
# `bluelet_listener` as this function will replace it at its
# original location.
@ -259,19 +294,16 @@ class BPDTestHelper(PluginTestCase):
self.lib.add_album([self.item1, self.item2])
@contextmanager
def run_bpd(
def bpd_server(
self,
host="localhost",
password=None,
do_hello=True,
second_client=False,
):
host: str = "localhost",
password: str | None = None,
) -> Iterator[tuple[str, int]]:
"""Runs BPD in another process, configured with the same library
database as we created in the setUp method. Exposes a client that is
connected to the server, and kills the server at the end.
database as we created in the setUp method. Kills the server at the end.
"""
# Create a config file:
config = {
config: dict[str, Any] = {
"pluginpath": [str(self.temp_dir_path)],
"plugins": "bpd",
# use port 0 to let the OS choose a free port
@ -291,7 +323,10 @@ class BPDTestHelper(PluginTestCase):
config_file.close()
# Fork and launch BPD in the new process:
assigned_port = mp.Queue(2) # 2 slots, `control_port` and `port`
assigned_port: mp.Queue[int] = mp.Queue(
2
) # 2 slots, `control_port` and `port`
server = mp.Process(
target=start_server,
args=(
@ -310,32 +345,48 @@ class BPDTestHelper(PluginTestCase):
server.start()
try:
assigned_port.get(timeout=1) # skip control_port
port = assigned_port.get(timeout=0.5) # read port
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((host, port))
if second_client:
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock2.connect((host, port))
yield (
MPCClient(sock, do_hello),
MPCClient(sock2, do_hello),
)
finally:
sock2.close()
else:
yield MPCClient(sock, do_hello)
finally:
sock.close()
assigned_port.get(timeout=5) # skip control_port
port = assigned_port.get(timeout=2) # read port
yield (host, port)
finally:
server.terminate()
server.join(timeout=0.2)
@contextmanager
def bpd_client(
self,
host: str,
port: int,
do_hello: bool = True,
) -> Iterator[MPCClient]:
"""Connects a BPD client to a given server"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((host, port))
yield MPCClient(sock, do_hello)
finally:
sock.close()
@contextmanager
def run_bpd(
self,
host: str = "localhost",
password: str | None = None,
do_hello: bool = True,
) -> Iterator[MPCClient]:
"""Runs BPD in another process, configured with the same library
database as we created in the setUp method. Exposes a client that is
connected to the server, and kills the server at the end.
"""
with self.bpd_server(
host,
password,
) as (host, port):
with self.bpd_client(host, port, do_hello) as client:
yield client
def _assert_ok(self, *responses):
for response in responses:
assert response is not None
@ -488,22 +539,30 @@ class BPDQueryTest(BPDTestHelper):
if any(not r.ok for r in rs):
raise RuntimeError("Toggler failed")
with self.run_bpd(second_client=True) as (client, client2):
self._bpd_add(client, self.item1, self.item2)
toggler = threading.Thread(target=_toggle, args=(client2,))
toggler.start()
# Idling will hang until the toggler thread changes the play state.
# Since the client sockets have a 1s timeout set at worst this will
# raise a socket.timeout and fail the test if the toggler thread
# manages to finish before the idle command is sent here.
response = client.send_command("idle", "player")
toggler.join()
with self.bpd_server() as (host, port):
with (
self.bpd_client(host, port) as client1,
self.bpd_client(host, port) as client2,
):
self._bpd_add(client1, self.item1, self.item2)
toggler = threading.Thread(target=_toggle, args=(client2,))
toggler.start()
# Idling will hang until the toggler thread changes the play state.
# Since the client sockets have a 1s timeout set at worst this will
# raise a socket.timeout and fail the test if the toggler thread
# manages to finish before the idle command is sent here.
response = client1.send_command("idle", "player")
toggler.join()
self._assert_ok(response)
def test_cmd_idle_with_pending(self):
with self.run_bpd(second_client=True) as (client, client2):
response1 = client.send_command("random", "1")
response2 = client2.send_command("idle")
with self.bpd_server() as (host, port):
with (
self.bpd_client(host, port) as client1,
self.bpd_client(host, port) as client2,
):
response1 = client1.send_command("random", "1")
response2 = client2.send_command("idle")
self._assert_ok(response1, response2)
assert "options" == response2.data["changed"]
@ -1130,10 +1189,6 @@ class BPDReflectionTest(BPDTestHelper):
fail=True,
)
@patch(
"beetsplug.bpd.gstplayer.GstPlayer.get_decoders",
MagicMock(return_value={"default": ({"audio/mpeg"}, {"mp3"})}),
)
def test_cmd_decoders(self):
with self.run_bpd() as client:
response = client.send_command("decoders")