diff --git a/beets/library.py b/beets/library.py index d4ec63200..1fe253c18 100644 --- a/beets/library.py +++ b/beets/library.py @@ -1075,26 +1075,19 @@ class Item(LibModel): def destination( self, - fragment=False, + relative_to_libdir=False, basedir=None, - platform=None, path_formats=None, - replacements=None, - ): - """Return the path in the library directory designated for the - item (i.e., where the file ought to be). + ) -> bytes: + """Return the path in the library directory designated for the item + (i.e., where the file ought to be). - fragment makes this method return just the path fragment underneath - the root library directory; the path is also returned as Unicode - instead of encoded as a bytestring. basedir can override the library's - base directory for the destination. + The path is returned as a bytestring. ``basedir`` can override the + library's base directory for the destination. """ db = self._check_db() - platform = platform or sys.platform basedir = basedir or db.directory path_formats = path_formats or db.path_formats - if replacements is None: - replacements = self._db.replacements # Use a path format based on a query, falling back on the # default. @@ -1122,7 +1115,7 @@ class Item(LibModel): subpath = self.evaluate_template(subpath_tmpl, True) # Prepare path for output: normalize Unicode characters. - if platform == "darwin": + if sys.platform == "darwin": subpath = unicodedata.normalize("NFD", subpath) else: subpath = unicodedata.normalize("NFC", subpath) @@ -1132,19 +1125,10 @@ class Item(LibModel): subpath, beets.config["path_sep_replace"].as_str() ) - maxlen = beets.config["max_filename_length"].get(int) - if not maxlen: - # When zero, try to determine from filesystem. - maxlen = util.max_filename_length(db.directory) - - subpath, fellback = util.legalize_path( - subpath, - replacements, - maxlen, - os.path.splitext(self.path)[1], - fragment, + lib_path_str, fallback = util.legalize_path( + subpath, db.replacements, os.path.splitext(self.path)[1] ) - if fellback: + if fallback: # Print an error message if legalization fell back to # default replacements because of the maximum length. log.warning( @@ -1153,11 +1137,12 @@ class Item(LibModel): "the filename.", subpath, ) + lib_path_bytes = util.bytestring_path(lib_path_str) - if fragment: - return util.as_string(subpath) - else: - return normpath(os.path.join(basedir, subpath)) + if relative_to_libdir: + return lib_path_bytes + + return normpath(os.path.join(basedir, lib_path_bytes)) class Album(LibModel): diff --git a/beets/util/__init__.py b/beets/util/__init__.py index 68dbaee65..ff0a5d273 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -30,6 +30,7 @@ import traceback from collections import Counter from contextlib import suppress from enum import Enum +from functools import cache from importlib import import_module from multiprocessing.pool import ThreadPool from pathlib import Path @@ -47,6 +48,7 @@ from typing import ( from unidecode import unidecode +import beets from beets.util import hidden if TYPE_CHECKING: @@ -694,105 +696,87 @@ def sanitize_path(path: str, replacements: Replacements | None = None) -> str: return os.path.join(*comps) -def truncate_path(path: AnyStr, length: int = MAX_FILENAME_LENGTH) -> AnyStr: - """Given a bytestring path or a Unicode path fragment, truncate the - components to a legal length. In the last component, the extension - is preserved. +def truncate_str(s: str, length: int) -> str: + """Truncate the string to the given byte length. + + If we end up truncating a unicode character in the middle (rendering it invalid), + it is removed: + + >>> s = "🎹🎶" # 8 bytes + >>> truncate_str(s, 6) + '🎹' """ - comps = components(path) + return os.fsencode(s)[:length].decode(sys.getfilesystemencoding(), "ignore") - out = [c[:length] for c in comps] - base, ext = os.path.splitext(comps[-1]) - if ext: - # Last component has an extension. - base = base[: length - len(ext)] - out[-1] = base + ext - return os.path.join(*out) +def truncate_path(str_path: str) -> str: + """Truncate each path part to a legal length preserving the extension.""" + max_length = get_max_filename_length() + path = Path(str_path) + parent_parts = [truncate_str(p, max_length) for p in path.parts[:-1]] + stem = truncate_str(path.stem, max_length - len(path.suffix)) + return str(Path(*parent_parts, stem).with_suffix(path.suffix)) def _legalize_stage( - path: str, - replacements: Replacements | None, - length: int, - extension: str, - fragment: bool, -) -> tuple[BytesOrStr, bool]: + path: str, replacements: Replacements | None, extension: str +) -> tuple[str, bool]: """Perform a single round of path legalization steps - (sanitation/replacement, encoding from Unicode to bytes, - extension-appending, and truncation). Return the path (Unicode if - `fragment` is set, `bytes` otherwise) and whether truncation was - required. + 1. sanitation/replacement + 2. appending the extension + 3. truncation. + + Return the path and whether truncation was required. """ # Perform an initial sanitization including user replacements. path = sanitize_path(path, replacements) - # Encode for the filesystem. - if not fragment: - path = bytestring_path(path) # type: ignore - # Preserve extension. path += extension.lower() # Truncate too-long components. pre_truncate_path = path - path = truncate_path(path, length) + path = truncate_path(path) return path, path != pre_truncate_path def legalize_path( - path: str, - replacements: Replacements | None, - length: int, - extension: bytes, - fragment: bool, -) -> tuple[BytesOrStr, bool]: - """Given a path-like Unicode string, produce a legal path. Return - the path and a flag indicating whether some replacements had to be - ignored (see below). + path: str, replacements: Replacements | None, extension: str +) -> tuple[str, bool]: + """Given a path-like Unicode string, produce a legal path. Return the path + and a flag indicating whether some replacements had to be ignored (see + below). - The legalization process (see `_legalize_stage`) consists of - applying the sanitation rules in `replacements`, encoding the string - to bytes (unless `fragment` is set), truncating components to - `length`, appending the `extension`. + This function uses `_legalize_stage` function to legalize the path, see its + documentation for the details of what this involves. It is called up to + three times in case truncation conflicts with replacements (as can happen + when truncation creates whitespace at the end of the string, for example). - This function performs up to three calls to `_legalize_stage` in - case truncation conflicts with replacements (as can happen when - truncation creates whitespace at the end of the string, for - example). The limited number of iterations iterations avoids the - possibility of an infinite loop of sanitation and truncation - operations, which could be caused by replacement rules that make the - string longer. The flag returned from this function indicates that - the path has to be truncated twice (indicating that replacements - made the string longer again after it was truncated); the - application should probably log some sort of warning. + The limited number of iterations avoids the possibility of an infinite loop + of sanitation and truncation operations, which could be caused by + replacement rules that make the string longer. + + The flag returned from this function indicates that the path has to be + truncated twice (indicating that replacements made the string longer again + after it was truncated); the application should probably log some sort of + warning. """ + suffix = as_string(extension) - if fragment: - # Outputting Unicode. - extension = extension.decode("utf-8", "ignore") - - first_stage_path, _ = _legalize_stage( - path, replacements, length, extension, fragment + first_stage, _ = os.path.splitext( + _legalize_stage(path, replacements, suffix)[0] ) - # Convert back to Unicode with extension removed. - first_stage_path, _ = os.path.splitext(displayable_path(first_stage_path)) - # Re-sanitize following truncation (including user replacements). - second_stage_path, retruncated = _legalize_stage( - first_stage_path, replacements, length, extension, fragment - ) + second_stage, truncated = _legalize_stage(first_stage, replacements, suffix) - # If the path was once again truncated, discard user replacements + if not truncated: + return second_stage, False + + # If the path was truncated, discard user replacements # and run through one last legalization stage. - if retruncated: - second_stage_path, _ = _legalize_stage( - first_stage_path, None, length, extension, fragment - ) - - return second_stage_path, retruncated + return _legalize_stage(first_stage, None, suffix)[0], True def str2bool(value: str) -> bool: @@ -871,16 +855,21 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput: return CommandOutput(stdout, stderr) -def max_filename_length(path: BytesOrStr, limit=MAX_FILENAME_LENGTH) -> int: +@cache +def get_max_filename_length() -> int: """Attempt to determine the maximum filename length for the filesystem containing `path`. If the value is greater than `limit`, then `limit` is used instead (to prevent errors when a filesystem misreports its capacity). If it cannot be determined (e.g., on Windows), return `limit`. """ + if length := beets.config["max_filename_length"].get(int): + return length + + limit = MAX_FILENAME_LENGTH if hasattr(os, "statvfs"): try: - res = os.statvfs(path) + res = os.statvfs(beets.config["directory"].as_str()) except OSError: return limit return min(res[9], limit) diff --git a/beets/vfs.py b/beets/vfs.py index cdbf197a6..4fd133f5a 100644 --- a/beets/vfs.py +++ b/beets/vfs.py @@ -49,7 +49,7 @@ def libtree(lib): """ root = Node({}, {}) for item in lib.items(): - dest = item.destination(fragment=True) - parts = util.components(dest) + dest = item.destination(relative_to_libdir=True) + parts = util.components(util.as_string(dest)) _insert(root, parts, item.id) return root diff --git a/beetsplug/bpd/__init__.py b/beetsplug/bpd/__init__.py index 9d8b4142b..435368e35 100644 --- a/beetsplug/bpd/__init__.py +++ b/beetsplug/bpd/__init__.py @@ -33,7 +33,7 @@ import beets.ui from beets import dbcore, vfs from beets.library import Item from beets.plugins import BeetsPlugin -from beets.util import bluelet +from beets.util import as_string, bluelet if TYPE_CHECKING: from beets.dbcore.query import Query @@ -1130,7 +1130,7 @@ class Server(BaseServer): def _item_info(self, item): info_lines = [ - "file: " + item.destination(fragment=True), + "file: " + as_string(item.destination(relative_to_libdir=True)), "Time: " + str(int(item.length)), "duration: " + f"{item.length:.3f}", "Id: " + str(item.id), diff --git a/beetsplug/convert.py b/beetsplug/convert.py index a8c32e955..7586c2a1b 100644 --- a/beetsplug/convert.py +++ b/beetsplug/convert.py @@ -624,13 +624,7 @@ class ConvertPlugin(BeetsPlugin): # strings we get from item.destination to bytes. items_paths = [ os.path.relpath( - util.bytestring_path( - item.destination( - basedir=dest, - path_formats=path_formats, - fragment=False, - ) - ), + item.destination(basedir=dest, path_formats=path_formats), pl_dir, ) for item in items diff --git a/pyproject.toml b/pyproject.toml index d985c54ea..6819d6624 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -263,6 +263,7 @@ select = [ ] [tool.ruff.lint.per-file-ignores] "beets/**" = ["PT"] +"test/test_util.py" = ["E501"] [tool.ruff.lint.isort] split-on-trailing-comma = false diff --git a/test/test_library.py b/test/test_library.py index b5e6d4eeb..39f1d0b9e 100644 --- a/test/test_library.py +++ b/test/test_library.py @@ -19,10 +19,10 @@ import os.path import re import shutil import stat -import sys import time import unicodedata import unittest +from unittest.mock import patch import pytest from mediafile import MediaFile, UnreadableFileError @@ -34,7 +34,7 @@ from beets.library import Album from beets.test import _common from beets.test._common import item from beets.test.helper import BeetsTestCase, ItemInDBTestCase -from beets.util import bytestring_path, syspath +from beets.util import as_string, bytestring_path, syspath # Shortcut to path normalization. np = util.normpath @@ -411,33 +411,23 @@ class DestinationTest(BeetsTestCase): def test_unicode_normalized_nfd_on_mac(self): instr = unicodedata.normalize("NFC", "caf\xe9") self.lib.path_formats = [("default", instr)] - dest = self.i.destination(platform="darwin", fragment=True) - assert dest == unicodedata.normalize("NFD", instr) + with patch("sys.platform", "darwin"): + dest = self.i.destination(relative_to_libdir=True) + assert as_string(dest) == unicodedata.normalize("NFD", instr) def test_unicode_normalized_nfc_on_linux(self): instr = unicodedata.normalize("NFD", "caf\xe9") self.lib.path_formats = [("default", instr)] - dest = self.i.destination(platform="linux", fragment=True) - assert dest == unicodedata.normalize("NFC", instr) - - def test_non_mbcs_characters_on_windows(self): - oldfunc = sys.getfilesystemencoding - sys.getfilesystemencoding = lambda: "mbcs" - try: - self.i.title = "h\u0259d" - self.lib.path_formats = [("default", "$title")] - p = self.i.destination() - assert b"?" not in p - # We use UTF-8 to encode Windows paths now. - assert "h\u0259d".encode() in p - finally: - sys.getfilesystemencoding = oldfunc + with patch("sys.platform", "linux"): + dest = self.i.destination(relative_to_libdir=True) + assert as_string(dest) == unicodedata.normalize("NFC", instr) def test_unicode_extension_in_fragment(self): self.lib.path_formats = [("default", "foo")] self.i.path = util.bytestring_path("bar.caf\xe9") - dest = self.i.destination(platform="linux", fragment=True) - assert dest == "foo.caf\xe9" + with patch("sys.platform", "linux"): + dest = self.i.destination(relative_to_libdir=True) + assert as_string(dest) == "foo.caf\xe9" def test_asciify_and_replace(self): config["asciify_paths"] = True @@ -462,17 +452,6 @@ class DestinationTest(BeetsTestCase): self.i.album = "bar" assert self.i.destination() == np("base/ber/foo") - def test_destination_with_replacements_argument(self): - self.lib.directory = b"base" - self.lib.replacements = [(re.compile(r"a"), "f")] - self.lib.path_formats = [("default", "$album/$title")] - self.i.title = "foo" - self.i.album = "bar" - replacements = [(re.compile(r"a"), "e")] - assert self.i.destination(replacements=replacements) == np( - "base/ber/foo" - ) - @unittest.skip("unimplemented: #359") def test_destination_with_empty_component(self): self.lib.directory = b"base" @@ -494,37 +473,6 @@ class DestinationTest(BeetsTestCase): self.i.path = "foo.mp3" assert self.i.destination() == np("base/one/_.mp3") - def test_legalize_path_one_for_one_replacement(self): - # Use a replacement that should always replace the last X in any - # path component with a Z. - self.lib.replacements = [ - (re.compile(r"X$"), "Z"), - ] - - # Construct an item whose untruncated path ends with a Y but whose - # truncated version ends with an X. - self.i.title = "X" * 300 + "Y" - - # The final path should reflect the replacement. - dest = self.i.destination() - assert dest[-2:] == b"XZ" - - def test_legalize_path_one_for_many_replacement(self): - # Use a replacement that should always replace the last X in any - # path component with four Zs. - self.lib.replacements = [ - (re.compile(r"X$"), "ZZZZ"), - ] - - # Construct an item whose untruncated path ends with a Y but whose - # truncated version ends with an X. - self.i.title = "X" * 300 + "Y" - - # The final path should ignore the user replacement and create a path - # of the correct length, containing Xs. - dest = self.i.destination() - assert dest[-2:] == b"XX" - def test_album_field_query(self): self.lib.directory = b"one" self.lib.path_formats = [("default", "two"), ("flex:foo", "three")] @@ -611,8 +559,13 @@ class PathFormattingMixin: def _assert_dest(self, dest, i=None): if i is None: i = self.i - with _common.platform_posix(): - actual = i.destination() + + if os.path.sep != "/": + dest = dest.replace(b"/", os.path.sep.encode()) + dest = b"D:" + dest + + actual = i.destination() + assert actual == dest diff --git a/test/test_util.py b/test/test_util.py index f5b4fd102..3a5e55c49 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -156,13 +156,8 @@ class PathConversionTest(BeetsTestCase): assert path == outpath def _windows_bytestring_path(self, path): - old_gfse = sys.getfilesystemencoding - sys.getfilesystemencoding = lambda: "mbcs" - try: - with _common.platform_windows(): - return util.bytestring_path(path) - finally: - sys.getfilesystemencoding = old_gfse + with _common.platform_windows(): + return util.bytestring_path(path) def test_bytestring_path_windows_encodes_utf8(self): path = "caf\xe9" @@ -175,18 +170,47 @@ class PathConversionTest(BeetsTestCase): assert outpath == "C:\\caf\xe9".encode() -class PathTruncationTest(BeetsTestCase): - def test_truncate_bytestring(self): - with _common.platform_posix(): - p = util.truncate_path(b"abcde/fgh", 4) - assert p == b"abcd/fgh" +class TestPathLegalization: + @pytest.fixture(autouse=True) + def _patch_max_filename_length(self, monkeypatch): + monkeypatch.setattr("beets.util.get_max_filename_length", lambda: 5) - def test_truncate_unicode(self): - with _common.platform_posix(): - p = util.truncate_path("abcde/fgh", 4) - assert p == "abcd/fgh" + @pytest.mark.parametrize( + "path, expected", + [ + ("abcdeX/fgh", "abcde/fgh"), + ("abcde/fXX.ext", "abcde/f.ext"), + ("a🎹/a.ext", "a🎹/a.ext"), + ("ab🎹/a.ext", "ab/a.ext"), + ], + ) + def test_truncate(self, path, expected): + path = path.replace("/", os.path.sep) + expected = expected.replace("/", os.path.sep) - def test_truncate_preserves_extension(self): - with _common.platform_posix(): - p = util.truncate_path("abcde/fgh.ext", 5) - assert p == "abcde/f.ext" + assert util.truncate_path(path) == expected + + _p = pytest.param + + @pytest.mark.parametrize( + "replacements, expected_path, expected_truncated", + [ # [ repl before truncation, repl after truncation ] + _p([ ], "_abcd", False, id="default"), + _p([(r"abcdX$", "1ST"), ], ":1ST", False, id="1st_valid"), + _p([(r"abcdX$", "TOO_LONG"), ], ":TOO_", False, id="1st_truncated"), + _p([(r"abcdX$", "1ST"), (r"1ST$", "2ND") ], ":2ND", False, id="both_valid"), + _p([(r"abcdX$", "TOO_LONG"), (r"TOO_$", "2ND") ], ":2ND", False, id="1st_truncated_2nd_valid"), + _p([(r"abcdX$", "1ST"), (r"1ST$", "TOO_LONG") ], ":TOO_", False, id="1st_valid_2nd_truncated"), + # if the logic truncates the path twice, it ends up applying the default replacements + _p([(r"abcdX$", "TOO_LONG"), (r"TOO_$", "TOO_LONG") ], "_TOO_", True, id="both_truncated_default_repl_applied"), + ] + ) # fmt: skip + def test_replacements( + self, replacements, expected_path, expected_truncated + ): + replacements = [(re.compile(pat), repl) for pat, repl in replacements] + + assert util.legalize_path(":abcdX", replacements, "") == ( + expected_path, + expected_truncated, + )