From 32cc47814e1f6329958b616b29b75cd86d0a9e7e Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Mon, 5 Jan 2026 22:21:18 +0100 Subject: [PATCH] Fixed an failing bpd test: The failing test stems from differences in how the patch decorator from unitest works in python 3.14 in compassion to earlier versions. It does not patch code running in a multiprocessing. Generally enhanced typehints in file. --- test/plugins/test_bpd.py | 195 +++++++++++++++++++++++++-------------- 1 file changed, 125 insertions(+), 70 deletions(-) diff --git a/test/plugins/test_bpd.py b/test/plugins/test_bpd.py index 16e424d7e..3798be6f1 100644 --- a/test/plugins/test_bpd.py +++ b/test/plugins/test_bpd.py @@ -21,7 +21,9 @@ import tempfile import threading import time import unittest +from collections.abc import Iterator from contextlib import contextmanager +from typing import Any, Literal, NamedTuple, overload from unittest.mock import MagicMock, patch import confuse @@ -34,7 +36,7 @@ from beets.util import bluelet bpd = pytest.importorskip("beetsplug.bpd") -class CommandParseTest(unittest.TestCase): +class TestCommandParse: def test_no_args(self): s = r"command" c = bpd.Command(s) @@ -76,14 +78,28 @@ class CommandParseTest(unittest.TestCase): assert c.args == ["hello \\ there"] +class ErrorData(NamedTuple): + code: int + pos: int + cmd: str + msg: str + + class MPCResponse: - def __init__(self, raw_response): + ok: bool + data: dict[str, str | list[str]] + err_data: None | ErrorData + + def __init__(self, raw_response: bytes): body = b"\n".join(raw_response.split(b"\n")[:-2]).decode("utf-8") self.data = self._parse_body(body) status = raw_response.split(b"\n")[-2].decode("utf-8") self.ok, self.err_data = self._parse_status(status) - def _parse_status(self, status): + @staticmethod + def _parse_status( + status: str, + ) -> tuple[Literal[True], None] | tuple[Literal[False], ErrorData]: """Parses the first response line, which contains the status.""" if status.startswith("OK") or status.startswith("list_OK"): return True, None @@ -91,17 +107,18 @@ class MPCResponse: code, rest = status[5:].split("@", 1) pos, rest = rest.split("]", 1) cmd, rest = rest[2:].split("}") - return False, (int(code), int(pos), cmd, rest[1:]) + return False, ErrorData(int(code), int(pos), cmd, rest[1:]) else: raise RuntimeError(f"Unexpected status: {status!r}") - def _parse_body(self, body): + @staticmethod + def _parse_body(body: str) -> dict[str, str | list[str]]: """Messages are generally in the format "header: content". Convert them into a dict, storing the values for repeated headers as lists of strings, and non-repeated ones as string. """ - data = {} - repeated_headers = set() + data: dict[str, str | list[str]] = {} + repeated_headers: set[str] = set() for line in body.split("\n"): if not line: continue @@ -110,9 +127,9 @@ class MPCResponse: header, content = line.split(":", 1) content = content.lstrip() if header in repeated_headers: - data[header].append(content) + data[header].append(content) # type: ignore elif header in data: - data[header] = [data[header], content] + data[header] = [data[header], content] # type: ignore[list-item] repeated_headers.add(header) else: data[header] = content @@ -120,7 +137,10 @@ class MPCResponse: class MPCClient: - def __init__(self, sock, do_hello=True): + sock: socket.socket + buf: bytes + + def __init__(self, sock: socket.socket, do_hello: bool = True): self.sock = sock self.buf = b"" if do_hello: @@ -128,19 +148,25 @@ class MPCClient: if not hello.ok: raise RuntimeError("Bad hello") - def get_response(self, force_multi=None): + @overload + def get_response(self, force_multi: None = None) -> MPCResponse: ... + @overload + def get_response(self, force_multi: int) -> list[MPCResponse | None]: ... + def get_response( + self, force_multi: int | None = None + ) -> MPCResponse | list[MPCResponse | None]: """Wait for a full server response and wrap it in a helper class. If the request was a batch request then this will return a list of `MPCResponse`s, one for each processed subcommand. """ - response = b"" - responses = [] + response: bytes = b"" + responses: list[MPCResponse | None] = [] while True: line = self.readline() response += line if line.startswith(b"OK") or line.startswith(b"ACK"): - if force_multi or any(responses): + if isinstance(force_multi, int): if line.startswith(b"ACK"): responses.append(MPCResponse(response)) n_remaining = force_multi - len(responses) @@ -154,7 +180,7 @@ class MPCClient: elif not line: raise RuntimeError(f"Unexpected response: {line!r}") - def serialise_command(self, command, *args): + def serialise_command(self, command: str, *args: str) -> bytes: cmd = [command.encode("utf-8")] for arg in [a.encode("utf-8") for a in args]: if b" " in arg: @@ -163,12 +189,12 @@ class MPCClient: cmd.append(arg) return b" ".join(cmd) + b"\n" - def send_command(self, command, *args): + def send_command(self, command: str, *args: str) -> MPCResponse: request = self.serialise_command(command, *args) self.sock.sendall(request) return self.get_response() - def send_commands(self, *commands): + def send_commands(self, *commands: str) -> list[MPCResponse]: """Use MPD command batching to send multiple commands at once. Each item of commands is a tuple containing a command followed by any arguments. @@ -183,9 +209,9 @@ class MPCClient: requests.append(b"command_list_end\n") request = b"".join(requests) self.sock.sendall(request) - return self.get_response(force_multi=len(commands)) + return list(filter(None, self.get_response(force_multi=len(commands)))) - def readline(self, terminator=b"\n", bufsize=1024): + def readline(self, terminator: bytes = b"\n", bufsize: int = 1024) -> bytes: """Reads a line of data from the socket.""" while True: @@ -203,7 +229,7 @@ class MPCClient: return line -def implements(commands, fail=False): +def implements(commands: set[str], fail=False): def _test(self): with self.run_bpd() as client: response = client.send_command("commands") @@ -221,8 +247,17 @@ bluelet_listener = bluelet.Listener def start_server(args, assigned_port, listener_patch): """Start the bpd server, writing the port to `assigned_port`.""" + # FIXME: This is used in the test_cmd_decoders test. Patch does not apply to + # code running in mp.Process anymore (change in 3.14) + # There might be a better way to fix this but as I have already spent + # way more time here than planned this seems like the easiest way forward + patch( + "beetsplug.bpd.gstplayer.GstPlayer.get_decoders", + MagicMock(return_value={"default": ({"audio/mpeg"}, {"mp3"})}), + ).start() + def listener_wrap(host, port): - """Wrap `bluelet.Listener`, writing the port to `assigend_port`.""" + """Wrap `bluelet.Listener`, writing the port to `assigned_port`.""" # `bluelet.Listener` has previously been saved to # `bluelet_listener` as this function will replace it at its # original location. @@ -259,19 +294,16 @@ class BPDTestHelper(PluginTestCase): self.lib.add_album([self.item1, self.item2]) @contextmanager - def run_bpd( + def bpd_server( self, - host="localhost", - password=None, - do_hello=True, - second_client=False, - ): + host: str = "localhost", + password: str | None = None, + ) -> Iterator[tuple[str, int]]: """Runs BPD in another process, configured with the same library - database as we created in the setUp method. Exposes a client that is - connected to the server, and kills the server at the end. + database as we created in the setUp method. Kills the server at the end. """ # Create a config file: - config = { + config: dict[str, Any] = { "pluginpath": [str(self.temp_dir_path)], "plugins": "bpd", # use port 0 to let the OS choose a free port @@ -291,7 +323,10 @@ class BPDTestHelper(PluginTestCase): config_file.close() # Fork and launch BPD in the new process: - assigned_port = mp.Queue(2) # 2 slots, `control_port` and `port` + assigned_port: mp.Queue[int] = mp.Queue( + 2 + ) # 2 slots, `control_port` and `port` + server = mp.Process( target=start_server, args=( @@ -310,32 +345,48 @@ class BPDTestHelper(PluginTestCase): server.start() try: - assigned_port.get(timeout=1) # skip control_port - port = assigned_port.get(timeout=0.5) # read port - - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - sock.connect((host, port)) - - if second_client: - sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - sock2.connect((host, port)) - yield ( - MPCClient(sock, do_hello), - MPCClient(sock2, do_hello), - ) - finally: - sock2.close() - - else: - yield MPCClient(sock, do_hello) - finally: - sock.close() + assigned_port.get(timeout=5) # skip control_port + port = assigned_port.get(timeout=2) # read port + yield (host, port) finally: server.terminate() server.join(timeout=0.2) + @contextmanager + def bpd_client( + self, + host: str, + port: int, + do_hello: bool = True, + ) -> Iterator[MPCClient]: + """Connects a BPD client to a given server""" + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.connect((host, port)) + yield MPCClient(sock, do_hello) + finally: + sock.close() + + @contextmanager + def run_bpd( + self, + host: str = "localhost", + password: str | None = None, + do_hello: bool = True, + ) -> Iterator[MPCClient]: + """Runs BPD in another process, configured with the same library + database as we created in the setUp method. Exposes a client that is + connected to the server, and kills the server at the end. + """ + + with self.bpd_server( + host, + password, + ) as (host, port): + with self.bpd_client(host, port, do_hello) as client: + yield client + def _assert_ok(self, *responses): for response in responses: assert response is not None @@ -488,22 +539,30 @@ class BPDQueryTest(BPDTestHelper): if any(not r.ok for r in rs): raise RuntimeError("Toggler failed") - with self.run_bpd(second_client=True) as (client, client2): - self._bpd_add(client, self.item1, self.item2) - toggler = threading.Thread(target=_toggle, args=(client2,)) - toggler.start() - # Idling will hang until the toggler thread changes the play state. - # Since the client sockets have a 1s timeout set at worst this will - # raise a socket.timeout and fail the test if the toggler thread - # manages to finish before the idle command is sent here. - response = client.send_command("idle", "player") - toggler.join() + with self.bpd_server() as (host, port): + with ( + self.bpd_client(host, port) as client1, + self.bpd_client(host, port) as client2, + ): + self._bpd_add(client1, self.item1, self.item2) + toggler = threading.Thread(target=_toggle, args=(client2,)) + toggler.start() + # Idling will hang until the toggler thread changes the play state. + # Since the client sockets have a 1s timeout set at worst this will + # raise a socket.timeout and fail the test if the toggler thread + # manages to finish before the idle command is sent here. + response = client1.send_command("idle", "player") + toggler.join() self._assert_ok(response) def test_cmd_idle_with_pending(self): - with self.run_bpd(second_client=True) as (client, client2): - response1 = client.send_command("random", "1") - response2 = client2.send_command("idle") + with self.bpd_server() as (host, port): + with ( + self.bpd_client(host, port) as client1, + self.bpd_client(host, port) as client2, + ): + response1 = client1.send_command("random", "1") + response2 = client2.send_command("idle") self._assert_ok(response1, response2) assert "options" == response2.data["changed"] @@ -1130,10 +1189,6 @@ class BPDReflectionTest(BPDTestHelper): fail=True, ) - @patch( - "beetsplug.bpd.gstplayer.GstPlayer.get_decoders", - MagicMock(return_value={"default": ({"audio/mpeg"}, {"mp3"})}), - ) def test_cmd_decoders(self): with self.run_bpd() as client: response = client.send_command("decoders")