This commit is contained in:
Šarūnas Nejus 2026-03-23 20:30:56 +00:00 committed by GitHub
commit 517affc9f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 438 additions and 108 deletions

25
beets/context.py Normal file
View file

@ -0,0 +1,25 @@
from contextlib import contextmanager
from contextvars import ContextVar
# Holds the music dir context
_music_dir_var: ContextVar[bytes] = ContextVar("music_dir", default=b"")
def get_music_dir() -> bytes:
"""Get the current music directory context."""
return _music_dir_var.get()
def set_music_dir(value: bytes) -> None:
"""Set the current music directory context."""
_music_dir_var.set(value)
@contextmanager
def music_dir(value: bytes):
"""Temporarily bind the active music directory for query parsing."""
token = _music_dir_var.set(value)
try:
yield
finally:
_music_dir_var.reset(token)

View file

@ -1058,6 +1058,8 @@ class Transaction:
class Migration(ABC):
"""Define a one-time data migration that runs during database startup."""
CHUNK_SIZE: ClassVar[int] = 1000
db: Database
@cached_classproperty

41
beets/dbcore/pathutils.py Normal file
View file

@ -0,0 +1,41 @@
from __future__ import annotations
import os
from typing import TypeVar
from beets import context, util
MaybeBytes = TypeVar("MaybeBytes", bytes, None)
def _is_same_path_or_child(path: bytes, music_dir: bytes) -> bool:
"""Check if path is the music directory itself or resides within it."""
path_cmp = os.path.normcase(os.fsdecode(path))
music_dir_cmp = os.path.normcase(os.fsdecode(music_dir))
return path_cmp == music_dir_cmp or path_cmp.startswith(
os.path.join(music_dir_cmp, "")
)
def normalize_path_for_db(path: MaybeBytes) -> MaybeBytes:
"""Convert an absolute library path to its database representation."""
if not path or not os.path.isabs(path):
return path
music_dir = context.get_music_dir()
if not music_dir:
return path
if _is_same_path_or_child(path, music_dir):
return os.path.relpath(path, music_dir)
return path
def expand_path_from_db(path: bytes) -> bytes:
"""Convert a stored database path to an absolute library path."""
music_dir = context.get_music_dir()
if path and not os.path.isabs(path) and music_dir:
return util.normpath(os.path.join(music_dir, path))
return path

View file

@ -27,9 +27,11 @@ from operator import mul, or_
from re import Pattern
from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypeVar
from beets import util
from beets import context, util
from beets.util.units import raw_seconds_short
from . import pathutils
if TYPE_CHECKING:
from collections.abc import Iterator, MutableSequence
@ -289,10 +291,21 @@ class PathQuery(FieldQuery[bytes]):
`pattern` must be a path, either to a file or a directory.
"""
if not os.path.isabs(pattern) and (
music_dir := context.get_music_dir()
):
# Interpret relative `path:` queries relative to the library root.
if isinstance(pattern, str):
pattern = os.path.join(os.fsdecode(music_dir), pattern)
else:
pattern = os.path.join(music_dir, pattern)
path = util.normpath(pattern)
# Case sensitivity depends on the filesystem that the query path is located on.
self.case_sensitive = util.case_sensitive(path)
# Path queries compare against the DB representation, which is relative
# to the library root when the file lives inside it.
path = pathutils.normalize_path_for_db(path)
# Use a normalized-case pattern for case-insensitive matches.
if not self.case_sensitive:
@ -333,7 +346,9 @@ class PathQuery(FieldQuery[bytes]):
starts with the given directory path. Case sensitivity depends on the object's
filesystem as determined during initialization.
"""
path = obj.path if self.case_sensitive else obj.path.lower()
path = pathutils.normalize_path_for_db(obj.path)
if not self.case_sensitive:
path = path.lower()
return (path == self.pattern) or path.startswith(self.dir_path)
def col_clause(self) -> tuple[str, Sequence[SQLiteType]]:

View file

@ -26,7 +26,7 @@ import beets
from beets import util
from beets.util.units import human_seconds_short, raw_seconds_short
from . import query
from . import pathutils, query
SQLiteType = query.SQLiteType
BLOB_TYPE = query.BLOB_TYPE
@ -389,9 +389,10 @@ class BasePathType(Type[bytes, N]):
return value
def from_sql(self, sql_value):
return self.normalize(sql_value)
return pathutils.expand_path_from_db(self.normalize(sql_value))
def to_sql(self, value: bytes) -> BLOB_TYPE:
def to_sql(self, value: pathutils.MaybeBytes) -> BLOB_TYPE | None:
value = pathutils.normalize_path_for_db(value)
if isinstance(value, bytes):
value = BLOB_TYPE(value)
return value

