Replace custom unittest-like methods with assertions (#5854)

## Replace custom assertion methods with standard assertions

This PR is part of `unittest` -> `pytest` migration #5361 and removes
custom assertion methods from the test suite and replaces them with
standard Python assertions.

### Key Changes

- Removed custom assertion methods
- Updated path handling to use `pathlib.Path` wherever this was relevant
to the methods being replaced
- Simplified some of the tests structure
This commit is contained in:
Šarūnas Nejus 2025-07-09 13:24:50 +01:00 committed by GitHub
commit 8a43133bbe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 682 additions and 1069 deletions

View file

@ -52,7 +52,7 @@ jobs:
- if: ${{ env.IS_MAIN_PYTHON != 'true' }}
name: Test without coverage
run: |
poetry install --extras=autobpm --extras=lyrics
poetry install --extras=autobpm --extras=lyrics --extras=embedart
poe test
- if: ${{ env.IS_MAIN_PYTHON == 'true' }}

View file

@ -45,6 +45,11 @@ class LibModel(dbcore.Model["Library"]):
def writable_media_fields(cls) -> set[str]:
return set(MediaFile.fields()) & cls._fields.keys()
@property
def filepath(self) -> Path:
"""The path to the entity as pathlib.Path."""
return Path(os.fsdecode(self.path))
def _template_funcs(self):
funcs = DefaultTemplateFunctions(self, self._db).functions()
funcs.update(plugins.template_funcs())
@ -207,6 +212,8 @@ class Album(LibModel):
Reflects the library's "albums" table, including album art.
"""
artpath: bytes
_table = "albums"
_flex_table = "album_attributes"
_always_dirty = True
@ -331,6 +338,11 @@ class Album(LibModel):
f"ON {cls._table}.id = {cls._relation._table}.album_id"
)
@property
def art_filepath(self) -> Path | None:
"""The path to album's cover picture as pathlib.Path."""
return Path(os.fsdecode(self.artpath)) if self.artpath else None
@classmethod
def _getters(cls):
# In addition to plugin-provided computed fields, also expose
@ -748,11 +760,6 @@ class Item(LibModel):
f"ON {cls._table}.album_id = {cls._relation._table}.id"
)
@property
def filepath(self) -> Path:
"""The path to the item's file as pathlib.Path."""
return Path(os.fsdecode(self.path))
@property
def _cached_album(self):
"""The Album object that this item belongs to, if any, or

View file

@ -111,34 +111,6 @@ def import_session(lib=None, loghandler=None, paths=[], query=[], cli=False):
return cls(lib, loghandler, paths, query)
class Assertions:
"""A mixin with additional unit test assertions."""
def assertExists(self, path):
assert os.path.exists(syspath(path)), f"file does not exist: {path!r}"
def assertNotExists(self, path):
assert not os.path.exists(syspath(path)), f"file exists: {path!r}"
def assertIsFile(self, path):
self.assertExists(path)
assert os.path.isfile(syspath(path)), (
"path exists, but is not a regular file: {!r}".format(path)
)
def assertIsDir(self, path):
self.assertExists(path)
assert os.path.isdir(syspath(path)), (
"path exists, but is not a directory: {!r}".format(path)
)
def assert_equal_path(self, a, b):
"""Check that two paths are equal."""
a_bytes, b_bytes = util.normpath(a), util.normpath(b)
assert a_bytes == b_bytes, f"{a_bytes=} != {b_bytes=}"
# Mock I/O.

View file

@ -163,15 +163,49 @@ NEEDS_REFLINK = unittest.skipUnless(
)
class TestHelper(_common.Assertions, ConfigMixin):
class IOMixin:
@cached_property
def io(self) -> _common.DummyIO:
return _common.DummyIO()
def setUp(self):
super().setUp()
self.io.install()
def tearDown(self):
super().tearDown()
self.io.restore()
class TestHelper(ConfigMixin):
"""Helper mixin for high-level cli and plugin tests.
This mixin provides methods to isolate beets' global state provide
fixtures.
"""
resource_path = Path(os.fsdecode(_common.RSRC)) / "full.mp3"
db_on_disk: ClassVar[bool] = False
@cached_property
def temp_dir_path(self) -> Path:
return Path(self.create_temp_dir())
@cached_property
def temp_dir(self) -> bytes:
return util.bytestring_path(self.temp_dir_path)
@cached_property
def lib_path(self) -> Path:
lib_path = self.temp_dir_path / "libdir"
lib_path.mkdir(exist_ok=True)
return lib_path
@cached_property
def libdir(self) -> bytes:
return bytestring_path(self.lib_path)
# TODO automate teardown through hook registration
def setup_beets(self):
@ -194,8 +228,7 @@ class TestHelper(_common.Assertions, ConfigMixin):
Make sure you call ``teardown_beets()`` afterwards.
"""
self.create_temp_dir()
temp_dir_str = os.fsdecode(self.temp_dir)
temp_dir_str = str(self.temp_dir_path)
self.env_patcher = patch.dict(
"os.environ",
{
@ -205,9 +238,7 @@ class TestHelper(_common.Assertions, ConfigMixin):
)
self.env_patcher.start()
self.libdir = os.path.join(self.temp_dir, b"libdir")
os.mkdir(syspath(self.libdir))
self.config["directory"] = os.fsdecode(self.libdir)
self.config["directory"] = str(self.lib_path)
if self.db_on_disk:
dbpath = util.bytestring_path(self.config["library"].as_filename())
@ -215,12 +246,8 @@ class TestHelper(_common.Assertions, ConfigMixin):
dbpath = ":memory:"
self.lib = Library(dbpath, self.libdir)
# Initialize, but don't install, a DummyIO.
self.io = _common.DummyIO()
def teardown_beets(self):
self.env_patcher.stop()
self.io.restore()
self.lib._close()
self.remove_temp_dir()
@ -384,16 +411,12 @@ class TestHelper(_common.Assertions, ConfigMixin):
# Safe file operations
def create_temp_dir(self, **kwargs):
"""Create a temporary directory and assign it into
`self.temp_dir`. Call `remove_temp_dir` later to delete it.
"""
temp_dir = mkdtemp(**kwargs)
self.temp_dir = util.bytestring_path(temp_dir)
def create_temp_dir(self, **kwargs) -> str:
return mkdtemp(**kwargs)
def remove_temp_dir(self):
"""Delete the temporary directory created by `create_temp_dir`."""
shutil.rmtree(syspath(self.temp_dir))
shutil.rmtree(self.temp_dir_path)
def touch(self, path, dir=None, content=""):
"""Create a file at `path` with given content.
@ -514,7 +537,6 @@ class ImportHelper(TestHelper):
autotagging library and several assertions for the library.
"""
resource_path = syspath(os.path.join(_common.RSRC, b"full.mp3"))
default_import_config = {
"autotag": True,
"copy": True,
@ -531,7 +553,7 @@ class ImportHelper(TestHelper):
@cached_property
def import_path(self) -> Path:
import_path = Path(os.fsdecode(self.temp_dir)) / "import"
import_path = self.temp_dir_path / "import"
import_path.mkdir(exist_ok=True)
return import_path
@ -599,7 +621,7 @@ class ImportHelper(TestHelper):
]
def prepare_albums_for_import(self, count: int = 1) -> None:
album_dirs = Path(os.fsdecode(self.import_dir)).glob("album_*")
album_dirs = self.import_path.glob("album_*")
base_idx = int(str(max(album_dirs, default="0")).split("_")[-1]) + 1
for album_id in range(base_idx, count + base_idx):
@ -623,21 +645,6 @@ class ImportHelper(TestHelper):
def setup_singleton_importer(self, **kwargs) -> ImportSession:
return self.setup_importer(singletons=True, **kwargs)
def assert_file_in_lib(self, *segments):
"""Join the ``segments`` and assert that this path exists in the
library directory.
"""
self.assertExists(os.path.join(self.libdir, *segments))
def assert_file_not_in_lib(self, *segments):
"""Join the ``segments`` and assert that this path does not
exist in the library directory.
"""
self.assertNotExists(os.path.join(self.libdir, *segments))
def assert_lib_dir_empty(self):
assert not os.listdir(syspath(self.libdir))
class AsIsImporterMixin:
def setUp(self):
@ -759,7 +766,7 @@ class TerminalImportSessionFixture(TerminalImportSession):
self._add_choice_input()
class TerminalImportMixin(ImportHelper):
class TerminalImportMixin(IOMixin, ImportHelper):
"""Provides_a terminal importer for the import session."""
io: _common.DummyIO

View file

@ -2122,12 +2122,20 @@ default_commands.append(modify_cmd)
def move_items(
lib, dest, query, copy, album, pretend, confirm=False, export=False
lib,
dest_path: util.PathLike,
query,
copy,
album,
pretend,
confirm=False,
export=False,
):
"""Moves or copies items to a new base directory, given by dest. If
dest is None, then the library's base directory is used, making the
command "consolidate" files.
"""
dest = os.fsencode(dest_path) if dest_path else dest_path
items, albums = _do_query(lib, query, album, False)
objs = albums if album else items
num_objs = len(objs)

View file

@ -19,6 +19,7 @@ from __future__ import annotations
import os
import shutil
import unittest
from pathlib import Path
from typing import TYPE_CHECKING
from unittest.mock import patch
@ -244,13 +245,13 @@ class FetchImageTest(FetchImageTestCase):
self.mock_response(self.URL, "image/png")
self.source.fetch_image(self.candidate, self.settings)
assert os.path.splitext(self.candidate.path)[1] == b".png"
self.assertExists(self.candidate.path)
assert Path(os.fsdecode(self.candidate.path)).exists()
def test_does_not_rely_on_server_content_type(self):
self.mock_response(self.URL, "image/jpeg", "image/png")
self.source.fetch_image(self.candidate, self.settings)
assert os.path.splitext(self.candidate.path)[1] == b".png"
self.assertExists(self.candidate.path)
assert Path(os.fsdecode(self.candidate.path)).exists()
class FSArtTest(UseThePlugin):
@ -748,8 +749,8 @@ class ArtImporterTest(UseThePlugin):
super().setUp()
# Mock the album art fetcher to always return our test file.
self.art_file = os.path.join(self.temp_dir, b"tmpcover.jpg")
_common.touch(self.art_file)
self.art_file = self.temp_dir_path / "tmpcover.jpg"
self.art_file.touch()
self.old_afa = self.plugin.art_for_album
self.afa_response = fetchart.Candidate(
logger,
@ -804,12 +805,10 @@ class ArtImporterTest(UseThePlugin):
self.plugin.fetch_art(self.session, self.task)
self.plugin.assign_art(self.session, self.task)
artpath = self.lib.albums()[0].artpath
artpath = self.lib.albums()[0].art_filepath
if should_exist:
assert artpath == os.path.join(
os.path.dirname(self.i.path), b"cover.jpg"
)
self.assertExists(artpath)
assert artpath == self.i.filepath.parent / "cover.jpg"
assert artpath.exists()
else:
assert artpath is None
return artpath
@ -828,20 +827,20 @@ class ArtImporterTest(UseThePlugin):
def test_leave_original_file_in_place(self):
self._fetch_art(True)
self.assertExists(self.art_file)
assert self.art_file.exists()
def test_delete_original_file(self):
prev_move = config["import"]["move"].get()
try:
config["import"]["move"] = True
self._fetch_art(True)
self.assertNotExists(self.art_file)
assert not self.art_file.exists()
finally:
config["import"]["move"] = prev_move
def test_do_not_delete_original_if_already_in_place(self):
artdest = os.path.join(os.path.dirname(self.i.path), b"cover.jpg")
shutil.copyfile(syspath(self.art_file), syspath(artdest))
shutil.copyfile(self.art_file, syspath(artdest))
self.afa_response = fetchart.Candidate(
logger,
source_name="test",
@ -861,156 +860,135 @@ class ArtImporterTest(UseThePlugin):
self.plugin.batch_fetch_art(
self.lib, self.lib.albums(), force=False, quiet=False
)
self.assertExists(self.album.artpath)
assert self.album.art_filepath.exists()
class ArtForAlbumTest(UseThePlugin):
"""Tests that fetchart.art_for_album respects the scale & filesize
configurations (e.g., minwidth, enforce_ratio, max_filesize)
class AlbumArtOperationTestCase(UseThePlugin):
"""Base test case for album art operations.
Provides common setup for testing album art processing operations by setting
up a mock filesystem source that returns a predefined test image.
"""
IMG_225x225 = os.path.join(_common.RSRC, b"abbey.jpg")
IMG_348x348 = os.path.join(_common.RSRC, b"abbey-different.jpg")
IMG_500x490 = os.path.join(_common.RSRC, b"abbey-similar.jpg")
IMAGE_PATH = os.path.join(_common.RSRC, b"abbey-similar.jpg")
IMAGE_FILESIZE = os.stat(util.syspath(IMAGE_PATH)).st_size
IMAGE_WIDTH = 500
IMAGE_HEIGHT = 490
IMAGE_WIDTH_HEIGHT_DIFF = IMAGE_WIDTH - IMAGE_HEIGHT
IMG_225x225_SIZE = os.stat(util.syspath(IMG_225x225)).st_size
IMG_348x348_SIZE = os.stat(util.syspath(IMG_348x348)).st_size
RESIZE_OP = "resize"
DEINTERLACE_OP = "deinterlace"
REFORMAT_OP = "reformat"
def setUp(self):
super().setUp()
self.old_fs_source_get = fetchart.FileSystem.get
@classmethod
def setUpClass(cls):
super().setUpClass()
def fs_source_get(_self, album, settings, paths):
if paths:
yield fetchart.Candidate(
logger, source_name=_self.ID, path=self.image_file
logger, source_name=_self.ID, path=cls.IMAGE_PATH
)
fetchart.FileSystem.get = fs_source_get
patch("beetsplug.fetchart.FileSystem.get", fs_source_get).start()
cls.addClassCleanup(patch.stopall)
self.album = _common.Bag()
def get_album_art(self):
return self.plugin.art_for_album(_common.Bag(), [""], True)
def tearDown(self):
fetchart.FileSystem.get = self.old_fs_source_get
super().tearDown()
def assertImageIsValidArt(self, image_file, should_exist):
self.assertExists(image_file)
self.image_file = image_file
class AlbumArtOperationConfigurationTest(AlbumArtOperationTestCase):
"""Check that scale & filesize configuration is respected.
candidate = self.plugin.art_for_album(self.album, [""], True)
Depending on `minwidth`, `enforce_ratio`, `margin_px`, and `margin_percent`
configuration the plugin should or should not return an art candidate.
"""
if should_exist:
assert candidate is not None
assert candidate.path == self.image_file
self.assertExists(candidate.path)
else:
assert candidate is None
def test_minwidth(self):
self.plugin.minwidth = self.IMAGE_WIDTH / 2
assert self.get_album_art()
def _assert_image_operated(self, image_file, operation, should_operate):
self.image_file = image_file
with patch.object(
ArtResizer.shared, operation, return_value=self.image_file
) as mock_operation:
self.plugin.art_for_album(self.album, [""], True)
assert mock_operation.called == should_operate
self.plugin.minwidth = self.IMAGE_WIDTH * 2
assert not self.get_album_art()
def _require_backend(self):
"""Skip the test if the art resizer doesn't have ImageMagick or
PIL (so comparisons and measurements are unavailable).
"""
if not ArtResizer.shared.local:
self.skipTest("ArtResizer has no local imaging backend available")
def test_respect_minwidth(self):
self._require_backend()
self.plugin.minwidth = 300
self.assertImageIsValidArt(self.IMG_225x225, False)
self.assertImageIsValidArt(self.IMG_348x348, True)
def test_respect_enforce_ratio_yes(self):
self._require_backend()
def test_enforce_ratio(self):
self.plugin.enforce_ratio = True
self.assertImageIsValidArt(self.IMG_500x490, False)
self.assertImageIsValidArt(self.IMG_225x225, True)
assert not self.get_album_art()
def test_respect_enforce_ratio_no(self):
self.plugin.enforce_ratio = False
self.assertImageIsValidArt(self.IMG_500x490, True)
assert self.get_album_art()
def test_respect_enforce_ratio_px_above(self):
self._require_backend()
def test_enforce_ratio_with_px_margin(self):
self.plugin.enforce_ratio = True
self.plugin.margin_px = 5
self.assertImageIsValidArt(self.IMG_500x490, False)
def test_respect_enforce_ratio_px_below(self):
self._require_backend()
self.plugin.margin_px = self.IMAGE_WIDTH_HEIGHT_DIFF * 0.5
assert not self.get_album_art()
self.plugin.margin_px = self.IMAGE_WIDTH_HEIGHT_DIFF * 1.5
assert self.get_album_art()
def test_enforce_ratio_with_percent_margin(self):
self.plugin.enforce_ratio = True
self.plugin.margin_px = 15
self.assertImageIsValidArt(self.IMG_500x490, True)
diff_by_width = self.IMAGE_WIDTH_HEIGHT_DIFF / self.IMAGE_WIDTH
def test_respect_enforce_ratio_percent_above(self):
self._require_backend()
self.plugin.enforce_ratio = True
self.plugin.margin_percent = (500 - 490) / 500 * 0.5
self.assertImageIsValidArt(self.IMG_500x490, False)
self.plugin.margin_percent = diff_by_width * 0.5
assert not self.get_album_art()
def test_respect_enforce_ratio_percent_below(self):
self._require_backend()
self.plugin.enforce_ratio = True
self.plugin.margin_percent = (500 - 490) / 500 * 1.5
self.assertImageIsValidArt(self.IMG_500x490, True)
self.plugin.margin_percent = diff_by_width * 1.5
assert self.get_album_art()
def test_resize_if_necessary(self):
self._require_backend()
self.plugin.maxwidth = 300
self._assert_image_operated(self.IMG_225x225, self.RESIZE_OP, False)
self._assert_image_operated(self.IMG_348x348, self.RESIZE_OP, True)
def test_fileresize(self):
self._require_backend()
self.plugin.max_filesize = self.IMG_225x225_SIZE // 2
self._assert_image_operated(self.IMG_225x225, self.RESIZE_OP, True)
class AlbumArtPerformOperationTest(AlbumArtOperationTestCase):
"""Test that the art is resized and deinterlaced if necessary."""
def test_fileresize_if_necessary(self):
self._require_backend()
self.plugin.max_filesize = self.IMG_225x225_SIZE
self._assert_image_operated(self.IMG_225x225, self.RESIZE_OP, False)
self.assertImageIsValidArt(self.IMG_225x225, True)
def setUp(self):
super().setUp()
self.resizer_mock = patch.object(
ArtResizer.shared, "resize", return_value=self.IMAGE_PATH
).start()
self.deinterlacer_mock = patch.object(
ArtResizer.shared, "deinterlace", return_value=self.IMAGE_PATH
).start()
def test_fileresize_no_scale(self):
self._require_backend()
self.plugin.maxwidth = 300
self.plugin.max_filesize = self.IMG_225x225_SIZE // 2
self._assert_image_operated(self.IMG_225x225, self.RESIZE_OP, True)
def test_resize(self):
self.plugin.maxwidth = self.IMAGE_WIDTH / 2
assert self.get_album_art()
assert self.resizer_mock.called
def test_fileresize_and_scale(self):
self._require_backend()
self.plugin.maxwidth = 200
self.plugin.max_filesize = self.IMG_225x225_SIZE // 2
self._assert_image_operated(self.IMG_225x225, self.RESIZE_OP, True)
def test_file_resized(self):
self.plugin.max_filesize = self.IMAGE_FILESIZE // 2
assert self.get_album_art()
assert self.resizer_mock.called
def test_deinterlace(self):
self._require_backend()
def test_file_not_resized(self):
self.plugin.max_filesize = self.IMAGE_FILESIZE
assert self.get_album_art()
assert not self.resizer_mock.called
def test_file_resized_but_not_scaled(self):
self.plugin.maxwidth = self.IMAGE_WIDTH * 2
self.plugin.max_filesize = self.IMAGE_FILESIZE // 2
assert self.get_album_art()
assert self.resizer_mock.called
def test_file_resized_and_scaled(self):
self.plugin.maxwidth = self.IMAGE_WIDTH / 2
self.plugin.max_filesize = self.IMAGE_FILESIZE // 2
assert self.get_album_art()
assert self.resizer_mock.called
def test_deinterlaced(self):
self.plugin.deinterlace = True
self._assert_image_operated(self.IMG_225x225, self.DEINTERLACE_OP, True)
assert self.get_album_art()
assert self.deinterlacer_mock.called
def test_not_deinterlaced(self):
self.plugin.deinterlace = False
self._assert_image_operated(
self.IMG_225x225, self.DEINTERLACE_OP, False
)
assert self.get_album_art()
assert not self.deinterlacer_mock.called
def test_deinterlace_and_resize(self):
self._require_backend()
self.plugin.maxwidth = 300
def test_deinterlaced_and_resized(self):
self.plugin.maxwidth = self.IMAGE_WIDTH / 2
self.plugin.deinterlace = True
self._assert_image_operated(self.IMG_348x348, self.DEINTERLACE_OP, True)
self._assert_image_operated(self.IMG_348x348, self.RESIZE_OP, True)
assert self.get_album_art()
assert self.deinterlacer_mock.called
assert self.resizer_mock.called
class DeprecatedConfigTest(unittest.TestCase):

View file

@ -18,6 +18,7 @@ import os.path
import re
import sys
import unittest
from pathlib import Path
import pytest
from mediafile import MediaFile
@ -32,7 +33,6 @@ from beets.test.helper import (
capture_log,
control_stdin,
)
from beets.util import bytestring_path, displayable_path
from beetsplug import convert
@ -58,31 +58,11 @@ class ConvertMixin:
shell_quote(sys.executable), shell_quote(stub), tag
)
def assertFileTag(self, path, tag):
"""Assert that the path is a file and the files content ends
with `tag`.
"""
display_tag = tag
tag = tag.encode("utf-8")
self.assertIsFile(path)
with open(path, "rb") as f:
f.seek(-len(display_tag), os.SEEK_END)
assert f.read() == tag, (
f"{displayable_path(path)} is not tagged with {display_tag}"
)
def assertNoFileTag(self, path, tag):
"""Assert that the path is a file and the files content does not
end with `tag`.
"""
display_tag = tag
tag = tag.encode("utf-8")
self.assertIsFile(path)
with open(path, "rb") as f:
f.seek(-len(tag), os.SEEK_END)
assert f.read() != tag, (
f"{displayable_path(path)} is unexpectedly tagged with {display_tag}"
)
def file_endswith(self, path: Path, tag: str):
"""Check the path is a file and if its content ends with `tag`."""
assert path.exists()
assert path.is_file()
return path.read_bytes().endswith(tag.encode("utf-8"))
class ConvertTestCase(ConvertMixin, PluginTestCase):
@ -106,7 +86,7 @@ class ImportConvertTest(AsIsImporterMixin, ImportHelper, ConvertTestCase):
def test_import_converted(self):
self.run_asis_importer()
item = self.lib.items().get()
self.assertFileTag(item.path, "convert")
assert self.file_endswith(item.filepath, "convert")
# FIXME: fails on windows
@unittest.skipIf(sys.platform == "win32", "win32")
@ -117,7 +97,7 @@ class ImportConvertTest(AsIsImporterMixin, ImportHelper, ConvertTestCase):
item = self.lib.items().get()
assert item is not None
self.assertIsFile(item.path)
assert item.filepath.is_file()
def test_delete_originals(self):
self.config["convert"]["delete_originals"] = True
@ -159,11 +139,10 @@ class ConvertCliTest(ConvertTestCase, ConvertCommand):
self.album = self.add_album_fixture(ext="ogg")
self.item = self.album.items()[0]
self.convert_dest = bytestring_path(
os.path.join(self.temp_dir, b"convert_dest")
)
self.convert_dest = self.temp_dir_path / "convert_dest"
self.converted_mp3 = self.convert_dest / "converted.mp3"
self.config["convert"] = {
"dest": self.convert_dest,
"dest": str(self.convert_dest),
"paths": {"default": "converted"},
"format": "mp3",
"formats": {
@ -179,19 +158,16 @@ class ConvertCliTest(ConvertTestCase, ConvertCommand):
def test_convert(self):
with control_stdin("y"):
self.run_convert()
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertFileTag(converted, "mp3")
assert self.file_endswith(self.converted_mp3, "mp3")
def test_convert_with_auto_confirmation(self):
self.run_convert("--yes")
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertFileTag(converted, "mp3")
assert self.file_endswith(self.converted_mp3, "mp3")
def test_reject_confirmation(self):
with control_stdin("n"):
self.run_convert()
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertNotExists(converted)
assert not self.converted_mp3.exists()
def test_convert_keep_new(self):
assert os.path.splitext(self.item.path)[1] == b".ogg"
@ -205,8 +181,7 @@ class ConvertCliTest(ConvertTestCase, ConvertCommand):
def test_format_option(self):
with control_stdin("y"):
self.run_convert("--format", "opus")
converted = os.path.join(self.convert_dest, b"converted.ops")
self.assertFileTag(converted, "opus")
assert self.file_endswith(self.convert_dest / "converted.ops", "opus")
def test_embed_album_art(self):
self.config["convert"]["embed"] = True
@ -218,12 +193,11 @@ class ConvertCliTest(ConvertTestCase, ConvertCommand):
with control_stdin("y"):
self.run_convert()
converted = os.path.join(self.convert_dest, b"converted.mp3")
mediafile = MediaFile(converted)
mediafile = MediaFile(self.converted_mp3)
assert mediafile.images[0].data == image_data
def test_skip_existing(self):
converted = os.path.join(self.convert_dest, b"converted.mp3")
converted = self.converted_mp3
self.touch(converted, content="XXX")
self.run_convert("--yes")
with open(converted) as f:
@ -231,8 +205,7 @@ class ConvertCliTest(ConvertTestCase, ConvertCommand):
def test_pretend(self):
self.run_convert("--pretend")
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertNotExists(converted)
assert not self.converted_mp3.exists()
def test_empty_query(self):
with capture_log("beets.convert") as logs:
@ -243,55 +216,51 @@ class ConvertCliTest(ConvertTestCase, ConvertCommand):
self.config["convert"]["max_bitrate"] = 5000
with control_stdin("y"):
self.run_convert()
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertFileTag(converted, "mp3")
assert self.file_endswith(self.converted_mp3, "mp3")
def test_transcode_when_maxbr_set_low_and_different_formats(self):
self.config["convert"]["max_bitrate"] = 5
with control_stdin("y"):
self.run_convert()
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertFileTag(converted, "mp3")
assert self.file_endswith(self.converted_mp3, "mp3")
def test_transcode_when_maxbr_set_to_none_and_different_formats(self):
with control_stdin("y"):
self.run_convert()
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertFileTag(converted, "mp3")
assert self.file_endswith(self.converted_mp3, "mp3")
def test_no_transcode_when_maxbr_set_high_and_same_formats(self):
self.config["convert"]["max_bitrate"] = 5000
self.config["convert"]["format"] = "ogg"
with control_stdin("y"):
self.run_convert()
converted = os.path.join(self.convert_dest, b"converted.ogg")
self.assertNoFileTag(converted, "ogg")
assert not self.file_endswith(
self.convert_dest / "converted.ogg", "ogg"
)
def test_transcode_when_maxbr_set_low_and_same_formats(self):
self.config["convert"]["max_bitrate"] = 5
self.config["convert"]["format"] = "ogg"
with control_stdin("y"):
self.run_convert()
converted = os.path.join(self.convert_dest, b"converted.ogg")
self.assertFileTag(converted, "ogg")
assert self.file_endswith(self.convert_dest / "converted.ogg", "ogg")
def test_transcode_when_maxbr_set_to_none_and_same_formats(self):
self.config["convert"]["format"] = "ogg"
with control_stdin("y"):
self.run_convert()
converted = os.path.join(self.convert_dest, b"converted.ogg")
self.assertNoFileTag(converted, "ogg")
assert not self.file_endswith(
self.convert_dest / "converted.ogg", "ogg"
)
def test_playlist(self):
with control_stdin("y"):
self.run_convert("--playlist", "playlist.m3u8")
m3u_created = os.path.join(self.convert_dest, b"playlist.m3u8")
assert os.path.exists(m3u_created)
assert (self.convert_dest / "playlist.m3u8").exists()
def test_playlist_pretend(self):
self.run_convert("--playlist", "playlist.m3u8", "--pretend")
m3u_created = os.path.join(self.convert_dest, b"playlist.m3u8")
assert not os.path.exists(m3u_created)
assert not (self.convert_dest / "playlist.m3u8").exists()
@_common.slow_test()
@ -301,9 +270,9 @@ class NeverConvertLossyFilesTest(ConvertTestCase, ConvertCommand):
def setUp(self):
super().setUp()
self.convert_dest = os.path.join(self.temp_dir, b"convert_dest")
self.convert_dest = self.temp_dir_path / "convert_dest"
self.config["convert"] = {
"dest": self.convert_dest,
"dest": str(self.convert_dest),
"paths": {"default": "converted"},
"never_convert_lossy_files": True,
"format": "mp3",
@ -316,23 +285,23 @@ class NeverConvertLossyFilesTest(ConvertTestCase, ConvertCommand):
[item] = self.add_item_fixtures(ext="flac")
with control_stdin("y"):
self.run_convert_path(item)
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertFileTag(converted, "mp3")
converted = self.convert_dest / "converted.mp3"
assert self.file_endswith(converted, "mp3")
def test_transcode_from_lossy(self):
self.config["convert"]["never_convert_lossy_files"] = False
[item] = self.add_item_fixtures(ext="ogg")
with control_stdin("y"):
self.run_convert_path(item)
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertFileTag(converted, "mp3")
converted = self.convert_dest / "converted.mp3"
assert self.file_endswith(converted, "mp3")
def test_transcode_from_lossy_prevented(self):
[item] = self.add_item_fixtures(ext="ogg")
with control_stdin("y"):
self.run_convert_path(item)
converted = os.path.join(self.convert_dest, b"converted.ogg")
self.assertNoFileTag(converted, "mp3")
converted = self.convert_dest / "converted.ogg"
assert not self.file_endswith(converted, "mp3")
class TestNoConvert:

View file

@ -134,22 +134,6 @@ class EditCommandTest(EditMixin, BeetsTestCase):
{f: item[f] for f in item._fields} for item in self.album.items()
]
def assertCounts(
self,
mock_write,
album_count=ALBUM_COUNT,
track_count=TRACK_COUNT,
write_call_count=TRACK_COUNT,
title_starts_with="",
):
"""Several common assertions on Album, Track and call counts."""
assert len(self.lib.albums()) == album_count
assert len(self.lib.items()) == track_count
assert mock_write.call_count == write_call_count
assert all(
i.title.startswith(title_starts_with) for i in self.lib.items()
)
def test_title_edit_discard(self, mock_write):
"""Edit title for all items in the library, then discard changes."""
# Edit track titles.
@ -159,9 +143,7 @@ class EditCommandTest(EditMixin, BeetsTestCase):
["c"],
)
self.assertCounts(
mock_write, write_call_count=0, title_starts_with="t\u00eftle"
)
assert mock_write.call_count == 0
self.assertItemFieldsModified(self.album.items(), self.items_orig, [])
def test_title_edit_apply(self, mock_write):
@ -173,11 +155,7 @@ class EditCommandTest(EditMixin, BeetsTestCase):
["a"],
)
self.assertCounts(
mock_write,
write_call_count=self.TRACK_COUNT,
title_starts_with="modified t\u00eftle",
)
assert mock_write.call_count == self.TRACK_COUNT
self.assertItemFieldsModified(
self.album.items(), self.items_orig, ["title", "mtime"]
)
@ -191,10 +169,7 @@ class EditCommandTest(EditMixin, BeetsTestCase):
["a"],
)
self.assertCounts(
mock_write,
write_call_count=1,
)
assert mock_write.call_count == 1
# No changes except on last item.
self.assertItemFieldsModified(
list(self.album.items())[:-1], self.items_orig[:-1], []
@ -210,9 +185,7 @@ class EditCommandTest(EditMixin, BeetsTestCase):
[],
)
self.assertCounts(
mock_write, write_call_count=0, title_starts_with="t\u00eftle"
)
assert mock_write.call_count == 0
self.assertItemFieldsModified(self.album.items(), self.items_orig, [])
def test_album_edit_apply(self, mock_write):
@ -226,7 +199,7 @@ class EditCommandTest(EditMixin, BeetsTestCase):
["a"],
)
self.assertCounts(mock_write, write_call_count=self.TRACK_COUNT)
assert mock_write.call_count == self.TRACK_COUNT
self.assertItemFieldsModified(
self.album.items(), self.items_orig, ["album", "mtime"]
)
@ -249,9 +222,7 @@ class EditCommandTest(EditMixin, BeetsTestCase):
# Even though a flexible attribute was written (which is not directly
# written to the tags), write should still be called since templates
# might use it.
self.assertCounts(
mock_write, write_call_count=1, title_starts_with="t\u00eftle"
)
assert mock_write.call_count == 1
def test_a_album_edit_apply(self, mock_write):
"""Album query (-a), edit album field, apply changes."""
@ -263,7 +234,7 @@ class EditCommandTest(EditMixin, BeetsTestCase):
)
self.album.load()
self.assertCounts(mock_write, write_call_count=self.TRACK_COUNT)
assert mock_write.call_count == self.TRACK_COUNT
assert self.album.album == "modified \u00e4lbum"
self.assertItemFieldsModified(
self.album.items(), self.items_orig, ["album", "mtime"]
@ -279,7 +250,7 @@ class EditCommandTest(EditMixin, BeetsTestCase):
)
self.album.load()
self.assertCounts(mock_write, write_call_count=self.TRACK_COUNT)
assert mock_write.call_count == self.TRACK_COUNT
assert self.album.albumartist == "the modified album artist"
self.assertItemFieldsModified(
self.album.items(), self.items_orig, ["albumartist", "mtime"]
@ -295,9 +266,7 @@ class EditCommandTest(EditMixin, BeetsTestCase):
["n"],
)
self.assertCounts(
mock_write, write_call_count=0, title_starts_with="t\u00eftle"
)
assert mock_write.call_count == 0
def test_invalid_yaml(self, mock_write):
"""Edit the yaml file incorrectly (resulting in a well-formed but
@ -309,9 +278,7 @@ class EditCommandTest(EditMixin, BeetsTestCase):
[],
)
self.assertCounts(
mock_write, write_call_count=0, title_starts_with="t\u00eftle"
)
assert mock_write.call_count == 0
@_common.slow_test()

View file

@ -13,6 +13,7 @@
# included in all copies or substantial portions of the Software.
import os
import os.path
import shutil
import tempfile
@ -24,7 +25,12 @@ from mediafile import MediaFile
from beets import art, config, logging, ui
from beets.test import _common
from beets.test.helper import BeetsTestCase, FetchImageHelper, PluginMixin
from beets.test.helper import (
BeetsTestCase,
FetchImageHelper,
IOMixin,
PluginMixin,
)
from beets.util import bytestring_path, displayable_path, syspath
from beets.util.artresizer import ArtResizer
from test.test_art_resize import DummyIMBackend
@ -68,17 +74,13 @@ def require_artresizer_compare(test):
return wrapper
class EmbedartCliTest(PluginMixin, FetchImageHelper, BeetsTestCase):
class EmbedartCliTest(IOMixin, PluginMixin, FetchImageHelper, BeetsTestCase):
plugin = "embedart"
small_artpath = os.path.join(_common.RSRC, b"image-2x3.jpg")
abbey_artpath = os.path.join(_common.RSRC, b"abbey.jpg")
abbey_similarpath = os.path.join(_common.RSRC, b"abbey-similar.jpg")
abbey_differentpath = os.path.join(_common.RSRC, b"abbey-different.jpg")
def setUp(self):
super().setUp() # Converter is threaded
self.io.install()
def _setup_data(self, artpath=None):
if not artpath:
artpath = self.small_artpath
@ -202,23 +204,21 @@ class EmbedartCliTest(PluginMixin, FetchImageHelper, BeetsTestCase):
resource_path = os.path.join(_common.RSRC, b"image.mp3")
album = self.add_album_fixture()
trackpath = album.items()[0].path
albumpath = album.path
shutil.copy(syspath(resource_path), syspath(trackpath))
self.run_command("extractart", "-n", "extracted")
self.assertExists(os.path.join(albumpath, b"extracted.png"))
assert (album.filepath / "extracted.png").exists()
def test_extracted_extension(self):
resource_path = os.path.join(_common.RSRC, b"image-jpeg.mp3")
album = self.add_album_fixture()
trackpath = album.items()[0].path
albumpath = album.path
shutil.copy(syspath(resource_path), syspath(trackpath))
self.run_command("extractart", "-n", "extracted")
self.assertExists(os.path.join(albumpath, b"extracted.jpg"))
assert (album.filepath / "extracted.jpg").exists()
def test_clear_art_with_yes_input(self):
self._setup_data()

View file

@ -15,7 +15,7 @@
from __future__ import annotations
import os.path
import os
import sys
import unittest
from contextlib import contextmanager
@ -74,8 +74,7 @@ class HookCommandTest(HookTestCase):
def setUp(self):
super().setUp()
temp_dir = os.fsdecode(self.temp_dir)
self.paths = [os.path.join(temp_dir, e) for e in self.events]
self.paths = [str(self.temp_dir_path / e) for e in self.events]
def _test_command(
self,

View file

@ -68,26 +68,23 @@ class ImportAddedTest(PluginMixin, AutotagImportTestCase):
"No MediaFile found for Item " + displayable_path(item.path)
)
def assertEqualTimes(self, first, second, msg=None):
"""For comparing file modification times at a sufficient precision"""
assert first == pytest.approx(second, rel=1e-4), msg
def assertAlbumImport(self):
def test_import_album_with_added_dates(self):
self.importer.run()
album = self.lib.albums().get()
assert album.added == self.min_mtime
for item in album.items():
assert item.added == self.min_mtime
def test_import_album_with_added_dates(self):
self.assertAlbumImport()
def test_import_album_inplace_with_added_dates(self):
self.config["import"]["copy"] = False
self.config["import"]["move"] = False
self.config["import"]["link"] = False
self.config["import"]["hardlink"] = False
self.assertAlbumImport()
self.importer.run()
album = self.lib.albums().get()
assert album.added == self.min_mtime
for item in album.items():
assert item.added == self.min_mtime
def test_import_album_with_preserved_mtimes(self):
self.config["importadded"]["preserve_mtimes"] = True
@ -95,10 +92,12 @@ class ImportAddedTest(PluginMixin, AutotagImportTestCase):
album = self.lib.albums().get()
assert album.added == self.min_mtime
for item in album.items():
self.assertEqualTimes(item.added, self.min_mtime)
assert item.added == pytest.approx(self.min_mtime, rel=1e-4)
mediafile_mtime = os.path.getmtime(self.find_media_file(item).path)
self.assertEqualTimes(item.mtime, mediafile_mtime)
self.assertEqualTimes(os.path.getmtime(item.path), mediafile_mtime)
assert item.mtime == pytest.approx(mediafile_mtime, rel=1e-4)
assert os.path.getmtime(item.path) == pytest.approx(
mediafile_mtime, rel=1e-4
)
def test_reimported_album_skipped(self):
# Import and record the original added dates
@ -113,22 +112,21 @@ class ImportAddedTest(PluginMixin, AutotagImportTestCase):
self.importer.run()
# Verify the reimported items
album = self.lib.albums().get()
self.assertEqualTimes(album.added, album_added_before)
assert album.added == pytest.approx(album_added_before, rel=1e-4)
items_added_after = {item.path: item.added for item in album.items()}
for item_path, added_after in items_added_after.items():
self.assertEqualTimes(
items_added_before[item_path],
added_after,
"reimport modified Item.added for "
+ displayable_path(item_path),
)
assert items_added_before[item_path] == pytest.approx(
added_after, rel=1e-4
), "reimport modified Item.added for " + displayable_path(item_path)
def test_import_singletons_with_added_dates(self):
self.config["import"]["singletons"] = True
self.importer.run()
for item in self.lib.items():
mfile = self.find_media_file(item)
self.assertEqualTimes(item.added, os.path.getmtime(mfile.path))
assert item.added == pytest.approx(
os.path.getmtime(mfile.path), rel=1e-4
)
def test_import_singletons_with_preserved_mtimes(self):
self.config["import"]["singletons"] = True
@ -136,9 +134,11 @@ class ImportAddedTest(PluginMixin, AutotagImportTestCase):
self.importer.run()
for item in self.lib.items():
mediafile_mtime = os.path.getmtime(self.find_media_file(item).path)
self.assertEqualTimes(item.added, mediafile_mtime)
self.assertEqualTimes(item.mtime, mediafile_mtime)
self.assertEqualTimes(os.path.getmtime(item.path), mediafile_mtime)
assert item.added == pytest.approx(mediafile_mtime, rel=1e-4)
assert item.mtime == pytest.approx(mediafile_mtime, rel=1e-4)
assert os.path.getmtime(item.path) == pytest.approx(
mediafile_mtime, rel=1e-4
)
def test_reimported_singletons_skipped(self):
self.config["import"]["singletons"] = True
@ -155,9 +155,6 @@ class ImportAddedTest(PluginMixin, AutotagImportTestCase):
# Verify the reimported items
items_added_after = {item.path: item.added for item in self.lib.items()}
for item_path, added_after in items_added_after.items():
self.assertEqualTimes(
items_added_before[item_path],
added_after,
"reimport modified Item.added for "
+ displayable_path(item_path),
)
assert items_added_before[item_path] == pytest.approx(
added_after, rel=1e-4
), "reimport modified Item.added for " + displayable_path(item_path)

View file

@ -12,8 +12,8 @@ class ImportfeedsTestTest(BeetsTestCase):
def setUp(self):
super().setUp()
self.importfeeds = ImportFeedsPlugin()
self.feeds_dir = os.path.join(os.fsdecode(self.temp_dir), "importfeeds")
config["importfeeds"]["dir"] = self.feeds_dir
self.feeds_dir = self.temp_dir_path / "importfeeds"
config["importfeeds"]["dir"] = str(self.feeds_dir)
def test_multi_format_album_playlist(self):
config["importfeeds"]["formats"] = "m3u_multi"
@ -24,10 +24,8 @@ class ImportfeedsTestTest(BeetsTestCase):
self.lib.add(item)
self.importfeeds.album_imported(self.lib, album)
playlist_path = os.path.join(
self.feeds_dir, os.listdir(self.feeds_dir)[0]
)
assert playlist_path.endswith("album_name.m3u")
playlist_path = self.feeds_dir / next(self.feeds_dir.iterdir())
assert str(playlist_path).endswith("album_name.m3u")
with open(playlist_path) as playlist:
assert item_path in playlist.read()
@ -43,9 +41,7 @@ class ImportfeedsTestTest(BeetsTestCase):
self.lib.add(item)
self.importfeeds.album_imported(self.lib, album)
playlist = os.path.join(
self.feeds_dir, config["importfeeds"]["m3u_name"].get()
)
playlist = self.feeds_dir / config["importfeeds"]["m3u_name"].get()
playlist_subdir = os.path.dirname(playlist)
assert os.path.isdir(playlist_subdir)
assert os.path.isfile(playlist)
@ -62,7 +58,7 @@ class ImportfeedsTestTest(BeetsTestCase):
self.importfeeds.import_begin(self)
self.importfeeds.album_imported(self.lib, album)
date = datetime.datetime.now().strftime("%Y%m%d_%Hh%M")
playlist = os.path.join(self.feeds_dir, f"imports_{date}.m3u")
playlist = self.feeds_dir / f"imports_{date}.m3u"
assert os.path.isfile(playlist)
with open(playlist) as playlist_contents:
assert item_path in playlist_contents.read()

View file

@ -6,7 +6,6 @@ from unittest.mock import Mock, patch
from beets.test._common import touch
from beets.test.helper import AsIsImporterMixin, ImportTestCase, PluginMixin
from beets.util import displayable_path
from beetsplug.permissions import (
check_permissions,
convert_perm,
@ -23,57 +22,25 @@ class PermissionsPluginTest(AsIsImporterMixin, PluginMixin, ImportTestCase):
self.config["permissions"] = {"file": "777", "dir": "777"}
def test_permissions_on_album_imported(self):
self.do_thing(True)
self.import_and_check_permissions()
def test_permissions_on_item_imported(self):
self.config["import"]["singletons"] = True
self.do_thing(True)
self.import_and_check_permissions()
@patch("os.chmod", Mock())
def test_failing_to_set_permissions(self):
self.do_thing(False)
def do_thing(self, expect_success):
def import_and_check_permissions(self):
if platform.system() == "Windows":
self.skipTest("permissions not available on Windows")
def get_stat(v):
return (
os.stat(os.path.join(self.temp_dir, b"import", *v)).st_mode
& 0o777
)
typs = ["file", "dir"]
track_file = (b"album", b"track_1.mp3")
self.exp_perms = {
True: {
k: convert_perm(self.config["permissions"][k].get())
for k in typs
},
False: {k: get_stat(v) for (k, v) in zip(typs, (track_file, ()))},
}
track_file = os.path.join(self.import_dir, b"album", b"track_1.mp3")
assert os.stat(track_file).st_mode & 0o777 != 511
self.run_asis_importer()
item = self.lib.items().get()
self.assertPerms(item.path, "file", expect_success)
for path in dirs_in_library(self.lib.directory, item.path):
self.assertPerms(path, "dir", expect_success)
def assertPerms(self, path, typ, expect_success):
for x in [
(True, self.exp_perms[expect_success][typ], "!="),
(False, self.exp_perms[not expect_success][typ], "=="),
]:
msg = "{} : {} {} {}".format(
displayable_path(path),
oct(os.stat(path).st_mode),
x[2],
oct(x[1]),
)
assert x[0] == check_permissions(path, x[1]), msg
paths = (item.path, *dirs_in_library(self.lib.directory, item.path))
for path in paths:
assert os.stat(path).st_mode & 0o777 == 511
def test_convert_perm_from_string(self):
assert convert_perm("10") == 8

View file

@ -311,7 +311,7 @@ class BPDTestHelper(PluginTestCase):
"""
# Create a config file:
config = {
"pluginpath": [os.fsdecode(self.temp_dir)],
"pluginpath": [str(self.temp_dir_path)],
"plugins": "bpd",
# use port 0 to let the OS choose a free port
"bpd": {"host": host, "port": 0, "control_port": 0},
@ -320,7 +320,7 @@ class BPDTestHelper(PluginTestCase):
config["bpd"]["password"] = password
config_file = tempfile.NamedTemporaryFile(
mode="wb",
dir=os.fsdecode(self.temp_dir),
dir=str(self.temp_dir_path),
suffix=".yaml",
delete=False,
)

View file

@ -72,12 +72,10 @@ class PlaylistTestCase(PluginTestCase):
self.lib.add(i3)
self.lib.add_album([i3])
self.playlist_dir = os.path.join(
os.fsdecode(self.temp_dir), "playlists"
)
os.makedirs(self.playlist_dir)
self.playlist_dir = self.temp_dir_path / "playlists"
self.playlist_dir.mkdir(parents=True, exist_ok=True)
self.config["directory"] = self.music_dir
self.config["playlist"]["playlist_dir"] = self.playlist_dir
self.config["playlist"]["playlist_dir"] = str(self.playlist_dir)
self.setup_test()
self.load_plugins()
@ -222,7 +220,7 @@ class PlaylistTestRelativeToPls(PlaylistQueryTest, PlaylistTestCase):
)
self.config["playlist"]["relative_to"] = "playlist"
self.config["playlist"]["playlist_dir"] = self.playlist_dir
self.config["playlist"]["playlist_dir"] = str(self.playlist_dir)
class PlaylistUpdateTest:

View file

@ -13,7 +13,8 @@
# included in all copies or substantial portions of the Software.
from os import fsdecode, path, remove
from os import path, remove
from pathlib import Path
from shutil import rmtree
from tempfile import mkdtemp
from unittest.mock import MagicMock, Mock, PropertyMock
@ -26,7 +27,7 @@ from beets.dbcore.query import FixedFieldSort, MultipleSort, NullSort
from beets.library import Album, Item, parse_query_string
from beets.test.helper import BeetsTestCase, PluginTestCase
from beets.ui import UserError
from beets.util import CHAR_REPLACE, bytestring_path, syspath
from beets.util import CHAR_REPLACE, syspath
from beetsplug.smartplaylist import SmartPlaylistPlugin
@ -165,9 +166,9 @@ class SmartPlaylistTest(BeetsTestCase):
pl = b"$title-my<playlist>.m3u", (q, None), (a_q, None)
spl._matched_playlists = [pl]
dir = bytestring_path(mkdtemp())
dir = mkdtemp()
config["smartplaylist"]["relative_to"] = False
config["smartplaylist"]["playlist_dir"] = fsdecode(dir)
config["smartplaylist"]["playlist_dir"] = str(dir)
try:
spl.update_playlists(lib)
except Exception:
@ -177,10 +178,9 @@ class SmartPlaylistTest(BeetsTestCase):
lib.items.assert_called_once_with(q, None)
lib.albums.assert_called_once_with(a_q, None)
m3u_filepath = path.join(dir, b"ta_ga_da-my_playlist_.m3u")
self.assertExists(m3u_filepath)
with open(syspath(m3u_filepath), "rb") as f:
content = f.read()
m3u_filepath = Path(dir, "ta_ga_da-my_playlist_.m3u")
assert m3u_filepath.exists()
content = m3u_filepath.read_bytes()
rmtree(syspath(dir))
assert content == b"/tagada.mp3\n"
@ -208,11 +208,11 @@ class SmartPlaylistTest(BeetsTestCase):
pl = b"$title-my<playlist>.m3u", (q, None), (a_q, None)
spl._matched_playlists = [pl]
dir = bytestring_path(mkdtemp())
dir = mkdtemp()
config["smartplaylist"]["output"] = "extm3u"
config["smartplaylist"]["prefix"] = "http://beets:8337/files"
config["smartplaylist"]["relative_to"] = False
config["smartplaylist"]["playlist_dir"] = fsdecode(dir)
config["smartplaylist"]["playlist_dir"] = str(dir)
try:
spl.update_playlists(lib)
except Exception:
@ -222,10 +222,9 @@ class SmartPlaylistTest(BeetsTestCase):
lib.items.assert_called_once_with(q, None)
lib.albums.assert_called_once_with(a_q, None)
m3u_filepath = path.join(dir, b"ta_ga_da-my_playlist_.m3u")
self.assertExists(m3u_filepath)
with open(syspath(m3u_filepath), "rb") as f:
content = f.read()
m3u_filepath = Path(dir, "ta_ga_da-my_playlist_.m3u")
assert m3u_filepath.exists()
content = m3u_filepath.read_bytes()
rmtree(syspath(dir))
assert (
@ -260,10 +259,10 @@ class SmartPlaylistTest(BeetsTestCase):
pl = b"$title-my<playlist>.m3u", (q, None), (a_q, None)
spl._matched_playlists = [pl]
dir = bytestring_path(mkdtemp())
dir = mkdtemp()
config["smartplaylist"]["output"] = "extm3u"
config["smartplaylist"]["relative_to"] = False
config["smartplaylist"]["playlist_dir"] = fsdecode(dir)
config["smartplaylist"]["playlist_dir"] = str(dir)
config["smartplaylist"]["fields"] = ["id", "genre"]
try:
spl.update_playlists(lib)
@ -274,10 +273,9 @@ class SmartPlaylistTest(BeetsTestCase):
lib.items.assert_called_once_with(q, None)
lib.albums.assert_called_once_with(a_q, None)
m3u_filepath = path.join(dir, b"ta_ga_da-my_playlist_.m3u")
self.assertExists(m3u_filepath)
with open(syspath(m3u_filepath), "rb") as f:
content = f.read()
m3u_filepath = Path(dir, "ta_ga_da-my_playlist_.m3u")
assert m3u_filepath.exists()
content = m3u_filepath.read_bytes()
rmtree(syspath(dir))
assert (
@ -307,10 +305,10 @@ class SmartPlaylistTest(BeetsTestCase):
pl = b"$title-my<playlist>.m3u", (q, None), (a_q, None)
spl._matched_playlists = [pl]
dir = bytestring_path(mkdtemp())
dir = mkdtemp()
tpl = "http://beets:8337/item/$id/file"
config["smartplaylist"]["uri_format"] = tpl
config["smartplaylist"]["playlist_dir"] = fsdecode(dir)
config["smartplaylist"]["playlist_dir"] = dir
# The following options should be ignored when uri_format is set
config["smartplaylist"]["relative_to"] = "/data"
config["smartplaylist"]["prefix"] = "/prefix"
@ -324,10 +322,9 @@ class SmartPlaylistTest(BeetsTestCase):
lib.items.assert_called_once_with(q, None)
lib.albums.assert_called_once_with(a_q, None)
m3u_filepath = path.join(dir, b"ta_ga_da-my_playlist_.m3u")
self.assertExists(m3u_filepath)
with open(syspath(m3u_filepath), "rb") as f:
content = f.read()
m3u_filepath = Path(dir, "ta_ga_da-my_playlist_.m3u")
assert m3u_filepath.exists()
content = m3u_filepath.read_bytes()
rmtree(syspath(dir))
assert content == b"http://beets:8337/item/3/file\n"
@ -346,22 +343,20 @@ class SmartPlaylistCLITest(PluginTestCase):
{"name": "all.m3u", "query": ""},
]
)
config["smartplaylist"]["playlist_dir"].set(fsdecode(self.temp_dir))
config["smartplaylist"]["playlist_dir"].set(str(self.temp_dir_path))
def test_splupdate(self):
with pytest.raises(UserError):
self.run_with_output("splupdate", "tagada")
self.run_with_output("splupdate", "my_playlist")
m3u_path = path.join(self.temp_dir, b"my_playlist.m3u")
self.assertExists(m3u_path)
with open(syspath(m3u_path), "rb") as f:
assert f.read() == self.item.path + b"\n"
m3u_path = self.temp_dir_path / "my_playlist.m3u"
assert m3u_path.exists()
assert m3u_path.read_bytes() == self.item.path + b"\n"
remove(syspath(m3u_path))
self.run_with_output("splupdate", "my_playlist.m3u")
with open(syspath(m3u_path), "rb") as f:
assert f.read() == self.item.path + b"\n"
assert m3u_path.read_bytes() == self.item.path + b"\n"
remove(syspath(m3u_path))
self.run_with_output("splupdate")

View file

@ -16,6 +16,7 @@
import os
import unittest
from pathlib import Path
from unittest.mock import patch
from beets.test import _common
@ -65,7 +66,7 @@ class ArtResizerFileSizeTest(CleanupModulesMixin, BeetsTestCase):
max_filesize=0,
)
# check valid path returned - max_filesize hasn't broken resize command
self.assertExists(im_95_qual)
assert Path(os.fsdecode(im_95_qual)).exists()
# Attempt a lower filesize with same quality
im_a = backend.resize(
@ -74,7 +75,7 @@ class ArtResizerFileSizeTest(CleanupModulesMixin, BeetsTestCase):
quality=95,
max_filesize=0.9 * os.stat(syspath(im_95_qual)).st_size,
)
self.assertExists(im_a)
assert Path(os.fsdecode(im_a)).exists()
# target size was achieved
assert (
os.stat(syspath(im_a)).st_size
@ -88,7 +89,7 @@ class ArtResizerFileSizeTest(CleanupModulesMixin, BeetsTestCase):
quality=75,
max_filesize=0,
)
self.assertExists(im_75_qual)
assert Path(os.fsdecode(im_75_qual)).exists()
im_b = backend.resize(
225,
@ -96,7 +97,7 @@ class ArtResizerFileSizeTest(CleanupModulesMixin, BeetsTestCase):
quality=95,
max_filesize=0.9 * os.stat(syspath(im_75_qual)).st_size,
)
self.assertExists(im_b)
assert Path(os.fsdecode(im_b)).exists()
# Check high (initial) quality still gives a smaller filesize
assert (
os.stat(syspath(im_b)).st_size

View file

@ -29,122 +29,68 @@ from beets.dbcore.query import (
from beets.test.helper import ItemInDBTestCase
def _date(string):
return datetime.strptime(string, "%Y-%m-%dT%H:%M:%S")
class TestDateInterval:
now = datetime.now().replace(microsecond=0, second=0).isoformat()
def _datepattern(datetimedate):
return datetimedate.strftime("%Y-%m-%dT%H:%M:%S")
class DateIntervalTest(unittest.TestCase):
def test_year_precision_intervals(self):
self.assertContains("2000..2001", "2000-01-01T00:00:00")
self.assertContains("2000..2001", "2001-06-20T14:15:16")
self.assertContains("2000..2001", "2001-12-31T23:59:59")
self.assertExcludes("2000..2001", "1999-12-31T23:59:59")
self.assertExcludes("2000..2001", "2002-01-01T00:00:00")
self.assertContains("2000..", "2000-01-01T00:00:00")
self.assertContains("2000..", "2099-10-11T00:00:00")
self.assertExcludes("2000..", "1999-12-31T23:59:59")
self.assertContains("..2001", "2001-12-31T23:59:59")
self.assertExcludes("..2001", "2002-01-01T00:00:00")
self.assertContains("-1d..1d", _datepattern(datetime.now()))
self.assertExcludes("-2d..-1d", _datepattern(datetime.now()))
def test_day_precision_intervals(self):
self.assertContains("2000-06-20..2000-06-20", "2000-06-20T00:00:00")
self.assertContains("2000-06-20..2000-06-20", "2000-06-20T10:20:30")
self.assertContains("2000-06-20..2000-06-20", "2000-06-20T23:59:59")
self.assertExcludes("2000-06-20..2000-06-20", "2000-06-19T23:59:59")
self.assertExcludes("2000-06-20..2000-06-20", "2000-06-21T00:00:00")
def test_month_precision_intervals(self):
self.assertContains("1999-12..2000-02", "1999-12-01T00:00:00")
self.assertContains("1999-12..2000-02", "2000-02-15T05:06:07")
self.assertContains("1999-12..2000-02", "2000-02-29T23:59:59")
self.assertExcludes("1999-12..2000-02", "1999-11-30T23:59:59")
self.assertExcludes("1999-12..2000-02", "2000-03-01T00:00:00")
def test_hour_precision_intervals(self):
# test with 'T' separator
self.assertExcludes(
"2000-01-01T12..2000-01-01T13", "2000-01-01T11:59:59"
)
self.assertContains(
"2000-01-01T12..2000-01-01T13", "2000-01-01T12:00:00"
)
self.assertContains(
"2000-01-01T12..2000-01-01T13", "2000-01-01T12:30:00"
)
self.assertContains(
"2000-01-01T12..2000-01-01T13", "2000-01-01T13:30:00"
)
self.assertContains(
"2000-01-01T12..2000-01-01T13", "2000-01-01T13:59:59"
)
self.assertExcludes(
"2000-01-01T12..2000-01-01T13", "2000-01-01T14:00:00"
)
self.assertExcludes(
"2000-01-01T12..2000-01-01T13", "2000-01-01T14:30:00"
)
# test non-range query
self.assertContains("2008-12-01T22", "2008-12-01T22:30:00")
self.assertExcludes("2008-12-01T22", "2008-12-01T23:30:00")
def test_minute_precision_intervals(self):
self.assertExcludes(
"2000-01-01T12:30..2000-01-01T12:31", "2000-01-01T12:29:59"
)
self.assertContains(
"2000-01-01T12:30..2000-01-01T12:31", "2000-01-01T12:30:00"
)
self.assertContains(
"2000-01-01T12:30..2000-01-01T12:31", "2000-01-01T12:30:30"
)
self.assertContains(
"2000-01-01T12:30..2000-01-01T12:31", "2000-01-01T12:31:59"
)
self.assertExcludes(
"2000-01-01T12:30..2000-01-01T12:31", "2000-01-01T12:32:00"
)
def test_second_precision_intervals(self):
self.assertExcludes(
"2000-01-01T12:30:50..2000-01-01T12:30:55", "2000-01-01T12:30:49"
)
self.assertContains(
"2000-01-01T12:30:50..2000-01-01T12:30:55", "2000-01-01T12:30:50"
)
self.assertContains(
"2000-01-01T12:30:50..2000-01-01T12:30:55", "2000-01-01T12:30:55"
)
self.assertExcludes(
"2000-01-01T12:30:50..2000-01-01T12:30:55", "2000-01-01T12:30:56"
)
def test_unbounded_endpoints(self):
self.assertContains("..", date=datetime.max)
self.assertContains("..", date=datetime.min)
self.assertContains("..", "1000-01-01T00:00:00")
def assertContains(self, interval_pattern, date_pattern=None, date=None):
if date is None:
date = _date(date_pattern)
(start, end) = _parse_periods(interval_pattern)
@pytest.mark.parametrize(
"pattern, datestr, include",
[
# year precision
("2000..2001", "2000-01-01T00:00:00", True),
("2000..2001", "2001-06-20T14:15:16", True),
("2000..2001", "2001-12-31T23:59:59", True),
("2000..2001", "1999-12-31T23:59:59", False),
("2000..2001", "2002-01-01T00:00:00", False),
("2000..", "2000-01-01T00:00:00", True),
("2000..", "2099-10-11T00:00:00", True),
("2000..", "1999-12-31T23:59:59", False),
("..2001", "2001-12-31T23:59:59", True),
("..2001", "2002-01-01T00:00:00", False),
("-1d..1d", now, True),
("-2d..-1d", now, False),
# month precision
("2000-06-20..2000-06-20", "2000-06-20T00:00:00", True),
("2000-06-20..2000-06-20", "2000-06-20T10:20:30", True),
("2000-06-20..2000-06-20", "2000-06-20T23:59:59", True),
("2000-06-20..2000-06-20", "2000-06-19T23:59:59", False),
("2000-06-20..2000-06-20", "2000-06-21T00:00:00", False),
# day precision
("1999-12..2000-02", "1999-12-01T00:00:00", True),
("1999-12..2000-02", "2000-02-15T05:06:07", True),
("1999-12..2000-02", "2000-02-29T23:59:59", True),
("1999-12..2000-02", "1999-11-30T23:59:59", False),
("1999-12..2000-02", "2000-03-01T00:00:00", False),
# hour precision with 'T' separator
("2000-01-01T12..2000-01-01T13", "2000-01-01T11:59:59", False),
("2000-01-01T12..2000-01-01T13", "2000-01-01T12:00:00", True),
("2000-01-01T12..2000-01-01T13", "2000-01-01T12:30:00", True),
("2000-01-01T12..2000-01-01T13", "2000-01-01T13:30:00", True),
("2000-01-01T12..2000-01-01T13", "2000-01-01T13:59:59", True),
("2000-01-01T12..2000-01-01T13", "2000-01-01T14:00:00", False),
("2000-01-01T12..2000-01-01T13", "2000-01-01T14:30:00", False),
# hour precision non-range query
("2008-12-01T22", "2008-12-01T22:30:00", True),
("2008-12-01T22", "2008-12-01T23:30:00", False),
# minute precision
("2000-01-01T12:30..2000-01-01T12:31", "2000-01-01T12:29:59", False),
("2000-01-01T12:30..2000-01-01T12:31", "2000-01-01T12:30:00", True),
("2000-01-01T12:30..2000-01-01T12:31", "2000-01-01T12:30:30", True),
("2000-01-01T12:30..2000-01-01T12:31", "2000-01-01T12:31:59", True),
("2000-01-01T12:30..2000-01-01T12:31", "2000-01-01T12:32:00", False),
# second precision
("2000-01-01T12:30:50..2000-01-01T12:30:55", "2000-01-01T12:30:49", False),
("2000-01-01T12:30:50..2000-01-01T12:30:55", "2000-01-01T12:30:50", True),
("2000-01-01T12:30:50..2000-01-01T12:30:55", "2000-01-01T12:30:55", True),
("2000-01-01T12:30:50..2000-01-01T12:30:55", "2000-01-01T12:30:56", False), # unbounded # noqa: E501
("..", datetime.max.isoformat(), True),
("..", datetime.min.isoformat(), True),
("..", "1000-01-01T00:00:00", True),
],
) # fmt: skip
def test_intervals(self, pattern, datestr, include):
(start, end) = _parse_periods(pattern)
interval = DateInterval.from_periods(start, end)
assert interval.contains(date)
def assertExcludes(self, interval_pattern, date_pattern):
date = _date(date_pattern)
(start, end) = _parse_periods(interval_pattern)
interval = DateInterval.from_periods(start, end)
assert not interval.contains(date)
assert interval.contains(datetime.fromisoformat(datestr)) == include
def _parsetime(s):

View file

@ -19,6 +19,7 @@ import shutil
import stat
import unittest
from os.path import join
from pathlib import Path
import pytest
@ -27,7 +28,7 @@ from beets import util
from beets.test import _common
from beets.test._common import item, touch
from beets.test.helper import NEEDS_REFLINK, BeetsTestCase
from beets.util import MoveOperation, bytestring_path, syspath
from beets.util import MoveOperation, syspath
class MoveTest(BeetsTestCase):
@ -35,11 +36,8 @@ class MoveTest(BeetsTestCase):
super().setUp()
# make a temporary file
self.path = join(self.temp_dir, b"temp.mp3")
shutil.copy(
syspath(join(_common.RSRC, b"full.mp3")),
syspath(self.path),
)
self.path = self.temp_dir_path / "temp.mp3"
shutil.copy(self.resource_path, self.path)
# add it to a temporary library
self.i = beets.library.Item.from_path(self.path)
@ -52,57 +50,57 @@ class MoveTest(BeetsTestCase):
self.i.artist = "one"
self.i.album = "two"
self.i.title = "three"
self.dest = join(self.libdir, b"one", b"two", b"three.mp3")
self.dest = self.lib_path / "one" / "two" / "three.mp3"
self.otherdir = join(self.temp_dir, b"testotherdir")
self.otherdir = self.temp_dir_path / "testotherdir"
def test_move_arrives(self):
self.i.move()
self.assertExists(self.dest)
assert self.dest.exists()
def test_move_to_custom_dir(self):
self.i.move(basedir=self.otherdir)
self.assertExists(join(self.otherdir, b"one", b"two", b"three.mp3"))
self.i.move(basedir=os.fsencode(self.otherdir))
assert (self.otherdir / "one" / "two" / "three.mp3").exists()
def test_move_departs(self):
self.i.move()
self.assertNotExists(self.path)
assert not self.path.exists()
def test_move_in_lib_prunes_empty_dir(self):
self.i.move()
old_path = self.i.path
self.assertExists(old_path)
old_path = self.i.filepath
assert old_path.exists()
self.i.artist = "newArtist"
self.i.move()
self.assertNotExists(old_path)
self.assertNotExists(os.path.dirname(old_path))
assert not old_path.exists()
assert not old_path.parent.exists()
def test_copy_arrives(self):
self.i.move(operation=MoveOperation.COPY)
self.assertExists(self.dest)
assert self.dest.exists()
def test_copy_does_not_depart(self):
self.i.move(operation=MoveOperation.COPY)
self.assertExists(self.path)
assert self.path.exists()
def test_reflink_arrives(self):
self.i.move(operation=MoveOperation.REFLINK_AUTO)
self.assertExists(self.dest)
assert self.dest.exists()
def test_reflink_does_not_depart(self):
self.i.move(operation=MoveOperation.REFLINK_AUTO)
self.assertExists(self.path)
assert self.path.exists()
@NEEDS_REFLINK
def test_force_reflink_arrives(self):
self.i.move(operation=MoveOperation.REFLINK)
self.assertExists(self.dest)
assert self.dest.exists()
@NEEDS_REFLINK
def test_force_reflink_does_not_depart(self):
self.i.move(operation=MoveOperation.REFLINK)
self.assertExists(self.path)
assert self.path.exists()
def test_move_changes_path(self):
self.i.move()
@ -164,14 +162,14 @@ class MoveTest(BeetsTestCase):
@unittest.skipUnless(_common.HAVE_SYMLINK, "need symlinks")
def test_link_arrives(self):
self.i.move(operation=MoveOperation.LINK)
self.assertExists(self.dest)
assert self.dest.exists()
assert os.path.islink(syspath(self.dest))
assert bytestring_path(os.readlink(syspath(self.dest))) == self.path
assert self.dest.resolve() == self.path
@unittest.skipUnless(_common.HAVE_SYMLINK, "need symlinks")
def test_link_does_not_depart(self):
self.i.move(operation=MoveOperation.LINK)
self.assertExists(self.path)
assert self.path.exists()
@unittest.skipUnless(_common.HAVE_SYMLINK, "need symlinks")
def test_link_changes_path(self):
@ -181,7 +179,7 @@ class MoveTest(BeetsTestCase):
@unittest.skipUnless(_common.HAVE_HARDLINK, "need hardlinks")
def test_hardlink_arrives(self):
self.i.move(operation=MoveOperation.HARDLINK)
self.assertExists(self.dest)
assert self.dest.exists()
s1 = os.stat(syspath(self.path))
s2 = os.stat(syspath(self.dest))
assert (s1[stat.ST_INO], s1[stat.ST_DEV]) == (
@ -192,7 +190,7 @@ class MoveTest(BeetsTestCase):
@unittest.skipUnless(_common.HAVE_HARDLINK, "need hardlinks")
def test_hardlink_does_not_depart(self):
self.i.move(operation=MoveOperation.HARDLINK)
self.assertExists(self.path)
assert self.path.exists()
@unittest.skipUnless(_common.HAVE_HARDLINK, "need hardlinks")
def test_hardlink_changes_path(self):
@ -264,24 +262,24 @@ class AlbumFileTest(BeetsTestCase):
assert b"newAlbumName" in self.i.path
def test_albuminfo_move_moves_file(self):
oldpath = self.i.path
oldpath = self.i.filepath
self.ai.album = "newAlbumName"
self.ai.move()
self.ai.store()
self.i.load()
self.assertNotExists(oldpath)
self.assertExists(self.i.path)
assert not oldpath.exists()
assert self.i.filepath.exists()
def test_albuminfo_move_copies_file(self):
oldpath = self.i.path
oldpath = self.i.filepath
self.ai.album = "newAlbumName"
self.ai.move(operation=MoveOperation.COPY)
self.ai.store()
self.i.load()
self.assertExists(oldpath)
self.assertExists(self.i.path)
assert oldpath.exists()
assert self.i.filepath.exists()
@NEEDS_REFLINK
def test_albuminfo_move_reflinks_file(self):
@ -314,29 +312,30 @@ class ArtFileTest(BeetsTestCase):
# Make an album.
self.ai = self.lib.add_album((self.i,))
# Make an art file too.
self.art = self.lib.get_album(self.i).art_destination("something.jpg")
touch(self.art)
self.ai.artpath = self.art
art_bytes = self.lib.get_album(self.i).art_destination("something.jpg")
self.art = Path(os.fsdecode(art_bytes))
self.art.touch()
self.ai.artpath = art_bytes
self.ai.store()
# Alternate destination dir.
self.otherdir = os.path.join(self.temp_dir, b"testotherdir")
def test_art_deleted_when_items_deleted(self):
self.assertExists(self.art)
assert self.art.exists()
self.ai.remove(True)
self.assertNotExists(self.art)
assert not self.art.exists()
def test_art_moves_with_album(self):
self.assertExists(self.art)
assert self.art.exists()
oldpath = self.i.path
self.ai.album = "newAlbum"
self.ai.move()
self.i.load()
assert self.i.path != oldpath
self.assertNotExists(self.art)
assert not self.art.exists()
newart = self.lib.get_album(self.i).art_destination(self.art)
self.assertExists(newart)
assert Path(os.fsdecode(newart)).exists()
def test_art_moves_with_album_to_custom_dir(self):
# Move the album to another directory.
@ -345,10 +344,10 @@ class ArtFileTest(BeetsTestCase):
self.i.load()
# Art should be in new directory.
self.assertNotExists(self.art)
newart = self.lib.get_album(self.i).artpath
self.assertExists(newart)
assert b"testotherdir" in newart
assert not self.art.exists()
newart = self.lib.get_album(self.i).art_filepath
assert newart.exists()
assert "testotherdir" in str(newart)
def test_setart_copies_image(self):
util.remove(self.art)
@ -363,7 +362,7 @@ class ArtFileTest(BeetsTestCase):
assert ai.artpath is None
ai.set_art(newart)
self.assertExists(ai.artpath)
assert ai.art_filepath.exists()
def test_setart_to_existing_art_works(self):
util.remove(self.art)
@ -380,7 +379,7 @@ class ArtFileTest(BeetsTestCase):
# Set the art again.
ai.set_art(ai.artpath)
self.assertExists(ai.artpath)
assert ai.art_filepath.exists()
def test_setart_to_existing_but_unset_art_works(self):
newart = os.path.join(self.libdir, b"newart.jpg")
@ -397,7 +396,7 @@ class ArtFileTest(BeetsTestCase):
# Set the art again.
ai.set_art(artdest)
self.assertExists(ai.artpath)
assert ai.art_filepath.exists()
def test_setart_to_conflicting_file_gets_new_path(self):
newart = os.path.join(self.libdir, b"newart.jpg")
@ -442,34 +441,34 @@ class ArtFileTest(BeetsTestCase):
os.chmod(syspath(ai.artpath), 0o777)
def test_move_last_file_moves_albumart(self):
oldartpath = self.lib.albums()[0].artpath
self.assertExists(oldartpath)
oldartpath = self.lib.albums()[0].art_filepath
assert oldartpath.exists()
self.ai.album = "different_album"
self.ai.store()
self.ai.items()[0].move()
artpath = self.lib.albums()[0].artpath
assert b"different_album" in artpath
self.assertExists(artpath)
self.assertNotExists(oldartpath)
artpath = self.lib.albums()[0].art_filepath
assert "different_album" in str(artpath)
assert artpath.exists()
assert not oldartpath.exists()
def test_move_not_last_file_does_not_move_albumart(self):
i2 = item()
i2.albumid = self.ai.id
self.lib.add(i2)
oldartpath = self.lib.albums()[0].artpath
self.assertExists(oldartpath)
oldartpath = self.lib.albums()[0].art_filepath
assert oldartpath.exists()
self.i.album = "different_album"
self.i.album_id = None # detach from album
self.i.move()
artpath = self.lib.albums()[0].artpath
assert b"different_album" not in artpath
artpath = self.lib.albums()[0].art_filepath
assert "different_album" not in str(artpath)
assert artpath == oldartpath
self.assertExists(oldartpath)
assert oldartpath.exists()
class RemoveTest(BeetsTestCase):
@ -486,37 +485,32 @@ class RemoveTest(BeetsTestCase):
self.ai = self.lib.add_album((self.i,))
def test_removing_last_item_prunes_empty_dir(self):
parent = os.path.dirname(self.i.path)
self.assertExists(parent)
assert self.i.filepath.parent.exists()
self.i.remove(True)
self.assertNotExists(parent)
assert not self.i.filepath.parent.exists()
def test_removing_last_item_preserves_nonempty_dir(self):
parent = os.path.dirname(self.i.path)
touch(os.path.join(parent, b"dummy.txt"))
(self.i.filepath.parent / "dummy.txt").touch()
self.i.remove(True)
self.assertExists(parent)
assert self.i.filepath.parent.exists()
def test_removing_last_item_prunes_dir_with_blacklisted_file(self):
parent = os.path.dirname(self.i.path)
touch(os.path.join(parent, b".DS_Store"))
(self.i.filepath.parent / ".DS_Store").touch()
self.i.remove(True)
self.assertNotExists(parent)
assert not self.i.filepath.parent.exists()
def test_removing_without_delete_leaves_file(self):
path = self.i.path
self.i.remove(False)
self.assertExists(path)
assert self.i.filepath.parent.exists()
def test_removing_last_item_preserves_library_dir(self):
self.i.remove(True)
self.assertExists(self.libdir)
assert self.lib_path.exists()
def test_removing_item_outside_of_library_deletes_nothing(self):
self.lib.directory = os.path.join(self.temp_dir, b"xxx")
parent = os.path.dirname(self.i.path)
self.i.remove(True)
self.assertExists(parent)
assert self.i.filepath.parent.exists()
def test_removing_last_item_in_album_with_albumart_prunes_dir(self):
artfile = os.path.join(self.temp_dir, b"testart.jpg")
@ -524,55 +518,54 @@ class RemoveTest(BeetsTestCase):
self.ai.set_art(artfile)
self.ai.store()
parent = os.path.dirname(self.i.path)
self.i.remove(True)
self.assertNotExists(parent)
assert not self.i.filepath.parent.exists()
# Tests that we can "delete" nonexistent files.
class SoftRemoveTest(BeetsTestCase):
class FilePathTestCase(BeetsTestCase):
def setUp(self):
super().setUp()
self.path = os.path.join(self.temp_dir, b"testfile")
touch(self.path)
self.path = self.temp_dir_path / "testfile"
self.path.touch()
# Tests that we can "delete" nonexistent files.
class SoftRemoveTest(FilePathTestCase):
def test_soft_remove_deletes_file(self):
util.remove(self.path, True)
self.assertNotExists(self.path)
assert not self.path.exists()
def test_soft_remove_silent_on_no_file(self):
try:
util.remove(self.path + b"XXX", True)
util.remove(self.path / "XXX", True)
except OSError:
self.fail("OSError when removing path")
class SafeMoveCopyTest(BeetsTestCase):
class SafeMoveCopyTest(FilePathTestCase):
def setUp(self):
super().setUp()
self.path = os.path.join(self.temp_dir, b"testfile")
touch(self.path)
self.otherpath = os.path.join(self.temp_dir, b"testfile2")
touch(self.otherpath)
self.dest = self.path + b".dest"
self.otherpath = self.temp_dir_path / "testfile2"
self.otherpath.touch()
self.dest = Path(f"{self.path}.dest")
def test_successful_move(self):
util.move(self.path, self.dest)
self.assertExists(self.dest)
self.assertNotExists(self.path)
assert self.dest.exists()
assert not self.path.exists()
def test_successful_copy(self):
util.copy(self.path, self.dest)
self.assertExists(self.dest)
self.assertExists(self.path)
assert self.dest.exists()
assert self.path.exists()
@NEEDS_REFLINK
def test_successful_reflink(self):
util.reflink(self.path, self.dest)
self.assertExists(self.dest)
self.assertExists(self.path)
assert self.dest.exists()
assert self.path.exists()
def test_unsuccessful_move(self):
with pytest.raises(util.FilesystemError):
@ -588,31 +581,31 @@ class SafeMoveCopyTest(BeetsTestCase):
def test_self_move(self):
util.move(self.path, self.path)
self.assertExists(self.path)
assert self.path.exists()
def test_self_copy(self):
util.copy(self.path, self.path)
self.assertExists(self.path)
assert self.path.exists()
class PruneTest(BeetsTestCase):
def setUp(self):
super().setUp()
self.base = os.path.join(self.temp_dir, b"testdir")
os.mkdir(syspath(self.base))
self.sub = os.path.join(self.base, b"subdir")
os.mkdir(syspath(self.sub))
self.base = self.temp_dir_path / "testdir"
self.base.mkdir()
self.sub = self.base / "subdir"
self.sub.mkdir()
def test_prune_existent_directory(self):
util.prune_dirs(self.sub, self.base)
self.assertExists(self.base)
self.assertNotExists(self.sub)
assert self.base.exists()
assert not self.sub.exists()
def test_prune_nonexistent_directory(self):
util.prune_dirs(os.path.join(self.sub, b"another"), self.base)
self.assertExists(self.base)
self.assertNotExists(self.sub)
util.prune_dirs(self.sub / "another", self.base)
assert self.base.exists()
assert not self.sub.exists()
class WalkTest(BeetsTestCase):
@ -678,12 +671,9 @@ class UniquePathTest(BeetsTestCase):
class MkDirAllTest(BeetsTestCase):
def test_parent_exists(self):
path = os.path.join(self.temp_dir, b"foo", b"bar", b"baz", b"qux.mp3")
util.mkdirall(path)
self.assertIsDir(os.path.join(self.temp_dir, b"foo", b"bar", b"baz"))
def test_child_does_not_exist(self):
path = os.path.join(self.temp_dir, b"foo", b"bar", b"baz", b"qux.mp3")
util.mkdirall(path)
self.assertNotExists(path)
def test_mkdirall(self):
child = self.temp_dir_path / "foo" / "bar" / "baz" / "quz.mp3"
util.mkdirall(child)
assert not child.exists()
assert child.parent.exists()
assert child.parent.is_dir()

View file

@ -15,6 +15,8 @@
"""Tests for the general importer functionality."""
from __future__ import annotations
import os
import re
import shutil
@ -22,6 +24,7 @@ import stat
import sys
import unicodedata
import unittest
from functools import cached_property
from io import StringIO
from pathlib import Path
from tarfile import TarFile
@ -43,6 +46,7 @@ from beets.test.helper import (
AutotagStub,
BeetsTestCase,
ImportTestCase,
IOMixin,
PluginMixin,
capture_log,
has_program,
@ -50,84 +54,71 @@ from beets.test.helper import (
from beets.util import bytestring_path, displayable_path, syspath
class PathsMixin:
import_media: list[MediaFile]
@cached_property
def track_import_path(self) -> Path:
return Path(self.import_media[0].path)
@cached_property
def album_path(self) -> Path:
return self.track_import_path.parent
@cached_property
def track_lib_path(self):
return self.lib_path / "Tag Artist" / "Tag Album" / "Tag Track 1.mp3"
@_common.slow_test()
class NonAutotaggedImportTest(AsIsImporterMixin, ImportTestCase):
class NonAutotaggedImportTest(PathsMixin, AsIsImporterMixin, ImportTestCase):
db_on_disk = True
def test_album_created_with_track_artist(self):
self.run_asis_importer()
albums = self.lib.albums()
assert len(albums) == 1
assert albums[0].albumartist == "Tag Artist"
def test_import_copy_arrives(self):
self.run_asis_importer()
for mediafile in self.import_media:
self.assert_file_in_lib(
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
assert self.track_lib_path.exists()
def test_threaded_import_copy_arrives(self):
config["threaded"] = True
self.run_asis_importer()
for mediafile in self.import_media:
self.assert_file_in_lib(
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
assert self.track_lib_path.exists()
def test_import_with_move_deletes_import_files(self):
for mediafile in self.import_media:
self.assertExists(mediafile.path)
self.run_asis_importer(move=True)
for mediafile in self.import_media:
self.assertNotExists(mediafile.path)
def test_import_with_move_prunes_directory_empty(self):
self.assertExists(os.path.join(self.import_dir, b"album"))
self.run_asis_importer(move=True)
self.assertNotExists(os.path.join(self.import_dir, b"album"))
def test_import_with_move_prunes_with_extra_clutter(self):
self.touch(os.path.join(self.import_dir, b"album", b"alog.log"))
assert self.album_path.exists()
assert self.track_import_path.exists()
(self.album_path / "alog.log").touch()
config["clutter"] = ["*.log"]
self.assertExists(os.path.join(self.import_dir, b"album"))
self.run_asis_importer(move=True)
self.assertNotExists(os.path.join(self.import_dir, b"album"))
assert not self.track_import_path.exists()
assert not self.album_path.exists()
def test_threaded_import_move_arrives(self):
self.run_asis_importer(move=True, threaded=True)
for mediafile in self.import_media:
self.assert_file_in_lib(
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
def test_threaded_import_move_deletes_import(self):
self.run_asis_importer(move=True, threaded=True)
for mediafile in self.import_media:
self.assertNotExists(mediafile.path)
assert self.track_lib_path.exists()
assert not self.track_import_path.exists()
def test_import_without_delete_retains_files(self):
self.run_asis_importer(delete=False)
for mediafile in self.import_media:
self.assertExists(mediafile.path)
assert self.track_import_path.exists()
def test_import_with_delete_removes_files(self):
self.run_asis_importer(delete=True)
for mediafile in self.import_media:
self.assertNotExists(mediafile.path)
def test_import_with_delete_prunes_directory_empty(self):
self.assertExists(os.path.join(self.import_dir, b"album"))
self.run_asis_importer(delete=True)
self.assertNotExists(os.path.join(self.import_dir, b"album"))
assert not self.album_path.exists()
assert not self.track_import_path.exists()
def test_album_mb_albumartistids(self):
self.run_asis_importer()
@ -137,63 +128,38 @@ class NonAutotaggedImportTest(AsIsImporterMixin, ImportTestCase):
@unittest.skipUnless(_common.HAVE_SYMLINK, "need symlinks")
def test_import_link_arrives(self):
self.run_asis_importer(link=True)
for mediafile in self.import_media:
filename = os.path.join(
self.libdir,
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
self.assertExists(filename)
assert os.path.islink(syspath(filename))
self.assert_equal_path(
util.bytestring_path(os.readlink(syspath(filename))),
mediafile.path,
)
assert self.track_lib_path.exists()
assert self.track_lib_path.is_symlink()
assert self.track_lib_path.resolve() == self.track_import_path
@unittest.skipUnless(_common.HAVE_HARDLINK, "need hardlinks")
def test_import_hardlink_arrives(self):
self.run_asis_importer(hardlink=True)
for mediafile in self.import_media:
filename = os.path.join(
self.libdir,
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
self.assertExists(filename)
s1 = os.stat(syspath(mediafile.path))
s2 = os.stat(syspath(filename))
assert (s1[stat.ST_INO], s1[stat.ST_DEV]) == (
s2[stat.ST_INO],
s2[stat.ST_DEV],
)
assert self.track_lib_path.exists()
media_stat = self.track_import_path.stat()
lib_media_stat = self.track_lib_path.stat()
assert media_stat[stat.ST_INO] == lib_media_stat[stat.ST_INO]
assert media_stat[stat.ST_DEV] == lib_media_stat[stat.ST_DEV]
@NEEDS_REFLINK
def test_import_reflink_arrives(self):
# Detecting reflinks is currently tricky due to various fs
# implementations, we'll just check the file exists.
self.run_asis_importer(reflink=True)
for mediafile in self.import_media:
self.assert_file_in_lib(
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
assert self.track_lib_path.exists()
def test_import_reflink_auto_arrives(self):
# Should pass regardless of reflink support due to fallback.
self.run_asis_importer(reflink="auto")
for mediafile in self.import_media:
self.assert_file_in_lib(
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
assert self.track_lib_path.exists()
def create_archive(session):
(handle, path) = mkstemp(dir=os.fsdecode(session.temp_dir))
handle, path = mkstemp(dir=session.temp_dir_path)
path = bytestring_path(path)
os.close(handle)
archive = ZipFile(os.fsdecode(path), mode="w")
@ -218,10 +184,10 @@ class RmTempTest(BeetsTestCase):
zip_path = create_archive(self)
archive_task = importer.ArchiveImportTask(zip_path)
archive_task.extract()
tmp_path = archive_task.toppath
self.assertExists(tmp_path)
tmp_path = Path(os.fsdecode(archive_task.toppath))
assert tmp_path.exists()
archive_task.finalize(self)
self.assertNotExists(tmp_path)
assert not tmp_path.exists()
class ImportZipTest(AsIsImporterMixin, ImportTestCase):
@ -275,56 +241,36 @@ class ImportSingletonTest(AutotagImportTestCase):
self.prepare_album_for_import(1)
self.importer = self.setup_singleton_importer()
def test_apply_asis_adds_track(self):
assert self.lib.items().get() is None
def test_apply_asis_adds_only_singleton_track(self):
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
# album not added
assert not self.lib.albums()
assert self.lib.items().get().title == "Tag Track 1"
def test_apply_asis_does_not_add_album(self):
assert self.lib.albums().get() is None
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.albums().get() is None
def test_apply_asis_adds_singleton_path(self):
self.assert_lib_dir_empty()
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
self.assert_file_in_lib(b"singletons", b"Tag Track 1.mp3")
assert (self.lib_path / "singletons" / "Tag Track 1.mp3").exists()
def test_apply_candidate_adds_track(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert not self.lib.albums()
assert self.lib.items().get().title == "Applied Track 1"
assert (self.lib_path / "singletons" / "Applied Track 1.mp3").exists()
def test_apply_candidate_does_not_add_album(self):
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.albums().get() is None
def test_apply_candidate_adds_singleton_path(self):
self.assert_lib_dir_empty()
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assert_file_in_lib(b"singletons", b"Applied Track 1.mp3")
def test_skip_does_not_add_first_track(self):
def test_skip_does_not_add_track(self):
self.importer.add_choice(importer.Action.SKIP)
self.importer.run()
assert self.lib.items().get() is None
def test_skip_adds_other_tracks(self):
assert not self.lib.items()
def test_skip_first_add_second_asis(self):
self.prepare_album_for_import(2)
self.importer.add_choice(importer.Action.SKIP)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert len(self.lib.items()) == 1
def test_import_single_files(self):
@ -373,7 +319,7 @@ class ImportSingletonTest(AutotagImportTestCase):
item.remove()
# Autotagged.
assert self.lib.albums().get() is None
assert not self.lib.albums()
self.importer.clear_choices()
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
@ -386,7 +332,7 @@ class ImportSingletonTest(AutotagImportTestCase):
assert item.disc == disc
class ImportTest(AutotagImportTestCase):
class ImportTest(PathsMixin, AutotagImportTestCase):
"""Test APPLY, ASIS and SKIP choices."""
def setUp(self):
@ -394,48 +340,23 @@ class ImportTest(AutotagImportTestCase):
self.prepare_album_for_import(1)
self.setup_importer()
def test_apply_asis_adds_album(self):
assert self.lib.albums().get() is None
def test_asis_moves_album_and_track(self):
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.albums().get().album == "Tag Album"
item = self.lib.items().get()
assert item.title == "Tag Track 1"
assert item.filepath.exists()
def test_apply_asis_adds_tracks(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.items().get().title == "Tag Track 1"
def test_apply_asis_adds_album_path(self):
self.assert_lib_dir_empty()
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
self.assert_file_in_lib(b"Tag Artist", b"Tag Album", b"Tag Track 1.mp3")
def test_apply_candidate_adds_album(self):
assert self.lib.albums().get() is None
def test_apply_moves_album_and_track(self):
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.albums().get().album == "Applied Album"
def test_apply_candidate_adds_tracks(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().title == "Applied Track 1"
def test_apply_candidate_adds_album_path(self):
self.assert_lib_dir_empty()
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assert_file_in_lib(
b"Applied Artist", b"Applied Album", b"Applied Track 1.mp3"
)
item = self.lib.items().get()
assert item.title == "Applied Track 1"
assert item.filepath.exists()
def test_apply_from_scratch_removes_other_metadata(self):
config["import"]["from_scratch"] = True
@ -464,35 +385,35 @@ class ImportTest(AutotagImportTestCase):
assert self.lib.items().get().bitrate == bitrate
def test_apply_with_move_deletes_import(self):
assert self.track_import_path.exists()
config["import"]["move"] = True
import_file = os.path.join(self.import_dir, b"album", b"track_1.mp3")
self.assertExists(import_file)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assertNotExists(import_file)
assert not self.track_import_path.exists()
def test_apply_with_delete_deletes_import(self):
assert self.track_import_path.exists()
config["import"]["delete"] = True
import_file = os.path.join(self.import_dir, b"album", b"track_1.mp3")
self.assertExists(import_file)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assertNotExists(import_file)
assert not self.track_import_path.exists()
def test_skip_does_not_add_track(self):
self.importer.add_choice(importer.Action.SKIP)
self.importer.run()
assert self.lib.items().get() is None
assert not self.lib.items()
def test_skip_non_album_dirs(self):
self.assertIsDir(os.path.join(self.import_dir, b"album"))
assert (self.import_path / "album").exists()
self.touch(b"cruft", dir=self.import_dir)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert len(self.lib.albums()) == 1
def test_unmatched_tracks_not_added(self):
@ -596,24 +517,21 @@ class ImportTracksTest(AutotagImportTestCase):
self.setup_importer()
def test_apply_tracks_adds_singleton_track(self):
assert self.lib.items().get() is None
assert self.lib.albums().get() is None
self.importer.add_choice(importer.Action.TRACKS)
self.importer.add_choice(importer.Action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().title == "Applied Track 1"
assert self.lib.albums().get() is None
assert not self.lib.albums()
def test_apply_tracks_adds_singleton_path(self):
self.assert_lib_dir_empty()
self.importer.add_choice(importer.Action.TRACKS)
self.importer.add_choice(importer.Action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assert_file_in_lib(b"singletons", b"Applied Track 1.mp3")
assert (self.lib_path / "singletons" / "Applied Track 1.mp3").exists()
class ImportCompilationTest(AutotagImportTestCase):
@ -721,7 +639,7 @@ class ImportCompilationTest(AutotagImportTestCase):
assert asserted_multi_artists_1
class ImportExistingTest(AutotagImportTestCase):
class ImportExistingTest(PathsMixin, AutotagImportTestCase):
"""Test importing files that are already in the library directory."""
def setUp(self):
@ -731,20 +649,23 @@ class ImportExistingTest(AutotagImportTestCase):
self.reimporter = self.setup_importer(import_dir=self.libdir)
self.importer = self.setup_importer()
def test_does_not_duplicate_item(self):
def tearDown(self):
super().tearDown()
self.matcher.restore()
@cached_property
def applied_track_path(self) -> Path:
return Path(str(self.track_lib_path).replace("Tag", "Applied"))
def test_does_not_duplicate_item_nor_album(self):
self.importer.run()
assert len(self.lib.items()) == 1
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
assert len(self.lib.items()) == 1
def test_does_not_duplicate_album(self):
self.importer.run()
assert len(self.lib.albums()) == 1
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
assert len(self.lib.items()) == 1
assert len(self.lib.albums()) == 1
def test_does_not_duplicate_singleton_track(self):
@ -758,33 +679,19 @@ class ImportExistingTest(AutotagImportTestCase):
self.reimporter.run()
assert len(self.lib.items()) == 1
def test_asis_updates_metadata(self):
def test_asis_updates_metadata_and_moves_file(self):
self.importer.run()
medium = MediaFile(self.lib.items().get().path)
medium.title = "New Title"
medium.save()
self.reimporter.add_choice(importer.Action.ASIS)
self.reimporter.run()
assert self.lib.items().get().title == "New Title"
def test_asis_updated_moves_file(self):
self.importer.run()
medium = MediaFile(self.lib.items().get().path)
medium.title = "New Title"
medium.save()
old_path = os.path.join(
b"Applied Artist", b"Applied Album", b"Applied Track 1.mp3"
)
self.assert_file_in_lib(old_path)
self.reimporter.add_choice(importer.Action.ASIS)
self.reimporter.run()
self.assert_file_in_lib(
b"Applied Artist", b"Applied Album", b"New Title.mp3"
)
self.assert_file_not_in_lib(old_path)
assert not self.applied_track_path.exists()
assert self.applied_track_path.with_name("New Title.mp3").exists()
def test_asis_updated_without_copy_does_not_move_file(self):
self.importer.run()
@ -792,49 +699,24 @@ class ImportExistingTest(AutotagImportTestCase):
medium.title = "New Title"
medium.save()
old_path = os.path.join(
b"Applied Artist", b"Applied Album", b"Applied Track 1.mp3"
)
self.assert_file_in_lib(old_path)
config["import"]["copy"] = False
self.reimporter.add_choice(importer.Action.ASIS)
self.reimporter.run()
self.assert_file_not_in_lib(
b"Applied Artist", b"Applied Album", b"New Title.mp3"
)
self.assert_file_in_lib(old_path)
assert self.applied_track_path.exists()
assert not self.applied_track_path.with_name("New Title.mp3").exists()
def test_outside_file_is_copied(self):
config["import"]["copy"] = False
self.importer.run()
self.assert_equal_path(
self.lib.items().get().path, self.import_media[0].path
)
assert self.lib.items().get().filepath == self.track_import_path
self.reimporter = self.setup_importer()
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
new_path = os.path.join(
b"Applied Artist", b"Applied Album", b"Applied Track 1.mp3"
)
self.assert_file_in_lib(new_path)
self.assert_equal_path(
self.lib.items().get().path, os.path.join(self.libdir, new_path)
)
def test_outside_file_is_moved(self):
config["import"]["copy"] = False
self.importer.run()
self.assert_equal_path(
self.lib.items().get().path, self.import_media[0].path
)
self.reimporter = self.setup_importer(move=True)
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
self.assertNotExists(self.import_media[0].path)
assert self.applied_track_path.exists()
assert self.lib.items().get().filepath == self.applied_track_path
class GroupAlbumsImportTest(AutotagImportTestCase):
@ -1050,12 +932,12 @@ class ImportDuplicateAlbumTest(PluginMixin, ImportTestCase):
def test_remove_duplicate_album(self):
item = self.lib.items().get()
assert item.title == "t\xeftle 0"
self.assertExists(item.path)
assert item.filepath.exists()
self.importer.default_resolution = self.importer.Resolution.REMOVE
self.importer.run()
self.assertNotExists(item.path)
assert not item.filepath.exists()
assert len(self.lib.albums()) == 1
assert len(self.lib.items()) == 1
item = self.lib.items().get()
@ -1065,7 +947,7 @@ class ImportDuplicateAlbumTest(PluginMixin, ImportTestCase):
config["import"]["autotag"] = False
item = self.lib.items().get()
assert item.title == "t\xeftle 0"
self.assertExists(item.path)
assert item.filepath.exists()
# Imported item has the same artist and album as the one in the
# library.
@ -1081,7 +963,7 @@ class ImportDuplicateAlbumTest(PluginMixin, ImportTestCase):
self.importer.default_resolution = self.importer.Resolution.REMOVE
self.importer.run()
self.assertExists(item.path)
assert item.filepath.exists()
assert len(self.lib.albums()) == 2
assert len(self.lib.items()) == 2
@ -1168,12 +1050,12 @@ class ImportDuplicateSingletonTest(ImportTestCase):
def test_remove_duplicate(self):
item = self.lib.items().get()
assert item.mb_trackid == "old trackid"
self.assertExists(item.path)
assert item.filepath.exists()
self.importer.default_resolution = self.importer.Resolution.REMOVE
self.importer.run()
self.assertNotExists(item.path)
assert not item.filepath.exists()
assert len(self.lib.items()) == 1
item = self.lib.items().get()
assert item.mb_trackid == "new trackid"
@ -1566,14 +1448,14 @@ class ReimportTest(AutotagImportTestCase):
replaced_album = self._album()
replaced_album.set_art(art_source)
replaced_album.store()
old_artpath = replaced_album.artpath
old_artpath = replaced_album.art_filepath
self.importer.run()
new_album = self._album()
new_artpath = new_album.art_destination(art_source)
assert new_album.artpath == new_artpath
self.assertExists(new_artpath)
assert new_album.art_filepath.exists()
if new_artpath != old_artpath:
self.assertNotExists(old_artpath)
assert not old_artpath.exists()
def test_reimported_album_has_new_flexattr(self):
self._setup_session()
@ -1588,13 +1470,11 @@ class ReimportTest(AutotagImportTestCase):
assert self._album().data_source == "match_source"
class ImportPretendTest(AutotagImportTestCase):
class ImportPretendTest(IOMixin, AutotagImportTestCase):
"""Test the pretend commandline option"""
def setUp(self):
super().setUp()
self.io.install()
self.album_track_path = self.prepare_album_for_import(1)[0]
self.single_path = self.prepare_track_for_import(2, self.import_path)
self.album_path = self.album_track_path.parent
@ -1624,7 +1504,7 @@ class ImportPretendTest(AutotagImportTestCase):
]
def test_import_pretend_empty(self):
empty_path = Path(os.fsdecode(self.temp_dir)) / "empty"
empty_path = self.temp_dir_path / "empty"
empty_path.mkdir()
importer = self.setup_importer(pretend=True, import_dir=empty_path)

View file

@ -194,7 +194,7 @@ class DestinationTest(BeetsTestCase):
def create_temp_dir(self, **kwargs):
kwargs["prefix"] = "."
super().create_temp_dir(**kwargs)
return super().create_temp_dir(**kwargs)
def setUp(self):
super().setUp()

View file

@ -21,6 +21,7 @@ import shutil
import subprocess
import sys
import unittest
from pathlib import Path
from unittest.mock import Mock, patch
import pytest
@ -32,6 +33,7 @@ from beets.autotag.match import distance
from beets.test import _common
from beets.test.helper import (
BeetsTestCase,
IOMixin,
PluginTestCase,
capture_stdout,
control_stdin,
@ -107,15 +109,12 @@ class ListTest(BeetsTestCase):
assert "the album" not in stdout.getvalue()
class RemoveTest(BeetsTestCase):
class RemoveTest(IOMixin, BeetsTestCase):
def setUp(self):
super().setUp()
self.io.install()
# Copy a file into the library.
self.item_path = os.path.join(_common.RSRC, b"full.mp3")
self.i = library.Item.from_path(self.item_path)
self.i = library.Item.from_path(self.resource_path)
self.lib.add(self.i)
self.i.move(operation=MoveOperation.COPY)
@ -124,29 +123,29 @@ class RemoveTest(BeetsTestCase):
commands.remove_items(self.lib, "", False, False, False)
items = self.lib.items()
assert len(list(items)) == 0
self.assertExists(self.i.path)
assert self.i.filepath.exists()
def test_remove_items_with_delete(self):
self.io.addinput("y")
commands.remove_items(self.lib, "", False, True, False)
items = self.lib.items()
assert len(list(items)) == 0
self.assertNotExists(self.i.path)
assert not self.i.filepath.exists()
def test_remove_items_with_force_no_delete(self):
commands.remove_items(self.lib, "", False, False, True)
items = self.lib.items()
assert len(list(items)) == 0
self.assertExists(self.i.path)
assert self.i.filepath.exists()
def test_remove_items_with_force_delete(self):
commands.remove_items(self.lib, "", False, True, True)
items = self.lib.items()
assert len(list(items)) == 0
self.assertNotExists(self.i.path)
assert not self.i.filepath.exists()
def test_remove_items_select_with_delete(self):
i2 = library.Item.from_path(self.item_path)
i2 = library.Item.from_path(self.resource_path)
self.lib.add(i2)
i2.move(operation=MoveOperation.COPY)
@ -444,21 +443,16 @@ class MoveTest(BeetsTestCase):
def setUp(self):
super().setUp()
self.io.install()
self.itempath = os.path.join(self.libdir, b"srcfile")
shutil.copy(
syspath(os.path.join(_common.RSRC, b"full.mp3")),
syspath(self.itempath),
)
self.initial_item_path = self.lib_path / "srcfile"
shutil.copy(self.resource_path, self.initial_item_path)
# Add a file to the library but don't copy it in yet.
self.i = library.Item.from_path(self.itempath)
self.i = library.Item.from_path(self.initial_item_path)
self.lib.add(self.i)
self.album = self.lib.add_album([self.i])
# Alternate destination directory.
self.otherdir = os.path.join(self.temp_dir, b"testotherdir")
self.otherdir = self.temp_dir_path / "testotherdir"
def _move(
self,
@ -477,79 +471,77 @@ class MoveTest(BeetsTestCase):
self._move()
self.i.load()
assert b"libdir" in self.i.path
self.assertExists(self.i.path)
self.assertNotExists(self.itempath)
assert self.i.filepath.exists()
assert not self.initial_item_path.exists()
def test_copy_item(self):
self._move(copy=True)
self.i.load()
assert b"libdir" in self.i.path
self.assertExists(self.i.path)
self.assertExists(self.itempath)
assert self.i.filepath.exists()
assert self.initial_item_path.exists()
def test_move_album(self):
self._move(album=True)
self.i.load()
assert b"libdir" in self.i.path
self.assertExists(self.i.path)
self.assertNotExists(self.itempath)
assert self.i.filepath.exists()
assert not self.initial_item_path.exists()
def test_copy_album(self):
self._move(copy=True, album=True)
self.i.load()
assert b"libdir" in self.i.path
self.assertExists(self.i.path)
self.assertExists(self.itempath)
assert self.i.filepath.exists()
assert self.initial_item_path.exists()
def test_move_item_custom_dir(self):
self._move(dest=self.otherdir)
self.i.load()
assert b"testotherdir" in self.i.path
self.assertExists(self.i.path)
self.assertNotExists(self.itempath)
assert self.i.filepath.exists()
assert not self.initial_item_path.exists()
def test_move_album_custom_dir(self):
self._move(dest=self.otherdir, album=True)
self.i.load()
assert b"testotherdir" in self.i.path
self.assertExists(self.i.path)
self.assertNotExists(self.itempath)
assert self.i.filepath.exists()
assert not self.initial_item_path.exists()
def test_pretend_move_item(self):
self._move(dest=self.otherdir, pretend=True)
self.i.load()
assert b"srcfile" in self.i.path
assert self.i.filepath == self.initial_item_path
def test_pretend_move_album(self):
self._move(album=True, pretend=True)
self.i.load()
assert b"srcfile" in self.i.path
assert self.i.filepath == self.initial_item_path
def test_export_item_custom_dir(self):
self._move(dest=self.otherdir, export=True)
self.i.load()
assert self.i.path == self.itempath
self.assertExists(self.otherdir)
assert self.i.filepath == self.initial_item_path
assert self.otherdir.exists()
def test_export_album_custom_dir(self):
self._move(dest=self.otherdir, album=True, export=True)
self.i.load()
assert self.i.path == self.itempath
self.assertExists(self.otherdir)
assert self.i.filepath == self.initial_item_path
assert self.otherdir.exists()
def test_pretend_export_item(self):
self._move(dest=self.otherdir, pretend=True, export=True)
self.i.load()
assert b"srcfile" in self.i.path
self.assertNotExists(self.otherdir)
assert self.i.filepath == self.initial_item_path
assert not self.otherdir.exists()
class UpdateTest(BeetsTestCase):
class UpdateTest(IOMixin, BeetsTestCase):
def setUp(self):
super().setUp()
self.io.install()
# Copy a file into the library.
item_path = os.path.join(_common.RSRC, b"full.mp3")
item_path_two = os.path.join(_common.RSRC, b"full.flac")
@ -606,12 +598,12 @@ class UpdateTest(BeetsTestCase):
assert not self.lib.albums()
def test_delete_removes_album_art(self):
artpath = self.album.artpath
self.assertExists(artpath)
art_filepath = self.album.art_filepath
assert art_filepath.exists()
util.remove(self.i.path)
util.remove(self.i2.path)
self._update()
self.assertNotExists(artpath)
assert not art_filepath.exists()
def test_modified_metadata_detected(self):
mf = MediaFile(syspath(self.i.path))
@ -742,11 +734,7 @@ class UpdateTest(BeetsTestCase):
assert item.lyrics != "new lyrics"
class PrintTest(BeetsTestCase):
def setUp(self):
super().setUp()
self.io.install()
class PrintTest(IOMixin, unittest.TestCase):
def test_print_without_locale(self):
lang = os.environ.get("LANG")
if lang:
@ -841,9 +829,7 @@ class ConfigTest(TestPluginTestCase):
del os.environ["BEETSDIR"]
# Also set APPDATA, the Windows equivalent of setting $HOME.
appdata_dir = os.fsdecode(
os.path.join(self.temp_dir, b"AppData", b"Roaming")
)
appdata_dir = self.temp_dir_path / "AppData" / "Roaming"
self._orig_cwd = os.getcwd()
self.test_cmd = self._make_test_cmd()
@ -851,27 +837,21 @@ class ConfigTest(TestPluginTestCase):
# Default user configuration
if platform.system() == "Windows":
self.user_config_dir = os.fsencode(
os.path.join(appdata_dir, "beets")
)
self.user_config_dir = appdata_dir / "beets"
else:
self.user_config_dir = os.path.join(
self.temp_dir, b".config", b"beets"
)
os.makedirs(syspath(self.user_config_dir))
self.user_config_path = os.path.join(
self.user_config_dir, b"config.yaml"
)
self.user_config_dir = self.temp_dir_path / ".config" / "beets"
self.user_config_dir.mkdir(parents=True, exist_ok=True)
self.user_config_path = self.user_config_dir / "config.yaml"
# Custom BEETSDIR
self.beetsdir = os.path.join(self.temp_dir, b"beetsdir")
self.cli_config_path = os.path.join(
os.fsdecode(self.temp_dir), "config.yaml"
)
os.makedirs(syspath(self.beetsdir))
self.beetsdir = self.temp_dir_path / "beetsdir"
self.beetsdir.mkdir(parents=True, exist_ok=True)
self.env_config_path = str(self.beetsdir / "config.yaml")
self.cli_config_path = str(self.temp_dir_path / "config.yaml")
self.env_patcher = patch(
"os.environ",
{"HOME": os.fsdecode(self.temp_dir), "APPDATA": appdata_dir},
{"HOME": str(self.temp_dir_path), "APPDATA": str(appdata_dir)},
)
self.env_patcher.start()
@ -970,9 +950,8 @@ class ConfigTest(TestPluginTestCase):
assert config["anoption"].get() == "cli overwrite"
def test_cli_config_file_overwrites_beetsdir_defaults(self):
os.environ["BEETSDIR"] = os.fsdecode(self.beetsdir)
env_config_path = os.path.join(self.beetsdir, b"config.yaml")
with open(env_config_path, "w") as file:
os.environ["BEETSDIR"] = str(self.beetsdir)
with open(self.env_config_path, "w") as file:
file.write("anoption: value")
with open(self.cli_config_path, "w") as file:
@ -1019,39 +998,25 @@ class ConfigTest(TestPluginTestCase):
file.write("statefile: state")
self.run_command("--config", self.cli_config_path, "test", lib=None)
self.assert_equal_path(
util.bytestring_path(config["library"].as_filename()),
os.path.join(self.user_config_dir, b"beets.db"),
)
self.assert_equal_path(
util.bytestring_path(config["statefile"].as_filename()),
os.path.join(self.user_config_dir, b"state"),
)
assert config["library"].as_path() == self.user_config_dir / "beets.db"
assert config["statefile"].as_path() == self.user_config_dir / "state"
def test_cli_config_paths_resolve_relative_to_beetsdir(self):
os.environ["BEETSDIR"] = os.fsdecode(self.beetsdir)
os.environ["BEETSDIR"] = str(self.beetsdir)
with open(self.cli_config_path, "w") as file:
file.write("library: beets.db\n")
file.write("statefile: state")
self.run_command("--config", self.cli_config_path, "test", lib=None)
self.assert_equal_path(
util.bytestring_path(config["library"].as_filename()),
os.path.join(self.beetsdir, b"beets.db"),
)
self.assert_equal_path(
util.bytestring_path(config["statefile"].as_filename()),
os.path.join(self.beetsdir, b"state"),
)
assert config["library"].as_path() == self.beetsdir / "beets.db"
assert config["statefile"].as_path() == self.beetsdir / "state"
def test_command_line_option_relative_to_working_dir(self):
config.read()
os.chdir(syspath(self.temp_dir))
self.run_command("--library", "foo.db", "test", lib=None)
self.assert_equal_path(
config["library"].as_filename(), os.path.join(os.getcwd(), "foo.db")
)
assert config["library"].as_path() == Path.cwd() / "foo.db"
def test_cli_config_file_loads_plugin_commands(self):
with open(self.cli_config_path, "w") as file:
@ -1063,24 +1028,23 @@ class ConfigTest(TestPluginTestCase):
self.unload_plugins()
def test_beetsdir_config(self):
os.environ["BEETSDIR"] = os.fsdecode(self.beetsdir)
os.environ["BEETSDIR"] = str(self.beetsdir)
env_config_path = os.path.join(self.beetsdir, b"config.yaml")
with open(env_config_path, "w") as file:
with open(self.env_config_path, "w") as file:
file.write("anoption: overwrite")
config.read()
assert config["anoption"].get() == "overwrite"
def test_beetsdir_points_to_file_error(self):
beetsdir = os.path.join(self.temp_dir, b"beetsfile")
beetsdir = str(self.temp_dir_path / "beetsfile")
open(beetsdir, "a").close()
os.environ["BEETSDIR"] = os.fsdecode(beetsdir)
os.environ["BEETSDIR"] = beetsdir
with pytest.raises(ConfigError):
self.run_command("test")
def test_beetsdir_config_does_not_load_default_user_config(self):
os.environ["BEETSDIR"] = os.fsdecode(self.beetsdir)
os.environ["BEETSDIR"] = str(self.beetsdir)
with open(self.user_config_path, "w") as file:
file.write("anoption: value")
@ -1089,41 +1053,27 @@ class ConfigTest(TestPluginTestCase):
assert not config["anoption"].exists()
def test_default_config_paths_resolve_relative_to_beetsdir(self):
os.environ["BEETSDIR"] = os.fsdecode(self.beetsdir)
os.environ["BEETSDIR"] = str(self.beetsdir)
config.read()
self.assert_equal_path(
util.bytestring_path(config["library"].as_filename()),
os.path.join(self.beetsdir, b"library.db"),
)
self.assert_equal_path(
util.bytestring_path(config["statefile"].as_filename()),
os.path.join(self.beetsdir, b"state.pickle"),
)
assert config["library"].as_path() == self.beetsdir / "library.db"
assert config["statefile"].as_path() == self.beetsdir / "state.pickle"
def test_beetsdir_config_paths_resolve_relative_to_beetsdir(self):
os.environ["BEETSDIR"] = os.fsdecode(self.beetsdir)
os.environ["BEETSDIR"] = str(self.beetsdir)
env_config_path = os.path.join(self.beetsdir, b"config.yaml")
with open(env_config_path, "w") as file:
with open(self.env_config_path, "w") as file:
file.write("library: beets.db\n")
file.write("statefile: state")
config.read()
self.assert_equal_path(
util.bytestring_path(config["library"].as_filename()),
os.path.join(self.beetsdir, b"beets.db"),
)
self.assert_equal_path(
util.bytestring_path(config["statefile"].as_filename()),
os.path.join(self.beetsdir, b"state"),
)
assert config["library"].as_path() == self.beetsdir / "beets.db"
assert config["statefile"].as_path() == self.beetsdir / "state"
class ShowModelChangeTest(BeetsTestCase):
class ShowModelChangeTest(IOMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.io.install()
self.a = _common.item()
self.b = _common.item()
self.a.path = self.b.path
@ -1172,10 +1122,9 @@ class ShowModelChangeTest(BeetsTestCase):
assert "bar" in out
class ShowChangeTest(BeetsTestCase):
class ShowChangeTest(IOMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.io.install()
self.items = [_common.item()]
self.items[0].track = 1
@ -1397,7 +1346,7 @@ class PluginTest(TestPluginTestCase):
os.environ.get("GITHUB_ACTIONS") == "true" and sys.platform == "linux",
reason="Completion is for some reason unhappy on Ubuntu 24.04 in CI",
)
class CompletionTest(TestPluginTestCase):
class CompletionTest(IOMixin, TestPluginTestCase):
def test_completion(self):
# Do not load any other bash completion scripts on the system.
env = dict(os.environ)
@ -1427,7 +1376,6 @@ class CompletionTest(TestPluginTestCase):
self.skipTest("could not read bash-completion script")
# Load completion script.
self.io.install()
self.run_command("completion", lib=None)
completion_script = self.io.getoutput().encode("utf-8")
self.io.restore()

View file

@ -21,7 +21,7 @@ import pytest
from beets import library, ui
from beets.test import _common
from beets.test.helper import BeetsTestCase, ItemInDBTestCase
from beets.test.helper import BeetsTestCase, IOMixin, ItemInDBTestCase
from beets.ui import commands
from beets.util import syspath
@ -75,16 +75,7 @@ class QueryTest(BeetsTestCase):
self.check_do_query(0, 2, album=True, also_items=False)
class FieldsTest(ItemInDBTestCase):
def setUp(self):
super().setUp()
self.io.install()
def tearDown(self):
super().tearDown()
self.io.restore()
class FieldsTest(IOMixin, ItemInDBTestCase):
def remove_keys(self, keys, text):
for i in text:
try:

View file

@ -16,19 +16,16 @@
import os
import shutil
import unittest
from copy import deepcopy
from random import random
from beets import config, ui
from beets.test import _common
from beets.test.helper import BeetsTestCase, control_stdin
from beets.test.helper import BeetsTestCase, IOMixin, control_stdin
class InputMethodsTest(BeetsTestCase):
def setUp(self):
super().setUp()
self.io.install()
class InputMethodsTest(IOMixin, unittest.TestCase):
def _print_helper(self, s):
print(s)