BPD tests: fork and launch beets

This commit is contained in:
Carl Suster 2019-03-29 11:22:58 +11:00
parent c42c0c06bc
commit 35bf041ad0

View file

@ -13,25 +13,36 @@
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Tests for BPD and music playing.
"""Tests for BPD's implementation of the MPD protocol.
"""
from __future__ import division, absolute_import, print_function
import unittest
import os
import sys
import subprocess
import multiprocessing as mp
import socket
import time
import yaml
import tempfile
from contextlib import contextmanager
from test import _common
from beets.util import confit, py3_path
from beetsplug import bpd
from test.helper import TestHelper
from beets.util import confit
from beetsplug import bpd
# Mock GstPlayer so that the forked process doesn't attempt to import gi:
import mock
import imp
gstplayer = imp.new_module("beetsplug.bpg.gstplayer")
gstplayer._GstPlayer = mock.MagicMock(spec_set=[
"time", "volume", "playing", "run", "play_file", "pause", "stop", "seek"])
gstplayer._GstPlayer.time.return_value = (0, 0)
gstplayer._GstPlayer.volume = 0.0
gstplayer._GstPlayer.playing = False
gstplayer.GstPlayer = lambda _: gstplayer._GstPlayer
sys.modules["beetsplug.bpd.gstplayer"] = gstplayer
bpd.gstplayer = gstplayer
class CommandParseTest(unittest.TestCase):
@ -193,6 +204,11 @@ class MPCClient(object):
return line
def start_beets(*args):
import beets.ui
beets.ui.main(list(args))
def implements(commands, expectedFailure=False): # noqa: N803
def _test(self):
with self.run_bpd() as client:
@ -202,7 +218,6 @@ def implements(commands, expectedFailure=False): # noqa: N803
return unittest.expectedFailure(_test) if expectedFailure else _test
@_common.slow_test()
class BPDTest(unittest.TestCase, TestHelper):
def setUp(self):
self.setup_beets(disk=True)
@ -215,23 +230,6 @@ class BPDTest(unittest.TestCase, TestHelper):
track=2)
self.lib.add_album([self.item1, self.item2])
with open(os.path.join(self.temp_dir, b'bpd_mock.py'), 'wb') as f:
f.write(b'import mock\n')
f.write(b'import sys, imp\n')
f.write(b'gstplayer = imp.new_module("beetsplug.bpg.gstplayer")\n')
f.write(b'gstplayer._GstPlayer = mock.MagicMock(spec_set=["time",')
f.write(b'"volume", "playing", "run", "play_file", "pause", "stop')
f.write(b'", "seek"])\n')
f.write(b'gstplayer._GstPlayer.time.return_value = (0, 0)\n')
f.write(b'gstplayer._GstPlayer.volume = 0.0\n')
f.write(b'gstplayer._GstPlayer.playing = False\n')
f.write(b'gstplayer.GstPlayer = lambda _: gstplayer._GstPlayer\n')
f.write(b'sys.modules["beetsplug.bpd.gstplayer"] = gstplayer\n')
f.write(b'import beetsplug, beetsplug.bpd\n')
f.write(b'beetsplug.bpd.gstplayer = gstplayer\n')
f.write(b'sys.modules["beetsplug.bpd_mock"] = beetsplug.bpd\n')
f.write(b'beetsplug.bpd_mock = beetsplug.bpd\n')
def tearDown(self):
self.teardown_beets()
self.unload_plugins()
@ -246,7 +244,7 @@ class BPDTest(unittest.TestCase, TestHelper):
# Create a config file:
config = {
'pluginpath': [self.temp_dir.decode('utf-8')],
'plugins': 'bpd_mock',
'plugins': 'bpd',
'bpd': {'host': host, 'port': port},
}
if password:
@ -257,19 +255,16 @@ class BPDTest(unittest.TestCase, TestHelper):
yaml.dump(config, Dumper=confit.Dumper, encoding='utf-8'))
config_file.close()
# Launch BPD in a new process:
env = dict(os.environ.items())
env['PYTHONPATH'] = ':'.join(
[self.temp_dir.decode('utf-8')] + sys.path[1:]).encode('utf-8')
beet = os.path.join(os.path.dirname(__file__), '..', 'beet')
server = subprocess.Popen([
beet,
# Fork and launch BPD in the new process:
args = (
'--library', self.config['library'].as_filename(),
'--directory', self.libdir,
'--config', config_file.name,
'--directory', py3_path(self.libdir),
'--config', py3_path(config_file.name),
'--verbose', '--verbose',
'bpd', '--debug',
], env=env)
'bpd', '--debug'
)
server = mp.Process(target=start_beets, args=args)
server.start()
# Wait until the socket is bound:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -286,7 +281,7 @@ class BPDTest(unittest.TestCase, TestHelper):
yield MPCClient(host, port, do_hello)
finally:
server.terminate()
server.wait(timeout=0.1)
server.join(timeout=0.1)
server.kill()
def test_server_hello(self):