Fix _legalize_path types and clean up path legalisation logic (#5224)

**Refactor: Simplify Path Generation and Legalization**

This PR refactors the way destination paths for library items are
generated and made filesystem-safe. The goal is to simplify the process,
make it more robust, and centralize most of the path manipulation logic.

**Key Changes:**

*   **`Item.destination` Simplified:**
* The method now has a clearer interface, primarily controlled by the
`relative_to_libdir` flag (replacing the old `fragment` flag).
* It consistently returns the path as `bytes`, ready for filesystem
operations.
* Path legalization logic (sanitization, truncation, replacements) is
now delegated to `util.legalize_path`.

*   **`util.legalize_path` Enhanced:**
* Takes responsibility for the full legalization process, including
truncation based on filesystem limits.
* Uses new helper functions (`util.truncate_path`, `util.truncate_str`)
for robust truncation of path components while preserving extensions and
handling multi-byte characters correctly.
* Includes logic to handle potential conflicts where
sanitization/replacements might interfere with truncation, falling back
to default rules if necessary.

*   **Centralized Max Length:**
* A new cached function `util.get_max_filename_length` determines the
maximum filename length, checking the config first and then querying the
filesystem.

This refactoring leads to cleaner code by separating concerns:
`Item.destination` focuses on generating the *intended* path based on
metadata and formats, while `util.legalize_path` and its helpers handle
the complexities of making that path valid for the target filesystem.
This commit is contained in:
Benedikt 2025-05-06 14:49:51 +02:00 committed by GitHub
commit 9756a5d061
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 143 additions and 197 deletions

View file

@ -1075,26 +1075,19 @@ class Item(LibModel):
def destination(
self,
fragment=False,
relative_to_libdir=False,
basedir=None,
platform=None,
path_formats=None,
replacements=None,
):
"""Return the path in the library directory designated for the
item (i.e., where the file ought to be).
) -> bytes:
"""Return the path in the library directory designated for the item
(i.e., where the file ought to be).
fragment makes this method return just the path fragment underneath
the root library directory; the path is also returned as Unicode
instead of encoded as a bytestring. basedir can override the library's
base directory for the destination.
The path is returned as a bytestring. ``basedir`` can override the
library's base directory for the destination.
"""
db = self._check_db()
platform = platform or sys.platform
basedir = basedir or db.directory
path_formats = path_formats or db.path_formats
if replacements is None:
replacements = self._db.replacements
# Use a path format based on a query, falling back on the
# default.
@ -1122,7 +1115,7 @@ class Item(LibModel):
subpath = self.evaluate_template(subpath_tmpl, True)
# Prepare path for output: normalize Unicode characters.
if platform == "darwin":
if sys.platform == "darwin":
subpath = unicodedata.normalize("NFD", subpath)
else:
subpath = unicodedata.normalize("NFC", subpath)
@ -1132,19 +1125,10 @@ class Item(LibModel):
subpath, beets.config["path_sep_replace"].as_str()
)
maxlen = beets.config["max_filename_length"].get(int)
if not maxlen:
# When zero, try to determine from filesystem.
maxlen = util.max_filename_length(db.directory)
subpath, fellback = util.legalize_path(
subpath,
replacements,
maxlen,
os.path.splitext(self.path)[1],
fragment,
lib_path_str, fallback = util.legalize_path(
subpath, db.replacements, os.path.splitext(self.path)[1]
)
if fellback:
if fallback:
# Print an error message if legalization fell back to
# default replacements because of the maximum length.
log.warning(
@ -1153,11 +1137,12 @@ class Item(LibModel):
"the filename.",
subpath,
)
lib_path_bytes = util.bytestring_path(lib_path_str)
if fragment:
return util.as_string(subpath)
else:
return normpath(os.path.join(basedir, subpath))
if relative_to_libdir:
return lib_path_bytes
return normpath(os.path.join(basedir, lib_path_bytes))
class Album(LibModel):

View file

@ -30,6 +30,7 @@ import traceback
from collections import Counter
from contextlib import suppress
from enum import Enum
from functools import cache
from importlib import import_module
from multiprocessing.pool import ThreadPool
from pathlib import Path
@ -47,6 +48,7 @@ from typing import (
from unidecode import unidecode
import beets
from beets.util import hidden
if TYPE_CHECKING:
@ -694,105 +696,87 @@ def sanitize_path(path: str, replacements: Replacements | None = None) -> str:
return os.path.join(*comps)
def truncate_path(path: AnyStr, length: int = MAX_FILENAME_LENGTH) -> AnyStr:
"""Given a bytestring path or a Unicode path fragment, truncate the
components to a legal length. In the last component, the extension
is preserved.
def truncate_str(s: str, length: int) -> str:
"""Truncate the string to the given byte length.
If we end up truncating a unicode character in the middle (rendering it invalid),
it is removed:
>>> s = "🎹🎶" # 8 bytes
>>> truncate_str(s, 6)
'🎹'
"""
comps = components(path)
return os.fsencode(s)[:length].decode(sys.getfilesystemencoding(), "ignore")
out = [c[:length] for c in comps]
base, ext = os.path.splitext(comps[-1])
if ext:
# Last component has an extension.
base = base[: length - len(ext)]
out[-1] = base + ext
return os.path.join(*out)
def truncate_path(str_path: str) -> str:
"""Truncate each path part to a legal length preserving the extension."""
max_length = get_max_filename_length()
path = Path(str_path)
parent_parts = [truncate_str(p, max_length) for p in path.parts[:-1]]
stem = truncate_str(path.stem, max_length - len(path.suffix))
return str(Path(*parent_parts, stem).with_suffix(path.suffix))
def _legalize_stage(
path: str,
replacements: Replacements | None,
length: int,
extension: str,
fragment: bool,
) -> tuple[BytesOrStr, bool]:
path: str, replacements: Replacements | None, extension: str
) -> tuple[str, bool]:
"""Perform a single round of path legalization steps
(sanitation/replacement, encoding from Unicode to bytes,
extension-appending, and truncation). Return the path (Unicode if
`fragment` is set, `bytes` otherwise) and whether truncation was
required.
1. sanitation/replacement
2. appending the extension
3. truncation.
Return the path and whether truncation was required.
"""
# Perform an initial sanitization including user replacements.
path = sanitize_path(path, replacements)
# Encode for the filesystem.
if not fragment:
path = bytestring_path(path) # type: ignore
# Preserve extension.
path += extension.lower()
# Truncate too-long components.
pre_truncate_path = path
path = truncate_path(path, length)
path = truncate_path(path)
return path, path != pre_truncate_path
def legalize_path(
path: str,
replacements: Replacements | None,
length: int,
extension: bytes,
fragment: bool,
) -> tuple[BytesOrStr, bool]:
"""Given a path-like Unicode string, produce a legal path. Return
the path and a flag indicating whether some replacements had to be
ignored (see below).
path: str, replacements: Replacements | None, extension: str
) -> tuple[str, bool]:
"""Given a path-like Unicode string, produce a legal path. Return the path
and a flag indicating whether some replacements had to be ignored (see
below).
The legalization process (see `_legalize_stage`) consists of
applying the sanitation rules in `replacements`, encoding the string
to bytes (unless `fragment` is set), truncating components to
`length`, appending the `extension`.
This function uses `_legalize_stage` function to legalize the path, see its
documentation for the details of what this involves. It is called up to
three times in case truncation conflicts with replacements (as can happen
when truncation creates whitespace at the end of the string, for example).
This function performs up to three calls to `_legalize_stage` in
case truncation conflicts with replacements (as can happen when
truncation creates whitespace at the end of the string, for
example). The limited number of iterations iterations avoids the
possibility of an infinite loop of sanitation and truncation
operations, which could be caused by replacement rules that make the
string longer. The flag returned from this function indicates that
the path has to be truncated twice (indicating that replacements
made the string longer again after it was truncated); the
application should probably log some sort of warning.
The limited number of iterations avoids the possibility of an infinite loop
of sanitation and truncation operations, which could be caused by
replacement rules that make the string longer.
The flag returned from this function indicates that the path has to be
truncated twice (indicating that replacements made the string longer again
after it was truncated); the application should probably log some sort of
warning.
"""
suffix = as_string(extension)
if fragment:
# Outputting Unicode.
extension = extension.decode("utf-8", "ignore")
first_stage_path, _ = _legalize_stage(
path, replacements, length, extension, fragment
first_stage, _ = os.path.splitext(
_legalize_stage(path, replacements, suffix)[0]
)
# Convert back to Unicode with extension removed.
first_stage_path, _ = os.path.splitext(displayable_path(first_stage_path))
# Re-sanitize following truncation (including user replacements).
second_stage_path, retruncated = _legalize_stage(
first_stage_path, replacements, length, extension, fragment
)
second_stage, truncated = _legalize_stage(first_stage, replacements, suffix)
# If the path was once again truncated, discard user replacements
if not truncated:
return second_stage, False
# If the path was truncated, discard user replacements
# and run through one last legalization stage.
if retruncated:
second_stage_path, _ = _legalize_stage(
first_stage_path, None, length, extension, fragment
)
return second_stage_path, retruncated
return _legalize_stage(first_stage, None, suffix)[0], True
def str2bool(value: str) -> bool:
@ -871,16 +855,21 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
return CommandOutput(stdout, stderr)
def max_filename_length(path: BytesOrStr, limit=MAX_FILENAME_LENGTH) -> int:
@cache
def get_max_filename_length() -> int:
"""Attempt to determine the maximum filename length for the
filesystem containing `path`. If the value is greater than `limit`,
then `limit` is used instead (to prevent errors when a filesystem
misreports its capacity). If it cannot be determined (e.g., on
Windows), return `limit`.
"""
if length := beets.config["max_filename_length"].get(int):
return length
limit = MAX_FILENAME_LENGTH
if hasattr(os, "statvfs"):
try:
res = os.statvfs(path)
res = os.statvfs(beets.config["directory"].as_str())
except OSError:
return limit
return min(res[9], limit)

View file

@ -49,7 +49,7 @@ def libtree(lib):
"""
root = Node({}, {})
for item in lib.items():
dest = item.destination(fragment=True)
parts = util.components(dest)
dest = item.destination(relative_to_libdir=True)
parts = util.components(util.as_string(dest))
_insert(root, parts, item.id)
return root

