Merge branch 'master' into feature/add-artist-to-item-entry-template

This commit is contained in:
Šarūnas Nejus 2025-07-06 16:43:37 +01:00 committed by GitHub
commit 0de27cbfb3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 955 additions and 1511 deletions

View file

@ -48,4 +48,6 @@ f36bc497c8c8f89004f3f6879908d3f0b25123e1
# Fix formatting
c490ac5810b70f3cf5fd8649669838e8fdb19f4d
# Importer restructure
9147577b2b19f43ca827e9650261a86fb0450cef
9147577b2b19f43ca827e9650261a86fb0450cef
# Copy paste query, types from library to dbcore
1a045c91668c771686f4c871c84f1680af2e944b

View file

@ -16,12 +16,13 @@
from __future__ import annotations
import os
import re
import unicodedata
from abc import ABC, abstractmethod
from collections.abc import Iterator, MutableSequence, Sequence
from datetime import datetime, timedelta
from functools import reduce
from functools import cached_property, reduce
from operator import mul, or_
from re import Pattern
from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union
@ -29,13 +30,19 @@ from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union
from beets import util
if TYPE_CHECKING:
from beets.dbcore import Model
from beets.dbcore.db import AnyModel
from beets.dbcore.db import AnyModel, Model
P = TypeVar("P", default=Any)
else:
P = TypeVar("P")
# To use the SQLite "blob" type, it doesn't suffice to provide a byte
# string; SQLite treats that as encoded text. Wrapping it in a
# `memoryview` tells it that we actually mean non-text data.
# needs to be defined in here due to circular import.
# TODO: remove it from this module and define it in dbcore/types.py instead
BLOB_TYPE = memoryview
class ParsingError(ValueError):
"""Abstract class for any unparsable user-requested album/query
@ -78,6 +85,7 @@ class Query(ABC):
"""Return a set with field names that this query operates on."""
return set()
@abstractmethod
def clause(self) -> tuple[str | None, Sequence[Any]]:
"""Generate an SQLite expression implementing the query.
@ -88,14 +96,12 @@ class Query(ABC):
The default implementation returns None, falling back to a slow query
using `match()`.
"""
return None, ()
@abstractmethod
def match(self, obj: Model):
"""Check whether this query matches a given Model. Can be used to
perform queries on arbitrary sets of Model.
"""
...
def __and__(self, other: Query) -> AndQuery:
return AndQuery([self, other])
@ -145,7 +151,7 @@ class FieldQuery(Query, Generic[P]):
self.fast = fast
def col_clause(self) -> tuple[str, Sequence[SQLiteType]]:
return self.field, ()
raise NotImplementedError
def clause(self) -> tuple[str | None, Sequence[SQLiteType]]:
if self.fast:
@ -157,7 +163,7 @@ class FieldQuery(Query, Generic[P]):
@classmethod
def value_match(cls, pattern: P, value: Any):
"""Determine whether the value matches the pattern."""
raise NotImplementedError()
raise NotImplementedError
def match(self, obj: Model) -> bool:
return self.value_match(self.pattern, obj.get(self.field_name))
@ -227,7 +233,7 @@ class StringFieldQuery(FieldQuery[P]):
"""Determine whether the value matches the pattern. Both
arguments are strings. Subclasses implement this method.
"""
raise NotImplementedError()
raise NotImplementedError
class StringQuery(StringFieldQuery[str]):
@ -267,6 +273,91 @@ class SubstringQuery(StringFieldQuery[str]):
return pattern.lower() in value.lower()
class PathQuery(FieldQuery[bytes]):
"""A query that matches all items under a given path.
Matching can either be case-insensitive or case-sensitive. By
default, the behavior depends on the OS: case-insensitive on Windows
and case-sensitive otherwise.
"""
def __init__(self, field: str, pattern: bytes, fast: bool = True) -> None:
"""Create a path query.
`pattern` must be a path, either to a file or a directory.
"""
path = util.normpath(pattern)
# Case sensitivity depends on the filesystem that the query path is located on.
self.case_sensitive = util.case_sensitive(path)
# Use a normalized-case pattern for case-insensitive matches.
if not self.case_sensitive:
# We need to lowercase the entire path, not just the pattern.
# In particular, on Windows, the drive letter is otherwise not
# lowercased.
# This also ensures that the `match()` method below and the SQL
# from `col_clause()` do the same thing.
path = path.lower()
super().__init__(field, path, fast)
@cached_property
def dir_path(self) -> bytes:
return os.path.join(self.pattern, b"")
@staticmethod
def is_path_query(query_part: str) -> bool:
"""Try to guess whether a unicode query part is a path query.
The path query must
1. precede the colon in the query, if a colon is present
2. contain either ``os.sep`` or ``os.altsep`` (Windows)
3. this path must exist on the filesystem.
"""
query_part = query_part.split(":")[0]
return (
# make sure the query part contains a path separator
bool(set(query_part) & {os.sep, os.altsep})
and os.path.exists(util.normpath(query_part))
)
def match(self, obj: Model) -> bool:
"""Check whether a model object's path matches this query.
Performs either an exact match against the pattern or checks if the path
starts with the given directory path. Case sensitivity depends on the object's
filesystem as determined during initialization.
"""
path = obj.path if self.case_sensitive else obj.path.lower()
return (path == self.pattern) or path.startswith(self.dir_path)
def col_clause(self) -> tuple[str, Sequence[SQLiteType]]:
"""Generate an SQL clause that implements path matching in the database.
Returns a tuple of SQL clause string and parameter values list that matches
paths either exactly or by directory prefix. Handles case sensitivity
appropriately using BYTELOWER for case-insensitive matches.
"""
if self.case_sensitive:
left, right = self.field, "?"
else:
left, right = f"BYTELOWER({self.field})", "BYTELOWER(?)"
return f"({left} = {right}) || (substr({left}, 1, ?) = {right})", [
BLOB_TYPE(self.pattern),
len(dir_blob := BLOB_TYPE(self.dir_path)),
dir_blob,
]
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}({self.field!r}, {self.pattern!r}, "
f"fast={self.fast}, case_sensitive={self.case_sensitive})"
)
class RegexpQuery(StringFieldQuery[Pattern[str]]):
"""A query that matches a regular expression in a specific Model field.
@ -844,6 +935,24 @@ class DurationQuery(NumericQuery):
)
class SingletonQuery(FieldQuery[str]):
"""This query is responsible for the 'singleton' lookup.
It is based on the FieldQuery and constructs a SQL clause
'album_id is NULL' which yields the same result as the previous filter
in Python but is more performant since it's done in SQL.
Using util.str2bool ensures that lookups like singleton:true, singleton:1
and singleton:false, singleton:0 are handled consistently.
"""
def __new__(cls, field: str, value: str, *args, **kwargs):
query = NoneQuery("album_id")
if util.str2bool(value):
return query
return NotQuery(query)
# Sorting.

