Fix lint issues

- Fix imports
- Fix pytest issues
- Do not assign lambda as variable
- Use isinstance instead of type to check type
- Rename ambiguously named variables
- Name custom errors with Error suffix
This commit is contained in:
Šarūnas Nejus 2024-09-17 12:57:12 +01:00
parent 85a17ee503
commit f36bc497c8
No known key found for this signature in database
GPG key ID: DD28F6704DBE3435
29 changed files with 204 additions and 167 deletions

View file

@ -20,15 +20,30 @@ from beets import config, logging
from beets.library import Album, Item
# Parts of external interface.
from .hooks import ( # noqa
AlbumInfo,
AlbumMatch,
Distance,
TrackInfo,
TrackMatch,
from .hooks import AlbumInfo, AlbumMatch, Distance, TrackInfo, TrackMatch
from .match import (
Proposal,
Recommendation,
current_metadata,
tag_album,
tag_item,
)
from .match import Recommendation # noqa
from .match import Proposal, current_metadata, tag_album, tag_item # noqa
__all__ = [
"AlbumInfo",
"AlbumMatch",
"Distance",
"TrackInfo",
"TrackMatch",
"Proposal",
"Recommendation",
"apply_album_metadata",
"apply_item_metadata",
"apply_metadata",
"current_metadata",
"tag_album",
"tag_item",
]
# Global logger.
log = logging.getLogger("beets")

View file

@ -54,7 +54,7 @@ FIELDS_TO_MB_KEYS = {
musicbrainzngs.set_useragent("beets", beets.__version__, "https://beets.io/")
class MusicBrainzAPIError(util.HumanReadableException):
class MusicBrainzAPIError(util.HumanReadableError):
"""An error while talking to MusicBrainz. The `query` field is the
parameter to the action and may have any type.
"""

View file

@ -74,7 +74,7 @@ REIMPORT_FRESH_FIELDS_ITEM = list(REIMPORT_FRESH_FIELDS_ALBUM)
log = logging.getLogger("beets")
class ImportAbort(Exception):
class ImportAbortError(Exception):
"""Raised when the user aborts the tagging operation."""
pass
@ -360,7 +360,7 @@ class ImportSession:
pl.run_parallel(QUEUE_SIZE)
else:
pl.run_sequential()
except ImportAbort:
except ImportAbortError:
# User aborted operation. Silently stop.
pass
@ -946,7 +946,7 @@ class ImportTask(BaseImportTask):
dup_item.remove()
log.debug(
"{0} of {1} items replaced",
sum(bool(l) for l in self.replaced_items.values()),
sum(bool(v) for v in self.replaced_items.values()),
len(self.imported_items()),
)

View file

@ -35,7 +35,7 @@ LASTFM_KEY = "2dc3914abf35f0d9c92d97d8f8e42b43"
log = logging.getLogger("beets")
class PluginConflictException(Exception):
class PluginConflictError(Exception):
"""Indicates that the services provided by one plugin conflict with
those of another.
@ -342,7 +342,7 @@ def types(model_cls):
plugin_types = getattr(plugin, attr_name, {})
for field in plugin_types:
if field in types and plugin_types[field] != types[field]:
raise PluginConflictException(
raise PluginConflictError(
"Plugin {} defines flexible field {} "
"which has already been defined with "
"another type.".format(plugin.name, field)
@ -446,13 +446,13 @@ def import_stages():
def _check_conflicts_and_merge(plugin, plugin_funcs, funcs):
"""Check the provided template functions for conflicts and merge into funcs.
Raises a `PluginConflictException` if a plugin defines template functions
Raises a `PluginConflictError` if a plugin defines template functions
for fields that another plugin has already defined template functions for.
"""
if plugin_funcs:
if not plugin_funcs.keys().isdisjoint(funcs.keys()):
conflicted_fields = ", ".join(plugin_funcs.keys() & funcs.keys())
raise PluginConflictException(
raise PluginConflictError(
f"Plugin {plugin.name} defines template functions for "
f"{conflicted_fields} that conflict with another plugin."
)

View file

@ -171,7 +171,7 @@ class Assertions:
# Mock I/O.
class InputException(Exception):
class InputError(Exception):
def __init__(self, output=None):
self.output = output
@ -218,9 +218,9 @@ class DummyIn:
def readline(self):
if not self.buf:
if self.out:
raise InputException(self.out.get())
raise InputError(self.out.get())
else:
raise InputException()
raise InputError()
self.reads += 1
return self.buf.pop(0)

View file

@ -1864,7 +1864,7 @@ def main(args=None):
message = exc.args[0] if exc.args else None
log.error("error: {0}", message)
sys.exit(1)
except util.HumanReadableException as exc:
except util.HumanReadableError as exc:
exc.log(log)
sys.exit(1)
except library.FileOperationError as exc:

View file

@ -1026,7 +1026,7 @@ def manual_id(session, task):
def abort_action(session, task):
"""A prompt choice callback that aborts the importer."""
raise importer.ImportAbort()
raise importer.ImportAbortError()
class TerminalImportSession(importer.ImportSession):
@ -1056,7 +1056,7 @@ class TerminalImportSession(importer.ImportSession):
if len(actions) == 1:
return actions[0]
elif len(actions) > 1:
raise plugins.PluginConflictException(
raise plugins.PluginConflictError(
"Only one handler for `import_task_before_choice` may return "
"an action."
)

View file

@ -67,7 +67,7 @@ PathLike = Union[BytesOrStr, Path]
Replacements: TypeAlias = "Sequence[tuple[Pattern[str], str]]"
class HumanReadableException(Exception):
class HumanReadableError(Exception):
"""An Exception that can include a human-readable error message to
be logged without a traceback. Can preserve a traceback for
debugging purposes as well.
@ -123,7 +123,7 @@ class HumanReadableException(Exception):
logger.error("{0}: {1}", self.error_kind, self.args[0])
class FilesystemError(HumanReadableException):
class FilesystemError(HumanReadableError):
"""An error that occurred while performing a filesystem manipulation
via a function in this module. The `paths` field is a sequence of
pathnames involved in the operation.

View file

@ -203,7 +203,7 @@ def _event_select(events):
return ready_events
class ThreadException(Exception):
class ThreadError(Exception):
def __init__(self, coro, exc_info):
self.coro = coro
self.exc_info = exc_info
@ -266,7 +266,7 @@ def run(root_coro):
"""After an event is fired, run a given coroutine associated with
it in the threads dict until it yields again. If the coroutine
exits, then the thread is removed from the pool. If the coroutine
raises an exception, it is reraised in a ThreadException. If
raises an exception, it is reraised in a ThreadError. If
is_exc is True, then the value must be an exc_info tuple and the
exception is thrown into the coroutine.
"""
@ -281,7 +281,7 @@ def run(root_coro):
except BaseException:
# Thread raised some other exception.
del threads[coro]
raise ThreadException(coro, sys.exc_info())
raise ThreadError(coro, sys.exc_info())
else:
if isinstance(next_event, types.GeneratorType):
# Automatically invoke sub-coroutines. (Shorthand for
@ -369,7 +369,7 @@ def run(root_coro):
else:
advance_thread(event2coro[event], value)
except ThreadException as te:
except ThreadError as te:
# Exception raised from inside a thread.
event = ExceptionEvent(te.exc_info)
if te.coro in delegators:

View file

@ -28,7 +28,7 @@ from beets.ui import Subcommand
from beets.util import displayable_path, par_map
class CheckerCommandException(Exception):
class CheckerCommandError(Exception):
"""Raised when running a checker failed.
Attributes:
@ -68,7 +68,7 @@ class BadFiles(BeetsPlugin):
errors = 1
status = e.returncode
except OSError as e:
raise CheckerCommandException(cmd, e)
raise CheckerCommandError(cmd, e)
output = output.decode(sys.getdefaultencoding(), "replace")
return status, errors, [line for line in output.split("\n") if line]
@ -126,7 +126,7 @@ class BadFiles(BeetsPlugin):
path = item.path.decode(sys.getfilesystemencoding())
try:
status, errors, output = checker(path)
except CheckerCommandException as e:
except CheckerCommandError as e:
if e.errno == errno.ENOENT:
self._log.error(
"command not found: {} when validating file: {}",
@ -198,7 +198,7 @@ class BadFiles(BeetsPlugin):
elif sel == "c":
return None
elif sel == "b":
raise importer.ImportAbort()
raise importer.ImportAbortError()
else:
raise Exception(f"Unexpected selection: {sel}")

View file

@ -167,13 +167,13 @@ def cast_arg(t, val):
raise ArgumentTypeError()
class BPDClose(Exception):
class BPDCloseError(Exception):
"""Raised by a command invocation to indicate that the connection
should be closed.
"""
class BPDIdle(Exception):
class BPDIdleError(Exception):
"""Raised by a command to indicate the client wants to enter the idle state
and should be notified when a relevant event happens.
"""
@ -348,7 +348,7 @@ class BaseServer:
for system in subsystems:
if system not in SUBSYSTEMS:
raise BPDError(ERROR_ARG, f"Unrecognised idle event: {system}")
raise BPDIdle(subsystems) # put the connection into idle mode
raise BPDIdleError(subsystems) # put the connection into idle mode
def cmd_kill(self, conn):
"""Exits the server process."""
@ -356,7 +356,7 @@ class BaseServer:
def cmd_close(self, conn):
"""Closes the connection."""
raise BPDClose()
raise BPDCloseError()
def cmd_password(self, conn, password):
"""Attempts password authentication."""
@ -772,8 +772,8 @@ class Connection:
if isinstance(lines, str):
lines = [lines]
out = NEWLINE.join(lines) + NEWLINE
for l in out.split(NEWLINE)[:-1]:
self.debug(l, kind=">")
for line in out.split(NEWLINE)[:-1]:
self.debug(line, kind=">")
if isinstance(out, str):
out = out.encode("utf-8")
return self.sock.sendall(out)
@ -852,8 +852,8 @@ class MPDConnection(Connection):
self.disconnect() # Client sent a blank line.
break
line = line.decode("utf8") # MPD protocol uses UTF-8.
for l in line.split(NEWLINE):
self.debug(l, kind="<")
for line in line.split(NEWLINE):
self.debug(line, kind="<")
if self.idle_subscriptions:
# The connection is in idle mode.
@ -887,12 +887,12 @@ class MPDConnection(Connection):
# Ordinary command.
try:
yield bluelet.call(self.do_command(Command(line)))
except BPDClose:
except BPDCloseError:
# Command indicates that the conn should close.
self.sock.close()
self.disconnect() # Client explicitly closed.
return
except BPDIdle as e:
except BPDIdleError as e:
self.idle_subscriptions = e.subsystems
self.debug(
"awaiting: {}".format(" ".join(e.subsystems)), kind="z"
@ -921,8 +921,8 @@ class ControlConnection(Connection):
if not line:
break # Client sent a blank line.
line = line.decode("utf8") # Protocol uses UTF-8.
for l in line.split(NEWLINE):
self.debug(l, kind="<")
for line in line.split(NEWLINE):
self.debug(line, kind="<")
command = Command(line)
try:
func = command.delegate("ctrl_", self)
@ -1045,12 +1045,12 @@ class Command:
e.cmd_name = self.name
raise e
except BPDClose:
except BPDCloseError:
# An indication that the connection should close. Send
# it on the Connection.
raise
except BPDIdle:
except BPDIdleError:
raise
except Exception:

View file

@ -95,8 +95,9 @@ def should_transcode(item, fmt):
query, _ = parse_query_string(query_string, Item)
if query.match(item):
return False
if config["convert"]["never_convert_lossy_files"] and not (
item.format.lower() in LOSSLESS_FORMATS
if (
config["convert"]["never_convert_lossy_files"]
and item.format.lower() not in LOSSLESS_FORMATS
):
return False
maxbr = config["convert"]["max_bitrate"].get(Optional(int))

View file

@ -303,7 +303,9 @@ class DuplicatesPlugin(BeetsPlugin):
kind = "items" if all(isinstance(o, Item) for o in objs) else "albums"
if tiebreak and kind in tiebreak.keys():
key = lambda x: tuple(getattr(x, k) for k in tiebreak[kind])
def key(x):
return tuple(getattr(x, k) for k in tiebreak[kind])
else:
if kind == "items":
@ -316,9 +318,13 @@ class DuplicatesPlugin(BeetsPlugin):
)
fields = Item.all_keys()
key = lambda x: sum(1 for f in fields if truthy(getattr(x, f)))
def key(x):
return sum(1 for f in fields if truthy(getattr(x, f)))
else:
key = lambda x: len(x.items())
def key(x):
return len(x.items())
return sorted(objs, key=key, reverse=True)

View file

@ -1061,7 +1061,7 @@ class LyricsPlugin(plugins.BeetsPlugin):
if any(lyrics):
break
lyrics = "\n\n---\n\n".join([l for l in lyrics if l])
lyrics = "\n\n---\n\n".join(filter(None, lyrics))
if lyrics:
self._log.info("fetched lyrics: {0}", item)

View file

@ -253,6 +253,7 @@ max-line-length = 88
[tool.ruff.lint.flake8-pytest-style]
fixture-parentheses = false
mark-parentheses = false
parametrize-names-type = "csv"
[tool.ruff.lint.flake8-unused-arguments]
ignore-variadic-names = true

View file

@ -15,10 +15,11 @@
import os
import sys
from test.plugins import test_lyrics
import requests
from test.plugins import test_lyrics
def mkdir_p(path):
try:

View file

@ -994,7 +994,7 @@ class DeprecatedConfigTest(BeetsTestCase):
self.plugin = fetchart.FetchArtPlugin()
def test_moves_filesystem_to_end(self):
assert type(self.plugin.sources[-1]) == fetchart.FileSystem
assert isinstance(self.plugin.sources[-1], fetchart.FileSystem)
class EnforceRatioConfigTest(BeetsTestCase):

View file

@ -17,7 +17,6 @@ import os.path
import shutil
import tempfile
import unittest
from test.test_art_resize import DummyIMBackend
from unittest.mock import MagicMock, patch
import pytest
@ -28,6 +27,7 @@ from beets.test import _common
from beets.test.helper import BeetsTestCase, FetchImageHelper, PluginMixin
from beets.util import bytestring_path, displayable_path, syspath
from beets.util.artresizer import ArtResizer
from test.test_art_resize import DummyIMBackend
def require_artresizer_compare(test):

View file

@ -82,14 +82,14 @@ class FtInTitlePluginFunctional(PluginTestCase):
item = self._ft_add_item("/", "Alice ft Bob", "Song 1", "Alice")
self.run_command("ftintitle")
item.load()
self.assertEqual(item["artist"], "Alice ft Bob")
self.assertEqual(item["title"], "Song 1 feat. Bob")
assert item["artist"] == "Alice ft Bob"
assert item["title"] == "Song 1 feat. Bob"
item = self._ft_add_item("/", "Alice ft Bob", "Song 1", "Alice")
self.run_command("ftintitle", "-d")
item.load()
self.assertEqual(item["artist"], "Alice ft Bob")
self.assertEqual(item["title"], "Song 1")
assert item["artist"] == "Alice ft Bob"
assert item["title"] == "Song 1"
class FtInTitlePluginTest(unittest.TestCase):

View file

@ -33,9 +33,9 @@ from beetsplug.thumbnails import (
class ThumbnailsTest(BeetsTestCase):
@patch("beetsplug.thumbnails.ArtResizer")
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok")
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok", Mock())
@patch("beetsplug.thumbnails.os.stat")
def test_add_tags(self, mock_stat, _, mock_artresizer):
def test_add_tags(self, mock_stat, mock_artresizer):
plugin = ThumbnailsPlugin()
plugin.get_uri = Mock(
side_effect={b"/path/to/cover": "COVER_URI"}.__getitem__
@ -98,13 +98,13 @@ class ThumbnailsTest(BeetsTestCase):
giouri_inst.available = False
assert ThumbnailsPlugin().get_uri.__self__.__class__ == PathlibURI
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok")
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok", Mock())
@patch("beetsplug.thumbnails.ArtResizer")
@patch("beetsplug.thumbnails.util")
@patch("beetsplug.thumbnails.os")
@patch("beetsplug.thumbnails.shutil")
def test_make_cover_thumbnail(
self, mock_shutils, mock_os, mock_util, mock_artresizer, _
self, mock_shutils, mock_os, mock_util, mock_artresizer
):
thumbnail_dir = os.path.normpath(b"/thumbnail/dir")
md5_file = os.path.join(thumbnail_dir, b"md5")
@ -166,8 +166,8 @@ class ThumbnailsTest(BeetsTestCase):
plugin.make_cover_thumbnail(album, 12345, thumbnail_dir)
mock_resize.assert_called_once_with(12345, path_to_art, md5_file)
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok")
def test_make_dolphin_cover_thumbnail(self, _):
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok", Mock())
def test_make_dolphin_cover_thumbnail(self):
plugin = ThumbnailsPlugin()
tmp = bytestring_path(mkdtemp())
album = Mock(path=tmp, artpath=os.path.join(tmp, b"cover.jpg"))
@ -189,9 +189,9 @@ class ThumbnailsTest(BeetsTestCase):
rmtree(syspath(tmp))
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok")
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok", Mock())
@patch("beetsplug.thumbnails.ArtResizer")
def test_process_album(self, mock_artresizer, _):
def test_process_album(self, mock_artresizer):
get_size = mock_artresizer.shared.get_size
plugin = ThumbnailsPlugin()
@ -234,9 +234,9 @@ class ThumbnailsTest(BeetsTestCase):
any_order=True,
)
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok")
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok", Mock())
@patch("beetsplug.thumbnails.decargs")
def test_invokations(self, mock_decargs, _):
def test_invokations(self, mock_decargs):
plugin = ThumbnailsPlugin()
plugin.process_album = Mock()
album = Mock()

View file

@ -47,7 +47,7 @@ class PluralityTest(BeetsTestCase):
assert freq == 2
def test_plurality_empty_sequence_raises_error(self):
with pytest.raises(ValueError):
with pytest.raises(ValueError, match="must be non-empty"):
plurality([])
def test_current_metadata_finds_pluralities(self):

View file

@ -112,10 +112,10 @@ class ConfigCommandTest(BeetsTestCase):
def test_config_editor_not_found(self):
msg_match = "Could not edit configuration.*here is problem"
with pytest.raises(ui.UserError, match=msg_match):
with patch("os.execlp") as execlp:
execlp.side_effect = OSError("here is problem")
self.run_command("config", "-e")
with patch(
"os.execlp", side_effect=OSError("here is problem")
), pytest.raises(ui.UserError, match=msg_match):
self.run_command("config", "-e")
def test_edit_invalid_config_file(self):
with open(self.config_path, "w") as file:

View file

@ -771,7 +771,8 @@ class ImportCompilationTest(ImportTestCase):
asserted_multi_artists_1 = True
assert item.artists == ["Another Artist", "Another Artist 2"]
assert asserted_multi_artists_0 and asserted_multi_artists_1
assert asserted_multi_artists_0
assert asserted_multi_artists_1
class ImportExistingTest(ImportTestCase):
@ -1320,7 +1321,7 @@ class ResumeImportTest(ImportTestCase):
# the first album in the second try.
def raise_exception(event, **kwargs):
if event == "album_imported":
raise importer.ImportAbort
raise importer.ImportAbortError
plugins_send.side_effect = raise_exception
@ -1343,7 +1344,7 @@ class ResumeImportTest(ImportTestCase):
# the first album in the second try.
def raise_exception(event, **kwargs):
if event == "item_imported":
raise importer.ImportAbort
raise importer.ImportAbortError
plugins_send.side_effect = raise_exception

View file

@ -39,14 +39,14 @@ class LoggingTest(BeetsTestCase):
assert l1 != l6
def test_str_format_logging(self):
l = blog.getLogger("baz123")
logger = blog.getLogger("baz123")
stream = StringIO()
handler = log.StreamHandler(stream)
l.addHandler(handler)
l.propagate = False
logger.addHandler(handler)
logger.propagate = False
l.warning("foo {0} {bar}", "oof", bar="baz")
logger.warning("foo {0} {bar}", "oof", bar="baz")
handler.flush()
assert stream.getvalue(), "foo oof baz"
@ -265,9 +265,9 @@ class ConcurrentEventsTest(AsIsImporterMixin, ImportTestCase):
blog.getLogger("beets").set_global_level(blog.INFO)
with helper.capture_log() as logs:
self.run_asis_importer()
for l in logs:
assert "import" in l
assert "album" in l
for line in logs:
assert "import" in line
assert "album" in line
blog.getLogger("beets").set_global_level(blog.DEBUG)
with helper.capture_log() as logs:

View file

@ -33,14 +33,14 @@ def _work():
i *= 2
def _consume(l):
def _consume(result):
while True:
i = yield
l.append(i)
result.append(i)
# A worker that raises an exception.
class ExceptionFixture(Exception):
class PipelineError(Exception):
pass
@ -49,7 +49,7 @@ def _exc_work(num=3):
while True:
i = yield i
if i == num:
raise ExceptionFixture()
raise PipelineError()
i *= 2
@ -74,16 +74,18 @@ def _multi_work():
class SimplePipelineTest(unittest.TestCase):
def setUp(self):
self.l = []
self.pl = pipeline.Pipeline((_produce(), _work(), _consume(self.l)))
self.result = []
self.pl = pipeline.Pipeline(
(_produce(), _work(), _consume(self.result))
)
def test_run_sequential(self):
self.pl.run_sequential()
assert self.l == [0, 2, 4, 6, 8]
assert self.result == [0, 2, 4, 6, 8]
def test_run_parallel(self):
self.pl.run_parallel()
assert self.l == [0, 2, 4, 6, 8]
assert self.result == [0, 2, 4, 6, 8]
def test_pull(self):
pl = pipeline.Pipeline((_produce(), _work()))
@ -97,19 +99,19 @@ class SimplePipelineTest(unittest.TestCase):
class ParallelStageTest(unittest.TestCase):
def setUp(self):
self.l = []
self.result = []
self.pl = pipeline.Pipeline(
(_produce(), (_work(), _work()), _consume(self.l))
(_produce(), (_work(), _work()), _consume(self.result))
)
def test_run_sequential(self):
self.pl.run_sequential()
assert self.l == [0, 2, 4, 6, 8]
assert self.result == [0, 2, 4, 6, 8]
def test_run_parallel(self):
self.pl.run_parallel()
# Order possibly not preserved; use set equality.
assert set(self.l) == {0, 2, 4, 6, 8}
assert set(self.result) == {0, 2, 4, 6, 8}
def test_pull(self):
pl = pipeline.Pipeline((_produce(), (_work(), _work())))
@ -118,15 +120,17 @@ class ParallelStageTest(unittest.TestCase):
class ExceptionTest(unittest.TestCase):
def setUp(self):
self.l = []
self.pl = pipeline.Pipeline((_produce(), _exc_work(), _consume(self.l)))
self.result = []
self.pl = pipeline.Pipeline(
(_produce(), _exc_work(), _consume(self.result))
)
def test_run_sequential(self):
with pytest.raises(ExceptionFixture):
with pytest.raises(PipelineError):
self.pl.run_sequential()
def test_run_parallel(self):
with pytest.raises(ExceptionFixture):
with pytest.raises(PipelineError):
self.pl.run_parallel()
def test_pull(self):
@ -134,59 +138,65 @@ class ExceptionTest(unittest.TestCase):
pull = pl.pull()
for i in range(3):
next(pull)
with pytest.raises(ExceptionFixture):
with pytest.raises(PipelineError):
next(pull)
class ParallelExceptionTest(unittest.TestCase):
def setUp(self):
self.l = []
self.result = []
self.pl = pipeline.Pipeline(
(_produce(), (_exc_work(), _exc_work()), _consume(self.l))
(_produce(), (_exc_work(), _exc_work()), _consume(self.result))
)
def test_run_parallel(self):
with pytest.raises(ExceptionFixture):
with pytest.raises(PipelineError):
self.pl.run_parallel()
class ConstrainedThreadedPipelineTest(unittest.TestCase):
def setUp(self):
self.result = []
def test_constrained(self):
l = []
# Do a "significant" amount of work...
pl = pipeline.Pipeline((_produce(1000), _work(), _consume(l)))
self.pl = pipeline.Pipeline(
(_produce(1000), _work(), _consume(self.result))
)
# ... with only a single queue slot.
pl.run_parallel(1)
assert l == [i * 2 for i in range(1000)]
self.pl.run_parallel(1)
assert self.result == [i * 2 for i in range(1000)]
def test_constrained_exception(self):
# Raise an exception in a constrained pipeline.
l = []
pl = pipeline.Pipeline((_produce(1000), _exc_work(), _consume(l)))
with pytest.raises(ExceptionFixture):
pl.run_parallel(1)
self.pl = pipeline.Pipeline(
(_produce(1000), _exc_work(), _consume(self.result))
)
with pytest.raises(PipelineError):
self.pl.run_parallel(1)
def test_constrained_parallel(self):
l = []
pl = pipeline.Pipeline(
(_produce(1000), (_work(), _work()), _consume(l))
self.pl = pipeline.Pipeline(
(_produce(1000), (_work(), _work()), _consume(self.result))
)
pl.run_parallel(1)
assert set(l) == {i * 2 for i in range(1000)}
self.pl.run_parallel(1)
assert set(self.result) == {i * 2 for i in range(1000)}
class BubbleTest(unittest.TestCase):
def setUp(self):
self.l = []
self.pl = pipeline.Pipeline((_produce(), _bub_work(), _consume(self.l)))
self.result = []
self.pl = pipeline.Pipeline(
(_produce(), _bub_work(), _consume(self.result))
)
def test_run_sequential(self):
self.pl.run_sequential()
assert self.l == [0, 2, 4, 8]
assert self.result == [0, 2, 4, 8]
def test_run_parallel(self):
self.pl.run_parallel()
assert self.l == [0, 2, 4, 8]
assert self.result == [0, 2, 4, 8]
def test_pull(self):
pl = pipeline.Pipeline((_produce(), _bub_work()))
@ -195,18 +205,18 @@ class BubbleTest(unittest.TestCase):
class MultiMessageTest(unittest.TestCase):
def setUp(self):
self.l = []
self.result = []
self.pl = pipeline.Pipeline(
(_produce(), _multi_work(), _consume(self.l))
(_produce(), _multi_work(), _consume(self.result))
)
def test_run_sequential(self):
self.pl.run_sequential()
assert self.l == [0, 0, 1, -1, 2, -2, 3, -3, 4, -4]
assert self.result == [0, 0, 1, -1, 2, -2, 3, -3, 4, -4]
def test_run_parallel(self):
self.pl.run_parallel()
assert self.l == [0, 0, 1, -1, 2, -2, 3, -3, 4, -4]
assert self.result == [0, 0, 1, -1, 2, -2, 3, -3, 4, -4]
def test_pull(self):
pl = pipeline.Pipeline((_produce(), _multi_work()))

View file

@ -32,9 +32,8 @@ from beets.importer import (
from beets.library import Item
from beets.plugins import MetadataSourcePlugin
from beets.test import helper
from beets.test.helper import AutotagStub, ImportHelper
from beets.test.helper import AutotagStub, ImportHelper, TerminalImportMixin
from beets.test.helper import PluginTestCase as BasePluginTestCase
from beets.test.helper import TerminalImportMixin
from beets.util import displayable_path, syspath
from beets.util.id_extractors import (
beatport_id_regex,
@ -142,7 +141,7 @@ class ItemTypeConflictTest(PluginLoaderTestCase):
self.advent_listener_plugin = AdventListenerPlugin
self.register_plugin(EventListenerPlugin)
self.register_plugin(AdventListenerPlugin)
with pytest.raises(plugins.PluginConflictException):
with pytest.raises(plugins.PluginConflictError):
plugins.types(Item)
def test_match(self):

View file

@ -1235,21 +1235,25 @@ class ShowChangeTest(BeetsTestCase):
def test_item_data_change(self):
self.items[0].title = "different"
msg = self._show_change()
assert "different" in msg and "the title" in msg
assert "different" in msg
assert "the title" in msg
def test_item_data_change_with_unicode(self):
self.items[0].title = "caf\xe9"
msg = self._show_change()
assert "caf\xe9" in msg and "the title" in msg
assert "caf\xe9" in msg
assert "the title" in msg
def test_album_data_change_with_unicode(self):
msg = self._show_change(cur_artist="caf\xe9", cur_album="another album")
assert "caf\xe9" in msg and "the artist" in msg
assert "caf\xe9" in msg
assert "the artist" in msg
def test_item_data_change_title_missing(self):
self.items[0].title = ""
msg = re.sub(r" +", " ", self._show_change())
assert "file.mp3" in msg and "the title" in msg
assert "file.mp3" in msg
assert "the title" in msg
def test_item_data_change_title_missing_with_unicode_filename(self):
self.items[0].title = ""
@ -1454,69 +1458,69 @@ class CommonOptionsParserCliTest(BeetsTestCase):
self.lib.add_album([self.item])
def test_base(self):
l = self.run_with_output("ls")
assert l == "the artist - the album - the title\n"
output = self.run_with_output("ls")
assert output == "the artist - the album - the title\n"
l = self.run_with_output("ls", "-a")
assert l == "the album artist - the album\n"
output = self.run_with_output("ls", "-a")
assert output == "the album artist - the album\n"
def test_path_option(self):
l = self.run_with_output("ls", "-p")
assert l == "xxx/yyy\n"
output = self.run_with_output("ls", "-p")
assert output == "xxx/yyy\n"
l = self.run_with_output("ls", "-a", "-p")
assert l == "xxx\n"
output = self.run_with_output("ls", "-a", "-p")
assert output == "xxx\n"
def test_format_option(self):
l = self.run_with_output("ls", "-f", "$artist")
assert l == "the artist\n"
output = self.run_with_output("ls", "-f", "$artist")
assert output == "the artist\n"
l = self.run_with_output("ls", "-a", "-f", "$albumartist")
assert l == "the album artist\n"
output = self.run_with_output("ls", "-a", "-f", "$albumartist")
assert output == "the album artist\n"
def test_format_option_unicode(self):
l = self.run_with_output(
output = self.run_with_output(
b"ls", b"-f", "caf\xe9".encode(util.arg_encoding())
)
assert l == "caf\xe9\n"
assert output == "caf\xe9\n"
def test_root_format_option(self):
l = self.run_with_output(
output = self.run_with_output(
"--format-item", "$artist", "--format-album", "foo", "ls"
)
assert l == "the artist\n"
assert output == "the artist\n"
l = self.run_with_output(
output = self.run_with_output(
"--format-item", "foo", "--format-album", "$albumartist", "ls", "-a"
)
assert l == "the album artist\n"
assert output == "the album artist\n"
def test_help(self):
l = self.run_with_output("help")
assert "Usage:" in l
output = self.run_with_output("help")
assert "Usage:" in output
l = self.run_with_output("help", "list")
assert "Usage:" in l
output = self.run_with_output("help", "list")
assert "Usage:" in output
with pytest.raises(ui.UserError):
self.run_command("help", "this.is.not.a.real.command")
def test_stats(self):
l = self.run_with_output("stats")
assert "Approximate total size:" in l
output = self.run_with_output("stats")
assert "Approximate total size:" in output
# # Need to have more realistic library setup for this to work
# l = self.run_with_output('stats', '-e')
# assert 'Total size:' in l
# output = self.run_with_output('stats', '-e')
# assert 'Total size:' in output
def test_version(self):
l = self.run_with_output("version")
assert "Python version" in l
assert "no plugins loaded" in l
output = self.run_with_output("version")
assert "Python version" in output
assert "no plugins loaded" in output
# # Need to have plugin loaded
# l = self.run_with_output('version')
# assert 'plugins: ' in l
# output = self.run_with_output('version')
# assert 'plugins: ' in output
class CommonOptionsParserTest(BeetsTestCase):

View file

@ -85,10 +85,10 @@ class FieldsTest(ItemInDBTestCase):
super().tearDown()
self.io.restore()
def remove_keys(self, l, text):
def remove_keys(self, keys, text):
for i in text:
try:
l.remove(i)
keys.remove(i)
except ValueError:
pass

View file

@ -18,9 +18,8 @@ test_importer module. But here the test importer inherits from
``TerminalImportSession``. So we test this class, too.
"""
from test import test_importer
from beets.test.helper import TerminalImportMixin
from test import test_importer
class NonAutotaggedImportTest(