View file

@ -33,7 +33,7 @@ import beets.ui
from beets import dbcore, vfs
from beets.library import Item
from beets.plugins import BeetsPlugin
from beets.util import bluelet
from beets.util import as_string, bluelet
if TYPE_CHECKING:
from beets.dbcore.query import Query
@ -1130,7 +1130,7 @@ class Server(BaseServer):
def _item_info(self, item):
info_lines = [
"file: " + item.destination(fragment=True),
"file: " + as_string(item.destination(relative_to_libdir=True)),
"Time: " + str(int(item.length)),
"duration: " + f"{item.length:.3f}",
"Id: " + str(item.id),

View file

@ -624,13 +624,7 @@ class ConvertPlugin(BeetsPlugin):
# strings we get from item.destination to bytes.
items_paths = [
os.path.relpath(
util.bytestring_path(
item.destination(
basedir=dest,
path_formats=path_formats,
fragment=False,
)
),
item.destination(basedir=dest, path_formats=path_formats),
pl_dir,
)
for item in items

View file

@ -263,6 +263,7 @@ select = [
]
[tool.ruff.lint.per-file-ignores]
"beets/**" = ["PT"]
"test/test_util.py" = ["E501"]
[tool.ruff.lint.isort]
split-on-trailing-comma = false

View file

@ -19,10 +19,10 @@ import os.path
import re
import shutil
import stat
import sys
import time
import unicodedata
import unittest
from unittest.mock import patch
import pytest
from mediafile import MediaFile, UnreadableFileError
@ -34,7 +34,7 @@ from beets.library import Album
from beets.test import _common
from beets.test._common import item
from beets.test.helper import BeetsTestCase, ItemInDBTestCase
from beets.util import bytestring_path, syspath
from beets.util import as_string, bytestring_path, syspath
# Shortcut to path normalization.
np = util.normpath
@ -411,33 +411,23 @@ class DestinationTest(BeetsTestCase):
def test_unicode_normalized_nfd_on_mac(self):
instr = unicodedata.normalize("NFC", "caf\xe9")
self.lib.path_formats = [("default", instr)]
dest = self.i.destination(platform="darwin", fragment=True)
assert dest == unicodedata.normalize("NFD", instr)
with patch("sys.platform", "darwin"):
dest = self.i.destination(relative_to_libdir=True)
assert as_string(dest) == unicodedata.normalize("NFD", instr)
def test_unicode_normalized_nfc_on_linux(self):
instr = unicodedata.normalize("NFD", "caf\xe9")
self.lib.path_formats = [("default", instr)]
dest = self.i.destination(platform="linux", fragment=True)
assert dest == unicodedata.normalize("NFC", instr)
def test_non_mbcs_characters_on_windows(self):
oldfunc = sys.getfilesystemencoding
sys.getfilesystemencoding = lambda: "mbcs"
try:
self.i.title = "h\u0259d"
self.lib.path_formats = [("default", "$title")]
p = self.i.destination()
assert b"?" not in p
# We use UTF-8 to encode Windows paths now.
assert "h\u0259d".encode() in p
finally:
sys.getfilesystemencoding = oldfunc
with patch("sys.platform", "linux"):
dest = self.i.destination(relative_to_libdir=True)
assert as_string(dest) == unicodedata.normalize("NFC", instr)
def test_unicode_extension_in_fragment(self):
self.lib.path_formats = [("default", "foo")]
self.i.path = util.bytestring_path("bar.caf\xe9")
dest = self.i.destination(platform="linux", fragment=True)
assert dest == "foo.caf\xe9"
with patch("sys.platform", "linux"):
dest = self.i.destination(relative_to_libdir=True)
assert as_string(dest) == "foo.caf\xe9"
def test_asciify_and_replace(self):
config["asciify_paths"] = True
@ -462,17 +452,6 @@ class DestinationTest(BeetsTestCase):
self.i.album = "bar"
assert self.i.destination() == np("base/ber/foo")
def test_destination_with_replacements_argument(self):
self.lib.directory = b"base"
self.lib.replacements = [(re.compile(r"a"), "f")]
self.lib.path_formats = [("default", "$album/$title")]
self.i.title = "foo"
self.i.album = "bar"
replacements = [(re.compile(r"a"), "e")]
assert self.i.destination(replacements=replacements) == np(
"base/ber/foo"
)
@unittest.skip("unimplemented: #359")
def test_destination_with_empty_component(self):
self.lib.directory = b"base"
@ -494,37 +473,6 @@ class DestinationTest(BeetsTestCase):
self.i.path = "foo.mp3"
assert self.i.destination() == np("base/one/_.mp3")
def test_legalize_path_one_for_one_replacement(self):
# Use a replacement that should always replace the last X in any
# path component with a Z.
self.lib.replacements = [
(re.compile(r"X$"), "Z"),
]
# Construct an item whose untruncated path ends with a Y but whose
# truncated version ends with an X.
self.i.title = "X" * 300 + "Y"
# The final path should reflect the replacement.
dest = self.i.destination()
assert dest[-2:] == b"XZ"
def test_legalize_path_one_for_many_replacement(self):
# Use a replacement that should always replace the last X in any
# path component with four Zs.
self.lib.replacements = [
(re.compile(r"X$"), "ZZZZ"),
]
# Construct an item whose untruncated path ends with a Y but whose
# truncated version ends with an X.
self.i.title = "X" * 300 + "Y"
# The final path should ignore the user replacement and create a path
# of the correct length, containing Xs.
dest = self.i.destination()
assert dest[-2:] == b"XX"
def test_album_field_query(self):
self.lib.directory = b"one"
self.lib.path_formats = [("default", "two"), ("flex:foo", "three")]
@ -611,8 +559,13 @@ class PathFormattingMixin:
def _assert_dest(self, dest, i=None):
if i is None:
i = self.i
with _common.platform_posix():
actual = i.destination()
if os.path.sep != "/":
dest = dest.replace(b"/", os.path.sep.encode())
dest = b"D:" + dest
actual = i.destination()
assert actual == dest

View file

@ -156,13 +156,8 @@ class PathConversionTest(BeetsTestCase):
assert path == outpath
def _windows_bytestring_path(self, path):
old_gfse = sys.getfilesystemencoding
sys.getfilesystemencoding = lambda: "mbcs"
try:
with _common.platform_windows():
return util.bytestring_path(path)
finally:
sys.getfilesystemencoding = old_gfse
with _common.platform_windows():
return util.bytestring_path(path)
def test_bytestring_path_windows_encodes_utf8(self):
path = "caf\xe9"
@ -175,18 +170,47 @@ class PathConversionTest(BeetsTestCase):
assert outpath == "C:\\caf\xe9".encode()
class PathTruncationTest(BeetsTestCase):
def test_truncate_bytestring(self):
with _common.platform_posix():
p = util.truncate_path(b"abcde/fgh", 4)
assert p == b"abcd/fgh"
class TestPathLegalization:
@pytest.fixture(autouse=True)
def _patch_max_filename_length(self, monkeypatch):
monkeypatch.setattr("beets.util.get_max_filename_length", lambda: 5)
def test_truncate_unicode(self):
with _common.platform_posix():
p = util.truncate_path("abcde/fgh", 4)
assert p == "abcd/fgh"
@pytest.mark.parametrize(
"path, expected",
[
("abcdeX/fgh", "abcde/fgh"),
("abcde/fXX.ext", "abcde/f.ext"),
("a🎹/a.ext", "a🎹/a.ext"),
("ab🎹/a.ext", "ab/a.ext"),
],
)
def test_truncate(self, path, expected):
path = path.replace("/", os.path.sep)
expected = expected.replace("/", os.path.sep)
def test_truncate_preserves_extension(self):
with _common.platform_posix():
p = util.truncate_path("abcde/fgh.ext", 5)
assert p == "abcde/f.ext"
assert util.truncate_path(path) == expected
_p = pytest.param
@pytest.mark.parametrize(
"replacements, expected_path, expected_truncated",
[ # [ repl before truncation, repl after truncation ]
_p([ ], "_abcd", False, id="default"),
_p([(r"abcdX$", "1ST"), ], ":1ST", False, id="1st_valid"),
_p([(r"abcdX$", "TOO_LONG"), ], ":TOO_", False, id="1st_truncated"),
_p([(r"abcdX$", "1ST"), (r"1ST$", "2ND") ], ":2ND", False, id="both_valid"),
_p([(r"abcdX$", "TOO_LONG"), (r"TOO_$", "2ND") ], ":2ND", False, id="1st_truncated_2nd_valid"),
_p([(r"abcdX$", "1ST"), (r"1ST$", "TOO_LONG") ], ":TOO_", False, id="1st_valid_2nd_truncated"),
# if the logic truncates the path twice, it ends up applying the default replacements
_p([(r"abcdX$", "TOO_LONG"), (r"TOO_$", "TOO_LONG") ], "_TOO_", True, id="both_truncated_default_repl_applied"),
]
) # fmt: skip
def test_replacements(
self, replacements, expected_path, expected_truncated
):
replacements = [(re.compile(pat), repl) for pat, repl in replacements]
assert util.legalize_path(":abcdX", replacements, "") == (
expected_path,
expected_truncated,
)