View file

@ -1,11 +1,12 @@
from __future__ import annotations
from contextlib import contextmanager
from typing import TYPE_CHECKING
import platformdirs
import beets
from beets import dbcore
from beets import context, dbcore
from beets.util import normpath
from . import migrations
@ -23,6 +24,7 @@ class Library(dbcore.Database):
_migrations = (
(migrations.MultiGenreFieldMigration, (Item, Album)),
(migrations.LyricsMetadataInFlexFieldsMigration, (Item,)),
(migrations.RelativePathMigration, (Item, Album)),
)
def __init__(
@ -31,11 +33,14 @@ class Library(dbcore.Database):
directory: str | None = None,
path_formats=((PF_KEY_DEFAULT, "$artist/$album/$track $title"),),
replacements=None,
set_music_dir: bool = True,
):
timeout = beets.config["timeout"].as_number()
super().__init__(path, timeout=timeout)
self.directory = normpath(directory or platformdirs.user_music_path())
if set_music_dir:
context.set_music_dir(self.directory)
super().__init__(path, timeout=timeout)
self.path_formats = path_formats
self.replacements = replacements
@ -43,6 +48,12 @@ class Library(dbcore.Database):
# Used for template substitution performance.
self._memotable: dict[tuple[str, ...], str] = {}
@contextmanager
def music_dir_context(self):
"""Temporarily bind this library's directory to path conversion."""
with context.music_dir(self.directory):
yield self
# Adding objects to the database.
def add(self, obj):
@ -93,10 +104,13 @@ class Library(dbcore.Database):
# Parse the query, if necessary.
try:
parsed_sort = None
if isinstance(query, str):
query, parsed_sort = parse_query_string(query, model_cls)
elif isinstance(query, (list, tuple)):
query, parsed_sort = parse_query_parts(query, model_cls)
# Query parsing needs the library root, but keeping it scoped here
# avoids leaking one Library's directory into another's work.
with context.music_dir(self.directory):
if isinstance(query, str):
query, parsed_sort = parse_query_string(query, model_cls)
elif isinstance(query, (list, tuple)):
query, parsed_sort = parse_query_parts(query, model_cls)
except dbcore.query.InvalidQueryArgumentValueError as exc:
raise dbcore.InvalidQueryError(query, exc)

View file

