From 98377ab5f6fc1829d79211b376bfd8d82bafaf33 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Mon, 7 Jul 2025 15:45:55 +0200 Subject: [PATCH] Split library file into different files inside library folder. --- beets/library/__init__.py | 16 + beets/library/exceptions.py | 38 + beets/library/library.py | 148 ++++ beets/{library.py => library/models.py} | 1048 +++++++++-------------- beets/library/queries.py | 61 ++ 5 files changed, 663 insertions(+), 648 deletions(-) create mode 100644 beets/library/__init__.py create mode 100644 beets/library/exceptions.py create mode 100644 beets/library/library.py rename beets/{library.py => library/models.py} (86%) create mode 100644 beets/library/queries.py diff --git a/beets/library/__init__.py b/beets/library/__init__.py new file mode 100644 index 000000000..286b84189 --- /dev/null +++ b/beets/library/__init__.py @@ -0,0 +1,16 @@ +from .exceptions import FileOperationError, ReadError, WriteError +from .library import Library +from .models import Album, Item, LibModel +from .queries import parse_query_parts, parse_query_string + +__all__ = [ + "Library", + "LibModel", + "Album", + "Item", + "parse_query_parts", + "parse_query_string", + "FileOperationError", + "ReadError", + "WriteError", +] diff --git a/beets/library/exceptions.py b/beets/library/exceptions.py new file mode 100644 index 000000000..7f117a2fe --- /dev/null +++ b/beets/library/exceptions.py @@ -0,0 +1,38 @@ +from beets import util + + +class FileOperationError(Exception): + """Indicate an error when interacting with a file on disk. + + Possibilities include an unsupported media type, a permissions + error, and an unhandled Mutagen exception. + """ + + def __init__(self, path, reason): + """Create an exception describing an operation on the file at + `path` with the underlying (chained) exception `reason`. + """ + super().__init__(path, reason) + self.path = path + self.reason = reason + + def __str__(self): + """Get a string representing the error. + + Describe both the underlying reason and the file path in question. + """ + return f"{util.displayable_path(self.path)}: {self.reason}" + + +class ReadError(FileOperationError): + """An error while reading a file (i.e. in `Item.read`).""" + + def __str__(self): + return "error reading " + str(super()) + + +class WriteError(FileOperationError): + """An error while writing a file (i.e. in `Item.write`).""" + + def __str__(self): + return "error writing " + str(super()) diff --git a/beets/library/library.py b/beets/library/library.py new file mode 100644 index 000000000..7370f7ecd --- /dev/null +++ b/beets/library/library.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import platformdirs + +import beets +from beets import dbcore +from beets.util import normpath + +from .models import Album, Item +from .queries import PF_KEY_DEFAULT, parse_query_parts, parse_query_string + +if TYPE_CHECKING: + from beets.dbcore import Results + + +class Library(dbcore.Database): + """A database of music containing songs and albums.""" + + _models = (Item, Album) + + def __init__( + self, + path="library.blb", + directory: str | None = None, + path_formats=((PF_KEY_DEFAULT, "$artist/$album/$track $title"),), + replacements=None, + ): + timeout = beets.config["timeout"].as_number() + super().__init__(path, timeout=timeout) + + self.directory = normpath(directory or platformdirs.user_music_path()) + + self.path_formats = path_formats + self.replacements = replacements + + # Used for template substitution performance. + self._memotable: dict[tuple[str, ...], str] = {} + + # Adding objects to the database. + + def add(self, obj): + """Add the :class:`Item` or :class:`Album` object to the library + database. + + Return the object's new id. + """ + obj.add(self) + self._memotable = {} + return obj.id + + def add_album(self, items): + """Create a new album consisting of a list of items. + + The items are added to the database if they don't yet have an + ID. Return a new :class:`Album` object. The list items must not + be empty. + """ + if not items: + raise ValueError("need at least one item") + + # Create the album structure using metadata from the first item. + values = {key: items[0][key] for key in Album.item_keys} + album = Album(self, **values) + + # Add the album structure and set the items' album_id fields. + # Store or add the items. + with self.transaction(): + album.add(self) + for item in items: + item.album_id = album.id + if item.id is None: + item.add(self) + else: + item.store() + + return album + + # Querying. + + def _fetch(self, model_cls, query, sort=None): + """Parse a query and fetch. + + If an order specification is present in the query string + the `sort` argument is ignored. + """ + # Parse the query, if necessary. + try: + parsed_sort = None + if isinstance(query, str): + query, parsed_sort = parse_query_string(query, model_cls) + elif isinstance(query, (list, tuple)): + query, parsed_sort = parse_query_parts(query, model_cls) + except dbcore.query.InvalidQueryArgumentValueError as exc: + raise dbcore.InvalidQueryError(query, exc) + + # Any non-null sort specified by the parsed query overrides the + # provided sort. + if parsed_sort and not isinstance(parsed_sort, dbcore.query.NullSort): + sort = parsed_sort + + return super()._fetch(model_cls, query, sort) + + @staticmethod + def get_default_album_sort(): + """Get a :class:`Sort` object for albums from the config option.""" + return dbcore.sort_from_strings( + Album, beets.config["sort_album"].as_str_seq() + ) + + @staticmethod + def get_default_item_sort(): + """Get a :class:`Sort` object for items from the config option.""" + return dbcore.sort_from_strings( + Item, beets.config["sort_item"].as_str_seq() + ) + + def albums(self, query=None, sort=None) -> Results[Album]: + """Get :class:`Album` objects matching the query.""" + return self._fetch(Album, query, sort or self.get_default_album_sort()) + + def items(self, query=None, sort=None) -> Results[Item]: + """Get :class:`Item` objects matching the query.""" + return self._fetch(Item, query, sort or self.get_default_item_sort()) + + # Convenience accessors. + + def get_item(self, id): + """Fetch a :class:`Item` by its ID. + + Return `None` if no match is found. + """ + return self._get(Item, id) + + def get_album(self, item_or_id): + """Given an album ID or an item associated with an album, return + a :class:`Album` object for the album. + + If no such album exists, return `None`. + """ + if isinstance(item_or_id, int): + album_id = item_or_id + else: + album_id = item_or_id.album_id + if album_id is None: + return None + return self._get(Album, album_id) diff --git a/beets/library.py b/beets/library/models.py similarity index 86% rename from beets/library.py rename to beets/library/models.py index 9223b3209..efa0f9694 100644 --- a/beets/library.py +++ b/beets/library/models.py @@ -1,23 +1,6 @@ -# This file is part of beets. -# Copyright 2016, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -"""The core data store and collection logic for beets.""" - from __future__ import annotations import os -import shlex import string import sys import time @@ -26,12 +9,11 @@ from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING -import platformdirs from mediafile import MediaFile, UnreadableFileError import beets from beets import dbcore, logging, plugins, util -from beets.dbcore import Results, types +from beets.dbcore import types from beets.util import ( MoveOperation, bytestring_path, @@ -42,57 +24,16 @@ from beets.util import ( ) from beets.util.functemplate import Template, template +from .exceptions import FileOperationError, ReadError, WriteError +from .queries import PF_KEY_DEFAULT, parse_query_string + if TYPE_CHECKING: - from .dbcore.query import FieldQuery, FieldQueryType + from ..dbcore.query import FieldQuery, FieldQueryType + from .library import Library # noqa: F401 log = logging.getLogger("beets") -# Special path format key. -PF_KEY_DEFAULT = "default" - - -# Exceptions. -class FileOperationError(Exception): - """Indicate an error when interacting with a file on disk. - - Possibilities include an unsupported media type, a permissions - error, and an unhandled Mutagen exception. - """ - - def __init__(self, path, reason): - """Create an exception describing an operation on the file at - `path` with the underlying (chained) exception `reason`. - """ - super().__init__(path, reason) - self.path = path - self.reason = reason - - def __str__(self): - """Get a string representing the error. - - Describe both the underlying reason and the file path in question. - """ - return f"{util.displayable_path(self.path)}: {self.reason}" - - -class ReadError(FileOperationError): - """An error while reading a file (i.e. in `Item.read`).""" - - def __str__(self): - return "error reading " + str(super()) - - -class WriteError(FileOperationError): - """An error while writing a file (i.e. in `Item.write`).""" - - def __str__(self): - return "error writing " + str(super()) - - -# Item and Album model classes. - - class LibModel(dbcore.Model["Library"]): """Shared concrete functionality for Items and Albums.""" @@ -259,6 +200,400 @@ class FormattedItemMapping(dbcore.db.FormattedMapping): return len(self.all_keys) +class Album(LibModel): + """Provide access to information about albums stored in a + library. + + Reflects the library's "albums" table, including album art. + """ + + _table = "albums" + _flex_table = "album_attributes" + _always_dirty = True + _fields = { + "id": types.PRIMARY_ID, + "artpath": types.NullPathType(), + "added": types.DATE, + "albumartist": types.STRING, + "albumartist_sort": types.STRING, + "albumartist_credit": types.STRING, + "albumartists": types.MULTI_VALUE_DSV, + "albumartists_sort": types.MULTI_VALUE_DSV, + "albumartists_credit": types.MULTI_VALUE_DSV, + "album": types.STRING, + "genre": types.STRING, + "style": types.STRING, + "discogs_albumid": types.INTEGER, + "discogs_artistid": types.INTEGER, + "discogs_labelid": types.INTEGER, + "year": types.PaddedInt(4), + "month": types.PaddedInt(2), + "day": types.PaddedInt(2), + "disctotal": types.PaddedInt(2), + "comp": types.BOOLEAN, + "mb_albumid": types.STRING, + "mb_albumartistid": types.STRING, + "mb_albumartistids": types.MULTI_VALUE_DSV, + "albumtype": types.STRING, + "albumtypes": types.SEMICOLON_SPACE_DSV, + "label": types.STRING, + "barcode": types.STRING, + "mb_releasegroupid": types.STRING, + "release_group_title": types.STRING, + "asin": types.STRING, + "catalognum": types.STRING, + "script": types.STRING, + "language": types.STRING, + "country": types.STRING, + "albumstatus": types.STRING, + "albumdisambig": types.STRING, + "releasegroupdisambig": types.STRING, + "rg_album_gain": types.NULL_FLOAT, + "rg_album_peak": types.NULL_FLOAT, + "r128_album_gain": types.NULL_FLOAT, + "original_year": types.PaddedInt(4), + "original_month": types.PaddedInt(2), + "original_day": types.PaddedInt(2), + } + + _search_fields = ("album", "albumartist", "genre") + + _types = { + "path": types.PathType(), + "data_source": types.STRING, + } + + _sorts = { + "albumartist": dbcore.query.SmartArtistSort, + "artist": dbcore.query.SmartArtistSort, + } + + # List of keys that are set on an album's items. + item_keys = [ + "added", + "albumartist", + "albumartists", + "albumartist_sort", + "albumartists_sort", + "albumartist_credit", + "albumartists_credit", + "album", + "genre", + "style", + "discogs_albumid", + "discogs_artistid", + "discogs_labelid", + "year", + "month", + "day", + "disctotal", + "comp", + "mb_albumid", + "mb_albumartistid", + "mb_albumartistids", + "albumtype", + "albumtypes", + "label", + "barcode", + "mb_releasegroupid", + "asin", + "catalognum", + "script", + "language", + "country", + "albumstatus", + "albumdisambig", + "releasegroupdisambig", + "release_group_title", + "rg_album_gain", + "rg_album_peak", + "r128_album_gain", + "original_year", + "original_month", + "original_day", + ] + + _format_config_key = "format_album" + + @cached_classproperty + def _relation(cls) -> type[Item]: + return Item + + @cached_classproperty + def relation_join(cls) -> str: + """Return FROM clause which joins on related album items. + + Use LEFT join to select all albums, including those that do not have + any items. + """ + return ( + f"LEFT JOIN {cls._relation._table} " + f"ON {cls._table}.id = {cls._relation._table}.album_id" + ) + + @classmethod + def _getters(cls): + # In addition to plugin-provided computed fields, also expose + # the album's directory as `path`. + getters = plugins.album_field_getters() + getters["path"] = Album.item_dir + getters["albumtotal"] = Album._albumtotal + return getters + + def items(self): + """Return an iterable over the items associated with this + album. + + This method conflicts with :meth:`LibModel.items`, which is + inherited from :meth:`beets.dbcore.Model.items`. + Since :meth:`Album.items` predates these methods, and is + likely to be used by plugins, we keep this interface as-is. + """ + return self._db.items(dbcore.MatchQuery("album_id", self.id)) + + def remove(self, delete=False, with_items=True): + """Remove this album and all its associated items from the + library. + + If delete, then the items' files are also deleted from disk, + along with any album art. The directories containing the album are + also removed (recursively) if empty. + + Set with_items to False to avoid removing the album's items. + """ + super().remove() + + # Send a 'album_removed' signal to plugins + plugins.send("album_removed", album=self) + + # Delete art file. + if delete: + artpath = self.artpath + if artpath: + util.remove(artpath) + + # Remove (and possibly delete) the constituent items. + if with_items: + for item in self.items(): + item.remove(delete, False) + + def move_art(self, operation=MoveOperation.MOVE): + """Move, copy, link or hardlink (depending on `operation`) any + existing album art so that it remains in the same directory as + the items. + + `operation` should be an instance of `util.MoveOperation`. + """ + old_art = self.artpath + if not old_art: + return + + if not os.path.exists(syspath(old_art)): + log.error( + "removing reference to missing album art file {}", + util.displayable_path(old_art), + ) + self.artpath = None + return + + new_art = self.art_destination(old_art) + if new_art == old_art: + return + + new_art = util.unique_path(new_art) + log.debug( + "moving album art {0} to {1}", + util.displayable_path(old_art), + util.displayable_path(new_art), + ) + if operation == MoveOperation.MOVE: + util.move(old_art, new_art) + util.prune_dirs(os.path.dirname(old_art), self._db.directory) + elif operation == MoveOperation.COPY: + util.copy(old_art, new_art) + elif operation == MoveOperation.LINK: + util.link(old_art, new_art) + elif operation == MoveOperation.HARDLINK: + util.hardlink(old_art, new_art) + elif operation == MoveOperation.REFLINK: + util.reflink(old_art, new_art, fallback=False) + elif operation == MoveOperation.REFLINK_AUTO: + util.reflink(old_art, new_art, fallback=True) + else: + assert False, "unknown MoveOperation" + self.artpath = new_art + + def move(self, operation=MoveOperation.MOVE, basedir=None, store=True): + """Move, copy, link or hardlink (depending on `operation`) + all items to their destination. Any album art moves along with them. + + `basedir` overrides the library base directory for the destination. + + `operation` should be an instance of `util.MoveOperation`. + + By default, the album is stored to the database, persisting any + modifications to its metadata. If `store` is `False` however, + the album is not stored automatically, and it will have to be manually + stored after invoking this method. + """ + basedir = basedir or self._db.directory + + # Ensure new metadata is available to items for destination + # computation. + if store: + self.store() + + # Move items. + items = list(self.items()) + for item in items: + item.move(operation, basedir=basedir, with_album=False, store=store) + + # Move art. + self.move_art(operation) + if store: + self.store() + + def item_dir(self): + """Return the directory containing the album's first item, + provided that such an item exists. + """ + item = self.items().get() + if not item: + raise ValueError("empty album for album id %d" % self.id) + return os.path.dirname(item.path) + + def _albumtotal(self): + """Return the total number of tracks on all discs on the album.""" + if self.disctotal == 1 or not beets.config["per_disc_numbering"]: + return self.items()[0].tracktotal + + counted = [] + total = 0 + + for item in self.items(): + if item.disc in counted: + continue + + total += item.tracktotal + counted.append(item.disc) + + if len(counted) == self.disctotal: + break + + return total + + def art_destination(self, image, item_dir=None): + """Return a path to the destination for the album art image + for the album. + + `image` is the path of the image that will be + moved there (used for its extension). + + The path construction uses the existing path of the album's + items, so the album must contain at least one item or + item_dir must be provided. + """ + image = bytestring_path(image) + item_dir = item_dir or self.item_dir() + + filename_tmpl = template(beets.config["art_filename"].as_str()) + subpath = self.evaluate_template(filename_tmpl, True) + if beets.config["asciify_paths"]: + subpath = util.asciify_path( + subpath, beets.config["path_sep_replace"].as_str() + ) + subpath = util.sanitize_path( + subpath, replacements=self._db.replacements + ) + subpath = bytestring_path(subpath) + + _, ext = os.path.splitext(image) + dest = os.path.join(item_dir, subpath + ext) + + return bytestring_path(dest) + + def set_art(self, path, copy=True): + """Set the album's cover art to the image at the given path. + + The image is copied (or moved) into place, replacing any + existing art. + + Send an 'art_set' event with `self` as the sole argument. + """ + path = bytestring_path(path) + oldart = self.artpath + artdest = self.art_destination(path) + + if oldart and samefile(path, oldart): + # Art already set. + return + elif samefile(path, artdest): + # Art already in place. + self.artpath = path + return + + # Normal operation. + if oldart == artdest: + util.remove(oldart) + artdest = util.unique_path(artdest) + if copy: + util.copy(path, artdest) + else: + util.move(path, artdest) + self.artpath = artdest + + plugins.send("art_set", album=self) + + def store(self, fields=None, inherit=True): + """Update the database with the album information. + + `fields` represents the fields to be stored. If not specified, + all fields will be. + + The album's tracks are also updated when the `inherit` flag is enabled. + This applies to fixed attributes as well as flexible ones. The `id` + attribute of the album will never be inherited. + """ + # Get modified track fields. + track_updates = {} + track_deletes = set() + for key in self._dirty: + if inherit: + if key in self.item_keys: # is a fixed attribute + track_updates[key] = self[key] + elif key not in self: # is a fixed or a flexible attribute + track_deletes.add(key) + elif key != "id": # is a flexible attribute + track_updates[key] = self[key] + + with self._db.transaction(): + super().store(fields) + if track_updates: + for item in self.items(): + for key, value in track_updates.items(): + item[key] = value + item.store() + if track_deletes: + for item in self.items(): + for key in track_deletes: + if key in item: + del item[key] + item.store() + + def try_sync(self, write, move, inherit=True): + """Synchronize the album and its items with the database. + Optionally, also write any new tags into the files and update + their paths. + + `write` indicates whether to write tags to the item files, and + `move` controls whether files (both audio and album art) are + moved. + """ + self.store(inherit=inherit) + for item in self.items(): + item.try_sync(write, move) + + class Item(LibModel): """Represent a song or track.""" @@ -898,589 +1233,6 @@ class Item(LibModel): return normpath(os.path.join(basedir, lib_path_bytes)) -class Album(LibModel): - """Provide access to information about albums stored in a - library. - - Reflects the library's "albums" table, including album art. - """ - - _table = "albums" - _flex_table = "album_attributes" - _always_dirty = True - _fields = { - "id": types.PRIMARY_ID, - "artpath": types.NullPathType(), - "added": types.DATE, - "albumartist": types.STRING, - "albumartist_sort": types.STRING, - "albumartist_credit": types.STRING, - "albumartists": types.MULTI_VALUE_DSV, - "albumartists_sort": types.MULTI_VALUE_DSV, - "albumartists_credit": types.MULTI_VALUE_DSV, - "album": types.STRING, - "genre": types.STRING, - "style": types.STRING, - "discogs_albumid": types.INTEGER, - "discogs_artistid": types.INTEGER, - "discogs_labelid": types.INTEGER, - "year": types.PaddedInt(4), - "month": types.PaddedInt(2), - "day": types.PaddedInt(2), - "disctotal": types.PaddedInt(2), - "comp": types.BOOLEAN, - "mb_albumid": types.STRING, - "mb_albumartistid": types.STRING, - "mb_albumartistids": types.MULTI_VALUE_DSV, - "albumtype": types.STRING, - "albumtypes": types.SEMICOLON_SPACE_DSV, - "label": types.STRING, - "barcode": types.STRING, - "mb_releasegroupid": types.STRING, - "release_group_title": types.STRING, - "asin": types.STRING, - "catalognum": types.STRING, - "script": types.STRING, - "language": types.STRING, - "country": types.STRING, - "albumstatus": types.STRING, - "albumdisambig": types.STRING, - "releasegroupdisambig": types.STRING, - "rg_album_gain": types.NULL_FLOAT, - "rg_album_peak": types.NULL_FLOAT, - "r128_album_gain": types.NULL_FLOAT, - "original_year": types.PaddedInt(4), - "original_month": types.PaddedInt(2), - "original_day": types.PaddedInt(2), - } - - _search_fields = ("album", "albumartist", "genre") - - _types = { - "path": types.PathType(), - "data_source": types.STRING, - } - - _sorts = { - "albumartist": dbcore.query.SmartArtistSort, - "artist": dbcore.query.SmartArtistSort, - } - - # List of keys that are set on an album's items. - item_keys = [ - "added", - "albumartist", - "albumartists", - "albumartist_sort", - "albumartists_sort", - "albumartist_credit", - "albumartists_credit", - "album", - "genre", - "style", - "discogs_albumid", - "discogs_artistid", - "discogs_labelid", - "year", - "month", - "day", - "disctotal", - "comp", - "mb_albumid", - "mb_albumartistid", - "mb_albumartistids", - "albumtype", - "albumtypes", - "label", - "barcode", - "mb_releasegroupid", - "asin", - "catalognum", - "script", - "language", - "country", - "albumstatus", - "albumdisambig", - "releasegroupdisambig", - "release_group_title", - "rg_album_gain", - "rg_album_peak", - "r128_album_gain", - "original_year", - "original_month", - "original_day", - ] - - _format_config_key = "format_album" - - @cached_classproperty - def _relation(cls) -> type[Item]: - return Item - - @cached_classproperty - def relation_join(cls) -> str: - """Return FROM clause which joins on related album items. - - Use LEFT join to select all albums, including those that do not have - any items. - """ - return ( - f"LEFT JOIN {cls._relation._table} " - f"ON {cls._table}.id = {cls._relation._table}.album_id" - ) - - @classmethod - def _getters(cls): - # In addition to plugin-provided computed fields, also expose - # the album's directory as `path`. - getters = plugins.album_field_getters() - getters["path"] = Album.item_dir - getters["albumtotal"] = Album._albumtotal - return getters - - def items(self): - """Return an iterable over the items associated with this - album. - - This method conflicts with :meth:`LibModel.items`, which is - inherited from :meth:`beets.dbcore.Model.items`. - Since :meth:`Album.items` predates these methods, and is - likely to be used by plugins, we keep this interface as-is. - """ - return self._db.items(dbcore.MatchQuery("album_id", self.id)) - - def remove(self, delete=False, with_items=True): - """Remove this album and all its associated items from the - library. - - If delete, then the items' files are also deleted from disk, - along with any album art. The directories containing the album are - also removed (recursively) if empty. - - Set with_items to False to avoid removing the album's items. - """ - super().remove() - - # Send a 'album_removed' signal to plugins - plugins.send("album_removed", album=self) - - # Delete art file. - if delete: - artpath = self.artpath - if artpath: - util.remove(artpath) - - # Remove (and possibly delete) the constituent items. - if with_items: - for item in self.items(): - item.remove(delete, False) - - def move_art(self, operation=MoveOperation.MOVE): - """Move, copy, link or hardlink (depending on `operation`) any - existing album art so that it remains in the same directory as - the items. - - `operation` should be an instance of `util.MoveOperation`. - """ - old_art = self.artpath - if not old_art: - return - - if not os.path.exists(syspath(old_art)): - log.error( - "removing reference to missing album art file {}", - util.displayable_path(old_art), - ) - self.artpath = None - return - - new_art = self.art_destination(old_art) - if new_art == old_art: - return - - new_art = util.unique_path(new_art) - log.debug( - "moving album art {0} to {1}", - util.displayable_path(old_art), - util.displayable_path(new_art), - ) - if operation == MoveOperation.MOVE: - util.move(old_art, new_art) - util.prune_dirs(os.path.dirname(old_art), self._db.directory) - elif operation == MoveOperation.COPY: - util.copy(old_art, new_art) - elif operation == MoveOperation.LINK: - util.link(old_art, new_art) - elif operation == MoveOperation.HARDLINK: - util.hardlink(old_art, new_art) - elif operation == MoveOperation.REFLINK: - util.reflink(old_art, new_art, fallback=False) - elif operation == MoveOperation.REFLINK_AUTO: - util.reflink(old_art, new_art, fallback=True) - else: - assert False, "unknown MoveOperation" - self.artpath = new_art - - def move(self, operation=MoveOperation.MOVE, basedir=None, store=True): - """Move, copy, link or hardlink (depending on `operation`) - all items to their destination. Any album art moves along with them. - - `basedir` overrides the library base directory for the destination. - - `operation` should be an instance of `util.MoveOperation`. - - By default, the album is stored to the database, persisting any - modifications to its metadata. If `store` is `False` however, - the album is not stored automatically, and it will have to be manually - stored after invoking this method. - """ - basedir = basedir or self._db.directory - - # Ensure new metadata is available to items for destination - # computation. - if store: - self.store() - - # Move items. - items = list(self.items()) - for item in items: - item.move(operation, basedir=basedir, with_album=False, store=store) - - # Move art. - self.move_art(operation) - if store: - self.store() - - def item_dir(self): - """Return the directory containing the album's first item, - provided that such an item exists. - """ - item = self.items().get() - if not item: - raise ValueError("empty album for album id %d" % self.id) - return os.path.dirname(item.path) - - def _albumtotal(self): - """Return the total number of tracks on all discs on the album.""" - if self.disctotal == 1 or not beets.config["per_disc_numbering"]: - return self.items()[0].tracktotal - - counted = [] - total = 0 - - for item in self.items(): - if item.disc in counted: - continue - - total += item.tracktotal - counted.append(item.disc) - - if len(counted) == self.disctotal: - break - - return total - - def art_destination(self, image, item_dir=None): - """Return a path to the destination for the album art image - for the album. - - `image` is the path of the image that will be - moved there (used for its extension). - - The path construction uses the existing path of the album's - items, so the album must contain at least one item or - item_dir must be provided. - """ - image = bytestring_path(image) - item_dir = item_dir or self.item_dir() - - filename_tmpl = template(beets.config["art_filename"].as_str()) - subpath = self.evaluate_template(filename_tmpl, True) - if beets.config["asciify_paths"]: - subpath = util.asciify_path( - subpath, beets.config["path_sep_replace"].as_str() - ) - subpath = util.sanitize_path( - subpath, replacements=self._db.replacements - ) - subpath = bytestring_path(subpath) - - _, ext = os.path.splitext(image) - dest = os.path.join(item_dir, subpath + ext) - - return bytestring_path(dest) - - def set_art(self, path, copy=True): - """Set the album's cover art to the image at the given path. - - The image is copied (or moved) into place, replacing any - existing art. - - Send an 'art_set' event with `self` as the sole argument. - """ - path = bytestring_path(path) - oldart = self.artpath - artdest = self.art_destination(path) - - if oldart and samefile(path, oldart): - # Art already set. - return - elif samefile(path, artdest): - # Art already in place. - self.artpath = path - return - - # Normal operation. - if oldart == artdest: - util.remove(oldart) - artdest = util.unique_path(artdest) - if copy: - util.copy(path, artdest) - else: - util.move(path, artdest) - self.artpath = artdest - - plugins.send("art_set", album=self) - - def store(self, fields=None, inherit=True): - """Update the database with the album information. - - `fields` represents the fields to be stored. If not specified, - all fields will be. - - The album's tracks are also updated when the `inherit` flag is enabled. - This applies to fixed attributes as well as flexible ones. The `id` - attribute of the album will never be inherited. - """ - # Get modified track fields. - track_updates = {} - track_deletes = set() - for key in self._dirty: - if inherit: - if key in self.item_keys: # is a fixed attribute - track_updates[key] = self[key] - elif key not in self: # is a fixed or a flexible attribute - track_deletes.add(key) - elif key != "id": # is a flexible attribute - track_updates[key] = self[key] - - with self._db.transaction(): - super().store(fields) - if track_updates: - for item in self.items(): - for key, value in track_updates.items(): - item[key] = value - item.store() - if track_deletes: - for item in self.items(): - for key in track_deletes: - if key in item: - del item[key] - item.store() - - def try_sync(self, write, move, inherit=True): - """Synchronize the album and its items with the database. - Optionally, also write any new tags into the files and update - their paths. - - `write` indicates whether to write tags to the item files, and - `move` controls whether files (both audio and album art) are - moved. - """ - self.store(inherit=inherit) - for item in self.items(): - item.try_sync(write, move) - - -# Query construction helpers. - - -def parse_query_parts(parts, model_cls): - """Given a beets query string as a list of components, return the - `Query` and `Sort` they represent. - - Like `dbcore.parse_sorted_query`, with beets query prefixes and - ensuring that implicit path queries are made explicit with 'path::' - """ - # Get query types and their prefix characters. - prefixes = { - ":": dbcore.query.RegexpQuery, - "=~": dbcore.query.StringQuery, - "=": dbcore.query.MatchQuery, - } - prefixes.update(plugins.queries()) - - # Special-case path-like queries, which are non-field queries - # containing path separators (/). - parts = [ - f"path:{s}" if dbcore.query.PathQuery.is_path_query(s) else s - for s in parts - ] - - case_insensitive = beets.config["sort_case_insensitive"].get(bool) - - query, sort = dbcore.parse_sorted_query( - model_cls, parts, prefixes, case_insensitive - ) - log.debug("Parsed query: {!r}", query) - log.debug("Parsed sort: {!r}", sort) - return query, sort - - -def parse_query_string(s, model_cls): - """Given a beets query string, return the `Query` and `Sort` they - represent. - - The string is split into components using shell-like syntax. - """ - message = f"Query is not unicode: {s!r}" - assert isinstance(s, str), message - try: - parts = shlex.split(s) - except ValueError as exc: - raise dbcore.InvalidQueryError(s, exc) - return parse_query_parts(parts, model_cls) - - -# The Library: interface to the database. - - -class Library(dbcore.Database): - """A database of music containing songs and albums.""" - - _models = (Item, Album) - - def __init__( - self, - path="library.blb", - directory: str | None = None, - path_formats=((PF_KEY_DEFAULT, "$artist/$album/$track $title"),), - replacements=None, - ): - timeout = beets.config["timeout"].as_number() - super().__init__(path, timeout=timeout) - - self.directory = normpath(directory or platformdirs.user_music_path()) - - self.path_formats = path_formats - self.replacements = replacements - - # Used for template substitution performance. - self._memotable: dict[tuple[str, ...], str] = {} - - # Adding objects to the database. - - def add(self, obj): - """Add the :class:`Item` or :class:`Album` object to the library - database. - - Return the object's new id. - """ - obj.add(self) - self._memotable = {} - return obj.id - - def add_album(self, items): - """Create a new album consisting of a list of items. - - The items are added to the database if they don't yet have an - ID. Return a new :class:`Album` object. The list items must not - be empty. - """ - if not items: - raise ValueError("need at least one item") - - # Create the album structure using metadata from the first item. - values = {key: items[0][key] for key in Album.item_keys} - album = Album(self, **values) - - # Add the album structure and set the items' album_id fields. - # Store or add the items. - with self.transaction(): - album.add(self) - for item in items: - item.album_id = album.id - if item.id is None: - item.add(self) - else: - item.store() - - return album - - # Querying. - - def _fetch(self, model_cls, query, sort=None): - """Parse a query and fetch. - - If an order specification is present in the query string - the `sort` argument is ignored. - """ - # Parse the query, if necessary. - try: - parsed_sort = None - if isinstance(query, str): - query, parsed_sort = parse_query_string(query, model_cls) - elif isinstance(query, (list, tuple)): - query, parsed_sort = parse_query_parts(query, model_cls) - except dbcore.query.InvalidQueryArgumentValueError as exc: - raise dbcore.InvalidQueryError(query, exc) - - # Any non-null sort specified by the parsed query overrides the - # provided sort. - if parsed_sort and not isinstance(parsed_sort, dbcore.query.NullSort): - sort = parsed_sort - - return super()._fetch(model_cls, query, sort) - - @staticmethod - def get_default_album_sort(): - """Get a :class:`Sort` object for albums from the config option.""" - return dbcore.sort_from_strings( - Album, beets.config["sort_album"].as_str_seq() - ) - - @staticmethod - def get_default_item_sort(): - """Get a :class:`Sort` object for items from the config option.""" - return dbcore.sort_from_strings( - Item, beets.config["sort_item"].as_str_seq() - ) - - def albums(self, query=None, sort=None) -> Results[Album]: - """Get :class:`Album` objects matching the query.""" - return self._fetch(Album, query, sort or self.get_default_album_sort()) - - def items(self, query=None, sort=None) -> Results[Item]: - """Get :class:`Item` objects matching the query.""" - return self._fetch(Item, query, sort or self.get_default_item_sort()) - - # Convenience accessors. - - def get_item(self, id): - """Fetch a :class:`Item` by its ID. - - Return `None` if no match is found. - """ - return self._get(Item, id) - - def get_album(self, item_or_id): - """Given an album ID or an item associated with an album, return - a :class:`Album` object for the album. - - If no such album exists, return `None`. - """ - if isinstance(item_or_id, int): - album_id = item_or_id - else: - album_id = item_or_id.album_id - if album_id is None: - return None - return self._get(Album, album_id) - - -# Default path template resources. - - def _int_arg(s): """Convert a string argument to an integer for use in a template function. diff --git a/beets/library/queries.py b/beets/library/queries.py new file mode 100644 index 000000000..7c9d688cd --- /dev/null +++ b/beets/library/queries.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +import shlex + +import beets +from beets import dbcore, logging, plugins + +log = logging.getLogger("beets") + + +# Special path format key. +PF_KEY_DEFAULT = "default" + +# Query construction helpers. + + +def parse_query_parts(parts, model_cls): + """Given a beets query string as a list of components, return the + `Query` and `Sort` they represent. + + Like `dbcore.parse_sorted_query`, with beets query prefixes and + ensuring that implicit path queries are made explicit with 'path::' + """ + # Get query types and their prefix characters. + prefixes = { + ":": dbcore.query.RegexpQuery, + "=~": dbcore.query.StringQuery, + "=": dbcore.query.MatchQuery, + } + prefixes.update(plugins.queries()) + + # Special-case path-like queries, which are non-field queries + # containing path separators (/). + parts = [ + f"path:{s}" if dbcore.query.PathQuery.is_path_query(s) else s + for s in parts + ] + + case_insensitive = beets.config["sort_case_insensitive"].get(bool) + + query, sort = dbcore.parse_sorted_query( + model_cls, parts, prefixes, case_insensitive + ) + log.debug("Parsed query: {!r}", query) + log.debug("Parsed sort: {!r}", sort) + return query, sort + + +def parse_query_string(s, model_cls): + """Given a beets query string, return the `Query` and `Sort` they + represent. + + The string is split into components using shell-like syntax. + """ + message = f"Query is not unicode: {s!r}" + assert isinstance(s, str), message + try: + parts = shlex.split(s) + except ValueError as exc: + raise dbcore.InvalidQueryError(s, exc) + return parse_query_parts(parts, model_cls)