View file

@ -16,19 +16,20 @@
from __future__ import annotations
import re
import time
import typing
from abc import ABC
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
from beets.util import str2bool
import beets
from beets import util
from beets.util.units import human_seconds_short, raw_seconds_short
from .query import (
BooleanQuery,
FieldQueryType,
NumericQuery,
SQLiteType,
SubstringQuery,
)
from . import query
SQLiteType = query.SQLiteType
BLOB_TYPE = query.BLOB_TYPE
class ModelType(typing.Protocol):
@ -61,7 +62,7 @@ class Type(ABC, Generic[T, N]):
"""The SQLite column type for the value.
"""
query: FieldQueryType = SubstringQuery
query: query.FieldQueryType = query.SubstringQuery
"""The `Query` subclass to be used when querying the field.
"""
@ -160,7 +161,7 @@ class BaseInteger(Type[int, N]):
"""A basic integer type."""
sql = "INTEGER"
query = NumericQuery
query = query.NumericQuery
model_type = int
def normalize(self, value: Any) -> int | N:
@ -241,7 +242,7 @@ class BaseFloat(Type[float, N]):
"""
sql = "REAL"
query: FieldQueryType = NumericQuery
query: query.FieldQueryType = query.NumericQuery
model_type = float
def __init__(self, digits: int = 1):
@ -271,7 +272,7 @@ class BaseString(Type[T, N]):
"""A Unicode string type."""
sql = "TEXT"
query = SubstringQuery
query = query.SubstringQuery
def normalize(self, value: Any) -> T | N:
if value is None:
@ -312,14 +313,145 @@ class Boolean(Type):
"""A boolean type."""
sql = "INTEGER"
query = BooleanQuery
query = query.BooleanQuery
model_type = bool
def format(self, value: bool) -> str:
return str(bool(value))
def parse(self, string: str) -> bool:
return str2bool(string)
return util.str2bool(string)
class DateType(Float):
# TODO representation should be `datetime` object
# TODO distinguish between date and time types
query = query.DateQuery
def format(self, value):
return time.strftime(
beets.config["time_format"].as_str(), time.localtime(value or 0)
)
def parse(self, string):
try:
# Try a formatted date string.
return time.mktime(
time.strptime(string, beets.config["time_format"].as_str())
)
except ValueError:
# Fall back to a plain timestamp number.
try:
return float(string)
except ValueError:
return self.null
class BasePathType(Type[bytes, N]):
"""A dbcore type for filesystem paths.
These are represented as `bytes` objects, in keeping with
the Unix filesystem abstraction.
"""
sql = "BLOB"
query = query.PathQuery
model_type = bytes
def parse(self, string: str) -> bytes:
return util.normpath(string)
def normalize(self, value: Any) -> bytes | N:
if isinstance(value, str):
# Paths stored internally as encoded bytes.
return util.bytestring_path(value)
elif isinstance(value, BLOB_TYPE):
# We unwrap buffers to bytes.
return bytes(value)
else:
return value
def from_sql(self, sql_value):
return self.normalize(sql_value)
def to_sql(self, value: bytes) -> BLOB_TYPE:
if isinstance(value, bytes):
value = BLOB_TYPE(value)
return value
class NullPathType(BasePathType[None]):
@property
def null(self) -> None:
return None
def format(self, value: bytes | None) -> str:
return util.displayable_path(value or b"")
class PathType(BasePathType[bytes]):
@property
def null(self) -> bytes:
return b""
def format(self, value: bytes) -> str:
return util.displayable_path(value or b"")
class MusicalKey(String):
"""String representing the musical key of a song.
The standard format is C, Cm, C#, C#m, etc.
"""
ENHARMONIC = {
r"db": "c#",
r"eb": "d#",
r"gb": "f#",
r"ab": "g#",
r"bb": "a#",
}
null = None
def parse(self, key):
key = key.lower()
for flat, sharp in self.ENHARMONIC.items():
key = re.sub(flat, sharp, key)
key = re.sub(r"[\W\s]+minor", "m", key)
key = re.sub(r"[\W\s]+major", "", key)
return key.capitalize()
def normalize(self, key):
if key is None:
return None
else:
return self.parse(key)
class DurationType(Float):
"""Human-friendly (M:SS) representation of a time interval."""
query = query.DurationQuery
def format(self, value):
if not beets.config["format_raw_length"].get(bool):
return human_seconds_short(value or 0.0)
else:
return value
def parse(self, string):
try:
# Try to format back hh:ss to seconds.
return raw_seconds_short(string)
except ValueError:
# Fall back to a plain float.
try:
return float(string)
except ValueError:
return self.null
# Shared instances of common types.
@ -331,6 +463,7 @@ FLOAT = Float()
NULL_FLOAT = NullFloat()
STRING = String()
BOOLEAN = Boolean()
DATE = DateType()
SEMICOLON_SPACE_DSV = DelimitedString(delimiter="; ")
# Will set the proper null char in mediafile

View file

@ -17,7 +17,6 @@
from __future__ import annotations
import os
import re
import shlex
import string
import sys
@ -46,259 +45,9 @@ from beets.util.functemplate import Template, template
if TYPE_CHECKING:
from .dbcore.query import FieldQuery, FieldQueryType
# To use the SQLite "blob" type, it doesn't suffice to provide a byte
# string; SQLite treats that as encoded text. Wrapping it in a
# `memoryview` tells it that we actually mean non-text data.
BLOB_TYPE = memoryview
log = logging.getLogger("beets")
# Library-specific query types.
class SingletonQuery(dbcore.FieldQuery[str]):
"""This query is responsible for the 'singleton' lookup.
It is based on the FieldQuery and constructs a SQL clause
'album_id is NULL' which yields the same result as the previous filter
in Python but is more performant since it's done in SQL.
Using util.str2bool ensures that lookups like singleton:true, singleton:1
and singleton:false, singleton:0 are handled consistently.
"""
def __new__(cls, field: str, value: str, *args, **kwargs):
query = dbcore.query.NoneQuery("album_id")
if util.str2bool(value):
return query
return dbcore.query.NotQuery(query)
class PathQuery(dbcore.FieldQuery[bytes]):
"""A query that matches all items under a given path.
Matching can either be case-insensitive or case-sensitive. By
default, the behavior depends on the OS: case-insensitive on Windows
and case-sensitive otherwise.
"""
# For tests
force_implicit_query_detection = False
def __init__(self, field, pattern, fast=True, case_sensitive=None):
"""Create a path query.
`pattern` must be a path, either to a file or a directory.
`case_sensitive` can be a bool or `None`, indicating that the
behavior should depend on the filesystem.
"""
super().__init__(field, pattern, fast)
path = util.normpath(pattern)
# By default, the case sensitivity depends on the filesystem
# that the query path is located on.
if case_sensitive is None:
case_sensitive = util.case_sensitive(path)
self.case_sensitive = case_sensitive
# Use a normalized-case pattern for case-insensitive matches.
if not case_sensitive:
# We need to lowercase the entire path, not just the pattern.
# In particular, on Windows, the drive letter is otherwise not
# lowercased.
# This also ensures that the `match()` method below and the SQL
# from `col_clause()` do the same thing.
path = path.lower()
# Match the path as a single file.
self.file_path = path
# As a directory (prefix).
self.dir_path = os.path.join(path, b"")
@classmethod
def is_path_query(cls, query_part):
"""Try to guess whether a unicode query part is a path query.
Condition: separator precedes colon and the file exists.
"""
colon = query_part.find(":")
if colon != -1:
query_part = query_part[:colon]
# Test both `sep` and `altsep` (i.e., both slash and backslash on
# Windows).
if not (
os.sep in query_part or (os.altsep and os.altsep in query_part)
):
return False
if cls.force_implicit_query_detection:
return True
return os.path.exists(syspath(normpath(query_part)))
def match(self, item):
path = item.path if self.case_sensitive else item.path.lower()
return (path == self.file_path) or path.startswith(self.dir_path)
def col_clause(self):
file_blob = BLOB_TYPE(self.file_path)
dir_blob = BLOB_TYPE(self.dir_path)
if self.case_sensitive:
query_part = "({0} = ?) || (substr({0}, 1, ?) = ?)"
else:
query_part = "(BYTELOWER({0}) = BYTELOWER(?)) || \
(substr(BYTELOWER({0}), 1, ?) = BYTELOWER(?))"
return query_part.format(self.field), (
file_blob,
len(dir_blob),
dir_blob,
)
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}({self.field!r}, {self.pattern!r}, "
f"fast={self.fast}, case_sensitive={self.case_sensitive})"
)
# Library-specific field types.
class DateType(types.Float):
# TODO representation should be `datetime` object
# TODO distinguish between date and time types
query = dbcore.query.DateQuery
def format(self, value):
return time.strftime(
beets.config["time_format"].as_str(), time.localtime(value or 0)
)
def parse(self, string):
try:
# Try a formatted date string.
return time.mktime(
time.strptime(string, beets.config["time_format"].as_str())
)
except ValueError:
# Fall back to a plain timestamp number.
try:
return float(string)
except ValueError:
return self.null
class PathType(types.Type[bytes, bytes]):
"""A dbcore type for filesystem paths.
These are represented as `bytes` objects, in keeping with
the Unix filesystem abstraction.
"""
sql = "BLOB"
query = PathQuery
model_type = bytes
def __init__(self, nullable=False):
"""Create a path type object.
`nullable` controls whether the type may be missing, i.e., None.
"""
self.nullable = nullable
@property
def null(self):
if self.nullable:
return None
else:
return b""
def format(self, value):
return util.displayable_path(value)
def parse(self, string):
return normpath(bytestring_path(string))
def normalize(self, value):
if isinstance(value, str):
# Paths stored internally as encoded bytes.
return bytestring_path(value)
elif isinstance(value, BLOB_TYPE):
# We unwrap buffers to bytes.
return bytes(value)
else:
return value
def from_sql(self, sql_value):
return self.normalize(sql_value)
def to_sql(self, value):
if isinstance(value, bytes):
value = BLOB_TYPE(value)
return value
class MusicalKey(types.String):
"""String representing the musical key of a song.
The standard format is C, Cm, C#, C#m, etc.
"""
ENHARMONIC = {
r"db": "c#",
r"eb": "d#",
r"gb": "f#",
r"ab": "g#",
r"bb": "a#",
}
null = None
def parse(self, key):
key = key.lower()
for flat, sharp in self.ENHARMONIC.items():
key = re.sub(flat, sharp, key)
key = re.sub(r"[\W\s]+minor", "m", key)
key = re.sub(r"[\W\s]+major", "", key)
return key.capitalize()
def normalize(self, key):
if key is None:
return None
else:
return self.parse(key)
class DurationType(types.Float):
"""Human-friendly (M:SS) representation of a time interval."""
query = dbcore.query.DurationQuery
def format(self, value):
if not beets.config["format_raw_length"].get(bool):
return beets.ui.human_seconds_short(value or 0.0)
else:
return value
def parse(self, string):
try:
# Try to format back hh:ss to seconds.
return util.raw_seconds_short(string)
except ValueError:
# Fall back to a plain float.
try:
return float(string)
except ValueError:
return self.null
# Special path format key.
PF_KEY_DEFAULT = "default"
@ -517,7 +266,7 @@ class Item(LibModel):
_flex_table = "item_attributes"
_fields = {
"id": types.PRIMARY_ID,
"path": PathType(),
"path": types.PathType(),
"album_id": types.FOREIGN_ID,
"title": types.STRING,
"artist": types.STRING,
@ -596,8 +345,8 @@ class Item(LibModel):
"original_year": types.PaddedInt(4),
"original_month": types.PaddedInt(2),
"original_day": types.PaddedInt(2),
"initial_key": MusicalKey(),
"length": DurationType(),
"initial_key": types.MusicalKey(),
"length": types.DurationType(),
"bitrate": types.ScaledInt(1000, "kbps"),
"bitrate_mode": types.STRING,
"encoder_info": types.STRING,
@ -606,8 +355,8 @@ class Item(LibModel):
"samplerate": types.ScaledInt(1000, "kHz"),
"bitdepth": types.INTEGER,
"channels": types.INTEGER,
"mtime": DateType(),
"added": DateType(),
"mtime": types.DATE,
"added": types.DATE,
}
_search_fields = (
@ -641,7 +390,7 @@ class Item(LibModel):
_sorts = {"artist": dbcore.query.SmartArtistSort}
_queries = {"singleton": SingletonQuery}
_queries = {"singleton": dbcore.query.SingletonQuery}
_format_config_key = "format_item"
@ -717,7 +466,7 @@ class Item(LibModel):
if key == "path":
if isinstance(value, str):
value = bytestring_path(value)
elif isinstance(value, BLOB_TYPE):
elif isinstance(value, types.BLOB_TYPE):
value = bytes(value)
elif key == "album_id":
self._cached_album = None
@ -1161,8 +910,8 @@ class Album(LibModel):
_always_dirty = True
_fields = {
"id": types.PRIMARY_ID,
"artpath": PathType(True),
"added": DateType(),
"artpath": types.NullPathType(),
"added": types.DATE,
"albumartist": types.STRING,
"albumartist_sort": types.STRING,
"albumartist_credit": types.STRING,
@ -1208,7 +957,7 @@ class Album(LibModel):
_search_fields = ("album", "albumartist", "genre")
_types = {
"path": PathType(),
"path": types.PathType(),
"data_source": types.STRING,
}
@ -1563,7 +1312,10 @@ def parse_query_parts(parts, model_cls):
# Special-case path-like queries, which are non-field queries
# containing path separators (/).
parts = [f"path:{s}" if PathQuery.is_path_query(s) else s for s in parts]
parts = [
f"path:{s}" if dbcore.query.PathQuery.is_path_query(s) else s
for s in parts
]
case_insensitive = beets.config["sort_case_insensitive"].get(bool)

View file

@ -63,8 +63,8 @@ HAVE_SYMLINK = sys.platform != "win32"
HAVE_HARDLINK = sys.platform != "win32"
def item(lib=None):
i = beets.library.Item(
def item(lib=None, **kwargs):
defaults = dict(
title="the title",
artist="the artist",
albumartist="the album artist",
@ -99,6 +99,7 @@ def item(lib=None):
album_id=None,
mtime=12345,
)
i = beets.library.Item(**{**defaults, **kwargs})
if lib:
lib.add(i)
return i

View file

@ -435,56 +435,6 @@ def input_select_objects(prompt, objs, rep, prompt_all=None):
return []
# Human output formatting.
def human_bytes(size):
"""Formats size, a number of bytes, in a human-readable way."""
powers = ["", "K", "M", "G", "T", "P", "E", "Z", "Y", "H"]
unit = "B"
for power in powers:
if size < 1024:
return f"{size:3.1f} {power}{unit}"
size /= 1024.0
unit = "iB"
return "big"
def human_seconds(interval):
"""Formats interval, a number of seconds, as a human-readable time
interval using English words.
"""
units = [
(1, "second"),
(60, "minute"),
(60, "hour"),
(24, "day"),
(7, "week"),
(52, "year"),
(10, "decade"),
]
for i in range(len(units) - 1):
increment, suffix = units[i]
next_increment, _ = units[i + 1]
interval /= float(increment)
if interval < next_increment:
break
else:
# Last unit.
increment, suffix = units[-1]
interval /= float(increment)
return f"{interval:3.1f} {suffix}s"
def human_seconds_short(interval):
"""Formats a number of seconds as a short human-readable M:SS
string.
"""
interval = int(interval)
return "%i:%02i" % (interval // 60, interval % 60)
# Colorization.
# ANSI terminal colorization code heavily inspired by pygments:

View file

@ -43,6 +43,7 @@ from beets.util import (
normpath,
syspath,
)
from beets.util.units import human_bytes, human_seconds, human_seconds_short
from . import _store_dict
@ -541,8 +542,8 @@ class ChangeRepresentation:
cur_length0 = item.length if item.length else 0
new_length0 = track_info.length if track_info.length else 0
# format into string
cur_length = f"({ui.human_seconds_short(cur_length0)})"
new_length = f"({ui.human_seconds_short(new_length0)})"
cur_length = f"({human_seconds_short(cur_length0)})"
new_length = f"({human_seconds_short(new_length0)})"
# colorize
lhs_length = ui.colorize(highlight_color, cur_length)
rhs_length = ui.colorize(highlight_color, new_length)
@ -706,14 +707,14 @@ class AlbumChange(ChangeRepresentation):
for track_info in self.match.extra_tracks:
line = f" ! {track_info.title} (#{self.format_index(track_info)})"
if track_info.length:
line += f" ({ui.human_seconds_short(track_info.length)})"
line += f" ({human_seconds_short(track_info.length)})"
print_(ui.colorize("text_warning", line))
if self.match.extra_items:
print_(f"Unmatched tracks ({len(self.match.extra_items)}):")
for item in self.match.extra_items:
line = " ! {} (#{})".format(item.title, self.format_index(item))
if item.length:
line += " ({})".format(ui.human_seconds_short(item.length))
line += " ({})".format(human_seconds_short(item.length))
print_(ui.colorize("text_warning", line))
@ -795,8 +796,8 @@ def summarize_items(items, singleton):
round(int(items[0].samplerate) / 1000, 1), items[0].bitdepth
)
summary_parts.append(sample_bits)
summary_parts.append(ui.human_seconds_short(total_duration))
summary_parts.append(ui.human_bytes(total_filesize))
summary_parts.append(human_seconds_short(total_duration))
summary_parts.append(human_bytes(total_filesize))
return ", ".join(summary_parts)
@ -1906,7 +1907,7 @@ def show_stats(lib, query, exact):
if item.album_id:
albums.add(item.album_id)
size_str = "" + ui.human_bytes(total_size)
size_str = "" + human_bytes(total_size)
if exact:
size_str += f" ({total_size} bytes)"
@ -1918,7 +1919,7 @@ Artists: {}
Albums: {}
Album artists: {}""".format(
total_items,
ui.human_seconds(total_time),
human_seconds(total_time),
f" ({total_time:.2f} seconds)" if exact else "",
"Total size" if exact else "Approximate total size",
size_str,

View file

@ -1019,19 +1019,6 @@ def case_sensitive(path: bytes) -> bool:
return not os.path.samefile(lower_sys, upper_sys)
def raw_seconds_short(string: str) -> float:
"""Formats a human-readable M:SS string as a float (number of seconds).
Raises ValueError if the conversion cannot take place due to `string` not
being in the right format.
"""
match = re.match(r"^(\d+):([0-5]\d)$", string)
if not match:
raise ValueError("String not in M:SS format")
minutes, seconds = map(int, match.groups())
return float(minutes * 60 + seconds)
def asciify_path(path: str, sep_replace: str) -> str:
"""Decodes all unicode characters in a path into ASCII equivalents.

61
beets/util/units.py Normal file
View file

@ -0,0 +1,61 @@
import re
def raw_seconds_short(string: str) -> float:
"""Formats a human-readable M:SS string as a float (number of seconds).
Raises ValueError if the conversion cannot take place due to `string` not
being in the right format.
"""
match = re.match(r"^(\d+):([0-5]\d)$", string)
if not match:
raise ValueError("String not in M:SS format")
minutes, seconds = map(int, match.groups())
return float(minutes * 60 + seconds)
def human_seconds_short(interval):
"""Formats a number of seconds as a short human-readable M:SS
string.
"""
interval = int(interval)
return "%i:%02i" % (interval // 60, interval % 60)
def human_bytes(size):
"""Formats size, a number of bytes, in a human-readable way."""
powers = ["", "K", "M", "G", "T", "P", "E", "Z", "Y", "H"]
unit = "B"
for power in powers:
if size < 1024:
return f"{size:3.1f} {power}{unit}"
size /= 1024.0
unit = "iB"
return "big"
def human_seconds(interval):
"""Formats interval, a number of seconds, as a human-readable time
interval using English words.
"""
units = [
(1, "second"),
(60, "minute"),
(60, "hour"),
(24, "day"),
(7, "week"),
(52, "year"),
(10, "decade"),
]
for i in range(len(units) - 1):
increment, suffix = units[i]
next_increment, _ = units[i + 1]
interval /= float(increment)
if interval < next_increment:
break
else:
# Last unit.
increment, suffix = units[-1]
interval /= float(increment)
return f"{interval:3.1f} {suffix}s"

View file

@ -25,7 +25,6 @@ import unidecode
from beets import ui
from beets.autotag import AlbumInfo, TrackInfo
from beets.dbcore import types
from beets.library import DateType
from beets.plugins import BeetsPlugin, MetadataSourcePlugin
@ -35,7 +34,7 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
item_types = {
"deezer_track_rank": types.INTEGER,
"deezer_track_id": types.INTEGER,
"deezer_updated": DateType(),
"deezer_updated": types.DATE,
}
# Base URLs for the Deezer API

View file

@ -20,7 +20,6 @@ from time import mktime
from xml.sax.saxutils import quoteattr
from beets.dbcore import types
from beets.library import DateType
from beets.util import displayable_path
from beetsplug.metasync import MetaSource
@ -41,8 +40,8 @@ class Amarok(MetaSource):
"amarok_score": types.FLOAT,
"amarok_uid": types.STRING,
"amarok_playcount": types.INTEGER,
"amarok_firstplayed": DateType(),
"amarok_lastplayed": DateType(),
"amarok_firstplayed": types.DATE,
"amarok_lastplayed": types.DATE,
}
query_xml = '<query version="1.0"> \

View file

@ -26,7 +26,6 @@ from confuse import ConfigValueError
from beets import util
from beets.dbcore import types
from beets.library import DateType
from beets.util import bytestring_path, syspath
from beetsplug.metasync import MetaSource
@ -63,9 +62,9 @@ class Itunes(MetaSource):
"itunes_rating": types.INTEGER, # 0..100 scale
"itunes_playcount": types.INTEGER,
"itunes_skipcount": types.INTEGER,
"itunes_lastplayed": DateType(),
"itunes_lastskipped": DateType(),
"itunes_dateadded": DateType(),
"itunes_lastplayed": types.DATE,
"itunes_lastskipped": types.DATE,
"itunes_dateadded": types.DATE,
}
def __init__(self, config, log):

View file

@ -18,8 +18,9 @@ import time
import mpd
from beets import config, library, plugins, ui
from beets import config, plugins, ui
from beets.dbcore import types
from beets.dbcore.query import PathQuery
from beets.util import displayable_path
# If we lose the connection, how many times do we want to retry and how
@ -160,7 +161,7 @@ class MPDStats:
def get_item(self, path):
"""Return the beets item related to path."""
query = library.PathQuery("path", path)
query = PathQuery("path", path)
item = self.lib.items(query).get()
if item:
return item
@ -321,7 +322,7 @@ class MPDStatsPlugin(plugins.BeetsPlugin):
item_types = {
"play_count": types.INTEGER,
"skip_count": types.INTEGER,
"last_played": library.DateType(),
"last_played": types.DATE,
"rating": types.FLOAT,
}

View file

@ -18,8 +18,7 @@ import tempfile
from collections.abc import Sequence
import beets
from beets.dbcore.query import InQuery
from beets.library import BLOB_TYPE
from beets.dbcore.query import BLOB_TYPE, InQuery
from beets.util import path_as_posix

View file

@ -34,10 +34,10 @@ import unidecode
from beets import ui
from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.dbcore import types
from beets.library import DateType, Library
from beets.plugins import BeetsPlugin, MetadataSourcePlugin, Response
if TYPE_CHECKING:
from beets.library import Library
from beetsplug._typing import JSONDict
DEFAULT_WAITING_TIME = 5
@ -64,7 +64,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
"spotify_tempo": types.FLOAT,
"spotify_time_signature": types.INTEGER,
"spotify_valence": types.FLOAT,
"spotify_updated": DateType(),
"spotify_updated": types.DATE,
}
# Base URLs for the Spotify API

View file

@ -15,7 +15,6 @@
from confuse import ConfigValueError
from beets import library
from beets.dbcore import types
from beets.plugins import BeetsPlugin
@ -42,7 +41,7 @@ class TypesPlugin(BeetsPlugin):
elif value.get() == "bool":
mytypes[key] = types.BOOLEAN
elif value.get() == "date":
mytypes[key] = library.DateType()
mytypes[key] = types.DATE
else:
raise ConfigValueError(
"unknown type '{}' for the '{}' field".format(value, key)

View file

@ -25,6 +25,7 @@ from werkzeug.routing import BaseConverter, PathConverter
import beets.library
from beets import ui, util
from beets.dbcore.query import PathQuery
from beets.plugins import BeetsPlugin
# Utilities.
@ -342,7 +343,7 @@ def item_query(queries):
@app.route("/item/path/<everything:path>")
def item_at_path(path):
query = beets.library.PathQuery("path", path.encode("utf-8"))
query = PathQuery("path", path.encode("utf-8"))
item = g.lib.items(query).get()
if item:
return flask.jsonify(_rep(item))

View file

@ -1,7 +1,10 @@
import inspect
import os
import pytest
from beets.dbcore.query import Query
def skip_marked_items(items: list[pytest.Item], marker_name: str, reason: str):
for item in (i for i in items if i.get_closest_marker(marker_name)):
@ -21,3 +24,20 @@ def pytest_collection_modifyitems(
skip_marked_items(
items, "on_lyrics_update", "No change in lyrics source code"
)
def pytest_make_parametrize_id(config, val, argname):
"""Generate readable test identifiers for pytest parametrized tests.
Provides custom string representations for:
- Query classes/instances: use class name
- Lambda functions: show abbreviated source
- Other values: use standard repr()
"""
if inspect.isclass(val) and issubclass(val, Query):
return val.__name__
if inspect.isfunction(val) and val.__name__ == "<lambda>":
return inspect.getsource(val).split("lambda")[-1][:30]
return repr(val)

View file

@ -19,7 +19,6 @@ import os.path
import re
import shutil
import stat
import time
import unicodedata
import unittest
from unittest.mock import patch
@ -1320,56 +1319,3 @@ class ParseQueryTest(unittest.TestCase):
def test_parse_bytes(self):
with pytest.raises(AssertionError):
beets.library.parse_query_string(b"query", None)
class LibraryFieldTypesTest(unittest.TestCase):
"""Test format() and parse() for library-specific field types"""
def test_datetype(self):
t = beets.library.DateType()
# format
time_format = beets.config["time_format"].as_str()
time_local = time.strftime(time_format, time.localtime(123456789))
assert time_local == t.format(123456789)
# parse
assert 123456789.0 == t.parse(time_local)
assert 123456789.0 == t.parse("123456789.0")
assert t.null == t.parse("not123456789.0")
assert t.null == t.parse("1973-11-29")
def test_pathtype(self):
t = beets.library.PathType()
# format
assert "/tmp" == t.format("/tmp")
assert "/tmp/\xe4lbum" == t.format("/tmp/\u00e4lbum")
# parse
assert np(b"/tmp") == t.parse("/tmp")
assert np(b"/tmp/\xc3\xa4lbum") == t.parse("/tmp/\u00e4lbum/")
def test_musicalkey(self):
t = beets.library.MusicalKey()
# parse
assert "C#m" == t.parse("c#m")
assert "Gm" == t.parse("g minor")
assert "Not c#m" == t.parse("not C#m")
def test_durationtype(self):
t = beets.library.DurationType()
# format
assert "1:01" == t.format(61.23)
assert "60:01" == t.format(3601.23)
assert "0:00" == t.format(None)
# parse
assert 61.0 == t.parse("1:01")
assert 61.23 == t.parse("61.23")
assert 3601.0 == t.parse("60:01")
assert t.null == t.parse("1:00:01")
assert t.null == t.parse("not61.23")
# config format_raw_length
beets.config["format_raw_length"] = True
assert 61.23 == t.format(61.23)
assert 3601.23 == t.format(3601.23)

File diff suppressed because it is too large Load diff

58
test/test_types.py Normal file
View file

@ -0,0 +1,58 @@
import time
import beets
from beets.dbcore import types
from beets.util import normpath
def test_datetype():
t = types.DATE
# format
time_format = beets.config["time_format"].as_str()
time_local = time.strftime(time_format, time.localtime(123456789))
assert time_local == t.format(123456789)
# parse
assert 123456789.0 == t.parse(time_local)
assert 123456789.0 == t.parse("123456789.0")
assert t.null == t.parse("not123456789.0")
assert t.null == t.parse("1973-11-29")
def test_pathtype():
t = types.PathType()
# format
assert "/tmp" == t.format("/tmp")
assert "/tmp/\xe4lbum" == t.format("/tmp/\u00e4lbum")
# parse
assert normpath(b"/tmp") == t.parse("/tmp")
assert normpath(b"/tmp/\xc3\xa4lbum") == t.parse("/tmp/\u00e4lbum/")
def test_musicalkey():
t = types.MusicalKey()
# parse
assert "C#m" == t.parse("c#m")
assert "Gm" == t.parse("g minor")
assert "Not c#m" == t.parse("not C#m")
def test_durationtype():
t = types.DurationType()
# format
assert "1:01" == t.format(61.23)
assert "60:01" == t.format(3601.23)
assert "0:00" == t.format(None)
# parse
assert 61.0 == t.parse("1:01")
assert 61.23 == t.parse("61.23")
assert 3601.0 == t.parse("60:01")
assert t.null == t.parse("1:00:01")
assert t.null == t.parse("not61.23")
# config format_raw_length
beets.config["format_raw_length"] = True
assert 61.23 == t.format(61.23)
assert 3601.23 == t.format(3601.23)

View file

@ -21,7 +21,7 @@ from random import random
from beets import config, ui
from beets.test import _common
from beets.test.helper import BeetsTestCase, ItemInDBTestCase, control_stdin
from beets.test.helper import BeetsTestCase, control_stdin
class InputMethodsTest(BeetsTestCase):
@ -88,42 +88,6 @@ class InputMethodsTest(BeetsTestCase):
assert items == ["1", "3"]
class InitTest(ItemInDBTestCase):
def test_human_bytes(self):
tests = [
(0, "0.0 B"),
(30, "30.0 B"),
(pow(2, 10), "1.0 KiB"),
(pow(2, 20), "1.0 MiB"),
(pow(2, 30), "1.0 GiB"),
(pow(2, 40), "1.0 TiB"),
(pow(2, 50), "1.0 PiB"),
(pow(2, 60), "1.0 EiB"),
(pow(2, 70), "1.0 ZiB"),
(pow(2, 80), "1.0 YiB"),
(pow(2, 90), "1.0 HiB"),
(pow(2, 100), "big"),
]
for i, h in tests:
assert h == ui.human_bytes(i)
def test_human_seconds(self):
tests = [
(0, "0.0 seconds"),
(30, "30.0 seconds"),
(60, "1.0 minutes"),
(90, "1.5 minutes"),
(125, "2.1 minutes"),
(3600, "1.0 hours"),
(86400, "1.0 days"),
(604800, "1.0 weeks"),
(31449600, "1.0 years"),
(314496000, "1.0 decades"),
]
for i, h in tests:
assert h == ui.human_seconds(i)
class ParentalDirCreation(BeetsTestCase):
def test_create_yes(self):
non_exist_path = _common.os.fsdecode(

43
test/util/test_units.py Normal file
View file

@ -0,0 +1,43 @@
import pytest
from beets.util.units import human_bytes, human_seconds
@pytest.mark.parametrize(
"input_bytes,expected",
[
(0, "0.0 B"),
(30, "30.0 B"),
(pow(2, 10), "1.0 KiB"),
(pow(2, 20), "1.0 MiB"),
(pow(2, 30), "1.0 GiB"),
(pow(2, 40), "1.0 TiB"),
(pow(2, 50), "1.0 PiB"),
(pow(2, 60), "1.0 EiB"),
(pow(2, 70), "1.0 ZiB"),
(pow(2, 80), "1.0 YiB"),
(pow(2, 90), "1.0 HiB"),
(pow(2, 100), "big"),
],
)
def test_human_bytes(input_bytes, expected):
assert human_bytes(input_bytes) == expected
@pytest.mark.parametrize(
"input_seconds,expected",
[
(0, "0.0 seconds"),
(30, "30.0 seconds"),
(60, "1.0 minutes"),
(90, "1.5 minutes"),
(125, "2.1 minutes"),
(3600, "1.0 hours"),
(86400, "1.0 days"),
(604800, "1.0 weeks"),
(31449600, "1.0 years"),
(314496000, "1.0 decades"),
],
)
def test_human_seconds(input_seconds, expected):
assert human_seconds(input_seconds) == expected