From 1a045c91668c771686f4c871c84f1680af2e944b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0ar=C5=ABnas=20Nejus?= Date: Mon, 12 May 2025 12:17:35 +0100 Subject: [PATCH 01/10] Copy paste query, types from library to dbcore --- beets/dbcore/query.py | 115 +++++++++++++++ beets/dbcore/types.py | 158 ++++++++++++++++++-- beets/library.py | 276 ++--------------------------------- beets/ui/__init__.py | 8 - beets/ui/commands.py | 10 +- beets/util/__init__.py | 8 + beetsplug/deezer.py | 3 +- beetsplug/metasync/amarok.py | 5 +- beetsplug/metasync/itunes.py | 7 +- beetsplug/mpdstats.py | 7 +- beetsplug/playlist.py | 3 +- beetsplug/spotify.py | 4 +- beetsplug/types.py | 3 +- beetsplug/web/__init__.py | 3 +- test/test_library.py | 54 ------- test/test_query.py | 12 +- test/test_types.py | 59 ++++++++ 17 files changed, 367 insertions(+), 368 deletions(-) create mode 100644 test/test_types.py diff --git a/beets/dbcore/query.py b/beets/dbcore/query.py index c7ca44452..9812a7528 100644 --- a/beets/dbcore/query.py +++ b/beets/dbcore/query.py @@ -16,6 +16,7 @@ from __future__ import annotations +import os import re import unicodedata from abc import ABC, abstractmethod @@ -36,6 +37,11 @@ if TYPE_CHECKING: else: P = TypeVar("P") +# To use the SQLite "blob" type, it doesn't suffice to provide a byte +# string; SQLite treats that as encoded text. Wrapping it in a +# `memoryview` tells it that we actually mean non-text data. +BLOB_TYPE = memoryview + class ParsingError(ValueError): """Abstract class for any unparsable user-requested album/query @@ -267,6 +273,97 @@ class SubstringQuery(StringFieldQuery[str]): return pattern.lower() in value.lower() +class PathQuery(FieldQuery[bytes]): + """A query that matches all items under a given path. + + Matching can either be case-insensitive or case-sensitive. By + default, the behavior depends on the OS: case-insensitive on Windows + and case-sensitive otherwise. + """ + + # For tests + force_implicit_query_detection = False + + def __init__(self, field, pattern, fast=True, case_sensitive=None): + """Create a path query. + + `pattern` must be a path, either to a file or a directory. + + `case_sensitive` can be a bool or `None`, indicating that the + behavior should depend on the filesystem. + """ + super().__init__(field, pattern, fast) + + path = util.normpath(pattern) + + # By default, the case sensitivity depends on the filesystem + # that the query path is located on. + if case_sensitive is None: + case_sensitive = util.case_sensitive(path) + self.case_sensitive = case_sensitive + + # Use a normalized-case pattern for case-insensitive matches. + if not case_sensitive: + # We need to lowercase the entire path, not just the pattern. + # In particular, on Windows, the drive letter is otherwise not + # lowercased. + # This also ensures that the `match()` method below and the SQL + # from `col_clause()` do the same thing. + path = path.lower() + + # Match the path as a single file. + self.file_path = path + # As a directory (prefix). + self.dir_path = os.path.join(path, b"") + + @classmethod + def is_path_query(cls, query_part): + """Try to guess whether a unicode query part is a path query. + + Condition: separator precedes colon and the file exists. + """ + colon = query_part.find(":") + if colon != -1: + query_part = query_part[:colon] + + # Test both `sep` and `altsep` (i.e., both slash and backslash on + # Windows). + if not ( + os.sep in query_part or (os.altsep and os.altsep in query_part) + ): + return False + + if cls.force_implicit_query_detection: + return True + return os.path.exists(util.syspath(util.normpath(query_part))) + + def match(self, item): + path = item.path if self.case_sensitive else item.path.lower() + return (path == self.file_path) or path.startswith(self.dir_path) + + def col_clause(self): + file_blob = BLOB_TYPE(self.file_path) + dir_blob = BLOB_TYPE(self.dir_path) + + if self.case_sensitive: + query_part = "({0} = ?) || (substr({0}, 1, ?) = ?)" + else: + query_part = "(BYTELOWER({0}) = BYTELOWER(?)) || \ + (substr(BYTELOWER({0}), 1, ?) = BYTELOWER(?))" + + return query_part.format(self.field), ( + file_blob, + len(dir_blob), + dir_blob, + ) + + def __repr__(self) -> str: + return ( + f"{self.__class__.__name__}({self.field!r}, {self.pattern!r}, " + f"fast={self.fast}, case_sensitive={self.case_sensitive})" + ) + + class RegexpQuery(StringFieldQuery[Pattern[str]]): """A query that matches a regular expression in a specific Model field. @@ -844,6 +941,24 @@ class DurationQuery(NumericQuery): ) +class SingletonQuery(FieldQuery[str]): + """This query is responsible for the 'singleton' lookup. + + It is based on the FieldQuery and constructs a SQL clause + 'album_id is NULL' which yields the same result as the previous filter + in Python but is more performant since it's done in SQL. + + Using util.str2bool ensures that lookups like singleton:true, singleton:1 + and singleton:false, singleton:0 are handled consistently. + """ + + def __new__(cls, field: str, value: str, *args, **kwargs): + query = NoneQuery("album_id") + if util.str2bool(value): + return query + return NotQuery(query) + + # Sorting. diff --git a/beets/dbcore/types.py b/beets/dbcore/types.py index 2a64b2ed9..27cd04b92 100644 --- a/beets/dbcore/types.py +++ b/beets/dbcore/types.py @@ -16,19 +16,18 @@ from __future__ import annotations +import re +import time import typing from abc import ABC from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast -from beets.util import str2bool +from beets import util -from .query import ( - BooleanQuery, - FieldQueryType, - NumericQuery, - SQLiteType, - SubstringQuery, -) +from . import query + +SQLiteType = query.SQLiteType +BLOB_TYPE = query.BLOB_TYPE class ModelType(typing.Protocol): @@ -61,7 +60,7 @@ class Type(ABC, Generic[T, N]): """The SQLite column type for the value. """ - query: FieldQueryType = SubstringQuery + query: query.FieldQueryType = query.SubstringQuery """The `Query` subclass to be used when querying the field. """ @@ -160,7 +159,7 @@ class BaseInteger(Type[int, N]): """A basic integer type.""" sql = "INTEGER" - query = NumericQuery + query = query.NumericQuery model_type = int def normalize(self, value: Any) -> int | N: @@ -241,7 +240,7 @@ class BaseFloat(Type[float, N]): """ sql = "REAL" - query: FieldQueryType = NumericQuery + query: query.FieldQueryType = query.NumericQuery model_type = float def __init__(self, digits: int = 1): @@ -271,7 +270,7 @@ class BaseString(Type[T, N]): """A Unicode string type.""" sql = "TEXT" - query = SubstringQuery + query = query.SubstringQuery def normalize(self, value: Any) -> T | N: if value is None: @@ -312,14 +311,144 @@ class Boolean(Type): """A boolean type.""" sql = "INTEGER" - query = BooleanQuery + query = query.BooleanQuery model_type = bool def format(self, value: bool) -> str: return str(bool(value)) def parse(self, string: str) -> bool: - return str2bool(string) + return util.str2bool(string) + + +class DateType(Float): + # TODO representation should be `datetime` object + # TODO distinguish between date and time types + query = query.DateQuery + + def format(self, value): + return time.strftime( + beets.config["time_format"].as_str(), time.localtime(value or 0) + ) + + def parse(self, string): + try: + # Try a formatted date string. + return time.mktime( + time.strptime(string, beets.config["time_format"].as_str()) + ) + except ValueError: + # Fall back to a plain timestamp number. + try: + return float(string) + except ValueError: + return self.null + + +class PathType(Type[bytes, bytes]): + """A dbcore type for filesystem paths. + + These are represented as `bytes` objects, in keeping with + the Unix filesystem abstraction. + """ + + sql = "BLOB" + query = query.PathQuery + model_type = bytes + + def __init__(self, nullable=False): + """Create a path type object. + + `nullable` controls whether the type may be missing, i.e., None. + """ + self.nullable = nullable + + @property + def null(self): + if self.nullable: + return None + else: + return b"" + + def format(self, value): + return util.displayable_path(value) + + def parse(self, string): + return util.normpath(util.bytestring_path(string)) + + def normalize(self, value): + if isinstance(value, str): + # Paths stored internally as encoded bytes. + return util.bytestring_path(value) + + elif isinstance(value, BLOB_TYPE): + # We unwrap buffers to bytes. + return bytes(value) + + else: + return value + + def from_sql(self, sql_value): + return self.normalize(sql_value) + + def to_sql(self, value): + if isinstance(value, bytes): + value = BLOB_TYPE(value) + return value + + +class MusicalKey(String): + """String representing the musical key of a song. + + The standard format is C, Cm, C#, C#m, etc. + """ + + ENHARMONIC = { + r"db": "c#", + r"eb": "d#", + r"gb": "f#", + r"ab": "g#", + r"bb": "a#", + } + + null = None + + def parse(self, key): + key = key.lower() + for flat, sharp in self.ENHARMONIC.items(): + key = re.sub(flat, sharp, key) + key = re.sub(r"[\W\s]+minor", "m", key) + key = re.sub(r"[\W\s]+major", "", key) + return key.capitalize() + + def normalize(self, key): + if key is None: + return None + else: + return self.parse(key) + + +class DurationType(Float): + """Human-friendly (M:SS) representation of a time interval.""" + + query = query.DurationQuery + + def format(self, value): + if not beets.config["format_raw_length"].get(bool): + return util.human_seconds_short(value or 0.0) + else: + return value + + def parse(self, string): + try: + # Try to format back hh:ss to seconds. + return util.raw_seconds_short(string) + except ValueError: + # Fall back to a plain float. + try: + return float(string) + except ValueError: + return self.null # Shared instances of common types. @@ -331,6 +460,7 @@ FLOAT = Float() NULL_FLOAT = NullFloat() STRING = String() BOOLEAN = Boolean() +DATE = DateType() SEMICOLON_SPACE_DSV = DelimitedString(delimiter="; ") # Will set the proper null char in mediafile diff --git a/beets/library.py b/beets/library.py index 271059c69..5a692ef1c 100644 --- a/beets/library.py +++ b/beets/library.py @@ -17,7 +17,6 @@ from __future__ import annotations import os -import re import shlex import string import sys @@ -46,259 +45,9 @@ from beets.util.functemplate import Template, template if TYPE_CHECKING: from .dbcore.query import FieldQuery, FieldQueryType -# To use the SQLite "blob" type, it doesn't suffice to provide a byte -# string; SQLite treats that as encoded text. Wrapping it in a -# `memoryview` tells it that we actually mean non-text data. -BLOB_TYPE = memoryview - log = logging.getLogger("beets") -# Library-specific query types. - - -class SingletonQuery(dbcore.FieldQuery[str]): - """This query is responsible for the 'singleton' lookup. - - It is based on the FieldQuery and constructs a SQL clause - 'album_id is NULL' which yields the same result as the previous filter - in Python but is more performant since it's done in SQL. - - Using util.str2bool ensures that lookups like singleton:true, singleton:1 - and singleton:false, singleton:0 are handled consistently. - """ - - def __new__(cls, field: str, value: str, *args, **kwargs): - query = dbcore.query.NoneQuery("album_id") - if util.str2bool(value): - return query - return dbcore.query.NotQuery(query) - - -class PathQuery(dbcore.FieldQuery[bytes]): - """A query that matches all items under a given path. - - Matching can either be case-insensitive or case-sensitive. By - default, the behavior depends on the OS: case-insensitive on Windows - and case-sensitive otherwise. - """ - - # For tests - force_implicit_query_detection = False - - def __init__(self, field, pattern, fast=True, case_sensitive=None): - """Create a path query. - - `pattern` must be a path, either to a file or a directory. - - `case_sensitive` can be a bool or `None`, indicating that the - behavior should depend on the filesystem. - """ - super().__init__(field, pattern, fast) - - path = util.normpath(pattern) - - # By default, the case sensitivity depends on the filesystem - # that the query path is located on. - if case_sensitive is None: - case_sensitive = util.case_sensitive(path) - self.case_sensitive = case_sensitive - - # Use a normalized-case pattern for case-insensitive matches. - if not case_sensitive: - # We need to lowercase the entire path, not just the pattern. - # In particular, on Windows, the drive letter is otherwise not - # lowercased. - # This also ensures that the `match()` method below and the SQL - # from `col_clause()` do the same thing. - path = path.lower() - - # Match the path as a single file. - self.file_path = path - # As a directory (prefix). - self.dir_path = os.path.join(path, b"") - - @classmethod - def is_path_query(cls, query_part): - """Try to guess whether a unicode query part is a path query. - - Condition: separator precedes colon and the file exists. - """ - colon = query_part.find(":") - if colon != -1: - query_part = query_part[:colon] - - # Test both `sep` and `altsep` (i.e., both slash and backslash on - # Windows). - if not ( - os.sep in query_part or (os.altsep and os.altsep in query_part) - ): - return False - - if cls.force_implicit_query_detection: - return True - return os.path.exists(syspath(normpath(query_part))) - - def match(self, item): - path = item.path if self.case_sensitive else item.path.lower() - return (path == self.file_path) or path.startswith(self.dir_path) - - def col_clause(self): - file_blob = BLOB_TYPE(self.file_path) - dir_blob = BLOB_TYPE(self.dir_path) - - if self.case_sensitive: - query_part = "({0} = ?) || (substr({0}, 1, ?) = ?)" - else: - query_part = "(BYTELOWER({0}) = BYTELOWER(?)) || \ - (substr(BYTELOWER({0}), 1, ?) = BYTELOWER(?))" - - return query_part.format(self.field), ( - file_blob, - len(dir_blob), - dir_blob, - ) - - def __repr__(self) -> str: - return ( - f"{self.__class__.__name__}({self.field!r}, {self.pattern!r}, " - f"fast={self.fast}, case_sensitive={self.case_sensitive})" - ) - - -# Library-specific field types. - - -class DateType(types.Float): - # TODO representation should be `datetime` object - # TODO distinguish between date and time types - query = dbcore.query.DateQuery - - def format(self, value): - return time.strftime( - beets.config["time_format"].as_str(), time.localtime(value or 0) - ) - - def parse(self, string): - try: - # Try a formatted date string. - return time.mktime( - time.strptime(string, beets.config["time_format"].as_str()) - ) - except ValueError: - # Fall back to a plain timestamp number. - try: - return float(string) - except ValueError: - return self.null - - -class PathType(types.Type[bytes, bytes]): - """A dbcore type for filesystem paths. - - These are represented as `bytes` objects, in keeping with - the Unix filesystem abstraction. - """ - - sql = "BLOB" - query = PathQuery - model_type = bytes - - def __init__(self, nullable=False): - """Create a path type object. - - `nullable` controls whether the type may be missing, i.e., None. - """ - self.nullable = nullable - - @property - def null(self): - if self.nullable: - return None - else: - return b"" - - def format(self, value): - return util.displayable_path(value) - - def parse(self, string): - return normpath(bytestring_path(string)) - - def normalize(self, value): - if isinstance(value, str): - # Paths stored internally as encoded bytes. - return bytestring_path(value) - - elif isinstance(value, BLOB_TYPE): - # We unwrap buffers to bytes. - return bytes(value) - - else: - return value - - def from_sql(self, sql_value): - return self.normalize(sql_value) - - def to_sql(self, value): - if isinstance(value, bytes): - value = BLOB_TYPE(value) - return value - - -class MusicalKey(types.String): - """String representing the musical key of a song. - - The standard format is C, Cm, C#, C#m, etc. - """ - - ENHARMONIC = { - r"db": "c#", - r"eb": "d#", - r"gb": "f#", - r"ab": "g#", - r"bb": "a#", - } - - null = None - - def parse(self, key): - key = key.lower() - for flat, sharp in self.ENHARMONIC.items(): - key = re.sub(flat, sharp, key) - key = re.sub(r"[\W\s]+minor", "m", key) - key = re.sub(r"[\W\s]+major", "", key) - return key.capitalize() - - def normalize(self, key): - if key is None: - return None - else: - return self.parse(key) - - -class DurationType(types.Float): - """Human-friendly (M:SS) representation of a time interval.""" - - query = dbcore.query.DurationQuery - - def format(self, value): - if not beets.config["format_raw_length"].get(bool): - return beets.ui.human_seconds_short(value or 0.0) - else: - return value - - def parse(self, string): - try: - # Try to format back hh:ss to seconds. - return util.raw_seconds_short(string) - except ValueError: - # Fall back to a plain float. - try: - return float(string) - except ValueError: - return self.null - - # Special path format key. PF_KEY_DEFAULT = "default" @@ -517,7 +266,7 @@ class Item(LibModel): _flex_table = "item_attributes" _fields = { "id": types.PRIMARY_ID, - "path": PathType(), + "path": types.PathType(), "album_id": types.FOREIGN_ID, "title": types.STRING, "artist": types.STRING, @@ -596,8 +345,8 @@ class Item(LibModel): "original_year": types.PaddedInt(4), "original_month": types.PaddedInt(2), "original_day": types.PaddedInt(2), - "initial_key": MusicalKey(), - "length": DurationType(), + "initial_key": types.MusicalKey(), + "length": types.DurationType(), "bitrate": types.ScaledInt(1000, "kbps"), "bitrate_mode": types.STRING, "encoder_info": types.STRING, @@ -606,8 +355,8 @@ class Item(LibModel): "samplerate": types.ScaledInt(1000, "kHz"), "bitdepth": types.INTEGER, "channels": types.INTEGER, - "mtime": DateType(), - "added": DateType(), + "mtime": types.DATE, + "added": types.DATE, } _search_fields = ( @@ -641,7 +390,7 @@ class Item(LibModel): _sorts = {"artist": dbcore.query.SmartArtistSort} - _queries = {"singleton": SingletonQuery} + _queries = {"singleton": dbcore.query.SingletonQuery} _format_config_key = "format_item" @@ -717,7 +466,7 @@ class Item(LibModel): if key == "path": if isinstance(value, str): value = bytestring_path(value) - elif isinstance(value, BLOB_TYPE): + elif isinstance(value, types.BLOB_TYPE): value = bytes(value) elif key == "album_id": self._cached_album = None @@ -1161,8 +910,8 @@ class Album(LibModel): _always_dirty = True _fields = { "id": types.PRIMARY_ID, - "artpath": PathType(True), - "added": DateType(), + "artpath": types.PathType(True), + "added": types.DATE, "albumartist": types.STRING, "albumartist_sort": types.STRING, "albumartist_credit": types.STRING, @@ -1208,7 +957,7 @@ class Album(LibModel): _search_fields = ("album", "albumartist", "genre") _types = { - "path": PathType(), + "path": types.PathType(), "data_source": types.STRING, } @@ -1563,7 +1312,10 @@ def parse_query_parts(parts, model_cls): # Special-case path-like queries, which are non-field queries # containing path separators (/). - parts = [f"path:{s}" if PathQuery.is_path_query(s) else s for s in parts] + parts = [ + f"path:{s}" if dbcore.query.PathQuery.is_path_query(s) else s + for s in parts + ] case_insensitive = beets.config["sort_case_insensitive"].get(bool) diff --git a/beets/ui/__init__.py b/beets/ui/__init__.py index a6f615b45..f1aac766f 100644 --- a/beets/ui/__init__.py +++ b/beets/ui/__init__.py @@ -477,14 +477,6 @@ def human_seconds(interval): return f"{interval:3.1f} {suffix}s" -def human_seconds_short(interval): - """Formats a number of seconds as a short human-readable M:SS - string. - """ - interval = int(interval) - return "%i:%02i" % (interval // 60, interval % 60) - - # Colorization. # ANSI terminal colorization code heavily inspired by pygments: diff --git a/beets/ui/commands.py b/beets/ui/commands.py index f42291019..fb9ca8b89 100755 --- a/beets/ui/commands.py +++ b/beets/ui/commands.py @@ -541,8 +541,8 @@ class ChangeRepresentation: cur_length0 = item.length if item.length else 0 new_length0 = track_info.length if track_info.length else 0 # format into string - cur_length = f"({ui.human_seconds_short(cur_length0)})" - new_length = f"({ui.human_seconds_short(new_length0)})" + cur_length = f"({util.human_seconds_short(cur_length0)})" + new_length = f"({util.human_seconds_short(new_length0)})" # colorize lhs_length = ui.colorize(highlight_color, cur_length) rhs_length = ui.colorize(highlight_color, new_length) @@ -706,14 +706,14 @@ class AlbumChange(ChangeRepresentation): for track_info in self.match.extra_tracks: line = f" ! {track_info.title} (#{self.format_index(track_info)})" if track_info.length: - line += f" ({ui.human_seconds_short(track_info.length)})" + line += f" ({util.human_seconds_short(track_info.length)})" print_(ui.colorize("text_warning", line)) if self.match.extra_items: print_(f"Unmatched tracks ({len(self.match.extra_items)}):") for item in self.match.extra_items: line = " ! {} (#{})".format(item.title, self.format_index(item)) if item.length: - line += " ({})".format(ui.human_seconds_short(item.length)) + line += " ({})".format(util.human_seconds_short(item.length)) print_(ui.colorize("text_warning", line)) @@ -795,7 +795,7 @@ def summarize_items(items, singleton): round(int(items[0].samplerate) / 1000, 1), items[0].bitdepth ) summary_parts.append(sample_bits) - summary_parts.append(ui.human_seconds_short(total_duration)) + summary_parts.append(util.human_seconds_short(total_duration)) summary_parts.append(ui.human_bytes(total_filesize)) return ", ".join(summary_parts) diff --git a/beets/util/__init__.py b/beets/util/__init__.py index 9bd7451f8..4572b27f9 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -1032,6 +1032,14 @@ def raw_seconds_short(string: str) -> float: return float(minutes * 60 + seconds) +def human_seconds_short(interval): + """Formats a number of seconds as a short human-readable M:SS + string. + """ + interval = int(interval) + return "%i:%02i" % (interval // 60, interval % 60) + + def asciify_path(path: str, sep_replace: str) -> str: """Decodes all unicode characters in a path into ASCII equivalents. diff --git a/beetsplug/deezer.py b/beetsplug/deezer.py index 2e5d8473a..89f7436f8 100644 --- a/beetsplug/deezer.py +++ b/beetsplug/deezer.py @@ -25,7 +25,6 @@ import unidecode from beets import ui from beets.autotag import AlbumInfo, TrackInfo from beets.dbcore import types -from beets.library import DateType from beets.plugins import BeetsPlugin, MetadataSourcePlugin @@ -35,7 +34,7 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin): item_types = { "deezer_track_rank": types.INTEGER, "deezer_track_id": types.INTEGER, - "deezer_updated": DateType(), + "deezer_updated": types.DATE, } # Base URLs for the Deezer API diff --git a/beetsplug/metasync/amarok.py b/beetsplug/metasync/amarok.py index f8dcbe3f3..9afe6dbca 100644 --- a/beetsplug/metasync/amarok.py +++ b/beetsplug/metasync/amarok.py @@ -20,7 +20,6 @@ from time import mktime from xml.sax.saxutils import quoteattr from beets.dbcore import types -from beets.library import DateType from beets.util import displayable_path from beetsplug.metasync import MetaSource @@ -41,8 +40,8 @@ class Amarok(MetaSource): "amarok_score": types.FLOAT, "amarok_uid": types.STRING, "amarok_playcount": types.INTEGER, - "amarok_firstplayed": DateType(), - "amarok_lastplayed": DateType(), + "amarok_firstplayed": types.DATE, + "amarok_lastplayed": types.DATE, } query_xml = ' \ diff --git a/beetsplug/metasync/itunes.py b/beetsplug/metasync/itunes.py index 02f592fdc..f777d0d55 100644 --- a/beetsplug/metasync/itunes.py +++ b/beetsplug/metasync/itunes.py @@ -26,7 +26,6 @@ from confuse import ConfigValueError from beets import util from beets.dbcore import types -from beets.library import DateType from beets.util import bytestring_path, syspath from beetsplug.metasync import MetaSource @@ -63,9 +62,9 @@ class Itunes(MetaSource): "itunes_rating": types.INTEGER, # 0..100 scale "itunes_playcount": types.INTEGER, "itunes_skipcount": types.INTEGER, - "itunes_lastplayed": DateType(), - "itunes_lastskipped": DateType(), - "itunes_dateadded": DateType(), + "itunes_lastplayed": types.DATE, + "itunes_lastskipped": types.DATE, + "itunes_dateadded": types.DATE, } def __init__(self, config, log): diff --git a/beetsplug/mpdstats.py b/beetsplug/mpdstats.py index 6d4c269d1..20faf225f 100644 --- a/beetsplug/mpdstats.py +++ b/beetsplug/mpdstats.py @@ -18,8 +18,9 @@ import time import mpd -from beets import config, library, plugins, ui +from beets import config, plugins, ui from beets.dbcore import types +from beets.dbcore.query import PathQuery from beets.util import displayable_path # If we lose the connection, how many times do we want to retry and how @@ -160,7 +161,7 @@ class MPDStats: def get_item(self, path): """Return the beets item related to path.""" - query = library.PathQuery("path", path) + query = PathQuery("path", path) item = self.lib.items(query).get() if item: return item @@ -321,7 +322,7 @@ class MPDStatsPlugin(plugins.BeetsPlugin): item_types = { "play_count": types.INTEGER, "skip_count": types.INTEGER, - "last_played": library.DateType(), + "last_played": types.DATE, "rating": types.FLOAT, } diff --git a/beetsplug/playlist.py b/beetsplug/playlist.py index cf1d500e8..cb16fb5bc 100644 --- a/beetsplug/playlist.py +++ b/beetsplug/playlist.py @@ -18,8 +18,7 @@ import tempfile from collections.abc import Sequence import beets -from beets.dbcore.query import InQuery -from beets.library import BLOB_TYPE +from beets.dbcore.query import BLOB_TYPE, InQuery from beets.util import path_as_posix diff --git a/beetsplug/spotify.py b/beetsplug/spotify.py index 76ceeed68..595da4892 100644 --- a/beetsplug/spotify.py +++ b/beetsplug/spotify.py @@ -34,10 +34,10 @@ import unidecode from beets import ui from beets.autotag.hooks import AlbumInfo, TrackInfo from beets.dbcore import types -from beets.library import DateType, Library from beets.plugins import BeetsPlugin, MetadataSourcePlugin, Response if TYPE_CHECKING: + from beets.library import Library from beetsplug._typing import JSONDict DEFAULT_WAITING_TIME = 5 @@ -64,7 +64,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin): "spotify_tempo": types.FLOAT, "spotify_time_signature": types.INTEGER, "spotify_valence": types.FLOAT, - "spotify_updated": DateType(), + "spotify_updated": types.DATE, } # Base URLs for the Spotify API diff --git a/beetsplug/types.py b/beetsplug/types.py index 9ba3aac66..9bdfdecee 100644 --- a/beetsplug/types.py +++ b/beetsplug/types.py @@ -15,7 +15,6 @@ from confuse import ConfigValueError -from beets import library from beets.dbcore import types from beets.plugins import BeetsPlugin @@ -42,7 +41,7 @@ class TypesPlugin(BeetsPlugin): elif value.get() == "bool": mytypes[key] = types.BOOLEAN elif value.get() == "date": - mytypes[key] = library.DateType() + mytypes[key] = types.DATE else: raise ConfigValueError( "unknown type '{}' for the '{}' field".format(value, key) diff --git a/beetsplug/web/__init__.py b/beetsplug/web/__init__.py index 175cec4a9..c1b0b5029 100644 --- a/beetsplug/web/__init__.py +++ b/beetsplug/web/__init__.py @@ -25,6 +25,7 @@ from werkzeug.routing import BaseConverter, PathConverter import beets.library from beets import ui, util +from beets.dbcore.query import PathQuery from beets.plugins import BeetsPlugin # Utilities. @@ -342,7 +343,7 @@ def item_query(queries): @app.route("/item/path/") def item_at_path(path): - query = beets.library.PathQuery("path", path.encode("utf-8")) + query = PathQuery("path", path.encode("utf-8")) item = g.lib.items(query).get() if item: return flask.jsonify(_rep(item)) diff --git a/test/test_library.py b/test/test_library.py index 36322cfec..2d232c88f 100644 --- a/test/test_library.py +++ b/test/test_library.py @@ -19,7 +19,6 @@ import os.path import re import shutil import stat -import time import unicodedata import unittest from unittest.mock import patch @@ -1320,56 +1319,3 @@ class ParseQueryTest(unittest.TestCase): def test_parse_bytes(self): with pytest.raises(AssertionError): beets.library.parse_query_string(b"query", None) - - -class LibraryFieldTypesTest(unittest.TestCase): - """Test format() and parse() for library-specific field types""" - - def test_datetype(self): - t = beets.library.DateType() - - # format - time_format = beets.config["time_format"].as_str() - time_local = time.strftime(time_format, time.localtime(123456789)) - assert time_local == t.format(123456789) - # parse - assert 123456789.0 == t.parse(time_local) - assert 123456789.0 == t.parse("123456789.0") - assert t.null == t.parse("not123456789.0") - assert t.null == t.parse("1973-11-29") - - def test_pathtype(self): - t = beets.library.PathType() - - # format - assert "/tmp" == t.format("/tmp") - assert "/tmp/\xe4lbum" == t.format("/tmp/\u00e4lbum") - # parse - assert np(b"/tmp") == t.parse("/tmp") - assert np(b"/tmp/\xc3\xa4lbum") == t.parse("/tmp/\u00e4lbum/") - - def test_musicalkey(self): - t = beets.library.MusicalKey() - - # parse - assert "C#m" == t.parse("c#m") - assert "Gm" == t.parse("g minor") - assert "Not c#m" == t.parse("not C#m") - - def test_durationtype(self): - t = beets.library.DurationType() - - # format - assert "1:01" == t.format(61.23) - assert "60:01" == t.format(3601.23) - assert "0:00" == t.format(None) - # parse - assert 61.0 == t.parse("1:01") - assert 61.23 == t.parse("61.23") - assert 3601.0 == t.parse("60:01") - assert t.null == t.parse("1:00:01") - assert t.null == t.parse("not61.23") - # config format_raw_length - beets.config["format_raw_length"] = True - assert 61.23 == t.format(61.23) - assert 3601.23 == t.format(3601.23) diff --git a/test/test_query.py b/test/test_query.py index 22c2710de..6546cb4df 100644 --- a/test/test_query.py +++ b/test/test_query.py @@ -466,9 +466,9 @@ class PathQueryTest(ItemInDBTestCase, AssertsMixin): # Unadorned path queries with path separators in them are considered # path queries only when the path in question actually exists. So we # mock the existence check to return true. - beets.library.PathQuery.force_implicit_query_detection = True + beets.dbcore.query.PathQuery.force_implicit_query_detection = True yield - beets.library.PathQuery.force_implicit_query_detection = False + beets.dbcore.query.PathQuery.force_implicit_query_detection = False def test_path_exact_match(self): q = "path:/a/b/c.mp3" @@ -609,7 +609,7 @@ class PathQueryTest(ItemInDBTestCase, AssertsMixin): def test_case_sensitivity(self): self.add_album(path=b"/A/B/C2.mp3", title="caps path") - makeq = partial(beets.library.PathQuery, "path", "/A/B") + makeq = partial(beets.dbcore.query.PathQuery, "path", "/A/B") results = self.lib.items(makeq(case_sensitive=True)) self.assert_items_matched(results, ["caps path"]) @@ -621,7 +621,7 @@ class PathQueryTest(ItemInDBTestCase, AssertsMixin): # both os.sep and os.altsep @unittest.skipIf(sys.platform == "win32", "win32") def test_path_sep_detection(self): - is_path_query = beets.library.PathQuery.is_path_query + is_path_query = beets.dbcore.query.PathQuery.is_path_query with self.force_implicit_query_detection(): assert is_path_query("/foo/bar") @@ -641,7 +641,7 @@ class PathQueryTest(ItemInDBTestCase, AssertsMixin): Thus, don't use the `force_implicit_query_detection()` contextmanager which would disable the existence check. """ - is_path_query = beets.library.PathQuery.is_path_query + is_path_query = beets.dbcore.query.PathQuery.is_path_query path = self.touch(os.path.join(b"foo", b"bar")) assert os.path.isabs(util.syspath(path)) @@ -664,7 +664,7 @@ class PathQueryTest(ItemInDBTestCase, AssertsMixin): Thus, don't use the `force_implicit_query_detection()` contextmanager which would disable the existence check. """ - is_path_query = beets.library.PathQuery.is_path_query + is_path_query = beets.dbcore.query.PathQuery.is_path_query self.touch(os.path.join(b"foo", b"bar")) diff --git a/test/test_types.py b/test/test_types.py new file mode 100644 index 000000000..8a6acd0dc --- /dev/null +++ b/test/test_types.py @@ -0,0 +1,59 @@ +import time +import unittest + +import beets +from beets.dbcore import types +from beets.util import normpath + + +class LibraryFieldTypesTest(unittest.TestCase): + """Test format() and parse() for library-specific field types""" + + def test_datetype(self): + t = types.DATE + + # format + time_format = beets.config["time_format"].as_str() + time_local = time.strftime(time_format, time.localtime(123456789)) + assert time_local == t.format(123456789) + # parse + assert 123456789.0 == t.parse(time_local) + assert 123456789.0 == t.parse("123456789.0") + assert t.null == t.parse("not123456789.0") + assert t.null == t.parse("1973-11-29") + + def test_pathtype(self): + t = types.PathType() + + # format + assert "/tmp" == t.format("/tmp") + assert "/tmp/\xe4lbum" == t.format("/tmp/\u00e4lbum") + # parse + assert normpath(b"/tmp") == t.parse("/tmp") + assert normpath(b"/tmp/\xc3\xa4lbum") == t.parse("/tmp/\u00e4lbum/") + + def test_musicalkey(self): + t = types.MusicalKey() + + # parse + assert "C#m" == t.parse("c#m") + assert "Gm" == t.parse("g minor") + assert "Not c#m" == t.parse("not C#m") + + def test_durationtype(self): + t = types.DurationType() + + # format + assert "1:01" == t.format(61.23) + assert "60:01" == t.format(3601.23) + assert "0:00" == t.format(None) + # parse + assert 61.0 == t.parse("1:01") + assert 61.23 == t.parse("61.23") + assert 3601.0 == t.parse("60:01") + assert t.null == t.parse("1:00:01") + assert t.null == t.parse("not61.23") + # config format_raw_length + beets.config["format_raw_length"] = True + assert 61.23 == t.format(61.23) + assert 3601.23 == t.format(3601.23) From b40ce836d5b7cf57a1c4ac836b9eee252aeb70e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0ar=C5=ABnas=20Nejus?= Date: Mon, 12 May 2025 12:19:00 +0100 Subject: [PATCH 02/10] Add NullPathType and types to PathType --- .git-blame-ignore-revs | 4 +- beets/dbcore/query.py | 2 + beets/dbcore/types.py | 46 +++++++++++----------- beets/library.py | 2 +- test/test_types.py | 87 +++++++++++++++++++++--------------------- 5 files changed, 73 insertions(+), 68 deletions(-) diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index 4703203ba..5441940a4 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -48,4 +48,6 @@ f36bc497c8c8f89004f3f6879908d3f0b25123e1 # Fix formatting c490ac5810b70f3cf5fd8649669838e8fdb19f4d # Importer restructure -9147577b2b19f43ca827e9650261a86fb0450cef \ No newline at end of file +9147577b2b19f43ca827e9650261a86fb0450cef +# Copy paste query, types from library to dbcore +1a045c91668c771686f4c871c84f1680af2e944b diff --git a/beets/dbcore/query.py b/beets/dbcore/query.py index 9812a7528..3243445cb 100644 --- a/beets/dbcore/query.py +++ b/beets/dbcore/query.py @@ -40,6 +40,8 @@ else: # To use the SQLite "blob" type, it doesn't suffice to provide a byte # string; SQLite treats that as encoded text. Wrapping it in a # `memoryview` tells it that we actually mean non-text data. +# needs to be defined in here due to circular import. +# TODO: remove it from this module and define it in dbcore/types.py instead BLOB_TYPE = memoryview diff --git a/beets/dbcore/types.py b/beets/dbcore/types.py index 27cd04b92..be28f6891 100644 --- a/beets/dbcore/types.py +++ b/beets/dbcore/types.py @@ -22,6 +22,7 @@ import typing from abc import ABC from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast +import beets from beets import util from . import query @@ -345,7 +346,7 @@ class DateType(Float): return self.null -class PathType(Type[bytes, bytes]): +class BasePathType(Type[bytes, N]): """A dbcore type for filesystem paths. These are represented as `bytes` objects, in keeping with @@ -356,27 +357,10 @@ class PathType(Type[bytes, bytes]): query = query.PathQuery model_type = bytes - def __init__(self, nullable=False): - """Create a path type object. + def parse(self, string: str) -> bytes: + return util.normpath(string) - `nullable` controls whether the type may be missing, i.e., None. - """ - self.nullable = nullable - - @property - def null(self): - if self.nullable: - return None - else: - return b"" - - def format(self, value): - return util.displayable_path(value) - - def parse(self, string): - return util.normpath(util.bytestring_path(string)) - - def normalize(self, value): + def normalize(self, value: Any) -> bytes | N: if isinstance(value, str): # Paths stored internally as encoded bytes. return util.bytestring_path(value) @@ -391,12 +375,30 @@ class PathType(Type[bytes, bytes]): def from_sql(self, sql_value): return self.normalize(sql_value) - def to_sql(self, value): + def to_sql(self, value: bytes) -> BLOB_TYPE: if isinstance(value, bytes): value = BLOB_TYPE(value) return value +class NullPathType(BasePathType[None]): + @property + def null(self) -> None: + return None + + def format(self, value: bytes | None) -> str: + return util.displayable_path(value or b"") + + +class PathType(BasePathType[bytes]): + @property + def null(self) -> bytes: + return b"" + + def format(self, value: bytes) -> str: + return util.displayable_path(value or b"") + + class MusicalKey(String): """String representing the musical key of a song. diff --git a/beets/library.py b/beets/library.py index 5a692ef1c..9223b3209 100644 --- a/beets/library.py +++ b/beets/library.py @@ -910,7 +910,7 @@ class Album(LibModel): _always_dirty = True _fields = { "id": types.PRIMARY_ID, - "artpath": types.PathType(True), + "artpath": types.NullPathType(), "added": types.DATE, "albumartist": types.STRING, "albumartist_sort": types.STRING, diff --git a/test/test_types.py b/test/test_types.py index 8a6acd0dc..6727917d8 100644 --- a/test/test_types.py +++ b/test/test_types.py @@ -1,59 +1,58 @@ import time -import unittest import beets from beets.dbcore import types from beets.util import normpath -class LibraryFieldTypesTest(unittest.TestCase): - """Test format() and parse() for library-specific field types""" +def test_datetype(): + t = types.DATE - def test_datetype(self): - t = types.DATE + # format + time_format = beets.config["time_format"].as_str() + time_local = time.strftime(time_format, time.localtime(123456789)) + assert time_local == t.format(123456789) + # parse + assert 123456789.0 == t.parse(time_local) + assert 123456789.0 == t.parse("123456789.0") + assert t.null == t.parse("not123456789.0") + assert t.null == t.parse("1973-11-29") - # format - time_format = beets.config["time_format"].as_str() - time_local = time.strftime(time_format, time.localtime(123456789)) - assert time_local == t.format(123456789) - # parse - assert 123456789.0 == t.parse(time_local) - assert 123456789.0 == t.parse("123456789.0") - assert t.null == t.parse("not123456789.0") - assert t.null == t.parse("1973-11-29") - def test_pathtype(self): - t = types.PathType() +def test_pathtype(): + t = types.PathType() - # format - assert "/tmp" == t.format("/tmp") - assert "/tmp/\xe4lbum" == t.format("/tmp/\u00e4lbum") - # parse - assert normpath(b"/tmp") == t.parse("/tmp") - assert normpath(b"/tmp/\xc3\xa4lbum") == t.parse("/tmp/\u00e4lbum/") + # format + assert "/tmp" == t.format("/tmp") + assert "/tmp/\xe4lbum" == t.format("/tmp/\u00e4lbum") + # parse + assert normpath(b"/tmp") == t.parse("/tmp") + assert normpath(b"/tmp/\xc3\xa4lbum") == t.parse("/tmp/\u00e4lbum/") - def test_musicalkey(self): - t = types.MusicalKey() - # parse - assert "C#m" == t.parse("c#m") - assert "Gm" == t.parse("g minor") - assert "Not c#m" == t.parse("not C#m") +def test_musicalkey(): + t = types.MusicalKey() - def test_durationtype(self): - t = types.DurationType() + # parse + assert "C#m" == t.parse("c#m") + assert "Gm" == t.parse("g minor") + assert "Not c#m" == t.parse("not C#m") - # format - assert "1:01" == t.format(61.23) - assert "60:01" == t.format(3601.23) - assert "0:00" == t.format(None) - # parse - assert 61.0 == t.parse("1:01") - assert 61.23 == t.parse("61.23") - assert 3601.0 == t.parse("60:01") - assert t.null == t.parse("1:00:01") - assert t.null == t.parse("not61.23") - # config format_raw_length - beets.config["format_raw_length"] = True - assert 61.23 == t.format(61.23) - assert 3601.23 == t.format(3601.23) + +def test_durationtype(): + t = types.DurationType() + + # format + assert "1:01" == t.format(61.23) + assert "60:01" == t.format(3601.23) + assert "0:00" == t.format(None) + # parse + assert 61.0 == t.parse("1:01") + assert 61.23 == t.parse("61.23") + assert 3601.0 == t.parse("60:01") + assert t.null == t.parse("1:00:01") + assert t.null == t.parse("not61.23") + # config format_raw_length + beets.config["format_raw_length"] = True + assert 61.23 == t.format(61.23) + assert 3601.23 == t.format(3601.23) From a38918380d3654e65cc7d19776e9c426050a7464 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0ar=C5=ABnas=20Nejus?= Date: Mon, 12 May 2025 09:53:01 +0100 Subject: [PATCH 03/10] Rewrite path query tests using pytest.mark.parametrize And remove `force_implicit_query_detection` attribute from `PathQuery` class. --- beets/dbcore/query.py | 5 - test/test_query.py | 350 +++++++++++++----------------------------- 2 files changed, 105 insertions(+), 250 deletions(-) diff --git a/beets/dbcore/query.py b/beets/dbcore/query.py index 3243445cb..e02ebb76a 100644 --- a/beets/dbcore/query.py +++ b/beets/dbcore/query.py @@ -283,9 +283,6 @@ class PathQuery(FieldQuery[bytes]): and case-sensitive otherwise. """ - # For tests - force_implicit_query_detection = False - def __init__(self, field, pattern, fast=True, case_sensitive=None): """Create a path query. @@ -335,8 +332,6 @@ class PathQuery(FieldQuery[bytes]): ): return False - if cls.force_implicit_query_detection: - return True return os.path.exists(util.syspath(util.normpath(query_part))) def match(self, item): diff --git a/test/test_query.py b/test/test_query.py index 6546cb4df..11537e039 100644 --- a/test/test_query.py +++ b/test/test_query.py @@ -14,26 +14,23 @@ """Various tests for querying the library database.""" -import os import sys import unittest -from contextlib import contextmanager -from functools import partial +from pathlib import Path import pytest from mock import patch -import beets.library -from beets import dbcore, util +from beets import dbcore from beets.dbcore import types from beets.dbcore.query import ( InvalidQueryArgumentValueError, NoneQuery, ParsingError, + PathQuery, ) from beets.test import _common -from beets.test.helper import BeetsTestCase, ItemInDBTestCase -from beets.util import syspath +from beets.test.helper import BeetsTestCase, TestHelper # Because the absolute path begins with something like C:, we # can't disambiguate it from an ordinary query. @@ -442,244 +439,6 @@ class MatchTest(unittest.TestCase): assert q3 != q4 -class PathQueryTest(ItemInDBTestCase, AssertsMixin): - def setUp(self): - super().setUp() - - # This is the item we'll try to match. - self.i.path = util.normpath("/a/b/c.mp3") - self.i.title = "path item" - self.i.album = "path album" - self.i.store() - self.lib.add_album([self.i]) - - # A second item for testing exclusion. - i2 = _common.item() - i2.path = util.normpath("/x/y/z.mp3") - i2.title = "another item" - i2.album = "another album" - self.lib.add(i2) - self.lib.add_album([i2]) - - @contextmanager - def force_implicit_query_detection(self): - # Unadorned path queries with path separators in them are considered - # path queries only when the path in question actually exists. So we - # mock the existence check to return true. - beets.dbcore.query.PathQuery.force_implicit_query_detection = True - yield - beets.dbcore.query.PathQuery.force_implicit_query_detection = False - - def test_path_exact_match(self): - q = "path:/a/b/c.mp3" - results = self.lib.items(q) - self.assert_items_matched(results, ["path item"]) - - results = self.lib.albums(q) - self.assert_albums_matched(results, ["path album"]) - - # FIXME: fails on windows - @unittest.skipIf(sys.platform == "win32", "win32") - def test_parent_directory_no_slash(self): - q = "path:/a" - results = self.lib.items(q) - self.assert_items_matched(results, ["path item"]) - - results = self.lib.albums(q) - self.assert_albums_matched(results, ["path album"]) - - # FIXME: fails on windows - @unittest.skipIf(sys.platform == "win32", "win32") - def test_parent_directory_with_slash(self): - q = "path:/a/" - results = self.lib.items(q) - self.assert_items_matched(results, ["path item"]) - - results = self.lib.albums(q) - self.assert_albums_matched(results, ["path album"]) - - def test_no_match(self): - q = "path:/xyzzy/" - results = self.lib.items(q) - self.assert_items_matched(results, []) - - results = self.lib.albums(q) - self.assert_albums_matched(results, []) - - def test_fragment_no_match(self): - q = "path:/b/" - results = self.lib.items(q) - self.assert_items_matched(results, []) - - results = self.lib.albums(q) - self.assert_albums_matched(results, []) - - def test_nonnorm_path(self): - q = "path:/x/../a/b" - results = self.lib.items(q) - self.assert_items_matched(results, ["path item"]) - - results = self.lib.albums(q) - self.assert_albums_matched(results, ["path album"]) - - @unittest.skipIf(sys.platform == "win32", WIN32_NO_IMPLICIT_PATHS) - def test_slashed_query_matches_path(self): - with self.force_implicit_query_detection(): - q = "/a/b" - results = self.lib.items(q) - self.assert_items_matched(results, ["path item"]) - - results = self.lib.albums(q) - self.assert_albums_matched(results, ["path album"]) - - @unittest.skipIf(sys.platform == "win32", WIN32_NO_IMPLICIT_PATHS) - def test_path_query_in_or_query(self): - with self.force_implicit_query_detection(): - q = "/a/b , /a/b" - results = self.lib.items(q) - self.assert_items_matched(results, ["path item"]) - - def test_non_slashed_does_not_match_path(self): - with self.force_implicit_query_detection(): - q = "c.mp3" - results = self.lib.items(q) - self.assert_items_matched(results, []) - - results = self.lib.albums(q) - self.assert_albums_matched(results, []) - - def test_slashes_in_explicit_field_does_not_match_path(self): - with self.force_implicit_query_detection(): - q = "title:/a/b" - results = self.lib.items(q) - self.assert_items_matched(results, []) - - def test_path_item_regex(self): - q = "path::c\\.mp3$" - results = self.lib.items(q) - self.assert_items_matched(results, ["path item"]) - - results = self.lib.albums(q) - self.assert_albums_matched(results, ["path album"]) - - def test_path_album_regex(self): - q = "path::b" - results = self.lib.albums(q) - self.assert_albums_matched(results, ["path album"]) - - def test_escape_underscore(self): - self.add_album( - path=b"/a/_/title.mp3", - title="with underscore", - album="album with underscore", - ) - q = "path:/a/_" - results = self.lib.items(q) - self.assert_items_matched(results, ["with underscore"]) - - results = self.lib.albums(q) - self.assert_albums_matched(results, ["album with underscore"]) - - def test_escape_percent(self): - self.add_album( - path=b"/a/%/title.mp3", - title="with percent", - album="album with percent", - ) - q = "path:/a/%" - results = self.lib.items(q) - self.assert_items_matched(results, ["with percent"]) - - results = self.lib.albums(q) - self.assert_albums_matched(results, ["album with percent"]) - - def test_escape_backslash(self): - self.add_album( - path=rb"/a/\x/title.mp3", - title="with backslash", - album="album with backslash", - ) - q = "path:/a/\\\\x" - results = self.lib.items(q) - self.assert_items_matched(results, ["with backslash"]) - - results = self.lib.albums(q) - self.assert_albums_matched(results, ["album with backslash"]) - - def test_case_sensitivity(self): - self.add_album(path=b"/A/B/C2.mp3", title="caps path") - - makeq = partial(beets.dbcore.query.PathQuery, "path", "/A/B") - - results = self.lib.items(makeq(case_sensitive=True)) - self.assert_items_matched(results, ["caps path"]) - - results = self.lib.items(makeq(case_sensitive=False)) - self.assert_items_matched(results, ["path item", "caps path"]) - - # FIXME: Also create a variant of this test for windows, which tests - # both os.sep and os.altsep - @unittest.skipIf(sys.platform == "win32", "win32") - def test_path_sep_detection(self): - is_path_query = beets.dbcore.query.PathQuery.is_path_query - - with self.force_implicit_query_detection(): - assert is_path_query("/foo/bar") - assert is_path_query("foo/bar") - assert is_path_query("foo/") - assert not is_path_query("foo") - assert is_path_query("foo/:bar") - assert not is_path_query("foo:bar/") - assert not is_path_query("foo:/bar") - - # FIXME: shouldn't this also work on windows? - @unittest.skipIf(sys.platform == "win32", WIN32_NO_IMPLICIT_PATHS) - def test_detect_absolute_path(self): - """Test detection of implicit path queries based on whether or - not the path actually exists, when using an absolute path query. - - Thus, don't use the `force_implicit_query_detection()` - contextmanager which would disable the existence check. - """ - is_path_query = beets.dbcore.query.PathQuery.is_path_query - - path = self.touch(os.path.join(b"foo", b"bar")) - assert os.path.isabs(util.syspath(path)) - path_str = path.decode("utf-8") - - # The file itself. - assert is_path_query(path_str) - - # The parent directory. - parent = os.path.dirname(path_str) - assert is_path_query(parent) - - # Some non-existent path. - assert not is_path_query(f"{path_str}baz") - - def test_detect_relative_path(self): - """Test detection of implicit path queries based on whether or - not the path actually exists, when using a relative path query. - - Thus, don't use the `force_implicit_query_detection()` - contextmanager which would disable the existence check. - """ - is_path_query = beets.dbcore.query.PathQuery.is_path_query - - self.touch(os.path.join(b"foo", b"bar")) - - # Temporarily change directory so relative paths work. - cur_dir = os.getcwd() - try: - os.chdir(syspath(self.temp_dir)) - assert is_path_query("foo/") - assert is_path_query("foo/bar") - assert is_path_query("foo/bar:tagada") - assert not is_path_query("bar") - finally: - os.chdir(cur_dir) - - class IntQueryTest(BeetsTestCase): def test_exact_value_match(self): item = self.add_item(bpm=120) @@ -1104,3 +863,104 @@ class RelatedQueriesTest(BeetsTestCase, AssertsMixin): q = "artpath::A Album1" results = self.lib.items(q) self.assert_items_matched(results, ["Album1 Item1", "Album1 Item2"]) + + +@pytest.fixture(scope="class") +def helper(): + helper = TestHelper() + helper.setup_beets() + + yield helper + + helper.teardown_beets() + + +class TestPathQuery: + _p = pytest.param + + @pytest.fixture(scope="class") + def lib(self, helper): + helper.add_item(path=b"/a/b/c.mp3", title="path item") + helper.add_item(path=b"/x/y/z.mp3", title="another item") + helper.add_item(path=b"/c/_/title.mp3", title="with underscore") + helper.add_item(path=b"/c/%/title.mp3", title="with percent") + helper.add_item(path=rb"/c/\x/title.mp3", title="with backslash") + helper.add_item(path=b"/A/B/C2.mp3", title="caps path") + + return helper.lib + + @pytest.mark.parametrize( + "q, expected_titles", + [ + _p("path:/a/b/c.mp3", ["path item"], id="exact-match"), + _p("path:/a", ["path item"], id="parent-dir-no-slash"), + _p("path:/a/", ["path item"], id="parent-dir-with-slash"), + _p("path:/xyzzy/", [], id="no-match"), + _p("path:/b/", [], id="fragment-no-match"), + _p("path:/x/../a/b", ["path item"], id="non-normalized"), + _p("path::c\\.mp3$", ["path item"], id="regex"), + _p("path:/c/_", ["with underscore"], id="underscore-escaped"), + _p("path:/c/%", ["with percent"], id="percent-escaped"), + _p("path:/c/\\\\x", ["with backslash"], id="backslash-escaped"), + ], + ) + def test_explicit(self, lib, q, expected_titles): + assert {i.title for i in lib.items(q)} == set(expected_titles) + + @pytest.mark.skipif(sys.platform == "win32", reason=WIN32_NO_IMPLICIT_PATHS) + @pytest.mark.parametrize( + "q, expected_titles", + [ + _p("/a/b", ["path item"], id="slashed-query"), + _p("/a/b , /a/b", ["path item"], id="path-in-or-query"), + _p("c.mp3", [], id="no-slash-no-match"), + _p("title:/a/b", [], id="slash-with-explicit-field-no-match"), + ], + ) + def test_implicit(self, monkeypatch, lib, q, expected_titles): + monkeypatch.setattr( + "beets.dbcore.query.PathQuery.is_path_query", lambda path: True + ) + + assert {i.title for i in lib.items(q)} == set(expected_titles) + + @pytest.mark.parametrize( + "case_sensitive, expected_titles", + [ + _p(True, [], id="non-caps-dont-match-caps"), + _p(False, ["caps path"], id="non-caps-match-caps"), + ], + ) + def test_case_sensitivity( + self, lib, monkeypatch, case_sensitive, expected_titles + ): + q = "path:/a/b/c2.mp3" + monkeypatch.setattr( + "beets.util.case_sensitive", lambda *_: case_sensitive + ) + + assert {i.title for i in lib.items(q)} == set(expected_titles) + + # FIXME: Also create a variant of this test for windows, which tests + # both os.sep and os.altsep + @pytest.mark.skipif(sys.platform == "win32", reason=WIN32_NO_IMPLICIT_PATHS) + @pytest.mark.parametrize( + "q, is_path_query", + [ + ("/foo/bar", True), + ("foo/bar", True), + ("foo/", True), + ("foo", False), + ("foo/:bar", True), + ("foo:bar/", False), + ("foo:/bar", False), + ], + ) + def test_path_sep_detection(self, monkeypatch, tmp_path, q, is_path_query): + monkeypatch.chdir(tmp_path) + (tmp_path / "foo").mkdir() + (tmp_path / "foo" / "bar").touch() + if Path(q).is_absolute(): + q = str(tmp_path / q[1:]) + + assert PathQuery.is_path_query(q) == is_path_query From 45f92ac6416e1200d941b410e42041dc147ecaa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0ar=C5=ABnas=20Nejus?= Date: Mon, 12 May 2025 09:48:40 +0100 Subject: [PATCH 04/10] Remove case_sensitive from PathQuery.__init__ The case_sensitive parameter was only used in tests, which now use monkeypatch to control the behavior of util.case_sensitive() instead. This simplifies the PathQuery initialization logic while maintaining test coverage. --- beets/dbcore/query.py | 14 ++++---------- test/test_query.py | 4 +++- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/beets/dbcore/query.py b/beets/dbcore/query.py index e02ebb76a..c814c5966 100644 --- a/beets/dbcore/query.py +++ b/beets/dbcore/query.py @@ -283,26 +283,20 @@ class PathQuery(FieldQuery[bytes]): and case-sensitive otherwise. """ - def __init__(self, field, pattern, fast=True, case_sensitive=None): + def __init__(self, field, pattern, fast=True): """Create a path query. `pattern` must be a path, either to a file or a directory. - - `case_sensitive` can be a bool or `None`, indicating that the - behavior should depend on the filesystem. """ super().__init__(field, pattern, fast) path = util.normpath(pattern) - # By default, the case sensitivity depends on the filesystem - # that the query path is located on. - if case_sensitive is None: - case_sensitive = util.case_sensitive(path) - self.case_sensitive = case_sensitive + # Case sensitivity depends on the filesystem that the query path is located on. + self.case_sensitive = util.case_sensitive(path) # Use a normalized-case pattern for case-insensitive matches. - if not case_sensitive: + if not self.case_sensitive: # We need to lowercase the entire path, not just the pattern. # In particular, on Windows, the drive letter is otherwise not # lowercased. diff --git a/test/test_query.py b/test/test_query.py index 11537e039..a8646f1bb 100644 --- a/test/test_query.py +++ b/test/test_query.py @@ -904,7 +904,9 @@ class TestPathQuery: _p("path:/c/\\\\x", ["with backslash"], id="backslash-escaped"), ], ) - def test_explicit(self, lib, q, expected_titles): + def test_explicit(self, monkeypatch, lib, q, expected_titles): + monkeypatch.setattr("beets.util.case_sensitive", lambda *_: True) + assert {i.title for i in lib.items(q)} == set(expected_titles) @pytest.mark.skipif(sys.platform == "win32", reason=WIN32_NO_IMPLICIT_PATHS) From 8937978d5f607885609d6809aca23d84cee063db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0ar=C5=ABnas=20Nejus?= Date: Sat, 31 May 2025 18:57:09 +0100 Subject: [PATCH 05/10] Refactor PathQuery and add docs --- beets/dbcore/query.py | 90 ++++++++++++++++++++++--------------------- test/test_query.py | 15 ++++---- 2 files changed, 55 insertions(+), 50 deletions(-) diff --git a/beets/dbcore/query.py b/beets/dbcore/query.py index c814c5966..9cff082a3 100644 --- a/beets/dbcore/query.py +++ b/beets/dbcore/query.py @@ -22,7 +22,7 @@ import unicodedata from abc import ABC, abstractmethod from collections.abc import Iterator, MutableSequence, Sequence from datetime import datetime, timedelta -from functools import reduce +from functools import cached_property, reduce from operator import mul, or_ from re import Pattern from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union @@ -30,8 +30,7 @@ from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union from beets import util if TYPE_CHECKING: - from beets.dbcore import Model - from beets.dbcore.db import AnyModel + from beets.dbcore.db import AnyModel, Model P = TypeVar("P", default=Any) else: @@ -283,13 +282,11 @@ class PathQuery(FieldQuery[bytes]): and case-sensitive otherwise. """ - def __init__(self, field, pattern, fast=True): + def __init__(self, field: str, pattern: bytes, fast: bool = True) -> None: """Create a path query. `pattern` must be a path, either to a file or a directory. """ - super().__init__(field, pattern, fast) - path = util.normpath(pattern) # Case sensitivity depends on the filesystem that the query path is located on. @@ -304,50 +301,57 @@ class PathQuery(FieldQuery[bytes]): # from `col_clause()` do the same thing. path = path.lower() - # Match the path as a single file. - self.file_path = path - # As a directory (prefix). - self.dir_path = os.path.join(path, b"") + super().__init__(field, path, fast) - @classmethod - def is_path_query(cls, query_part): + @cached_property + def dir_path(self) -> bytes: + return os.path.join(self.pattern, b"") + + @staticmethod + def is_path_query(query_part: str) -> bool: """Try to guess whether a unicode query part is a path query. - Condition: separator precedes colon and the file exists. + The path query must + 1. precede the colon in the query, if a colon is present + 2. contain either ``os.sep`` or ``os.altsep`` (Windows) + 3. this path must exist on the filesystem. """ - colon = query_part.find(":") - if colon != -1: - query_part = query_part[:colon] + query_part = query_part.split(":")[0] - # Test both `sep` and `altsep` (i.e., both slash and backslash on - # Windows). - if not ( - os.sep in query_part or (os.altsep and os.altsep in query_part) - ): - return False - - return os.path.exists(util.syspath(util.normpath(query_part))) - - def match(self, item): - path = item.path if self.case_sensitive else item.path.lower() - return (path == self.file_path) or path.startswith(self.dir_path) - - def col_clause(self): - file_blob = BLOB_TYPE(self.file_path) - dir_blob = BLOB_TYPE(self.dir_path) - - if self.case_sensitive: - query_part = "({0} = ?) || (substr({0}, 1, ?) = ?)" - else: - query_part = "(BYTELOWER({0}) = BYTELOWER(?)) || \ - (substr(BYTELOWER({0}), 1, ?) = BYTELOWER(?))" - - return query_part.format(self.field), ( - file_blob, - len(dir_blob), - dir_blob, + return ( + # make sure the query part contains a path separator + bool(set(query_part) & {os.sep, os.altsep}) + and os.path.exists(util.normpath(query_part)) ) + def match(self, obj: Model) -> bool: + """Check whether a model object's path matches this query. + + Performs either an exact match against the pattern or checks if the path + starts with the given directory path. Case sensitivity depends on the object's + filesystem as determined during initialization. + """ + path = obj.path if self.case_sensitive else obj.path.lower() + return (path == self.pattern) or path.startswith(self.dir_path) + + def col_clause(self) -> tuple[str, Sequence[SQLiteType]]: + """Generate an SQL clause that implements path matching in the database. + + Returns a tuple of SQL clause string and parameter values list that matches + paths either exactly or by directory prefix. Handles case sensitivity + appropriately using BYTELOWER for case-insensitive matches. + """ + if self.case_sensitive: + left, right = self.field, "?" + else: + left, right = f"BYTELOWER({self.field})", "BYTELOWER(?)" + + return f"({left} = {right}) || (substr({left}, 1, ?) = {right})", [ + BLOB_TYPE(self.pattern), + len(dir_blob := BLOB_TYPE(self.dir_path)), + dir_blob, + ] + def __repr__(self) -> str: return ( f"{self.__class__.__name__}({self.field!r}, {self.pattern!r}, " diff --git a/test/test_query.py b/test/test_query.py index a8646f1bb..776bfd6f6 100644 --- a/test/test_query.py +++ b/test/test_query.py @@ -880,7 +880,7 @@ class TestPathQuery: @pytest.fixture(scope="class") def lib(self, helper): - helper.add_item(path=b"/a/b/c.mp3", title="path item") + helper.add_item(path=b"/aaa/bb/c.mp3", title="path item") helper.add_item(path=b"/x/y/z.mp3", title="another item") helper.add_item(path=b"/c/_/title.mp3", title="with underscore") helper.add_item(path=b"/c/%/title.mp3", title="with percent") @@ -892,12 +892,13 @@ class TestPathQuery: @pytest.mark.parametrize( "q, expected_titles", [ - _p("path:/a/b/c.mp3", ["path item"], id="exact-match"), - _p("path:/a", ["path item"], id="parent-dir-no-slash"), - _p("path:/a/", ["path item"], id="parent-dir-with-slash"), + _p("path:/aaa/bb/c.mp3", ["path item"], id="exact-match"), + _p("path:/aaa", ["path item"], id="parent-dir-no-slash"), + _p("path:/aaa/", ["path item"], id="parent-dir-with-slash"), + _p("path:/aa", [], id="no-match-does-not-match-parent-dir"), _p("path:/xyzzy/", [], id="no-match"), _p("path:/b/", [], id="fragment-no-match"), - _p("path:/x/../a/b", ["path item"], id="non-normalized"), + _p("path:/x/../aaa/bb", ["path item"], id="non-normalized"), _p("path::c\\.mp3$", ["path item"], id="regex"), _p("path:/c/_", ["with underscore"], id="underscore-escaped"), _p("path:/c/%", ["with percent"], id="percent-escaped"), @@ -913,8 +914,8 @@ class TestPathQuery: @pytest.mark.parametrize( "q, expected_titles", [ - _p("/a/b", ["path item"], id="slashed-query"), - _p("/a/b , /a/b", ["path item"], id="path-in-or-query"), + _p("/aaa/bb", ["path item"], id="slashed-query"), + _p("/aaa/bb , /aaa", ["path item"], id="path-in-or-query"), _p("c.mp3", [], id="no-slash-no-match"), _p("title:/a/b", [], id="slash-with-explicit-field-no-match"), ], From 9d088ab69f499dd54597dd8af941746acfee3e27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0ar=C5=ABnas=20Nejus?= Date: Sat, 5 Jul 2025 20:46:27 +0100 Subject: [PATCH 06/10] Move human formatting functions under beets.util.units --- beets/dbcore/types.py | 5 ++-- beets/ui/__init__.py | 42 ---------------------------- beets/ui/commands.py | 17 ++++++------ beets/util/__init__.py | 21 -------------- beets/util/units.py | 61 +++++++++++++++++++++++++++++++++++++++++ test/test_ui_init.py | 38 +------------------------ test/util/test_units.py | 43 +++++++++++++++++++++++++++++ 7 files changed, 117 insertions(+), 110 deletions(-) create mode 100644 beets/util/units.py create mode 100644 test/util/test_units.py diff --git a/beets/dbcore/types.py b/beets/dbcore/types.py index be28f6891..30cabf42f 100644 --- a/beets/dbcore/types.py +++ b/beets/dbcore/types.py @@ -24,6 +24,7 @@ from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast import beets from beets import util +from beets.util.units import human_seconds_short, raw_seconds_short from . import query @@ -437,14 +438,14 @@ class DurationType(Float): def format(self, value): if not beets.config["format_raw_length"].get(bool): - return util.human_seconds_short(value or 0.0) + return human_seconds_short(value or 0.0) else: return value def parse(self, string): try: # Try to format back hh:ss to seconds. - return util.raw_seconds_short(string) + return raw_seconds_short(string) except ValueError: # Fall back to a plain float. try: diff --git a/beets/ui/__init__.py b/beets/ui/__init__.py index f1aac766f..b7033e41b 100644 --- a/beets/ui/__init__.py +++ b/beets/ui/__init__.py @@ -435,48 +435,6 @@ def input_select_objects(prompt, objs, rep, prompt_all=None): return [] -# Human output formatting. - - -def human_bytes(size): - """Formats size, a number of bytes, in a human-readable way.""" - powers = ["", "K", "M", "G", "T", "P", "E", "Z", "Y", "H"] - unit = "B" - for power in powers: - if size < 1024: - return f"{size:3.1f} {power}{unit}" - size /= 1024.0 - unit = "iB" - return "big" - - -def human_seconds(interval): - """Formats interval, a number of seconds, as a human-readable time - interval using English words. - """ - units = [ - (1, "second"), - (60, "minute"), - (60, "hour"), - (24, "day"), - (7, "week"), - (52, "year"), - (10, "decade"), - ] - for i in range(len(units) - 1): - increment, suffix = units[i] - next_increment, _ = units[i + 1] - interval /= float(increment) - if interval < next_increment: - break - else: - # Last unit. - increment, suffix = units[-1] - interval /= float(increment) - - return f"{interval:3.1f} {suffix}s" - - # Colorization. # ANSI terminal colorization code heavily inspired by pygments: diff --git a/beets/ui/commands.py b/beets/ui/commands.py index fb9ca8b89..3117262f1 100755 --- a/beets/ui/commands.py +++ b/beets/ui/commands.py @@ -43,6 +43,7 @@ from beets.util import ( normpath, syspath, ) +from beets.util.units import human_bytes, human_seconds, human_seconds_short from . import _store_dict @@ -541,8 +542,8 @@ class ChangeRepresentation: cur_length0 = item.length if item.length else 0 new_length0 = track_info.length if track_info.length else 0 # format into string - cur_length = f"({util.human_seconds_short(cur_length0)})" - new_length = f"({util.human_seconds_short(new_length0)})" + cur_length = f"({human_seconds_short(cur_length0)})" + new_length = f"({human_seconds_short(new_length0)})" # colorize lhs_length = ui.colorize(highlight_color, cur_length) rhs_length = ui.colorize(highlight_color, new_length) @@ -706,14 +707,14 @@ class AlbumChange(ChangeRepresentation): for track_info in self.match.extra_tracks: line = f" ! {track_info.title} (#{self.format_index(track_info)})" if track_info.length: - line += f" ({util.human_seconds_short(track_info.length)})" + line += f" ({human_seconds_short(track_info.length)})" print_(ui.colorize("text_warning", line)) if self.match.extra_items: print_(f"Unmatched tracks ({len(self.match.extra_items)}):") for item in self.match.extra_items: line = " ! {} (#{})".format(item.title, self.format_index(item)) if item.length: - line += " ({})".format(util.human_seconds_short(item.length)) + line += " ({})".format(human_seconds_short(item.length)) print_(ui.colorize("text_warning", line)) @@ -795,8 +796,8 @@ def summarize_items(items, singleton): round(int(items[0].samplerate) / 1000, 1), items[0].bitdepth ) summary_parts.append(sample_bits) - summary_parts.append(util.human_seconds_short(total_duration)) - summary_parts.append(ui.human_bytes(total_filesize)) + summary_parts.append(human_seconds_short(total_duration)) + summary_parts.append(human_bytes(total_filesize)) return ", ".join(summary_parts) @@ -1906,7 +1907,7 @@ def show_stats(lib, query, exact): if item.album_id: albums.add(item.album_id) - size_str = "" + ui.human_bytes(total_size) + size_str = "" + human_bytes(total_size) if exact: size_str += f" ({total_size} bytes)" @@ -1918,7 +1919,7 @@ Artists: {} Albums: {} Album artists: {}""".format( total_items, - ui.human_seconds(total_time), + human_seconds(total_time), f" ({total_time:.2f} seconds)" if exact else "", "Total size" if exact else "Approximate total size", size_str, diff --git a/beets/util/__init__.py b/beets/util/__init__.py index 4572b27f9..c1c76c860 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -1019,27 +1019,6 @@ def case_sensitive(path: bytes) -> bool: return not os.path.samefile(lower_sys, upper_sys) -def raw_seconds_short(string: str) -> float: - """Formats a human-readable M:SS string as a float (number of seconds). - - Raises ValueError if the conversion cannot take place due to `string` not - being in the right format. - """ - match = re.match(r"^(\d+):([0-5]\d)$", string) - if not match: - raise ValueError("String not in M:SS format") - minutes, seconds = map(int, match.groups()) - return float(minutes * 60 + seconds) - - -def human_seconds_short(interval): - """Formats a number of seconds as a short human-readable M:SS - string. - """ - interval = int(interval) - return "%i:%02i" % (interval // 60, interval % 60) - - def asciify_path(path: str, sep_replace: str) -> str: """Decodes all unicode characters in a path into ASCII equivalents. diff --git a/beets/util/units.py b/beets/util/units.py new file mode 100644 index 000000000..d07d42546 --- /dev/null +++ b/beets/util/units.py @@ -0,0 +1,61 @@ +import re + + +def raw_seconds_short(string: str) -> float: + """Formats a human-readable M:SS string as a float (number of seconds). + + Raises ValueError if the conversion cannot take place due to `string` not + being in the right format. + """ + match = re.match(r"^(\d+):([0-5]\d)$", string) + if not match: + raise ValueError("String not in M:SS format") + minutes, seconds = map(int, match.groups()) + return float(minutes * 60 + seconds) + + +def human_seconds_short(interval): + """Formats a number of seconds as a short human-readable M:SS + string. + """ + interval = int(interval) + return "%i:%02i" % (interval // 60, interval % 60) + + +def human_bytes(size): + """Formats size, a number of bytes, in a human-readable way.""" + powers = ["", "K", "M", "G", "T", "P", "E", "Z", "Y", "H"] + unit = "B" + for power in powers: + if size < 1024: + return f"{size:3.1f} {power}{unit}" + size /= 1024.0 + unit = "iB" + return "big" + + +def human_seconds(interval): + """Formats interval, a number of seconds, as a human-readable time + interval using English words. + """ + units = [ + (1, "second"), + (60, "minute"), + (60, "hour"), + (24, "day"), + (7, "week"), + (52, "year"), + (10, "decade"), + ] + for i in range(len(units) - 1): + increment, suffix = units[i] + next_increment, _ = units[i + 1] + interval /= float(increment) + if interval < next_increment: + break + else: + # Last unit. + increment, suffix = units[-1] + interval /= float(increment) + + return f"{interval:3.1f} {suffix}s" diff --git a/test/test_ui_init.py b/test/test_ui_init.py index a6f06c494..df21b300c 100644 --- a/test/test_ui_init.py +++ b/test/test_ui_init.py @@ -21,7 +21,7 @@ from random import random from beets import config, ui from beets.test import _common -from beets.test.helper import BeetsTestCase, ItemInDBTestCase, control_stdin +from beets.test.helper import BeetsTestCase, control_stdin class InputMethodsTest(BeetsTestCase): @@ -88,42 +88,6 @@ class InputMethodsTest(BeetsTestCase): assert items == ["1", "3"] -class InitTest(ItemInDBTestCase): - def test_human_bytes(self): - tests = [ - (0, "0.0 B"), - (30, "30.0 B"), - (pow(2, 10), "1.0 KiB"), - (pow(2, 20), "1.0 MiB"), - (pow(2, 30), "1.0 GiB"), - (pow(2, 40), "1.0 TiB"), - (pow(2, 50), "1.0 PiB"), - (pow(2, 60), "1.0 EiB"), - (pow(2, 70), "1.0 ZiB"), - (pow(2, 80), "1.0 YiB"), - (pow(2, 90), "1.0 HiB"), - (pow(2, 100), "big"), - ] - for i, h in tests: - assert h == ui.human_bytes(i) - - def test_human_seconds(self): - tests = [ - (0, "0.0 seconds"), - (30, "30.0 seconds"), - (60, "1.0 minutes"), - (90, "1.5 minutes"), - (125, "2.1 minutes"), - (3600, "1.0 hours"), - (86400, "1.0 days"), - (604800, "1.0 weeks"), - (31449600, "1.0 years"), - (314496000, "1.0 decades"), - ] - for i, h in tests: - assert h == ui.human_seconds(i) - - class ParentalDirCreation(BeetsTestCase): def test_create_yes(self): non_exist_path = _common.os.fsdecode( diff --git a/test/util/test_units.py b/test/util/test_units.py new file mode 100644 index 000000000..26f4d3eca --- /dev/null +++ b/test/util/test_units.py @@ -0,0 +1,43 @@ +import pytest + +from beets.util.units import human_bytes, human_seconds + + +@pytest.mark.parametrize( + "input_bytes,expected", + [ + (0, "0.0 B"), + (30, "30.0 B"), + (pow(2, 10), "1.0 KiB"), + (pow(2, 20), "1.0 MiB"), + (pow(2, 30), "1.0 GiB"), + (pow(2, 40), "1.0 TiB"), + (pow(2, 50), "1.0 PiB"), + (pow(2, 60), "1.0 EiB"), + (pow(2, 70), "1.0 ZiB"), + (pow(2, 80), "1.0 YiB"), + (pow(2, 90), "1.0 HiB"), + (pow(2, 100), "big"), + ], +) +def test_human_bytes(input_bytes, expected): + assert human_bytes(input_bytes) == expected + + +@pytest.mark.parametrize( + "input_seconds,expected", + [ + (0, "0.0 seconds"), + (30, "30.0 seconds"), + (60, "1.0 minutes"), + (90, "1.5 minutes"), + (125, "2.1 minutes"), + (3600, "1.0 hours"), + (86400, "1.0 days"), + (604800, "1.0 weeks"), + (31449600, "1.0 years"), + (314496000, "1.0 decades"), + ], +) +def test_human_seconds(input_seconds, expected): + assert human_seconds(input_seconds) == expected From 2b306de0fef3e5778cf440096f650788d4b9d84e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0ar=C5=ABnas=20Nejus?= Date: Mon, 5 Aug 2024 18:08:17 +0100 Subject: [PATCH 07/10] Replace assertInResult and assertNotInResult --- test/test_query.py | 46 +++++++++++++++++++--------------------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/test/test_query.py b/test/test_query.py index 776bfd6f6..ecaa19514 100644 --- a/test/test_query.py +++ b/test/test_query.py @@ -44,14 +44,6 @@ class AssertsMixin: def assert_albums_matched(self, results, albums): assert {a.album for a in results} == set(albums) - def assertInResult(self, item, results): - result_ids = [i.id for i in results] - assert item.id in result_ids - - def assertNotInResult(self, item, results): - result_ids = [i.id for i in results] - assert item.id not in result_ids - # A test case class providing a library with some dummy data and some # assertions involving that data. @@ -477,44 +469,44 @@ class BoolQueryTest(BeetsTestCase, AssertsMixin): item_true = self.add_item(comp=True) item_false = self.add_item(comp=False) matched = self.lib.items("comp:true") - self.assertInResult(item_true, matched) - self.assertNotInResult(item_false, matched) + assert item_true.id in {i.id for i in matched} + assert item_false.id not in {i.id for i in matched} def test_flex_parse_true(self): item_true = self.add_item(flexbool=True) item_false = self.add_item(flexbool=False) matched = self.lib.items("flexbool:true") - self.assertInResult(item_true, matched) - self.assertNotInResult(item_false, matched) + assert item_true.id in {i.id for i in matched} + assert item_false.id not in {i.id for i in matched} def test_flex_parse_false(self): item_true = self.add_item(flexbool=True) item_false = self.add_item(flexbool=False) matched = self.lib.items("flexbool:false") - self.assertInResult(item_false, matched) - self.assertNotInResult(item_true, matched) + assert item_false.id in {i.id for i in matched} + assert item_true.id not in {i.id for i in matched} def test_flex_parse_1(self): item_true = self.add_item(flexbool=True) item_false = self.add_item(flexbool=False) matched = self.lib.items("flexbool:1") - self.assertInResult(item_true, matched) - self.assertNotInResult(item_false, matched) + assert item_true.id in {i.id for i in matched} + assert item_false.id not in {i.id for i in matched} def test_flex_parse_0(self): item_true = self.add_item(flexbool=True) item_false = self.add_item(flexbool=False) matched = self.lib.items("flexbool:0") - self.assertInResult(item_false, matched) - self.assertNotInResult(item_true, matched) + assert item_false.id in {i.id for i in matched} + assert item_true.id not in {i.id for i in matched} def test_flex_parse_any_string(self): # TODO this should be the other way around item_true = self.add_item(flexbool=True) item_false = self.add_item(flexbool=False) matched = self.lib.items("flexbool:something") - self.assertInResult(item_false, matched) - self.assertNotInResult(item_true, matched) + assert item_false.id in {i.id for i in matched} + assert item_true.id not in {i.id for i in matched} class DefaultSearchFieldsTest(DummyDataTestCase): @@ -541,33 +533,33 @@ class NoneQueryTest(BeetsTestCase, AssertsMixin): album_item = self.add_album().items().get() matched = self.lib.items(NoneQuery("album_id")) - self.assertInResult(singleton, matched) - self.assertNotInResult(album_item, matched) + assert singleton.id in {i.id for i in matched} + assert album_item.id not in {i.id for i in matched} def test_match_after_set_none(self): item = self.add_item(rg_track_gain=0) matched = self.lib.items(NoneQuery("rg_track_gain")) - self.assertNotInResult(item, matched) + assert item.id not in {i.id for i in matched} item["rg_track_gain"] = None item.store() matched = self.lib.items(NoneQuery("rg_track_gain")) - self.assertInResult(item, matched) + assert item.id in {i.id for i in matched} def test_match_slow(self): item = self.add_item() matched = self.lib.items(NoneQuery("rg_track_peak", fast=False)) - self.assertInResult(item, matched) + assert item.id in {i.id for i in matched} def test_match_slow_after_set_none(self): item = self.add_item(rg_track_gain=0) matched = self.lib.items(NoneQuery("rg_track_gain", fast=False)) - self.assertNotInResult(item, matched) + assert item.id not in {i.id for i in matched} item["rg_track_gain"] = None item.store() matched = self.lib.items(NoneQuery("rg_track_gain", fast=False)) - self.assertInResult(item, matched) + assert item.id in {i.id for i in matched} class NotQueryMatchTest(unittest.TestCase): From 2c6f314f4febd37314ab333a1c4ce29a05cf736b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0ar=C5=ABnas=20Nejus?= Date: Wed, 7 Aug 2024 15:26:33 +0100 Subject: [PATCH 08/10] Replace assertNegationProperties --- test/test_query.py | 189 +++++++++++++++++++++------------------------ 1 file changed, 90 insertions(+), 99 deletions(-) diff --git a/test/test_query.py b/test/test_query.py index ecaa19514..252367c14 100644 --- a/test/test_query.py +++ b/test/test_query.py @@ -627,119 +627,110 @@ class NotQueryMatchTest(unittest.TestCase): dbcore.query.NotQuery(q) -class NotQueryTest(DummyDataTestCase): - """Test `query.NotQuery` against the dummy data: - - `test_type_xxx`: tests for the negation of a particular XxxQuery class. - - `test_get_yyy`: tests on query strings (similar to `GetTest`) - """ +class TestNotQuery: + """Test `query.NotQuery` against the dummy data.""" - def assertNegationProperties(self, q): - """Given a Query `q`, assert that: - - q OR not(q) == all items - - q AND not(q) == 0 - - not(not(q)) == q - """ + @pytest.fixture(autouse=True, scope="class") + def lib(self): + test_case = DummyDataTestCase() + test_case.setUp() + return test_case.lib + + @pytest.mark.parametrize( + "q, expected_results", + [ + ( + dbcore.query.BooleanQuery("comp", True), + {"beets 4 eva"}, + ), + ( + dbcore.query.DateQuery("added", "2000-01-01"), + {"foo bar", "baz qux", "beets 4 eva"}, + ), + ( + dbcore.query.FalseQuery(), + {"foo bar", "baz qux", "beets 4 eva"}, + ), + ( + dbcore.query.MatchQuery("year", "2003"), + {"foo bar", "baz qux"}, + ), + ( + dbcore.query.NoneQuery("rg_track_gain"), + set(), + ), + ( + dbcore.query.NumericQuery("year", "2001..2002"), + {"beets 4 eva"}, + ), + ( + dbcore.query.AnyFieldQuery( + "baz", ["album"], dbcore.query.MatchQuery + ), + {"beets 4 eva"}, + ), + ( + dbcore.query.AndQuery( + [ + dbcore.query.BooleanQuery("comp", True), + dbcore.query.NumericQuery("year", "2002"), + ] + ), + {"foo bar", "beets 4 eva"}, + ), + ( + dbcore.query.OrQuery( + [ + dbcore.query.BooleanQuery("comp", True), + dbcore.query.NumericQuery("year", "2002"), + ] + ), + {"beets 4 eva"}, + ), + ( + dbcore.query.RegexpQuery("artist", "^t"), + {"foo bar"}, + ), + ( + dbcore.query.SubstringQuery("album", "ba"), + {"beets 4 eva"}, + ), + ( + dbcore.query.TrueQuery(), + set(), + ), + ], + ids=lambda x: x.__class__ if isinstance(x, dbcore.query.Query) else "", + ) + def test_query_type(self, lib, q, expected_results): + def get_results(*args): + return {i.title for i in lib.items(*args)} + + # not(a and b) <-> not(a) or not(b) not_q = dbcore.query.NotQuery(q) + not_q_results = get_results(not_q) + assert not_q_results == expected_results + # assert using OrQuery, AndQuery q_or = dbcore.query.OrQuery([q, not_q]) + q_and = dbcore.query.AndQuery([q, not_q]) - self.assert_items_matched_all(self.lib.items(q_or)) - self.assert_items_matched(self.lib.items(q_and), []) + assert get_results(q_or) == {"foo bar", "baz qux", "beets 4 eva"} + assert get_results(q_and) == set() # assert manually checking the item titles - all_titles = {i.title for i in self.lib.items()} - q_results = {i.title for i in self.lib.items(q)} - not_q_results = {i.title for i in self.lib.items(not_q)} + all_titles = get_results() + q_results = get_results(q) assert q_results.union(not_q_results) == all_titles assert q_results.intersection(not_q_results) == set() # round trip not_not_q = dbcore.query.NotQuery(not_q) - assert {i.title for i in self.lib.items(q)} == { - i.title for i in self.lib.items(not_not_q) - } + assert get_results(q) == get_results(not_not_q) - def test_type_and(self): - # not(a and b) <-> not(a) or not(b) - q = dbcore.query.AndQuery( - [ - dbcore.query.BooleanQuery("comp", True), - dbcore.query.NumericQuery("year", "2002"), - ], - ) - not_results = self.lib.items(dbcore.query.NotQuery(q)) - self.assert_items_matched(not_results, ["foo bar", "beets 4 eva"]) - self.assertNegationProperties(q) - def test_type_boolean(self): - q = dbcore.query.BooleanQuery("comp", True) - not_results = self.lib.items(dbcore.query.NotQuery(q)) - self.assert_items_matched(not_results, ["beets 4 eva"]) - self.assertNegationProperties(q) - - def test_type_date(self): - q = dbcore.query.DateQuery("added", "2000-01-01") - not_results = self.lib.items(dbcore.query.NotQuery(q)) - # query date is in the past, thus the 'not' results should contain all - # items - self.assert_items_matched( - not_results, ["foo bar", "baz qux", "beets 4 eva"] - ) - self.assertNegationProperties(q) - - def test_type_false(self): - q = dbcore.query.FalseQuery() - not_results = self.lib.items(dbcore.query.NotQuery(q)) - self.assert_items_matched_all(not_results) - self.assertNegationProperties(q) - - def test_type_match(self): - q = dbcore.query.MatchQuery("year", "2003") - not_results = self.lib.items(dbcore.query.NotQuery(q)) - self.assert_items_matched(not_results, ["foo bar", "baz qux"]) - self.assertNegationProperties(q) - - def test_type_none(self): - q = dbcore.query.NoneQuery("rg_track_gain") - not_results = self.lib.items(dbcore.query.NotQuery(q)) - self.assert_items_matched(not_results, []) - self.assertNegationProperties(q) - - def test_type_numeric(self): - q = dbcore.query.NumericQuery("year", "2001..2002") - not_results = self.lib.items(dbcore.query.NotQuery(q)) - self.assert_items_matched(not_results, ["beets 4 eva"]) - self.assertNegationProperties(q) - - def test_type_or(self): - # not(a or b) <-> not(a) and not(b) - q = dbcore.query.OrQuery( - [ - dbcore.query.BooleanQuery("comp", True), - dbcore.query.NumericQuery("year", "2002"), - ] - ) - not_results = self.lib.items(dbcore.query.NotQuery(q)) - self.assert_items_matched(not_results, ["beets 4 eva"]) - self.assertNegationProperties(q) - - def test_type_regexp(self): - q = dbcore.query.RegexpQuery("artist", "^t") - not_results = self.lib.items(dbcore.query.NotQuery(q)) - self.assert_items_matched(not_results, ["foo bar"]) - self.assertNegationProperties(q) - - def test_type_substring(self): - q = dbcore.query.SubstringQuery("album", "ba") - not_results = self.lib.items(dbcore.query.NotQuery(q)) - self.assert_items_matched(not_results, ["beets 4 eva"]) - self.assertNegationProperties(q) - - def test_type_true(self): - q = dbcore.query.TrueQuery() - not_results = self.lib.items(dbcore.query.NotQuery(q)) - self.assert_items_matched(not_results, []) - self.assertNegationProperties(q) +class NegationPrefixTest(DummyDataTestCase): + """Tests negation prefixes.""" def test_get_prefixes_keyed(self): """Test both negation prefixes on a keyed query.""" From 09b22949c0c9c6d41b5869fd0b5bf58ee0a763b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0ar=C5=ABnas=20Nejus?= Date: Sun, 1 Jun 2025 13:53:34 +0100 Subject: [PATCH 09/10] Refactor test_query And rewrite test_query.py --- beets/test/_common.py | 5 +- test/conftest.py | 20 + test/test_query.py | 1206 +++++++++++++---------------------------- 3 files changed, 413 insertions(+), 818 deletions(-) diff --git a/beets/test/_common.py b/beets/test/_common.py index 86319c011..da81a587c 100644 --- a/beets/test/_common.py +++ b/beets/test/_common.py @@ -63,8 +63,8 @@ HAVE_SYMLINK = sys.platform != "win32" HAVE_HARDLINK = sys.platform != "win32" -def item(lib=None): - i = beets.library.Item( +def item(lib=None, **kwargs): + defaults = dict( title="the title", artist="the artist", albumartist="the album artist", @@ -99,6 +99,7 @@ def item(lib=None): album_id=None, mtime=12345, ) + i = beets.library.Item(**{**defaults, **kwargs}) if lib: lib.add(i) return i diff --git a/test/conftest.py b/test/conftest.py index 95509bdb6..3107ad690 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,7 +1,10 @@ +import inspect import os import pytest +from beets.dbcore.query import Query + def skip_marked_items(items: list[pytest.Item], marker_name: str, reason: str): for item in (i for i in items if i.get_closest_marker(marker_name)): @@ -21,3 +24,20 @@ def pytest_collection_modifyitems( skip_marked_items( items, "on_lyrics_update", "No change in lyrics source code" ) + + +def pytest_make_parametrize_id(config, val, argname): + """Generate readable test identifiers for pytest parametrized tests. + + Provides custom string representations for: + - Query classes/instances: use class name + - Lambda functions: show abbreviated source + - Other values: use standard repr() + """ + if inspect.isclass(val) and issubclass(val, Query): + return val.__name__ + + if inspect.isfunction(val) and val.__name__ == "": + return inspect.getsource(val).split("lambda")[-1][:30] + + return repr(val) diff --git a/test/test_query.py b/test/test_query.py index 252367c14..0ddf83e3a 100644 --- a/test/test_query.py +++ b/test/test_query.py @@ -15,837 +15,39 @@ """Various tests for querying the library database.""" import sys -import unittest +from functools import partial from pathlib import Path import pytest -from mock import patch -from beets import dbcore from beets.dbcore import types from beets.dbcore.query import ( - InvalidQueryArgumentValueError, + AndQuery, + BooleanQuery, + DateQuery, + FalseQuery, + MatchQuery, NoneQuery, + NotQuery, + NumericQuery, + OrQuery, ParsingError, PathQuery, + RegexpQuery, + StringFieldQuery, + StringQuery, + SubstringQuery, + TrueQuery, ) +from beets.library import Item from beets.test import _common -from beets.test.helper import BeetsTestCase, TestHelper +from beets.test.helper import TestHelper # Because the absolute path begins with something like C:, we # can't disambiguate it from an ordinary query. WIN32_NO_IMPLICIT_PATHS = "Implicit paths are not supported on Windows" - -class AssertsMixin: - def assert_items_matched(self, results, titles): - assert {i.title for i in results} == set(titles) - - def assert_albums_matched(self, results, albums): - assert {a.album for a in results} == set(albums) - - -# A test case class providing a library with some dummy data and some -# assertions involving that data. -class DummyDataTestCase(BeetsTestCase, AssertsMixin): - def setUp(self): - super().setUp() - items = [_common.item() for _ in range(3)] - items[0].title = "foo bar" - items[0].artist = "one" - items[0].artists = ["one", "eleven"] - items[0].album = "baz" - items[0].year = 2001 - items[0].comp = True - items[0].genre = "rock" - items[1].title = "baz qux" - items[1].artist = "two" - items[1].artists = ["two", "twelve"] - items[1].album = "baz" - items[1].year = 2002 - items[1].comp = True - items[1].genre = "Rock" - items[2].title = "beets 4 eva" - items[2].artist = "three" - items[2].artists = ["three", "one"] - items[2].album = "foo" - items[2].year = 2003 - items[2].comp = False - items[2].genre = "Hard Rock" - for item in items: - self.lib.add(item) - self.album = self.lib.add_album(items[:2]) - - def assert_items_matched_all(self, results): - self.assert_items_matched( - results, - [ - "foo bar", - "baz qux", - "beets 4 eva", - ], - ) - - -class GetTest(DummyDataTestCase): - def test_get_empty(self): - q = "" - results = self.lib.items(q) - self.assert_items_matched_all(results) - - def test_get_none(self): - q = None - results = self.lib.items(q) - self.assert_items_matched_all(results) - - def test_get_one_keyed_term(self): - q = "title:qux" - results = self.lib.items(q) - self.assert_items_matched(results, ["baz qux"]) - - def test_get_one_keyed_exact(self): - q = "genre:=rock" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar"]) - q = "genre:=Rock" - results = self.lib.items(q) - self.assert_items_matched(results, ["baz qux"]) - q = 'genre:="Hard Rock"' - results = self.lib.items(q) - self.assert_items_matched(results, ["beets 4 eva"]) - - def test_get_one_keyed_exact_nocase(self): - q = 'genre:=~"hard rock"' - results = self.lib.items(q) - self.assert_items_matched(results, ["beets 4 eva"]) - - def test_get_one_keyed_regexp(self): - q = "artist::t.+r" - results = self.lib.items(q) - self.assert_items_matched(results, ["beets 4 eva"]) - - def test_get_one_unkeyed_term(self): - q = "three" - results = self.lib.items(q) - self.assert_items_matched(results, ["beets 4 eva"]) - - def test_get_one_unkeyed_exact(self): - q = "=rock" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar"]) - - def test_get_one_unkeyed_exact_nocase(self): - q = '=~"hard rock"' - results = self.lib.items(q) - self.assert_items_matched(results, ["beets 4 eva"]) - - def test_get_one_unkeyed_regexp(self): - q = ":x$" - results = self.lib.items(q) - self.assert_items_matched(results, ["baz qux"]) - - def test_get_no_matches(self): - q = "popebear" - results = self.lib.items(q) - self.assert_items_matched(results, []) - - def test_invalid_key(self): - q = "pope:bear" - results = self.lib.items(q) - # Matches nothing since the flexattr is not present on the - # objects. - self.assert_items_matched(results, []) - - def test_get_no_matches_exact(self): - q = 'genre:="hard rock"' - results = self.lib.items(q) - self.assert_items_matched(results, []) - - def test_term_case_insensitive(self): - q = "oNE" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar"]) - - def test_regexp_case_sensitive(self): - q = ":oNE" - results = self.lib.items(q) - self.assert_items_matched(results, []) - q = ":one" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar"]) - - def test_term_case_insensitive_with_key(self): - q = "artist:thrEE" - results = self.lib.items(q) - self.assert_items_matched(results, ["beets 4 eva"]) - - def test_term_case_regex_with_multi_key_matches(self): - q = "artists::eleven" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar"]) - - def test_term_case_regex_with_multi_key_matches_multiple_columns(self): - q = "artists::one" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar", "beets 4 eva"]) - - def test_key_case_insensitive(self): - q = "ArTiST:three" - results = self.lib.items(q) - self.assert_items_matched(results, ["beets 4 eva"]) - - def test_keyed_matches_exact_nocase(self): - q = "genre:=~rock" - results = self.lib.items(q) - self.assert_items_matched( - results, - [ - "foo bar", - "baz qux", - ], - ) - - def test_unkeyed_term_matches_multiple_columns(self): - q = "baz" - results = self.lib.items(q) - self.assert_items_matched( - results, - [ - "foo bar", - "baz qux", - ], - ) - - def test_unkeyed_regexp_matches_multiple_columns(self): - q = ":z$" - results = self.lib.items(q) - self.assert_items_matched( - results, - [ - "foo bar", - "baz qux", - ], - ) - - def test_keyed_term_matches_only_one_column(self): - q = "title:baz" - results = self.lib.items(q) - self.assert_items_matched(results, ["baz qux"]) - - def test_keyed_regexp_matches_only_one_column(self): - q = "title::baz" - results = self.lib.items(q) - self.assert_items_matched( - results, - [ - "baz qux", - ], - ) - - def test_multiple_terms_narrow_search(self): - q = "qux baz" - results = self.lib.items(q) - self.assert_items_matched( - results, - [ - "baz qux", - ], - ) - - def test_multiple_regexps_narrow_search(self): - q = ":baz :qux" - results = self.lib.items(q) - self.assert_items_matched(results, ["baz qux"]) - - def test_mixed_terms_regexps_narrow_search(self): - q = ":baz qux" - results = self.lib.items(q) - self.assert_items_matched(results, ["baz qux"]) - - def test_single_year(self): - q = "year:2001" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar"]) - - def test_year_range(self): - q = "year:2000..2002" - results = self.lib.items(q) - self.assert_items_matched( - results, - [ - "foo bar", - "baz qux", - ], - ) - - def test_singleton_true(self): - q = "singleton:true" - results = self.lib.items(q) - self.assert_items_matched(results, ["beets 4 eva"]) - - def test_singleton_1(self): - q = "singleton:1" - results = self.lib.items(q) - self.assert_items_matched(results, ["beets 4 eva"]) - - def test_singleton_false(self): - q = "singleton:false" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar", "baz qux"]) - - def test_singleton_0(self): - q = "singleton:0" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar", "baz qux"]) - - def test_compilation_true(self): - q = "comp:true" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar", "baz qux"]) - - def test_compilation_false(self): - q = "comp:false" - results = self.lib.items(q) - self.assert_items_matched(results, ["beets 4 eva"]) - - def test_unknown_field_name_no_results(self): - q = "xyzzy:nonsense" - results = self.lib.items(q) - titles = [i.title for i in results] - assert titles == [] - - def test_unknown_field_name_no_results_in_album_query(self): - q = "xyzzy:nonsense" - results = self.lib.albums(q) - names = [a.album for a in results] - assert names == [] - - def test_item_field_name_matches_nothing_in_album_query(self): - q = "format:nonsense" - results = self.lib.albums(q) - names = [a.album for a in results] - assert names == [] - - def test_unicode_query(self): - item = self.lib.items().get() - item.title = "caf\xe9" - item.store() - - q = "title:caf\xe9" - results = self.lib.items(q) - self.assert_items_matched(results, ["caf\xe9"]) - - def test_numeric_search_positive(self): - q = dbcore.query.NumericQuery("year", "2001") - results = self.lib.items(q) - assert results - - def test_numeric_search_negative(self): - q = dbcore.query.NumericQuery("year", "1999") - results = self.lib.items(q) - assert not results - - def test_album_field_fallback(self): - self.album["albumflex"] = "foo" - self.album.store() - - q = "albumflex:foo" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar", "baz qux"]) - - def test_invalid_query(self): - with pytest.raises(InvalidQueryArgumentValueError, match="not an int"): - dbcore.query.NumericQuery("year", "199a") - - msg_match = r"not a regular expression.*unterminated subpattern" - with pytest.raises(ParsingError, match=msg_match): - dbcore.query.RegexpQuery("year", "199(") - - -class MatchTest(unittest.TestCase): - def setUp(self): - super().setUp() - self.item = _common.item() - - def test_regex_match_positive(self): - q = dbcore.query.RegexpQuery("album", "^the album$") - assert q.match(self.item) - - def test_regex_match_negative(self): - q = dbcore.query.RegexpQuery("album", "^album$") - assert not q.match(self.item) - - def test_regex_match_non_string_value(self): - q = dbcore.query.RegexpQuery("disc", "^6$") - assert q.match(self.item) - - def test_substring_match_positive(self): - q = dbcore.query.SubstringQuery("album", "album") - assert q.match(self.item) - - def test_substring_match_negative(self): - q = dbcore.query.SubstringQuery("album", "ablum") - assert not q.match(self.item) - - def test_substring_match_non_string_value(self): - q = dbcore.query.SubstringQuery("disc", "6") - assert q.match(self.item) - - def test_exact_match_nocase_positive(self): - q = dbcore.query.StringQuery("genre", "the genre") - assert q.match(self.item) - q = dbcore.query.StringQuery("genre", "THE GENRE") - assert q.match(self.item) - - def test_exact_match_nocase_negative(self): - q = dbcore.query.StringQuery("genre", "genre") - assert not q.match(self.item) - - def test_year_match_positive(self): - q = dbcore.query.NumericQuery("year", "1") - assert q.match(self.item) - - def test_year_match_negative(self): - q = dbcore.query.NumericQuery("year", "10") - assert not q.match(self.item) - - def test_bitrate_range_positive(self): - q = dbcore.query.NumericQuery("bitrate", "100000..200000") - assert q.match(self.item) - - def test_bitrate_range_negative(self): - q = dbcore.query.NumericQuery("bitrate", "200000..300000") - assert not q.match(self.item) - - def test_open_range(self): - dbcore.query.NumericQuery("bitrate", "100000..") - - def test_eq(self): - q1 = dbcore.query.MatchQuery("foo", "bar") - q2 = dbcore.query.MatchQuery("foo", "bar") - q3 = dbcore.query.MatchQuery("foo", "baz") - q4 = dbcore.query.StringFieldQuery("foo", "bar") - assert q1 == q2 - assert q1 != q3 - assert q1 != q4 - assert q3 != q4 - - -class IntQueryTest(BeetsTestCase): - def test_exact_value_match(self): - item = self.add_item(bpm=120) - matched = self.lib.items("bpm:120").get() - assert item.id == matched.id - - def test_range_match(self): - item = self.add_item(bpm=120) - self.add_item(bpm=130) - - matched = self.lib.items("bpm:110..125") - assert 1 == len(matched) - assert item.id == matched.get().id - - @patch("beets.library.Item._types", {"myint": types.Integer()}) - def test_flex_range_match(self): - item = self.add_item(myint=2) - matched = self.lib.items("myint:2").get() - assert item.id == matched.id - - @patch("beets.library.Item._types", {"myint": types.Integer()}) - def test_flex_dont_match_missing(self): - self.add_item() - matched = self.lib.items("myint:2").get() - assert matched is None - - def test_no_substring_match(self): - self.add_item(bpm=120) - matched = self.lib.items("bpm:12").get() - assert matched is None - - -@patch("beets.library.Item._types", {"flexbool": types.Boolean()}) -class BoolQueryTest(BeetsTestCase, AssertsMixin): - def test_parse_true(self): - item_true = self.add_item(comp=True) - item_false = self.add_item(comp=False) - matched = self.lib.items("comp:true") - assert item_true.id in {i.id for i in matched} - assert item_false.id not in {i.id for i in matched} - - def test_flex_parse_true(self): - item_true = self.add_item(flexbool=True) - item_false = self.add_item(flexbool=False) - matched = self.lib.items("flexbool:true") - assert item_true.id in {i.id for i in matched} - assert item_false.id not in {i.id for i in matched} - - def test_flex_parse_false(self): - item_true = self.add_item(flexbool=True) - item_false = self.add_item(flexbool=False) - matched = self.lib.items("flexbool:false") - assert item_false.id in {i.id for i in matched} - assert item_true.id not in {i.id for i in matched} - - def test_flex_parse_1(self): - item_true = self.add_item(flexbool=True) - item_false = self.add_item(flexbool=False) - matched = self.lib.items("flexbool:1") - assert item_true.id in {i.id for i in matched} - assert item_false.id not in {i.id for i in matched} - - def test_flex_parse_0(self): - item_true = self.add_item(flexbool=True) - item_false = self.add_item(flexbool=False) - matched = self.lib.items("flexbool:0") - assert item_false.id in {i.id for i in matched} - assert item_true.id not in {i.id for i in matched} - - def test_flex_parse_any_string(self): - # TODO this should be the other way around - item_true = self.add_item(flexbool=True) - item_false = self.add_item(flexbool=False) - matched = self.lib.items("flexbool:something") - assert item_false.id in {i.id for i in matched} - assert item_true.id not in {i.id for i in matched} - - -class DefaultSearchFieldsTest(DummyDataTestCase): - def test_albums_matches_album(self): - albums = list(self.lib.albums("baz")) - assert len(albums) == 1 - - def test_albums_matches_albumartist(self): - albums = list(self.lib.albums(["album artist"])) - assert len(albums) == 1 - - def test_items_matches_title(self): - items = self.lib.items("beets") - self.assert_items_matched(items, ["beets 4 eva"]) - - def test_items_does_not_match_year(self): - items = self.lib.items("2001") - self.assert_items_matched(items, []) - - -class NoneQueryTest(BeetsTestCase, AssertsMixin): - def test_match_singletons(self): - singleton = self.add_item() - album_item = self.add_album().items().get() - - matched = self.lib.items(NoneQuery("album_id")) - assert singleton.id in {i.id for i in matched} - assert album_item.id not in {i.id for i in matched} - - def test_match_after_set_none(self): - item = self.add_item(rg_track_gain=0) - matched = self.lib.items(NoneQuery("rg_track_gain")) - assert item.id not in {i.id for i in matched} - - item["rg_track_gain"] = None - item.store() - matched = self.lib.items(NoneQuery("rg_track_gain")) - assert item.id in {i.id for i in matched} - - def test_match_slow(self): - item = self.add_item() - matched = self.lib.items(NoneQuery("rg_track_peak", fast=False)) - assert item.id in {i.id for i in matched} - - def test_match_slow_after_set_none(self): - item = self.add_item(rg_track_gain=0) - matched = self.lib.items(NoneQuery("rg_track_gain", fast=False)) - assert item.id not in {i.id for i in matched} - - item["rg_track_gain"] = None - item.store() - matched = self.lib.items(NoneQuery("rg_track_gain", fast=False)) - assert item.id in {i.id for i in matched} - - -class NotQueryMatchTest(unittest.TestCase): - """Test `query.NotQuery` matching against a single item, using the same - cases and assertions as on `MatchTest`, plus assertion on the negated - queries (ie. assert q -> assert not NotQuery(q)). - """ - - def setUp(self): - super().setUp() - self.item = _common.item() - - def test_regex_match_positive(self): - q = dbcore.query.RegexpQuery("album", "^the album$") - assert q.match(self.item) - assert not dbcore.query.NotQuery(q).match(self.item) - - def test_regex_match_negative(self): - q = dbcore.query.RegexpQuery("album", "^album$") - assert not q.match(self.item) - assert dbcore.query.NotQuery(q).match(self.item) - - def test_regex_match_non_string_value(self): - q = dbcore.query.RegexpQuery("disc", "^6$") - assert q.match(self.item) - assert not dbcore.query.NotQuery(q).match(self.item) - - def test_substring_match_positive(self): - q = dbcore.query.SubstringQuery("album", "album") - assert q.match(self.item) - assert not dbcore.query.NotQuery(q).match(self.item) - - def test_substring_match_negative(self): - q = dbcore.query.SubstringQuery("album", "ablum") - assert not q.match(self.item) - assert dbcore.query.NotQuery(q).match(self.item) - - def test_substring_match_non_string_value(self): - q = dbcore.query.SubstringQuery("disc", "6") - assert q.match(self.item) - assert not dbcore.query.NotQuery(q).match(self.item) - - def test_year_match_positive(self): - q = dbcore.query.NumericQuery("year", "1") - assert q.match(self.item) - assert not dbcore.query.NotQuery(q).match(self.item) - - def test_year_match_negative(self): - q = dbcore.query.NumericQuery("year", "10") - assert not q.match(self.item) - assert dbcore.query.NotQuery(q).match(self.item) - - def test_bitrate_range_positive(self): - q = dbcore.query.NumericQuery("bitrate", "100000..200000") - assert q.match(self.item) - assert not dbcore.query.NotQuery(q).match(self.item) - - def test_bitrate_range_negative(self): - q = dbcore.query.NumericQuery("bitrate", "200000..300000") - assert not q.match(self.item) - assert dbcore.query.NotQuery(q).match(self.item) - - def test_open_range(self): - q = dbcore.query.NumericQuery("bitrate", "100000..") - dbcore.query.NotQuery(q) - - -class TestNotQuery: - """Test `query.NotQuery` against the dummy data.""" - - @pytest.fixture(autouse=True, scope="class") - def lib(self): - test_case = DummyDataTestCase() - test_case.setUp() - return test_case.lib - - @pytest.mark.parametrize( - "q, expected_results", - [ - ( - dbcore.query.BooleanQuery("comp", True), - {"beets 4 eva"}, - ), - ( - dbcore.query.DateQuery("added", "2000-01-01"), - {"foo bar", "baz qux", "beets 4 eva"}, - ), - ( - dbcore.query.FalseQuery(), - {"foo bar", "baz qux", "beets 4 eva"}, - ), - ( - dbcore.query.MatchQuery("year", "2003"), - {"foo bar", "baz qux"}, - ), - ( - dbcore.query.NoneQuery("rg_track_gain"), - set(), - ), - ( - dbcore.query.NumericQuery("year", "2001..2002"), - {"beets 4 eva"}, - ), - ( - dbcore.query.AnyFieldQuery( - "baz", ["album"], dbcore.query.MatchQuery - ), - {"beets 4 eva"}, - ), - ( - dbcore.query.AndQuery( - [ - dbcore.query.BooleanQuery("comp", True), - dbcore.query.NumericQuery("year", "2002"), - ] - ), - {"foo bar", "beets 4 eva"}, - ), - ( - dbcore.query.OrQuery( - [ - dbcore.query.BooleanQuery("comp", True), - dbcore.query.NumericQuery("year", "2002"), - ] - ), - {"beets 4 eva"}, - ), - ( - dbcore.query.RegexpQuery("artist", "^t"), - {"foo bar"}, - ), - ( - dbcore.query.SubstringQuery("album", "ba"), - {"beets 4 eva"}, - ), - ( - dbcore.query.TrueQuery(), - set(), - ), - ], - ids=lambda x: x.__class__ if isinstance(x, dbcore.query.Query) else "", - ) - def test_query_type(self, lib, q, expected_results): - def get_results(*args): - return {i.title for i in lib.items(*args)} - - # not(a and b) <-> not(a) or not(b) - not_q = dbcore.query.NotQuery(q) - not_q_results = get_results(not_q) - assert not_q_results == expected_results - - # assert using OrQuery, AndQuery - q_or = dbcore.query.OrQuery([q, not_q]) - - q_and = dbcore.query.AndQuery([q, not_q]) - assert get_results(q_or) == {"foo bar", "baz qux", "beets 4 eva"} - assert get_results(q_and) == set() - - # assert manually checking the item titles - all_titles = get_results() - q_results = get_results(q) - assert q_results.union(not_q_results) == all_titles - assert q_results.intersection(not_q_results) == set() - - # round trip - not_not_q = dbcore.query.NotQuery(not_q) - assert get_results(q) == get_results(not_not_q) - - -class NegationPrefixTest(DummyDataTestCase): - """Tests negation prefixes.""" - - def test_get_prefixes_keyed(self): - """Test both negation prefixes on a keyed query.""" - q0 = "-title:qux" - q1 = "^title:qux" - results0 = self.lib.items(q0) - results1 = self.lib.items(q1) - self.assert_items_matched(results0, ["foo bar", "beets 4 eva"]) - self.assert_items_matched(results1, ["foo bar", "beets 4 eva"]) - - def test_get_prefixes_unkeyed(self): - """Test both negation prefixes on an unkeyed query.""" - q0 = "-qux" - q1 = "^qux" - results0 = self.lib.items(q0) - results1 = self.lib.items(q1) - self.assert_items_matched(results0, ["foo bar", "beets 4 eva"]) - self.assert_items_matched(results1, ["foo bar", "beets 4 eva"]) - - def test_get_one_keyed_regexp(self): - q = "-artist::t.+r" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar", "baz qux"]) - - def test_get_one_unkeyed_regexp(self): - q = "-:x$" - results = self.lib.items(q) - self.assert_items_matched(results, ["foo bar", "beets 4 eva"]) - - def test_get_multiple_terms(self): - q = "baz -bar" - results = self.lib.items(q) - self.assert_items_matched(results, ["baz qux"]) - - def test_get_mixed_terms(self): - q = "baz -title:bar" - results = self.lib.items(q) - self.assert_items_matched(results, ["baz qux"]) - - def test_fast_vs_slow(self): - """Test that the results are the same regardless of the `fast` flag - for negated `FieldQuery`s. - - TODO: investigate NoneQuery(fast=False), as it is raising - AttributeError: type object 'NoneQuery' has no attribute 'field' - at NoneQuery.match() (due to being @classmethod, and no self?) - """ - classes = [ - (dbcore.query.DateQuery, ["added", "2001-01-01"]), - (dbcore.query.MatchQuery, ["artist", "one"]), - # (dbcore.query.NoneQuery, ['rg_track_gain']), - (dbcore.query.NumericQuery, ["year", "2002"]), - (dbcore.query.StringFieldQuery, ["year", "2001"]), - (dbcore.query.RegexpQuery, ["album", "^.a"]), - (dbcore.query.SubstringQuery, ["title", "x"]), - ] - - for klass, args in classes: - q_fast = dbcore.query.NotQuery(klass(*(args + [True]))) - q_slow = dbcore.query.NotQuery(klass(*(args + [False]))) - - try: - assert [i.title for i in self.lib.items(q_fast)] == [ - i.title for i in self.lib.items(q_slow) - ] - except NotImplementedError: - # ignore classes that do not provide `fast` implementation - pass - - -class RelatedQueriesTest(BeetsTestCase, AssertsMixin): - """Test album-level queries with track-level filters and vice-versa.""" - - def setUp(self): - super().setUp() - - albums = [] - for album_idx in range(1, 3): - album_name = f"Album{album_idx}" - album_items = [] - for item_idx in range(1, 3): - item = _common.item() - item.album = album_name - item.title = f"{album_name} Item{item_idx}" - self.lib.add(item) - album_items.append(item) - album = self.lib.add_album(album_items) - album.artpath = f"{album_name} Artpath" - album.catalognum = "ABC" - album.store() - albums.append(album) - - self.album, self.another_album = albums - - def test_get_albums_filter_by_track_field(self): - q = "title:Album1" - results = self.lib.albums(q) - self.assert_albums_matched(results, ["Album1"]) - - def test_get_items_filter_by_album_field(self): - q = "artpath::Album1" - results = self.lib.items(q) - self.assert_items_matched(results, ["Album1 Item1", "Album1 Item2"]) - - def test_filter_albums_by_common_field(self): - # title:Album1 ensures that the items table is joined for the query - q = "title:Album1 Album1" - results = self.lib.albums(q) - self.assert_albums_matched(results, ["Album1"]) - - def test_filter_items_by_common_field(self): - # artpath::A ensures that the albums table is joined for the query - q = "artpath::A Album1" - results = self.lib.items(q) - self.assert_items_matched(results, ["Album1 Item1", "Album1 Item2"]) +_p = pytest.param @pytest.fixture(scope="class") @@ -858,8 +60,228 @@ def helper(): helper.teardown_beets() +class TestGet: + @pytest.fixture(scope="class") + def lib(self, helper): + album_items = [ + helper.create_item( + title="first", + artist="one", + artists=["one", "eleven"], + album="baz", + year=2001, + comp=True, + genre="rock", + ), + helper.create_item( + title="second", + artist="two", + artists=["two", "twelve"], + album="baz", + year=2002, + comp=True, + genre="Rock", + ), + ] + album = helper.lib.add_album(album_items) + album.albumflex = "foo" + album.store() + + helper.add_item( + title="third", + artist="three", + artists=["three", "one"], + album="foo", + year=2003, + comp=False, + genre="Hard Rock", + comments="caf\xe9", + ) + + return helper.lib + + @pytest.mark.parametrize( + "q, expected_titles", + [ + ("", ["first", "second", "third"]), + (None, ["first", "second", "third"]), + (":oNE", []), + (":one", ["first"]), + (":sec :ond", ["second"]), + (":second", ["second"]), + ("=rock", ["first"]), + ('=~"hard rock"', ["third"]), + (":t$", ["first"]), + ("oNE", ["first"]), + ("baz", ["first", "second"]), + ("sec ond", ["second"]), + ("three", ["third"]), + ("albumflex:foo", ["first", "second"]), + ("artist::t.+r", ["third"]), + ("artist:thrEE", ["third"]), + ("artists::eleven", ["first"]), + ("artists::one", ["first", "third"]), + ("ArTiST:three", ["third"]), + ("comments:caf\xe9", ["third"]), + ("comp:true", ["first", "second"]), + ("comp:false", ["third"]), + ("genre:=rock", ["first"]), + ("genre:=Rock", ["second"]), + ('genre:="Hard Rock"', ["third"]), + ('genre:=~"hard rock"', ["third"]), + ("genre:=~rock", ["first", "second"]), + ('genre:="hard rock"', []), + ("popebear", []), + ("pope:bear", []), + ("singleton:true", ["third"]), + ("singleton:1", ["third"]), + ("singleton:false", ["first", "second"]), + ("singleton:0", ["first", "second"]), + ("title:ond", ["second"]), + ("title::sec", ["second"]), + ("year:2001", ["first"]), + ("year:2000..2002", ["first", "second"]), + ("xyzzy:nonsense", []), + ], + ) + def test_get_query(self, lib, q, expected_titles): + assert {i.title for i in lib.items(q)} == set(expected_titles) + + @pytest.mark.parametrize( + "q, expected_titles", + [ + (BooleanQuery("comp", True), ("third",)), + (DateQuery("added", "2000-01-01"), ("first", "second", "third")), + (FalseQuery(), ("first", "second", "third")), + (MatchQuery("year", "2003"), ("first", "second")), + (NoneQuery("rg_track_gain"), ()), + (NumericQuery("year", "2001..2002"), ("third",)), + ( + AndQuery( + [BooleanQuery("comp", True), NumericQuery("year", "2002")] + ), + ("first", "third"), + ), + ( + OrQuery( + [BooleanQuery("comp", True), NumericQuery("year", "2002")] + ), + ("third",), + ), + (RegexpQuery("artist", "^t"), ("first",)), + (SubstringQuery("album", "ba"), ("third",)), + (TrueQuery(), ()), + ], + ) + def test_query_logic(self, lib, q, expected_titles): + def get_results(*args): + return {i.title for i in lib.items(*args)} + + # not(a and b) <-> not(a) or not(b) + not_q = NotQuery(q) + not_q_results = get_results(not_q) + assert not_q_results == set(expected_titles) + + # assert using OrQuery, AndQuery + q_or = OrQuery([q, not_q]) + + q_and = AndQuery([q, not_q]) + assert get_results(q_or) == {"first", "second", "third"} + assert get_results(q_and) == set() + + # assert manually checking the item titles + all_titles = get_results() + q_results = get_results(q) + assert q_results.union(not_q_results) == all_titles + assert q_results.intersection(not_q_results) == set() + + # round trip + not_not_q = NotQuery(not_q) + assert get_results(q) == get_results(not_not_q) + + @pytest.mark.parametrize( + "q, expected_titles", + [ + ("-artist::t.+r", ["first", "second"]), + ("-:t$", ["second", "third"]), + ("sec -bar", ["second"]), + ("sec -title:bar", ["second"]), + ("-ond", ["first", "third"]), + ("^ond", ["first", "third"]), + ("^title:sec", ["first", "third"]), + ("-title:sec", ["first", "third"]), + ], + ) + def test_negation_prefix(self, lib, q, expected_titles): + actual_titles = {i.title for i in lib.items(q)} + assert actual_titles == set(expected_titles) + + @pytest.mark.parametrize( + "make_q", + [ + partial(DateQuery, "added", "2001-01-01"), + partial(MatchQuery, "artist", "one"), + partial(NoneQuery, "rg_track_gain"), + partial(NumericQuery, "year", "2002"), + partial(StringQuery, "year", "2001"), + partial(RegexpQuery, "album", "^.a"), + partial(SubstringQuery, "title", "x"), + ], + ) + def test_fast_vs_slow(self, lib, make_q): + """Test that the results are the same regardless of the `fast` flag + for negated `FieldQuery`s. + """ + q_fast = make_q(True) + q_slow = make_q(False) + + assert list(map(dict, lib.items(q_fast))) == list( + map(dict, lib.items(q_slow)) + ) + + +class TestMatch: + @pytest.fixture(scope="class") + def item(self): + return _common.item( + album="the album", + disc=6, + genre="the genre", + year=1, + bitrate=128000, + ) + + @pytest.mark.parametrize( + "q, should_match", + [ + (RegexpQuery("album", "^the album$"), True), + (RegexpQuery("album", "^album$"), False), + (RegexpQuery("disc", "^6$"), True), + (SubstringQuery("album", "album"), True), + (SubstringQuery("album", "ablum"), False), + (SubstringQuery("disc", "6"), True), + (StringQuery("genre", "the genre"), True), + (StringQuery("genre", "THE GENRE"), True), + (StringQuery("genre", "genre"), False), + (NumericQuery("year", "1"), True), + (NumericQuery("year", "10"), False), + (NumericQuery("bitrate", "100000..200000"), True), + (NumericQuery("bitrate", "200000..300000"), False), + (NumericQuery("bitrate", "100000.."), True), + ], + ) + def test_match(self, item, q, should_match): + assert q.match(item) == should_match + assert not NotQuery(q).match(item) == should_match + + class TestPathQuery: - _p = pytest.param + """Tests for path-based querying functionality in the database system. + + Verifies that path queries correctly match items by their file paths, + handling special characters, case sensitivity, parent directories, + and path separator detection across different platforms. + """ @pytest.fixture(scope="class") def lib(self, helper): @@ -889,6 +311,7 @@ class TestPathQuery: ], ) def test_explicit(self, monkeypatch, lib, q, expected_titles): + """Test explicit path queries with different path specifications.""" monkeypatch.setattr("beets.util.case_sensitive", lambda *_: True) assert {i.title for i in lib.items(q)} == set(expected_titles) @@ -904,6 +327,7 @@ class TestPathQuery: ], ) def test_implicit(self, monkeypatch, lib, q, expected_titles): + """Test implicit path detection when queries contain path separators.""" monkeypatch.setattr( "beets.dbcore.query.PathQuery.is_path_query", lambda path: True ) @@ -920,6 +344,7 @@ class TestPathQuery: def test_case_sensitivity( self, lib, monkeypatch, case_sensitive, expected_titles ): + """Test path matching with different case sensitivity settings.""" q = "path:/a/b/c2.mp3" monkeypatch.setattr( "beets.util.case_sensitive", lambda *_: case_sensitive @@ -943,6 +368,7 @@ class TestPathQuery: ], ) def test_path_sep_detection(self, monkeypatch, tmp_path, q, is_path_query): + """Test detection of path queries based on the presence of path separators.""" monkeypatch.chdir(tmp_path) (tmp_path / "foo").mkdir() (tmp_path / "foo" / "bar").touch() @@ -950,3 +376,151 @@ class TestPathQuery: q = str(tmp_path / q[1:]) assert PathQuery.is_path_query(q) == is_path_query + + +class TestQuery: + ALBUM = "album title" + SINGLE = "singleton" + + @pytest.fixture(scope="class") + def lib(self, helper): + helper.add_album( + title=self.ALBUM, + comp=True, + flexbool=True, + bpm=120, + flexint=2, + rg_track_gain=0, + ) + helper.add_item( + title=self.SINGLE, comp=False, flexbool=False, rg_track_gain=None + ) + + with pytest.MonkeyPatch.context() as monkeypatch: + monkeypatch.setattr( + Item, + "_types", + {"flexbool": types.Boolean(), "flexint": types.Integer()}, + ) + yield helper.lib + + @pytest.mark.parametrize("query_class", [MatchQuery, StringFieldQuery]) + def test_equality(self, query_class): + assert query_class("foo", "bar") == query_class("foo", "bar") + + @pytest.mark.parametrize( + "make_q, expected_msg", + [ + (lambda: NumericQuery("year", "199a"), "not an int"), + (lambda: RegexpQuery("year", "199("), r"not a regular expression.*unterminated subpattern"), # noqa: E501 + ] + ) # fmt: skip + def test_invalid_query(self, make_q, expected_msg): + with pytest.raises(ParsingError, match=expected_msg): + make_q() + + @pytest.mark.parametrize( + "q, expected_titles", + [ + # Boolean value + _p("comp:true", {ALBUM}, id="parse-true"), + _p("flexbool:true", {ALBUM}, id="flex-parse-true"), + _p("flexbool:false", {SINGLE}, id="flex-parse-false"), + _p("flexbool:1", {ALBUM}, id="flex-parse-1"), + _p("flexbool:0", {SINGLE}, id="flex-parse-0"), + # TODO: shouldn't this match 1 / true instead? + _p("flexbool:something", {SINGLE}, id="flex-parse-true"), + # Integer value + _p("bpm:120", {ALBUM}, id="int-exact-value"), + _p("bpm:110..125", {ALBUM}, id="int-range"), + _p("flexint:2", {ALBUM}, id="int-flex"), + _p("flexint:3", set(), id="int-no-match"), + _p("bpm:12", set(), id="int-dont-match-substring"), + # None value + _p(NoneQuery("album_id"), {SINGLE}, id="none-match-singleton"), + _p(NoneQuery("rg_track_gain"), {SINGLE}, id="none-value"), + ], + ) + def test_value_type(self, lib, q, expected_titles): + assert {i.title for i in lib.items(q)} == expected_titles + + +class TestDefaultSearchFields: + @pytest.fixture(scope="class") + def lib(self, helper): + helper.add_album( + title="title", + album="album", + albumartist="albumartist", + catalognum="catalognum", + year=2001, + ) + + return helper.lib + + @pytest.mark.parametrize( + "entity, q, should_match", + [ + _p("albums", "album", True, id="album-match-album"), + _p("albums", "albumartist", True, id="album-match-albumartist"), + _p("albums", "catalognum", False, id="album-dont-match-catalognum"), + _p("items", "title", True, id="item-match-title"), + _p("items", "2001", False, id="item-dont-match-year"), + ], + ) + def test_search(self, lib, entity, q, should_match): + assert bool(getattr(lib, entity)(q)) == should_match + + +class TestRelatedQueries: + """Test album-level queries with track-level filters and vice-versa.""" + + @pytest.fixture(scope="class") + def lib(self, helper): + for album_idx in range(1, 3): + album_name = f"Album{album_idx}" + items = [ + helper.create_item( + album=album_name, title=f"{album_name} Item{idx}" + ) + for idx in range(1, 3) + ] + album = helper.lib.add_album(items) + album.artpath = f"{album_name} Artpath" + album.catalognum = "ABC" + album.store() + + return helper.lib + + @pytest.mark.parametrize( + "q, expected_titles, expected_albums", + [ + _p( + "title:Album1", + ["Album1 Item1", "Album1 Item2"], + ["Album1"], + id="match-album-with-item-field-query", + ), + _p( + "title:Item2", + ["Album1 Item2", "Album2 Item2"], + ["Album1", "Album2"], + id="match-albums-with-item-field-query", + ), + _p( + "artpath::Album1", + ["Album1 Item1", "Album1 Item2"], + ["Album1"], + id="match-items-with-album-field-query", + ), + _p( + "catalognum:ABC Album1", + ["Album1 Item1", "Album1 Item2"], + ["Album1"], + id="query-field-common-to-album-and-item", + ), + ], + ) + def test_related_query(self, lib, q, expected_titles, expected_albums): + assert {i.album for i in lib.albums(q)} == set(expected_albums) + assert {i.title for i in lib.items(q)} == set(expected_titles) From 443ed578dcb717dcd51a354a4e48e7e9669f30bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0ar=C5=ABnas=20Nejus?= Date: Sun, 1 Jun 2025 13:56:15 +0100 Subject: [PATCH 10/10] Standardize abstract methods for coverage --- beets/dbcore/query.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/beets/dbcore/query.py b/beets/dbcore/query.py index 9cff082a3..7d9f0cee7 100644 --- a/beets/dbcore/query.py +++ b/beets/dbcore/query.py @@ -85,6 +85,7 @@ class Query(ABC): """Return a set with field names that this query operates on.""" return set() + @abstractmethod def clause(self) -> tuple[str | None, Sequence[Any]]: """Generate an SQLite expression implementing the query. @@ -95,14 +96,12 @@ class Query(ABC): The default implementation returns None, falling back to a slow query using `match()`. """ - return None, () @abstractmethod def match(self, obj: Model): """Check whether this query matches a given Model. Can be used to perform queries on arbitrary sets of Model. """ - ... def __and__(self, other: Query) -> AndQuery: return AndQuery([self, other]) @@ -152,7 +151,7 @@ class FieldQuery(Query, Generic[P]): self.fast = fast def col_clause(self) -> tuple[str, Sequence[SQLiteType]]: - return self.field, () + raise NotImplementedError def clause(self) -> tuple[str | None, Sequence[SQLiteType]]: if self.fast: @@ -164,7 +163,7 @@ class FieldQuery(Query, Generic[P]): @classmethod def value_match(cls, pattern: P, value: Any): """Determine whether the value matches the pattern.""" - raise NotImplementedError() + raise NotImplementedError def match(self, obj: Model) -> bool: return self.value_match(self.pattern, obj.get(self.field_name)) @@ -234,7 +233,7 @@ class StringFieldQuery(FieldQuery[P]): """Determine whether the value matches the pattern. Both arguments are strings. Subclasses implement this method. """ - raise NotImplementedError() + raise NotImplementedError class StringQuery(StringFieldQuery[str]):