@ -1,5 +1,6 @@
from __future__ import annotations
import os
from contextlib import suppress
from functools import cached_property
from typing import TYPE_CHECKING, NamedTuple, TypeVar
@ -9,6 +10,7 @@ from confuse.exceptions import ConfigError
import beets
from beets import ui
from beets.dbcore.db import Migration
from beets.dbcore.pathutils import normalize_path_for_db
from beets.dbcore.types import MULTI_VALUE_DELIMITER
from beets.util import unique_list
from beets.util.lyrics import Lyrics
@ -17,6 +19,7 @@ if TYPE_CHECKING:
from collections.abc import Iterator
from beets.dbcore.db import Model
from beets.library import Library
T = TypeVar("T")
@ -81,7 +84,7 @@ class MultiGenreFieldMigration(Migration):
migrated = total - len(to_migrate)
ui.print_(f"Migrating genres for {total} {table}...")
for batch in chunks(to_migrate, 1000):
for batch in chunks(to_migrate, self.CHUNK_SIZE):
with self.db.transaction() as tx:
tx.mutate_many(
f"UPDATE {table} SET genres = ? WHERE id = ?",
@ -106,6 +109,8 @@ class LyricsRow(NamedTuple):
class LyricsMetadataInFlexFieldsMigration(Migration):
"""Move legacy inline lyrics metadata into dedicated flexible fields."""
CHUNK_SIZE = 100
def _migrate_data(self, model_cls: type[Model], _: set[str]) -> None:
"""Migrate legacy lyrics to move metadata to flex attributes."""
table = model_cls._table
@ -140,7 +145,7 @@ class LyricsMetadataInFlexFieldsMigration(Migration):
ui.print_(f"Migrating lyrics for {total} {table}...")
lyr_fields = ["backend", "url", "language", "translation_language"]
for batch in chunks(to_migrate, 100):
for batch in chunks(to_migrate, self.CHUNK_SIZE):
lyrics_batch = [Lyrics.from_legacy_text(r.lyrics) for r in batch]
ids_with_lyrics = [
(lyr, r.id) for lyr, r in zip(lyrics_batch, batch)
@ -181,3 +186,44 @@ class LyricsMetadataInFlexFieldsMigration(Migration):
)
ui.print_(f"Migration complete: {migrated} of {total} {table} updated")
class RelativePathMigration(Migration):
"""Migrate path field to contain value relative to the music directory."""
db: Library
def _migrate_field(self, model_cls: type[Model], field: str) -> None:
table = model_cls._table
with self.db.transaction() as tx:
rows = tx.query(f"SELECT id, {field} FROM {table}") # type: ignore[assignment]
total = len(rows)
to_migrate = [r for r in rows if r[field] and os.path.isabs(r[field])]
if not to_migrate:
return
migrated = total - len(to_migrate)
ui.print_(f"Migrating {field} for {total} {table}...")
for batch in chunks(to_migrate, self.CHUNK_SIZE):
with self.db.transaction() as tx:
tx.mutate_many(
f"UPDATE {table} SET {field} = ? WHERE id = ?",
[(normalize_path_for_db(r[field]), r["id"]) for r in batch],
)
migrated += len(batch)
ui.print_(
f" Migrated {migrated} {table} "
f"({migrated}/{total} processed)..."
)
ui.print_(f"Migration complete: {migrated} of {total} {table} updated")
def _migrate_data(
self, model_cls: type[Model], current_fields: set[str]
) -> None:
for field in {"path", "artpath"} & current_fields:
self._migrate_field(model_cls, field)

View file

@ -14,6 +14,7 @@ from mediafile import MediaFile, UnreadableFileError
import beets
from beets import dbcore, logging, plugins, util
from beets.dbcore import types
from beets.dbcore.pathutils import normalize_path_for_db
from beets.util import (
MoveOperation,
bytestring_path,
@ -101,6 +102,18 @@ class LibModel(dbcore.Model["Library"]):
) -> FieldQuery:
"""Get a `FieldQuery` for the given field on this model."""
fast = field in cls.all_db_fields
if (
cls._type(field).query is dbcore.query.PathQuery
and query_cls is not dbcore.query.PathQuery
):
# Regex, exact, and string queries operate on the raw DB value, so
# strip the library prefix to match the stored relative path.
if isinstance(pattern, bytes):
pattern = normalize_path_for_db(pattern)
else:
pattern = os.fsdecode(
normalize_path_for_db(util.bytestring_path(pattern))
)
if field in cls.shared_db_fields:
# This field exists in both tables, so SQLite will encounter
# an OperationalError if we try to use it in a query.

View file

@ -16,6 +16,7 @@
from __future__ import annotations
import contextvars
import errno
import fnmatch
import os
@ -1048,17 +1049,21 @@ def asciify_path(path: str, sep_replace: str) -> str:
def par_map(transform: Callable[[T], Any], items: Sequence[T]) -> None:
"""Apply the function `transform` to all the elements in the
iterable `items`, like `map(transform, items)` but with no return
value.
"""Apply a transformation to each item concurrently using a thread pool.
The parallelism uses threads (not processes), so this is only useful
for IO-bound `transform`s.
Propagates the calling thread's context variables into each worker,
ensuring that context-dependent state is available during parallel
execution.
"""
pool = ThreadPool()
pool.map(transform, items)
pool.close()
pool.join()
ctx = contextvars.copy_context() # snapshot parent context at call time
def _worker(item: T) -> Any:
# ThreadPool workers may run concurrently, so each task needs its own
# child context rather than sharing one Context instance.
return ctx.copy().run(transform, item)
with ThreadPool() as pool:
pool.map(_worker, items)
class cached_classproperty(Generic[T]):

View file

@ -33,6 +33,7 @@ in place of any single coroutine.
from __future__ import annotations
import contextvars
import queue
import sys
from threading import Lock, Thread
@ -237,12 +238,18 @@ def _allmsgs(obj):
class PipelineThread(Thread):
"""Abstract base class for pipeline-stage threads."""
def __init__(self, all_threads):
def __init__(self, all_threads, ctx: contextvars.Context | None = None):
super().__init__()
self.abort_lock = Lock()
self.abort_flag = False
self.all_threads = all_threads
self.exc_info = None
self.ctx = ctx
def _run_in_context(self, func, *args):
if self.ctx is None:
return func(*args)
return self.ctx.run(func, *args)
def abort(self):
"""Shut down the thread at the next chance possible."""
@ -267,8 +274,8 @@ class FirstPipelineThread(PipelineThread):
The coroutine should just be a generator.
"""
def __init__(self, coro, out_queue, all_threads):
super().__init__(all_threads)
def __init__(self, coro, out_queue, all_threads, ctx=None):
super().__init__(all_threads, ctx)
self.coro = coro
self.out_queue = out_queue
self.out_queue.acquire()
@ -282,7 +289,7 @@ class FirstPipelineThread(PipelineThread):
# Get the value from the generator.
try:
msg = next(self.coro)
msg = self._run_in_context(next, self.coro)
except StopIteration:
break
@ -306,8 +313,8 @@ class MiddlePipelineThread(PipelineThread):
last.
"""
def __init__(self, coro, in_queue, out_queue, all_threads):
super().__init__(all_threads)
def __init__(self, coro, in_queue, out_queue, all_threads, ctx=None):
super().__init__(all_threads, ctx)
self.coro = coro
self.in_queue = in_queue
self.out_queue = out_queue
@ -316,7 +323,7 @@ class MiddlePipelineThread(PipelineThread):
def run(self):
try:
# Prime the coroutine.
next(self.coro)
self._run_in_context(next, self.coro)
while True:
with self.abort_lock:
@ -333,7 +340,7 @@ class MiddlePipelineThread(PipelineThread):
return
# Invoke the current stage.
out = self.coro.send(msg)
out = self._run_in_context(self.coro.send, msg)
# Send messages to next stage.
for msg in _allmsgs(out):
@ -355,14 +362,14 @@ class LastPipelineThread(PipelineThread):
should yield nothing.
"""
def __init__(self, coro, in_queue, all_threads):
super().__init__(all_threads)
def __init__(self, coro, in_queue, all_threads, ctx=None):
super().__init__(all_threads, ctx)
self.coro = coro
self.in_queue = in_queue
def run(self):
# Prime the coroutine.
next(self.coro)
self._run_in_context(next, self.coro)
try:
while True:
@ -380,7 +387,7 @@ class LastPipelineThread(PipelineThread):
return
# Send to consumer.
self.coro.send(msg)
self._run_in_context(self.coro.send, msg)
except BaseException:
self.abort_all(sys.exc_info())
@ -419,26 +426,37 @@ class Pipeline:
messages between the stages are stored in queues of the given
size.
"""
base_ctx = contextvars.copy_context()
queue_count = len(self.stages) - 1
queues = [CountedQueue(queue_size) for i in range(queue_count)]
threads = []
# Set up first stage.
for coro in self.stages[0]:
threads.append(FirstPipelineThread(coro, queues[0], threads))
# Each worker needs its own copy because Context objects cannot be
# entered concurrently from multiple threads.
threads.append(
FirstPipelineThread(coro, queues[0], threads, base_ctx.copy())
)
# Middle stages.
for i in range(1, queue_count):
for coro in self.stages[i]:
threads.append(
MiddlePipelineThread(
coro, queues[i - 1], queues[i], threads
coro,
queues[i - 1],
queues[i],
threads,
base_ctx.copy(),
)
)
# Last stage.
for coro in self.stages[-1]:
threads.append(LastPipelineThread(coro, queues[-1], threads))
threads.append(
LastPipelineThread(coro, queues[-1], threads, base_ctx.copy())
)
# Start threads.
for thread in threads:

View file

@ -281,13 +281,16 @@ class IPFSPlugin(BeetsPlugin):
def ipfs_added_albums(self, rlib, tmpname):
"""Returns a new library with only albums/items added to ipfs"""
tmplib = library.Library(tmpname)
for album in rlib.albums():
try:
if album.ipfs:
self.create_new_album(album, tmplib)
except AttributeError:
pass
tmplib = library.Library(
tmpname, directory="/ipfs/", set_music_dir=False
)
with tmplib.music_dir_context():
for album in rlib.albums():
try:
if album.ipfs:
self.create_new_album(album, tmplib)
except AttributeError:
pass
return tmplib
def create_new_album(self, album, tmplib):
@ -300,7 +303,7 @@ class IPFSPlugin(BeetsPlugin):
pass
item_path = os.fsdecode(os.path.basename(item.path))
# Clear current path from item
item.path = f"/ipfs/{album.ipfs}/{item_path}"
item.path = f"{album.ipfs}/{item_path}"
item.id = None
items.append(item)

View file

@ -16,6 +16,7 @@
from __future__ import annotations
import collections
import contextvars
import enum
import math
import os
@ -1427,6 +1428,9 @@ class ReplayGainPlugin(BeetsPlugin):
callback: Callable[[AnyRgTask], Any],
):
if self.pool is not None:
# Apply the caller's context to both the worker and its callbacks
# so lazy path expansion keeps the library root in pool threads.
ctx = contextvars.copy_context()
def handle_exc(exc):
"""Handle exceptions in the async work."""
@ -1435,8 +1439,19 @@ class ReplayGainPlugin(BeetsPlugin):
else:
self.exc_queue.put(exc)
def run_func():
return ctx.run(func, *args, **kwds)
def run_callback(task: AnyRgTask):
return ctx.run(callback, task)
def run_handle_exc(exc):
return ctx.run(handle_exc, exc)
self.pool.apply_async(
func, args, kwds, callback, error_callback=handle_exc
run_func,
callback=run_callback,
error_callback=run_handle_exc,
)
else:
callback(func(*args, **kwds))

View file

@ -32,6 +32,10 @@ New features
Bug fixes
~~~~~~~~~
- Item and album-art paths are now stored relative to the library root in the
database while remaining absolute in the rest of beets. Path queries now match
both library-relative paths and absolute paths under the currently configured
music directory under the new storage model. :bug:`133`
- :doc:`plugins/missing`: Fix ``--album`` mode incorrectly reporting albums
already in the library as missing. The comparison now correctly uses
``mb_releasegroupid``.

View file

@ -17,8 +17,9 @@ long-path support (Windows) are automatically managed by ``pathlib``.
When storing paths in the database, however, convert them to bytes with
``bytestring_path()``. Paths in Beets are currently stored as bytes, although
there are plans to eventually store ``pathlib.Path`` objects directly. To access
media file paths in their stored form, use the ``.path`` property on ``Item``
and ``Album``.
media file paths from library objects, use ``.path`` for the absolute path as
``bytes`` or ``.filepath`` for the absolute path as a ``pathlib.Path``. The
database still stores these paths relative to the configured library root.
Legacy utilities
----------------

View file

@ -386,6 +386,8 @@ Sometimes it's useful to find all the items in your library that are
::
$ beet list path:/my/music/directory
$ beet list path:Artist/Album
$ beet list path:Artist/Album/track.mp3
In fact, beets automatically recognizes any query term containing a path
separator (``/`` on POSIX systems) as a path query if that path exists, so this
@ -395,6 +397,9 @@ command is equivalent as long as ``/my/music/directory`` exist:
$ beet list /my/music/directory
The ``path:`` field accepts either an absolute path under the configured music
directory or a path relative to the library root.
Note that this only matches items that are *already in your library*, so a path
query won't necessarily find *all* the audio files in a directory---just the
ones you've already added to your beets library.

View file

@ -1,3 +1,4 @@
import os
import textwrap
import pytest
@ -139,3 +140,52 @@ class TestLyricsMetadataInFlexFieldsMigration:
assert helper.lib.migration_exists(
"lyrics_metadata_in_flex_fields", "items"
)
class TestRelativePathMigration:
@pytest.fixture
def helper(self, monkeypatch):
# do not apply migrations upon library initialization
monkeypatch.setattr("beets.library.library.Library._migrations", ())
helper = TestHelper()
helper.setup_beets()
# and now configure the migrations to be tested
monkeypatch.setattr(
"beets.library.library.Library._migrations",
((migrations.RelativePathMigration, (Item,)),),
)
yield helper
helper.teardown_beets()
def test_migrate(self, helper: TestHelper):
relative_path = os.path.join("foo", "bar", "baz.mp3")
absolute_path = os.fsencode(helper.lib_path / relative_path)
# need to insert the path directly into the database to bypass the path setter
helper.lib._connection().execute(
"INSERT INTO items (id, path) VALUES (?, ?)", (1, absolute_path)
)
old_stored_path = (
helper.lib._connection()
.execute("select path from items where id=?", (1,))
.fetchone()[0]
)
assert old_stored_path == absolute_path
helper.lib._migrate()
item = helper.lib.get_item(1)
assert item
# and now we have a relative path
stored_path = (
helper.lib._connection()
.execute("select path from items where id=?", (item.id,))
.fetchone()[0]
)
assert stored_path == os.fsencode(relative_path)
# and the item.path property still returns an absolute path
assert item.path == absolute_path

View file

@ -15,9 +15,9 @@
import os
from unittest.mock import Mock, patch
from beets import util
from beets.test import _common
from beets.test.helper import PluginTestCase
from beets.util import bytestring_path
from beetsplug.ipfs import IPFSPlugin
@ -29,26 +29,30 @@ class IPFSPluginTest(PluginTestCase):
test_album = self.mk_test_album()
ipfs = IPFSPlugin()
added_albums = ipfs.ipfs_added_albums(self.lib, self.lib.path)
added_album = added_albums.get_album(1)
assert added_album.ipfs == test_album.ipfs
found = False
want_item = test_album.items()[2]
for check_item in added_album.items():
try:
if check_item.get("ipfs", with_album=False):
ipfs_item = os.fsdecode(os.path.basename(want_item.path))
want_path = f"/ipfs/{test_album.ipfs}/{ipfs_item}"
want_path = bytestring_path(want_path)
assert check_item.path == want_path
assert (
check_item.get("ipfs", with_album=False)
== want_item.ipfs
)
assert check_item.title == want_item.title
found = True
except AttributeError:
pass
assert found
with added_albums.music_dir_context():
added_album = added_albums.get_album(1)
assert added_album.ipfs == test_album.ipfs
found = False
want_item = test_album.items()[2]
for check_item in added_album.items():
try:
if check_item.get("ipfs", with_album=False):
ipfs_item = os.fsdecode(
os.path.basename(want_item.path)
)
want_path = util.normpath(
os.path.join("/ipfs", test_album.ipfs, ipfs_item)
)
assert check_item.path == want_path
assert (
check_item.get("ipfs", with_album=False)
== want_item.ipfs
)
assert check_item.title == want_item.title
found = True
except AttributeError:
pass
assert found
def mk_test_album(self):
items = [_common.item() for _ in range(3)]

View file

@ -523,11 +523,6 @@ class RemoveTest(BeetsTestCase):
self.i.remove(True)
assert self.lib_path.exists()
def test_removing_item_outside_of_library_deletes_nothing(self):
self.lib.directory = os.path.join(self.temp_dir, b"xxx")
self.i.remove(True)
assert self.i.filepath.parent.exists()
def test_removing_last_item_in_album_with_albumart_prunes_dir(self):
artfile = os.path.join(self.temp_dir, b"testart.jpg")
touch(artfile)

View file

@ -925,10 +925,10 @@ class AlbumInfoTest(BeetsTestCase):
def test_albuminfo_stores_art(self):
ai = self.lib.get_album(self.i)
ai.artpath = "/my/great/art"
ai.artpath = os.fsdecode(np("/my/great/art"))
ai.store()
new_ai = self.lib.get_album(self.i)
assert new_ai.artpath == b"/my/great/art"
assert new_ai.artpath == np("/my/great/art")
def test_albuminfo_for_two_items_doesnt_duplicate_row(self):
i2 = item(self.lib)
@ -1071,7 +1071,7 @@ class PathStringTest(BeetsTestCase):
self.i.path = path
self.i.store()
i = next(iter(self.lib.items()))
assert i.path == path
assert i.path == os.path.join(self.libdir, path)
def test_special_char_path_added_to_database(self):
self.i.remove()
@ -1080,7 +1080,7 @@ class PathStringTest(BeetsTestCase):
i.path = path
self.lib.add(i)
i = next(iter(self.lib.items()))
assert i.path == path
assert i.path == os.path.join(self.libdir, path)
def test_destination_returns_bytestring(self):
self.i.artist = "b\xe1r"
@ -1094,12 +1094,18 @@ class PathStringTest(BeetsTestCase):
assert isinstance(dest, bytes)
def test_artpath_stores_special_chars(self):
path = b"b\xe1r"
path = bytestring_path("b\xe1r")
alb = self.lib.add_album([self.i])
alb.artpath = path
alb.store()
stored_path = (
self.lib._connection()
.execute("select artpath from albums where id=?", (alb.id,))
.fetchone()[0]
)
alb = self.lib.get_album(self.i)
assert path == alb.artpath
assert stored_path == path
assert alb.artpath == os.path.join(self.libdir, path)
def test_sanitize_path_with_special_chars(self):
path = "b\xe1r?"
@ -1124,6 +1130,22 @@ class PathStringTest(BeetsTestCase):
alb = self.lib.get_album(alb.id)
assert isinstance(alb.artpath, bytes)
def test_relative_path_is_stored(self):
relative_path = os.path.join(b"abc", b"foo.mp3")
absolute_path = os.path.join(self.libdir, relative_path)
self.i.path = absolute_path
self.i.store()
stored_path = (
self.lib._connection()
.execute("select path from items where id=?", (self.i.id,))
.fetchone()[0]
)
album = self.lib.add_album([self.i])
assert self.i.path == absolute_path
assert stored_path == relative_path
assert album.path == os.path.dirname(absolute_path)
class MtimeTest(BeetsTestCase):
def setUp(self):

View file

@ -14,12 +14,14 @@
"""Various tests for querying the library database."""
import os
import sys
from functools import partial
from pathlib import Path
import pytest
from beets import util
from beets.dbcore import types
from beets.dbcore.query import (
AndQuery,
@ -277,6 +279,23 @@ class TestPathQuery:
and path separator detection across different platforms.
"""
@staticmethod
def abs_query_path(path: str, trailing_sep: bool = False) -> str:
"""Build a platform-correct absolute query path without normalizing it.
On Windows, leading-slash paths are drive-rooted but Python 3.13 no
longer treats them as absolute. Prefix the current drive so explicit
path queries stay absolute while preserving raw segments such as ``..``.
"""
if os.path.__name__ == "ntpath" and path.startswith("/"):
drive, _ = os.path.splitdrive(os.fsdecode(util.normpath(os.sep)))
path = drive + path
path = path.replace("/", os.sep)
if trailing_sep:
path = os.path.join(path, "")
return path.replace("\\", "\\\\")
@pytest.fixture(scope="class")
def lib(self, helper):
helper.add_item(path=b"/aaa/bb/c.mp3", title="path item")
@ -289,27 +308,54 @@ class TestPathQuery:
return helper.lib
@pytest.mark.parametrize(
"q, expected_titles",
"path, expected_titles, trailing_sep",
[
_p("path:/aaa/bb/c.mp3", ["path item"], id="exact-match"),
_p("path:/aaa", ["path item"], id="parent-dir-no-slash"),
_p("path:/aaa/", ["path item"], id="parent-dir-with-slash"),
_p("path:/aa", [], id="no-match-does-not-match-parent-dir"),
_p("path:/xyzzy/", [], id="no-match"),
_p("path:/b/", [], id="fragment-no-match"),
_p("path:/x/../aaa/bb", ["path item"], id="non-normalized"),
_p("path::c\\.mp3$", ["path item"], id="regex"),
_p("path:/c/_", ["with underscore"], id="underscore-escaped"),
_p("path:/c/%", ["with percent"], id="percent-escaped"),
_p("path:/c/\\\\x", ["with backslash"], id="backslash-escaped"),
_p("/aaa/bb/c.mp3", ["path item"], False, id="exact-match"),
_p("/aaa", ["path item"], False, id="parent-dir-no-slash"),
_p("/aaa", ["path item"], True, id="parent-dir-with-slash"),
_p("/aa", [], False, id="no-match-does-not-match-parent-dir"),
_p("/xyzzy", [], True, id="no-match"),
_p("/b", [], True, id="fragment-no-match"),
_p("/x/../aaa/bb", ["path item"], False, id="non-normalized"),
_p(r"c\.mp3$", ["path item"], False, id="regex"),
_p("/c/_", ["with underscore"], False, id="underscore-escaped"),
_p("/c/%", ["with percent"], False, id="percent-escaped"),
_p(r"/c/\x", ["with backslash"], False, id="backslash-escaped"),
],
)
def test_explicit(self, monkeypatch, lib, q, expected_titles):
def test_explicit(
self, monkeypatch, lib, path, expected_titles, trailing_sep
):
"""Test explicit path queries with different path specifications."""
monkeypatch.setattr("beets.util.case_sensitive", lambda *_: True)
if path == r"c\.mp3$":
q = f"path::{path}"
elif path == r"/c/\x" and os.path.__name__ != "ntpath":
q = r"path:/c/\\x"
else:
q = f"path:{self.abs_query_path(path, trailing_sep=trailing_sep)}"
assert {i.title for i in lib.items(q)} == set(expected_titles)
@pytest.mark.parametrize(
"query", ["path:", "path::"], ids=["path", "regex"]
)
def test_absolute(self, lib, helper, query):
item_path = helper.lib_path / "item.mp3"
bytes_path = os.fsencode(item_path)
helper.add_item(path=bytes_path, title="absolute item")
q = f"{query}{item_path}".replace("\\", "\\\\")
assert {i.title for i in lib.items(q)} == {"absolute item"}
def test_relative(self, lib, helper):
item_path = helper.lib_path / "relative" / "item.mp3"
bytes_path = os.fsencode(item_path)
helper.add_item(path=bytes_path, title="relative item")
q = "path:relative/item.mp3"
assert {i.title for i in lib.items(q)} == {"relative item"}
@pytest.mark.skipif(sys.platform == "win32", reason=WIN32_NO_IMPLICIT_PATHS)
@pytest.mark.parametrize(
"q, expected_titles",
@ -339,7 +385,7 @@ class TestPathQuery:
self, lib, monkeypatch, case_sensitive, expected_titles
):
"""Test path matching with different case sensitivity settings."""
q = "path:/a/b/c2.mp3"
q = f"path:{self.abs_query_path('/a/b/c2.mp3')}"
monkeypatch.setattr(
"beets.util.case_sensitive", lambda *_: case_sensitive
)

View file

@ -14,16 +14,21 @@
"""Various tests for querying the library database."""
import os
from unittest.mock import patch
import beets.library
from beets import config, dbcore
from beets import config, dbcore, util
from beets.dbcore import types
from beets.library import Album
from beets.test import _common
from beets.test.helper import BeetsTestCase
def abs_test_path(path: str) -> str:
return os.fsdecode(util.normpath(path))
# A test case class providing a library with some dummy data and some
# assertions involving that data.
class DummyDataTestCase(BeetsTestCase):
@ -69,7 +74,7 @@ class DummyDataTestCase(BeetsTestCase):
items[0].flex2 = "Flex2-A"
items[0].album_id = albums[0].id
items[0].artist_sort = None
items[0].path = "/path0.mp3"
items[0].path = abs_test_path("/path0.mp3")
items[0].track = 1
items[1].title = "Baz qux"
items[1].artist = "Two"
@ -80,7 +85,7 @@ class DummyDataTestCase(BeetsTestCase):
items[1].flex2 = "Flex2-A"
items[1].album_id = albums[0].id
items[1].artist_sort = None
items[1].path = "/patH1.mp3"
items[1].path = abs_test_path("/patH1.mp3")
items[1].track = 2
items[2].title = "Beets 4 eva"
items[2].artist = "Three"
@ -91,7 +96,7 @@ class DummyDataTestCase(BeetsTestCase):
items[2].flex2 = "Flex1-B"
items[2].album_id = albums[1].id
items[2].artist_sort = None
items[2].path = "/paTH2.mp3"
items[2].path = abs_test_path("/paTH2.mp3")
items[2].track = 3
items[3].title = "Beets 4 eva"
items[3].artist = "Three"
@ -102,7 +107,7 @@ class DummyDataTestCase(BeetsTestCase):
items[3].flex2 = "Flex1-C"
items[3].album_id = albums[2].id
items[3].artist_sort = None
items[3].path = "/PATH3.mp3"
items[3].path = abs_test_path("/PATH3.mp3")
items[3].track = 4
for item in items:
self.lib.add(item)
@ -156,10 +161,10 @@ class SortFixedFieldTest(DummyDataTestCase):
q = ""
sort = dbcore.query.FixedFieldSort("path", True)
results = self.lib.items(q, sort)
assert results[0]["path"] == b"/path0.mp3"
assert results[1]["path"] == b"/patH1.mp3"
assert results[2]["path"] == b"/paTH2.mp3"
assert results[3]["path"] == b"/PATH3.mp3"
assert results[0]["path"] == util.normpath("/path0.mp3")
assert results[1]["path"] == util.normpath("/patH1.mp3")
assert results[2]["path"] == util.normpath("/paTH2.mp3")
assert results[3]["path"] == util.normpath("/PATH3.mp3")
class SortFlexFieldTest(DummyDataTestCase):

View file

@ -30,7 +30,7 @@ class ListTest(IOMixin, BeetsTestCase):
def test_list_item_path(self):
stdout = self._run_list(fmt="$path")
assert stdout.strip() == "xxx/yyy"
assert stdout.strip() == str(self.lib_path / "xxx/yyy")
def test_list_album_outputs_something(self):
stdout = self._run_list(album=True)
@ -38,7 +38,7 @@ class ListTest(IOMixin, BeetsTestCase):
def test_list_album_path(self):
stdout = self._run_list(album=True, fmt="$path")
assert stdout.strip() == "xxx"
assert stdout.strip() == str(self.lib_path / "xxx")
def test_list_album_omits_title(self):
stdout = self._run_list(album=True)

View file

@ -382,10 +382,10 @@ class CommonOptionsParserCliTest(IOMixin, BeetsTestCase):
def test_path_option(self):
output = self.run_with_output("ls", "-p")
assert output == "xxx/yyy\n"
assert output == f"{self.lib_path / 'xxx/yyy'}\n"
output = self.run_with_output("ls", "-a", "-p")
assert output == "xxx\n"
assert output == f"{self.lib_path / 'xxx'}\n"
def test_format_option(self):
output = self.run_with_output("ls", "-f", "$artist")