beets/test/test_player.py
Carl Suster 767441d5d5 BPD tests: improve test helpers
Decode the bytes to strings: the MPD protocol specifies that the
communications are all in UTF-8.

Also parse the body into a dict since this is typically more convenient
than having to do it manually in each test.
2019-03-31 10:46:14 +11:00

423 lines
15 KiB
Python

# -*- coding: utf-8 -*-
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Tests for BPD and music playing.
"""
from __future__ import division, absolute_import, print_function
import unittest
import sys
import imp
import multiprocessing as mp
import socket
import time
from test import _common
from test.helper import TestHelper
from beetsplug import bpd
# Intercept and mock the GstPlayer player:
gstplayer = imp.new_module('beetsplug.bpg.gstplayer')
gstplayer.GstPlayer = type('GstPlayer', (), {
'__init__': lambda self, callback: None,
'playing': False,
'volume': 0.0,
'run': lambda self: None,
'time': lambda self: (0, 0),
'play': lambda self: None,
'pause': lambda self: None,
'play_file': lambda self, path: None,
'seek': lambda self, pos: None,
'stop': lambda self: None,
})
sys.modules['beetsplug.bpd.gstplayer'] = gstplayer
bpd.gstplayer = gstplayer
class CommandParseTest(unittest.TestCase):
def test_no_args(self):
s = r'command'
c = bpd.Command(s)
self.assertEqual(c.name, u'command')
self.assertEqual(c.args, [])
def test_one_unquoted_arg(self):
s = r'command hello'
c = bpd.Command(s)
self.assertEqual(c.name, u'command')
self.assertEqual(c.args, [u'hello'])
def test_two_unquoted_args(self):
s = r'command hello there'
c = bpd.Command(s)
self.assertEqual(c.name, u'command')
self.assertEqual(c.args, [u'hello', u'there'])
def test_one_quoted_arg(self):
s = r'command "hello there"'
c = bpd.Command(s)
self.assertEqual(c.name, u'command')
self.assertEqual(c.args, [u'hello there'])
def test_heterogenous_args(self):
s = r'command "hello there" sir'
c = bpd.Command(s)
self.assertEqual(c.name, u'command')
self.assertEqual(c.args, [u'hello there', u'sir'])
def test_quote_in_arg(self):
s = r'command "hello \" there"'
c = bpd.Command(s)
self.assertEqual(c.args, [u'hello " there'])
def test_backslash_in_arg(self):
s = r'command "hello \\ there"'
c = bpd.Command(s)
self.assertEqual(c.args, [u'hello \\ there'])
class MPCResponse(object):
def __init__(self, raw_response):
body = b'\n'.join(raw_response.split(b'\n')[:-2]).decode('utf-8')
self.data = self._parse_body(body)
self.status = raw_response.split(b'\n')[-2].decode('utf-8')
self.ok = (self.status.startswith('OK') or
self.status.startswith('list_OK'))
self.err = self.status.startswith('ACK')
if not self.ok:
print(self.status)
def _parse_body(self, body):
""" Messages are generally in the format "header: content".
Convert them into a dict, storing the values for repeated headers as
lists of strings, and non-repeated ones as string.
"""
data = {}
repeated_headers = set()
for line in body.split('\n'):
if not line:
continue
if ':' not in line:
raise RuntimeError('Unexpected line: {!r}'.format(line))
header, content = line.split(':', 1)
content = content.lstrip()
if header in repeated_headers:
data[header].append(content)
elif header in data:
data[header] = [data[header], content]
repeated_headers.add(header)
else:
data[header] = content
return data
class MPCClient(object):
def __init__(self, host, port, do_hello=True):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
self.buf = b''
if do_hello:
hello = self.get_response()
if not hello.ok:
raise RuntimeError('Bad hello: {}'.format(hello.status))
def __del__(self):
self.sock.close()
def get_response(self):
""" Wait for a full server response and wrap it in a helper class.
If the request was a batch request then this will return a list of
`MPCResponse`s, one for each processed subcommand.
"""
response = b''
responses = []
while True:
line = self.readline()
response += line
if line.startswith(b'OK') or line.startswith(b'ACK'):
if any(responses):
if line.startswith(b'ACK'):
responses.append(MPCResponse(response))
return responses
else:
return MPCResponse(response)
if line.startswith(b'list_OK'):
responses.append(MPCResponse(response))
response = b''
elif not line:
raise RuntimeError('Unexpected response: {!r}'.format(line))
def serialise_command(self, command, *args):
cmd = [command.encode('utf-8')]
for arg in [a.encode('utf-8') for a in args]:
if b' ' in arg:
cmd.append(b'"' + arg + b'"')
else:
cmd.append(arg)
return b' '.join(cmd) + b'\n'
def send_command(self, command, *args):
request = self.serialise_command(command, *args)
self.sock.sendall(request)
return self.get_response()
def send_commands(self, *commands):
""" Use MPD command batching to send multiple commands at once.
Each item of commands is a tuple containing a command followed by
any arguments.
"""
requests = []
for command_and_args in commands:
command = command_and_args[0]
args = command_and_args[1:]
requests.append(self.serialise_command(command, *args))
requests.insert(0, b'command_list_ok_begin\n')
requests.append(b'command_list_end\n')
request = b''.join(requests)
self.sock.sendall(request)
return self.get_response()
def readline(self, terminator=b'\n', bufsize=1024):
""" Reads a line of data from the socket.
"""
while True:
if terminator in self.buf:
line, self.buf = self.buf.split(terminator, 1)
line += terminator
return line
self.sock.settimeout(1)
data = self.sock.recv(bufsize)
if data:
self.buf += data
else:
line = self.buf
self.buf = b''
return line
def implements(commands, expectedFailure=False): # noqa: N803
def _test(self):
response = self.client.send_command('commands')
implemented = response.data['command']
self.assertEqual(commands.intersection(implemented), commands)
return unittest.expectedFailure(_test) if expectedFailure else _test
@_common.slow_test()
class BPDTest(unittest.TestCase, TestHelper):
def setUp(self):
self.setup_beets()
self.load_plugins('bpd')
self.item1 = self.add_item(title='Track One Title',
album='Album Title', artist='Artist Name',
track=1)
self.item2 = self.add_item(title='Track Two Title',
album='Album Title', artist='Artist Name',
track=2)
self.lib.add_album([self.item1, self.item2])
self.server_proc = None
self.client = self.make_server_client()
def tearDown(self):
self.server_proc.terminate()
self.teardown_beets()
self.unload_plugins()
def make_server(self, host, port, password=None):
bpd_server = bpd.Server(self.lib, host, port, password)
self.server = bpd_server
if self.server_proc:
self.server_proc.terminate()
self.server_proc = mp.Process(target=bpd_server.run)
self.server_proc.start()
def make_client(self, host='localhost', port=9876, do_hello=True):
return MPCClient(host, port, do_hello)
def make_server_client(self, host='localhost', port=9876, password=None):
self.make_server(host, port, password)
time.sleep(0.1) # wait for the server to start
client = self.make_client(host, port)
return client
def test_server_hello(self):
alt_client = self.make_client(do_hello=False)
self.assertEqual(alt_client.readline(), b'OK MPD 0.13.0\n')
test_implements_query = implements({
'clearerror', 'currentsong', 'idle', 'status', 'stats',
}, expectedFailure=True)
test_implements_playback = implements({
'consume', 'crossfade', 'mixrampd', 'mixrampdelay', 'random',
'repeat', 'setvol', 'single', 'replay_gain_mode',
'replay_gain_status', 'volume',
}, expectedFailure=True)
test_implements_control = implements({
'next', 'pause', 'play', 'playid', 'previous', 'seek',
'seekid', 'seekcur', 'stop',
}, expectedFailure=True)
def test_cmd_play(self):
responses = self.client.send_commands(
('add', 'Artist Name/Album Title/01 Track One Title.mp3'),
('status',),
('play',),
('status',))
self.assertEqual('stop', responses[1].data['state'])
self.assertTrue(responses[2].ok)
self.assertEqual('play', responses[3].data['state'])
test_implements_queue = implements({
'add', 'addid', 'clear', 'delete', 'deleteid', 'move',
'moveid', 'playlist', 'playlistfind', 'playlistid',
'playlistinfo', 'playlistsearch', 'plchanges',
'plchangesposid', 'prio', 'prioid', 'rangeid', 'shuffle',
'swap', 'swapid', 'addtagid', 'cleartagid',
}, expectedFailure=True)
def test_cmd_add(self):
response = self.client.send_command(
'add',
'Artist Name/Album Title/01 Track One Title.mp3')
self.assertTrue(response.ok)
def test_cmd_playlistinfo(self):
responses = self.client.send_commands(
('add', 'Artist Name/Album Title/01 Track One Title.mp3'),
('playlistinfo',),
('playlistinfo', '0'))
self.assertTrue(responses[1].ok)
self.assertTrue(responses[2].ok)
self.assertEqual(responses[1].data, responses[2].data)
response = self.client.send_command('playlistinfo', '1')
self.assertTrue(response.err)
self.assertEqual(
'ACK [2@0] {playlistinfo} argument out of range',
response.status)
test_implements_playlists = implements({
'listplaylist', 'listplaylistinfo', 'listplaylists', 'load',
'playlistadd', 'playlistclear', 'playlistdelete',
'playlistmove', 'rename', 'rm', 'save',
}, expectedFailure=True)
test_implements_database = implements({
'albumart', 'count', 'find', 'findadd', 'list', 'listall',
'listallinfo', 'listfiles', 'lsinfo', 'readcomments',
'search', 'searchadd', 'searchaddpl', 'update', 'rescan',
}, expectedFailure=True)
def test_cmd_search(self):
response = self.client.send_command('search', 'track', '1')
self.assertEqual(
'Artist Name/Album Title/01 Track One Title.mp3',
response.data['file'])
def test_cmd_list_simple(self):
response = self.client.send_command('list', 'album')
self.assertEqual('Album Title', response.data['Album'])
response = self.client.send_command('list', 'track')
self.assertEqual(['1', '2'], response.data['Track'])
def test_cmd_count(self):
response = self.client.send_command('count', 'track', '1')
self.assertEqual('1', response.data['songs'])
self.assertEqual('0', response.data['playtime'])
test_implements_mounts = implements({
'mount', 'unmount', 'listmounts', 'listneighbors',
}, expectedFailure=True)
test_implements_stickers = implements({
'sticker',
}, expectedFailure=True)
test_implements_connection = implements({
'close', 'kill', 'password', 'ping', 'tagtypes',
})
def test_cmd_password(self):
self.client = self.make_server_client(password='abc123')
response = self.client.send_command('status')
self.assertTrue(response.err)
self.assertEqual(response.status,
'ACK [4@0] {} insufficient privileges')
response = self.client.send_command('password', 'wrong')
self.assertTrue(response.err)
self.assertEqual(response.status,
'ACK [3@0] {password} incorrect password')
response = self.client.send_command('password', 'abc123')
self.assertTrue(response.ok)
response = self.client.send_command('status')
self.assertTrue(response.ok)
def test_cmd_ping(self):
response = self.client.send_command('ping')
self.assertTrue(response.ok)
@unittest.expectedFailure
def test_cmd_tagtypes(self):
response = self.client.send_command('tagtypes')
types = {tag.lower() for tag in response.data['tag']}
self.assertEqual({
'artist', 'artistsort', 'album', 'albumsort', 'albumartist',
'albumartistsort', 'title', 'track', 'name', 'genre', 'date',
'composer', 'performer', 'comment', 'disc', 'label',
'musicbrainz_artistid', 'musicbrainz_albumid',
'musicbrainz_albumartistid', 'musicbrainz_trackid',
'musicbrainz_releasetrackid', 'musicbrainz_workid',
}, types)
@unittest.expectedFailure
def test_tagtypes_mask(self):
response = self.client.send_command('tagtypes', 'clear')
self.assertTrue(response.ok)
test_implements_partitions = implements({
'partition', 'listpartitions', 'newpartition',
}, expectedFailure=True)
test_implements_devices = implements({
'disableoutput', 'enableoutput', 'toggleoutput', 'outputs',
}, expectedFailure=True)
test_implements_reflection = implements({
'config', 'commands', 'notcommands', 'urlhandlers',
'decoders',
}, expectedFailure=True)
test_implements_peers = implements({
'subscribe', 'unsubscribe', 'channels', 'readmessages',
'sendmessage',
}, expectedFailure=True)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == '__main__':
unittest.main(defaultTest='suite')