diff --git a/test/test_player.py b/test/test_player.py index 9ea079b3c..5060b3322 100644 --- a/test/test_player.py +++ b/test/test_player.py @@ -101,10 +101,21 @@ class MPCResponse(object): def __init__(self, raw_response): body = b'\n'.join(raw_response.split(b'\n')[:-2]).decode('utf-8') self.data = self._parse_body(body) - self.status = raw_response.split(b'\n')[-2].decode('utf-8') - self.ok = (self.status.startswith('OK') or - self.status.startswith('list_OK')) - self.err = self.status.startswith('ACK') + status = raw_response.split(b'\n')[-2].decode('utf-8') + self.ok, self.err_data = self._parse_status(status) + + def _parse_status(self, status): + """ Parses the first response line, which contains the status. + """ + if status.startswith('OK') or status.startswith('list_OK'): + return True, None + elif status.startswith('ACK'): + code, rest = status[5:].split('@', 1) + pos, rest = rest.split(']', 1) + cmd, rest = rest[2:].split('}') + return False, (int(code), int(pos), cmd, rest[1:]) + else: + raise RuntimeError('Unexpected status: {!r}'.format(status)) def _parse_body(self, body): """ Messages are generally in the format "header: content". @@ -137,7 +148,7 @@ class MPCClient(object): if do_hello: hello = self.get_response() if not hello.ok: - raise RuntimeError('Bad hello: {}'.format(hello.status)) + raise RuntimeError('Bad hello') def get_response(self, force_multi=None): """ Wait for a full server response and wrap it in a helper class. @@ -154,6 +165,8 @@ class MPCClient(object): if force_multi or any(responses): if line.startswith(b'ACK'): responses.append(MPCResponse(response)) + n_remaining = force_multi - len(responses) + responses.extend([None] * n_remaining) return responses else: return MPCResponse(response) @@ -192,7 +205,7 @@ class MPCClient(object): requests.append(b'command_list_end\n') request = b''.join(requests) self.sock.sendall(request) - return self.get_response(force_multi=True) + return self.get_response(force_multi=len(commands)) def readline(self, terminator=b'\n', bufsize=1024): """ Reads a line of data from the socket. @@ -222,6 +235,7 @@ def implements(commands, expectedFailure=False): # noqa: N803 def _test(self): with self.run_bpd() as client: response = client.send_command('commands') + self._assert_ok(response) implemented = response.data['command'] self.assertEqual(commands.intersection(implemented), commands) return unittest.expectedFailure(_test) if expectedFailure else _test @@ -294,6 +308,25 @@ class BPDTest(unittest.TestCase, TestHelper): server.terminate() server.join(timeout=0.2) + def _assert_ok(self, *responses): + for response in responses: + self.assertTrue(response is not None) + self.assertTrue(response.ok, 'Response failed: {}'.format( + response.err_data)) + + def _assert_failed(self, response, code, pos=None): + """ Check that a command failed with a specific error code. If this + is a list of responses, first check all preceding commands were OK. + """ + if pos is not None: + previous_commands = response[0:pos] + self._assert_ok(*previous_commands) + response = response[pos] + self.assertEqual(pos, response.err_data[1]) + self.assertFalse(response.ok) + if code is not None: + self.assertEqual(code, response.err_data[0]) + def test_server_hello(self): with self.run_bpd(do_hello=False) as client: self.assertEqual(client.readline(), b'OK MPD 0.13.0\n') @@ -313,23 +346,29 @@ class BPDTest(unittest.TestCase, TestHelper): 'seekid', 'seekcur', 'stop', }, expectedFailure=True) - def bpd_add_item(self, client, item): + def _bpd_add(self, client, *items): """ Add the given item to the BPD playlist """ - name = py3_path(os.path.basename(item.path)) - path = '/'.join([item.artist, item.album, name]) - response = client.send_command('add', path) - self.assertTrue(response.ok) + paths = ['/'.join([ + item.artist, item.album, + py3_path(os.path.basename(item.path))]) for item in items] + responses = client.send_commands(*[('add', path) for path in paths]) + self._assert_ok(*responses) + + def test_unknown_cmd(self): + with self.run_bpd() as client: + response = client.send_command('notacommand') + self._assert_failed(response, bpd.ERROR_UNKNOWN) def test_cmd_play(self): with self.run_bpd() as client: - self.bpd_add_item(client, self.item1) + self._bpd_add(client, self.item1) responses = client.send_commands( ('status',), ('play',), ('status',)) + self._assert_ok(*responses) self.assertEqual('stop', responses[0].data['state']) - self.assertTrue(responses[1].ok) self.assertEqual('play', responses[2].data['state']) test_implements_queue = implements({ @@ -342,23 +381,17 @@ class BPDTest(unittest.TestCase, TestHelper): def test_cmd_add(self): with self.run_bpd() as client: - self.bpd_add_item(client, self.item1) + self._bpd_add(client, self.item1) def test_cmd_playlistinfo(self): with self.run_bpd() as client: - self.bpd_add_item(client, self.item1) + self._bpd_add(client, self.item1) responses = client.send_commands( ('playlistinfo',), - ('playlistinfo', '0')) - response = client.send_command('playlistinfo', '200') + ('playlistinfo', '0'), + ('playlistinfo', '200')) - self.assertTrue(responses[0].ok) - self.assertTrue(responses[1].ok) - - self.assertTrue(response.err) - self.assertEqual( - 'ACK [2@0] {playlistinfo} argument out of range', - response.status) + self._assert_failed(responses, bpd.ERROR_ARG, pos=2) test_implements_playlists = implements({ 'listplaylist', 'listplaylistinfo', 'listplaylists', 'load', @@ -375,27 +408,34 @@ class BPDTest(unittest.TestCase, TestHelper): def test_cmd_search(self): with self.run_bpd() as client: response = client.send_command('search', 'track', '1') + self._assert_ok(response) self.assertEqual(self.item1.title, response.data['Title']) def test_cmd_list_simple(self): with self.run_bpd() as client: - response1 = client.send_command('list', 'album') - response2 = client.send_command('list', 'track') - self.assertEqual('Album Title', response1.data['Album']) - self.assertEqual(['1', '2'], response2.data['Track']) + responses = client.send_commands( + ('list', 'album'), + ('list', 'track')) + self._assert_ok(*responses) + self.assertEqual('Album Title', responses[0].data['Album']) + self.assertEqual(['1', '2'], responses[1].data['Track']) def test_cmd_lsinfo(self): with self.run_bpd() as client: response1 = client.send_command('lsinfo') + self._assert_ok(response1) response2 = client.send_command( 'lsinfo', response1.data['directory']) + self._assert_ok(response2) response3 = client.send_command( 'lsinfo', response2.data['directory']) + self._assert_ok(response3) self.assertIn(self.item1.title, response3.data['Title']) def test_cmd_count(self): with self.run_bpd() as client: response = client.send_command('count', 'track', '1') + self._assert_ok(response) self.assertEqual('1', response.data['songs']) self.assertEqual('0', response.data['playtime']) @@ -414,29 +454,26 @@ class BPDTest(unittest.TestCase, TestHelper): def test_cmd_password(self): with self.run_bpd(password='abc123') as client: response = client.send_command('status') - self.assertTrue(response.err) - self.assertEqual(response.status, - 'ACK [4@0] {} insufficient privileges') + self._assert_failed(response, bpd.ERROR_PERMISSION) response = client.send_command('password', 'wrong') - self.assertTrue(response.err) - self.assertEqual(response.status, - 'ACK [3@0] {password} incorrect password') + self._assert_failed(response, bpd.ERROR_PASSWORD) - response = client.send_command('password', 'abc123') - self.assertTrue(response.ok) - response = client.send_command('status') - self.assertTrue(response.ok) + responses = client.send_commands( + ('password', 'abc123'), + ('status',)) + self._assert_ok(*responses) def test_cmd_ping(self): with self.run_bpd() as client: response = client.send_command('ping') - self.assertTrue(response.ok) + self._assert_ok(response) @unittest.expectedFailure def test_cmd_tagtypes(self): with self.run_bpd() as client: response = client.send_command('tagtypes') + self._assert_ok(response) self.assertEqual({ 'Artist', 'ArtistSort', 'Album', 'AlbumSort', 'AlbumArtist', 'AlbumArtistSort', 'Title', 'Track', 'Name', 'Genre', 'Date', @@ -450,7 +487,7 @@ class BPDTest(unittest.TestCase, TestHelper): def test_tagtypes_mask(self): with self.run_bpd() as client: response = client.send_command('tagtypes', 'clear') - self.assertTrue(response.ok) + self._assert_ok(response) test_implements_partitions = implements({ 'partition', 'listpartitions', 'newpartition',