Copy paste query, types from library to dbcore

This commit is contained in:
Šarūnas Nejus 2025-05-12 12:17:35 +01:00
parent 2a896d48b9
commit 1a045c9166
No known key found for this signature in database
GPG key ID: DD28F6704DBE3435
17 changed files with 367 additions and 368 deletions

View file

@ -16,6 +16,7 @@
from __future__ import annotations
import os
import re
import unicodedata
from abc import ABC, abstractmethod
@ -36,6 +37,11 @@ if TYPE_CHECKING:
else:
P = TypeVar("P")
# To use the SQLite "blob" type, it doesn't suffice to provide a byte
# string; SQLite treats that as encoded text. Wrapping it in a
# `memoryview` tells it that we actually mean non-text data.
BLOB_TYPE = memoryview
class ParsingError(ValueError):
"""Abstract class for any unparsable user-requested album/query
@ -267,6 +273,97 @@ class SubstringQuery(StringFieldQuery[str]):
return pattern.lower() in value.lower()
class PathQuery(FieldQuery[bytes]):
"""A query that matches all items under a given path.
Matching can either be case-insensitive or case-sensitive. By
default, the behavior depends on the OS: case-insensitive on Windows
and case-sensitive otherwise.
"""
# For tests
force_implicit_query_detection = False
def __init__(self, field, pattern, fast=True, case_sensitive=None):
"""Create a path query.
`pattern` must be a path, either to a file or a directory.
`case_sensitive` can be a bool or `None`, indicating that the
behavior should depend on the filesystem.
"""
super().__init__(field, pattern, fast)
path = util.normpath(pattern)
# By default, the case sensitivity depends on the filesystem
# that the query path is located on.
if case_sensitive is None:
case_sensitive = util.case_sensitive(path)
self.case_sensitive = case_sensitive
# Use a normalized-case pattern for case-insensitive matches.
if not case_sensitive:
# We need to lowercase the entire path, not just the pattern.
# In particular, on Windows, the drive letter is otherwise not
# lowercased.
# This also ensures that the `match()` method below and the SQL
# from `col_clause()` do the same thing.
path = path.lower()
# Match the path as a single file.
self.file_path = path
# As a directory (prefix).
self.dir_path = os.path.join(path, b"")
@classmethod
def is_path_query(cls, query_part):
"""Try to guess whether a unicode query part is a path query.
Condition: separator precedes colon and the file exists.
"""
colon = query_part.find(":")
if colon != -1:
query_part = query_part[:colon]
# Test both `sep` and `altsep` (i.e., both slash and backslash on
# Windows).
if not (
os.sep in query_part or (os.altsep and os.altsep in query_part)
):
return False
if cls.force_implicit_query_detection:
return True
return os.path.exists(util.syspath(util.normpath(query_part)))
def match(self, item):
path = item.path if self.case_sensitive else item.path.lower()
return (path == self.file_path) or path.startswith(self.dir_path)
def col_clause(self):
file_blob = BLOB_TYPE(self.file_path)
dir_blob = BLOB_TYPE(self.dir_path)
if self.case_sensitive:
query_part = "({0} = ?) || (substr({0}, 1, ?) = ?)"
else:
query_part = "(BYTELOWER({0}) = BYTELOWER(?)) || \
(substr(BYTELOWER({0}), 1, ?) = BYTELOWER(?))"
return query_part.format(self.field), (
file_blob,
len(dir_blob),
dir_blob,
)
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}({self.field!r}, {self.pattern!r}, "
f"fast={self.fast}, case_sensitive={self.case_sensitive})"
)
class RegexpQuery(StringFieldQuery[Pattern[str]]):
"""A query that matches a regular expression in a specific Model field.
@ -844,6 +941,24 @@ class DurationQuery(NumericQuery):
)
class SingletonQuery(FieldQuery[str]):
"""This query is responsible for the 'singleton' lookup.
It is based on the FieldQuery and constructs a SQL clause
'album_id is NULL' which yields the same result as the previous filter
in Python but is more performant since it's done in SQL.
Using util.str2bool ensures that lookups like singleton:true, singleton:1
and singleton:false, singleton:0 are handled consistently.
"""
def __new__(cls, field: str, value: str, *args, **kwargs):
query = NoneQuery("album_id")
if util.str2bool(value):
return query
return NotQuery(query)
# Sorting.

View file

@ -16,19 +16,18 @@
from __future__ import annotations
import re
import time
import typing
from abc import ABC
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
from beets.util import str2bool
from beets import util
from .query import (
BooleanQuery,
FieldQueryType,
NumericQuery,
SQLiteType,
SubstringQuery,
)
from . import query
SQLiteType = query.SQLiteType
BLOB_TYPE = query.BLOB_TYPE
class ModelType(typing.Protocol):
@ -61,7 +60,7 @@ class Type(ABC, Generic[T, N]):
"""The SQLite column type for the value.
"""
query: FieldQueryType = SubstringQuery
query: query.FieldQueryType = query.SubstringQuery
"""The `Query` subclass to be used when querying the field.
"""
@ -160,7 +159,7 @@ class BaseInteger(Type[int, N]):
"""A basic integer type."""
sql = "INTEGER"
query = NumericQuery
query = query.NumericQuery
model_type = int
def normalize(self, value: Any) -> int | N:
@ -241,7 +240,7 @@ class BaseFloat(Type[float, N]):
"""
sql = "REAL"
query: FieldQueryType = NumericQuery
query: query.FieldQueryType = query.NumericQuery
model_type = float
def __init__(self, digits: int = 1):
@ -271,7 +270,7 @@ class BaseString(Type[T, N]):
"""A Unicode string type."""
sql = "TEXT"
query = SubstringQuery
query = query.SubstringQuery
def normalize(self, value: Any) -> T | N:
if value is None:
@ -312,14 +311,144 @@ class Boolean(Type):
"""A boolean type."""
sql = "INTEGER"
query = BooleanQuery
query = query.BooleanQuery
model_type = bool
def format(self, value: bool) -> str:
return str(bool(value))
def parse(self, string: str) -> bool:
return str2bool(string)
return util.str2bool(string)
class DateType(Float):
# TODO representation should be `datetime` object
# TODO distinguish between date and time types
query = query.DateQuery
def format(self, value):
return time.strftime(
beets.config["time_format"].as_str(), time.localtime(value or 0)
)
def parse(self, string):
try:
# Try a formatted date string.
return time.mktime(
time.strptime(string, beets.config["time_format"].as_str())
)
except ValueError:
# Fall back to a plain timestamp number.
try:
return float(string)
except ValueError:
return self.null
class PathType(Type[bytes, bytes]):
"""A dbcore type for filesystem paths.
These are represented as `bytes` objects, in keeping with
the Unix filesystem abstraction.
"""
sql = "BLOB"
query = query.PathQuery
model_type = bytes
def __init__(self, nullable=False):
"""Create a path type object.
`nullable` controls whether the type may be missing, i.e., None.
"""
self.nullable = nullable
@property
def null(self):
if self.nullable:
return None
else:
return b""
def format(self, value):
return util.displayable_path(value)
def parse(self, string):
return util.normpath(util.bytestring_path(string))
def normalize(self, value):
if isinstance(value, str):
# Paths stored internally as encoded bytes.
return util.bytestring_path(value)
elif isinstance(value, BLOB_TYPE):
# We unwrap buffers to bytes.
return bytes(value)
else:
return value
def from_sql(self, sql_value):
return self.normalize(sql_value)
def to_sql(self, value):
if isinstance(value, bytes):
value = BLOB_TYPE(value)
return value
class MusicalKey(String):
"""String representing the musical key of a song.
The standard format is C, Cm, C#, C#m, etc.
"""
ENHARMONIC = {
r"db": "c#",
r"eb": "d#",
r"gb": "f#",
r"ab": "g#",
r"bb": "a#",
}
null = None
def parse(self, key):
key = key.lower()
for flat, sharp in self.ENHARMONIC.items():
key = re.sub(flat, sharp, key)
key = re.sub(r"[\W\s]+minor", "m", key)
key = re.sub(r"[\W\s]+major", "", key)
return key.capitalize()
def normalize(self, key):
if key is None:
return None
else:
return self.parse(key)
class DurationType(Float):
"""Human-friendly (M:SS) representation of a time interval."""
query = query.DurationQuery
def format(self, value):
if not beets.config["format_raw_length"].get(bool):
return util.human_seconds_short(value or 0.0)
else:
return value
def parse(self, string):
try:
# Try to format back hh:ss to seconds.
return util.raw_seconds_short(string)
except ValueError:
# Fall back to a plain float.
try:
return float(string)
except ValueError:
return self.null
# Shared instances of common types.
@ -331,6 +460,7 @@ FLOAT = Float()
NULL_FLOAT = NullFloat()
STRING = String()
BOOLEAN = Boolean()
DATE = DateType()
SEMICOLON_SPACE_DSV = DelimitedString(delimiter="; ")
# Will set the proper null char in mediafile

View file

@ -17,7 +17,6 @@
from __future__ import annotations
import os
import re
import shlex
import string
import sys
@ -46,259 +45,9 @@ from beets.util.functemplate import Template, template
if TYPE_CHECKING:
from .dbcore.query import FieldQuery, FieldQueryType
# To use the SQLite "blob" type, it doesn't suffice to provide a byte
# string; SQLite treats that as encoded text. Wrapping it in a
# `memoryview` tells it that we actually mean non-text data.
BLOB_TYPE = memoryview
log = logging.getLogger("beets")
# Library-specific query types.
class SingletonQuery(dbcore.FieldQuery[str]):
"""This query is responsible for the 'singleton' lookup.
It is based on the FieldQuery and constructs a SQL clause
'album_id is NULL' which yields the same result as the previous filter
in Python but is more performant since it's done in SQL.
Using util.str2bool ensures that lookups like singleton:true, singleton:1
and singleton:false, singleton:0 are handled consistently.
"""
def __new__(cls, field: str, value: str, *args, **kwargs):
query = dbcore.query.NoneQuery("album_id")
if util.str2bool(value):
return query
return dbcore.query.NotQuery(query)
class PathQuery(dbcore.FieldQuery[bytes]):
"""A query that matches all items under a given path.
Matching can either be case-insensitive or case-sensitive. By
default, the behavior depends on the OS: case-insensitive on Windows
and case-sensitive otherwise.
"""
# For tests
force_implicit_query_detection = False
def __init__(self, field, pattern, fast=True, case_sensitive=None):
"""Create a path query.
`pattern` must be a path, either to a file or a directory.
`case_sensitive` can be a bool or `None`, indicating that the
behavior should depend on the filesystem.
"""
super().__init__(field, pattern, fast)
path = util.normpath(pattern)
# By default, the case sensitivity depends on the filesystem
# that the query path is located on.
if case_sensitive is None:
case_sensitive = util.case_sensitive(path)
self.case_sensitive = case_sensitive
# Use a normalized-case pattern for case-insensitive matches.
if not case_sensitive:
# We need to lowercase the entire path, not just the pattern.
# In particular, on Windows, the drive letter is otherwise not
# lowercased.
# This also ensures that the `match()` method below and the SQL
# from `col_clause()` do the same thing.
path = path.lower()
# Match the path as a single file.
self.file_path = path
# As a directory (prefix).
self.dir_path = os.path.join(path, b"")
@classmethod
def is_path_query(cls, query_part):
"""Try to guess whether a unicode query part is a path query.
Condition: separator precedes colon and the file exists.
"""
colon = query_part.find(":")
if colon != -1:
query_part = query_part[:colon]
# Test both `sep` and `altsep` (i.e., both slash and backslash on
# Windows).
if not (
os.sep in query_part or (os.altsep and os.altsep in query_part)
):
return False
if cls.force_implicit_query_detection:
return True
return os.path.exists(syspath(normpath(query_part)))
def match(self, item):
path = item.path if self.case_sensitive else item.path.lower()
return (path == self.file_path) or path.startswith(self.dir_path)
def col_clause(self):
file_blob = BLOB_TYPE(self.file_path)
dir_blob = BLOB_TYPE(self.dir_path)
if self.case_sensitive:
query_part = "({0} = ?) || (substr({0}, 1, ?) = ?)"
else:
query_part = "(BYTELOWER({0}) = BYTELOWER(?)) || \
(substr(BYTELOWER({0}), 1, ?) = BYTELOWER(?))"
return query_part.format(self.field), (
file_blob,
len(dir_blob),
dir_blob,
)
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}({self.field!r}, {self.pattern!r}, "
f"fast={self.fast}, case_sensitive={self.case_sensitive})"
)
# Library-specific field types.
class DateType(types.Float):
# TODO representation should be `datetime` object
# TODO distinguish between date and time types
query = dbcore.query.DateQuery
def format(self, value):
return time.strftime(
beets.config["time_format"].as_str(), time.localtime(value or 0)
)
def parse(self, string):
try:
# Try a formatted date string.
return time.mktime(
time.strptime(string, beets.config["time_format"].as_str())
)
except ValueError:
# Fall back to a plain timestamp number.
try:
return float(string)
except ValueError:
return self.null
class PathType(types.Type[bytes, bytes]):
"""A dbcore type for filesystem paths.
These are represented as `bytes` objects, in keeping with
the Unix filesystem abstraction.
"""
sql = "BLOB"
query = PathQuery
model_type = bytes
def __init__(self, nullable=False):
"""Create a path type object.
`nullable` controls whether the type may be missing, i.e., None.
"""
self.nullable = nullable
@property
def null(self):
if self.nullable:
return None
else:
return b""
def format(self, value):
return util.displayable_path(value)
def parse(self, string):
return normpath(bytestring_path(string))
def normalize(self, value):
if isinstance(value, str):
# Paths stored internally as encoded bytes.
return bytestring_path(value)
elif isinstance(value, BLOB_TYPE):
# We unwrap buffers to bytes.
return bytes(value)
else:
return value
def from_sql(self, sql_value):
return self.normalize(sql_value)
def to_sql(self, value):
if isinstance(value, bytes):
value = BLOB_TYPE(value)
return value
class MusicalKey(types.String):
"""String representing the musical key of a song.
The standard format is C, Cm, C#, C#m, etc.
"""
ENHARMONIC = {
r"db": "c#",
r"eb": "d#",
r"gb": "f#",
r"ab": "g#",
r"bb": "a#",
}
null = None
def parse(self, key):
key = key.lower()
for flat, sharp in self.ENHARMONIC.items():
key = re.sub(flat, sharp, key)
key = re.sub(r"[\W\s]+minor", "m", key)
key = re.sub(r"[\W\s]+major", "", key)
return key.capitalize()
def normalize(self, key):
if key is None:
return None
else:
return self.parse(key)
class DurationType(types.Float):
"""Human-friendly (M:SS) representation of a time interval."""
query = dbcore.query.DurationQuery
def format(self, value):
if not beets.config["format_raw_length"].get(bool):
return beets.ui.human_seconds_short(value or 0.0)
else:
return value
def parse(self, string):
try:
# Try to format back hh:ss to seconds.
return util.raw_seconds_short(string)
except ValueError:
# Fall back to a plain float.
try:
return float(string)
except ValueError:
return self.null
# Special path format key.
PF_KEY_DEFAULT = "default"
@ -517,7 +266,7 @@ class Item(LibModel):
_flex_table = "item_attributes"
_fields = {
"id": types.PRIMARY_ID,
"path": PathType(),
"path": types.PathType(),
"album_id": types.FOREIGN_ID,
"title": types.STRING,
"artist": types.STRING,
@ -596,8 +345,8 @@ class Item(LibModel):
"original_year": types.PaddedInt(4),
"original_month": types.PaddedInt(2),
"original_day": types.PaddedInt(2),
"initial_key": MusicalKey(),
"length": DurationType(),
"initial_key": types.MusicalKey(),
"length": types.DurationType(),
"bitrate": types.ScaledInt(1000, "kbps"),
"bitrate_mode": types.STRING,
"encoder_info": types.STRING,
@ -606,8 +355,8 @@ class Item(LibModel):
"samplerate": types.ScaledInt(1000, "kHz"),
"bitdepth": types.INTEGER,
"channels": types.INTEGER,
"mtime": DateType(),
"added": DateType(),
"mtime": types.DATE,
"added": types.DATE,
}
_search_fields = (
@ -641,7 +390,7 @@ class Item(LibModel):
_sorts = {"artist": dbcore.query.SmartArtistSort}
_queries = {"singleton": SingletonQuery}
_queries = {"singleton": dbcore.query.SingletonQuery}
_format_config_key = "format_item"
@ -717,7 +466,7 @@ class Item(LibModel):
if key == "path":
if isinstance(value, str):
value = bytestring_path(value)
elif isinstance(value, BLOB_TYPE):
elif isinstance(value, types.BLOB_TYPE):
value = bytes(value)
elif key == "album_id":
self._cached_album = None
@ -1161,8 +910,8 @@ class Album(LibModel):
_always_dirty = True
_fields = {
"id": types.PRIMARY_ID,
"artpath": PathType(True),
"added": DateType(),
"artpath": types.PathType(True),
"added": types.DATE,
"albumartist": types.STRING,
"albumartist_sort": types.STRING,
"albumartist_credit": types.STRING,
@ -1208,7 +957,7 @@ class Album(LibModel):
_search_fields = ("album", "albumartist", "genre")
_types = {
"path": PathType(),
"path": types.PathType(),
"data_source": types.STRING,
}
@ -1563,7 +1312,10 @@ def parse_query_parts(parts, model_cls):
# Special-case path-like queries, which are non-field queries
# containing path separators (/).
parts = [f"path:{s}" if PathQuery.is_path_query(s) else s for s in parts]
parts = [
f"path:{s}" if dbcore.query.PathQuery.is_path_query(s) else s
for s in parts
]
case_insensitive = beets.config["sort_case_insensitive"].get(bool)

View file

@ -477,14 +477,6 @@ def human_seconds(interval):
return f"{interval:3.1f} {suffix}s"
def human_seconds_short(interval):
"""Formats a number of seconds as a short human-readable M:SS
string.
"""
interval = int(interval)
return "%i:%02i" % (interval // 60, interval % 60)
# Colorization.
# ANSI terminal colorization code heavily inspired by pygments:

View file

@ -541,8 +541,8 @@ class ChangeRepresentation:
cur_length0 = item.length if item.length else 0
new_length0 = track_info.length if track_info.length else 0
# format into string
cur_length = f"({ui.human_seconds_short(cur_length0)})"
new_length = f"({ui.human_seconds_short(new_length0)})"
cur_length = f"({util.human_seconds_short(cur_length0)})"
new_length = f"({util.human_seconds_short(new_length0)})"
# colorize
lhs_length = ui.colorize(highlight_color, cur_length)
rhs_length = ui.colorize(highlight_color, new_length)
@ -706,14 +706,14 @@ class AlbumChange(ChangeRepresentation):
for track_info in self.match.extra_tracks:
line = f" ! {track_info.title} (#{self.format_index(track_info)})"
if track_info.length:
line += f" ({ui.human_seconds_short(track_info.length)})"
line += f" ({util.human_seconds_short(track_info.length)})"
print_(ui.colorize("text_warning", line))
if self.match.extra_items:
print_(f"Unmatched tracks ({len(self.match.extra_items)}):")
for item in self.match.extra_items:
line = " ! {} (#{})".format(item.title, self.format_index(item))
if item.length:
line += " ({})".format(ui.human_seconds_short(item.length))
line += " ({})".format(util.human_seconds_short(item.length))
print_(ui.colorize("text_warning", line))
@ -795,7 +795,7 @@ def summarize_items(items, singleton):
round(int(items[0].samplerate) / 1000, 1), items[0].bitdepth
)
summary_parts.append(sample_bits)
summary_parts.append(ui.human_seconds_short(total_duration))
summary_parts.append(util.human_seconds_short(total_duration))
summary_parts.append(ui.human_bytes(total_filesize))
return ", ".join(summary_parts)

View file

@ -1032,6 +1032,14 @@ def raw_seconds_short(string: str) -> float:
return float(minutes * 60 + seconds)
def human_seconds_short(interval):
"""Formats a number of seconds as a short human-readable M:SS
string.
"""
interval = int(interval)
return "%i:%02i" % (interval // 60, interval % 60)
def asciify_path(path: str, sep_replace: str) -> str:
"""Decodes all unicode characters in a path into ASCII equivalents.

View file

@ -25,7 +25,6 @@ import unidecode
from beets import ui
from beets.autotag import AlbumInfo, TrackInfo
from beets.dbcore import types
from beets.library import DateType
from beets.plugins import BeetsPlugin, MetadataSourcePlugin
@ -35,7 +34,7 @@ class DeezerPlugin(MetadataSourcePlugin, BeetsPlugin):
item_types = {
"deezer_track_rank": types.INTEGER,
"deezer_track_id": types.INTEGER,
"deezer_updated": DateType(),
"deezer_updated": types.DATE,
}
# Base URLs for the Deezer API

View file

@ -20,7 +20,6 @@ from time import mktime
from xml.sax.saxutils import quoteattr
from beets.dbcore import types
from beets.library import DateType
from beets.util import displayable_path
from beetsplug.metasync import MetaSource
@ -41,8 +40,8 @@ class Amarok(MetaSource):
"amarok_score": types.FLOAT,
"amarok_uid": types.STRING,
"amarok_playcount": types.INTEGER,
"amarok_firstplayed": DateType(),
"amarok_lastplayed": DateType(),
"amarok_firstplayed": types.DATE,
"amarok_lastplayed": types.DATE,
}
query_xml = '<query version="1.0"> \

View file

@ -26,7 +26,6 @@ from confuse import ConfigValueError
from beets import util
from beets.dbcore import types
from beets.library import DateType
from beets.util import bytestring_path, syspath
from beetsplug.metasync import MetaSource
@ -63,9 +62,9 @@ class Itunes(MetaSource):
"itunes_rating": types.INTEGER, # 0..100 scale
"itunes_playcount": types.INTEGER,
"itunes_skipcount": types.INTEGER,
"itunes_lastplayed": DateType(),
"itunes_lastskipped": DateType(),
"itunes_dateadded": DateType(),
"itunes_lastplayed": types.DATE,
"itunes_lastskipped": types.DATE,
"itunes_dateadded": types.DATE,
}
def __init__(self, config, log):

View file

@ -18,8 +18,9 @@ import time
import mpd
from beets import config, library, plugins, ui
from beets import config, plugins, ui
from beets.dbcore import types
from beets.dbcore.query import PathQuery
from beets.util import displayable_path
# If we lose the connection, how many times do we want to retry and how
@ -160,7 +161,7 @@ class MPDStats:
def get_item(self, path):
"""Return the beets item related to path."""
query = library.PathQuery("path", path)
query = PathQuery("path", path)
item = self.lib.items(query).get()
if item:
return item
@ -321,7 +322,7 @@ class MPDStatsPlugin(plugins.BeetsPlugin):
item_types = {
"play_count": types.INTEGER,
"skip_count": types.INTEGER,
"last_played": library.DateType(),
"last_played": types.DATE,
"rating": types.FLOAT,
}

View file

@ -18,8 +18,7 @@ import tempfile
from collections.abc import Sequence
import beets
from beets.dbcore.query import InQuery
from beets.library import BLOB_TYPE
from beets.dbcore.query import BLOB_TYPE, InQuery
from beets.util import path_as_posix

View file

@ -34,10 +34,10 @@ import unidecode
from beets import ui
from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.dbcore import types
from beets.library import DateType, Library
from beets.plugins import BeetsPlugin, MetadataSourcePlugin, Response
if TYPE_CHECKING:
from beets.library import Library
from beetsplug._typing import JSONDict
DEFAULT_WAITING_TIME = 5
@ -64,7 +64,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
"spotify_tempo": types.FLOAT,
"spotify_time_signature": types.INTEGER,
"spotify_valence": types.FLOAT,
"spotify_updated": DateType(),
"spotify_updated": types.DATE,
}
# Base URLs for the Spotify API

View file

@ -15,7 +15,6 @@
from confuse import ConfigValueError
from beets import library
from beets.dbcore import types
from beets.plugins import BeetsPlugin
@ -42,7 +41,7 @@ class TypesPlugin(BeetsPlugin):
elif value.get() == "bool":
mytypes[key] = types.BOOLEAN
elif value.get() == "date":
mytypes[key] = library.DateType()
mytypes[key] = types.DATE
else:
raise ConfigValueError(
"unknown type '{}' for the '{}' field".format(value, key)

View file

@ -25,6 +25,7 @@ from werkzeug.routing import BaseConverter, PathConverter
import beets.library
from beets import ui, util
from beets.dbcore.query import PathQuery
from beets.plugins import BeetsPlugin
# Utilities.
@ -342,7 +343,7 @@ def item_query(queries):
@app.route("/item/path/<everything:path>")
def item_at_path(path):
query = beets.library.PathQuery("path", path.encode("utf-8"))
query = PathQuery("path", path.encode("utf-8"))
item = g.lib.items(query).get()
if item:
return flask.jsonify(_rep(item))

View file

@ -19,7 +19,6 @@ import os.path
import re
import shutil
import stat
import time
import unicodedata
import unittest
from unittest.mock import patch
@ -1320,56 +1319,3 @@ class ParseQueryTest(unittest.TestCase):
def test_parse_bytes(self):
with pytest.raises(AssertionError):
beets.library.parse_query_string(b"query", None)
class LibraryFieldTypesTest(unittest.TestCase):
"""Test format() and parse() for library-specific field types"""
def test_datetype(self):
t = beets.library.DateType()
# format
time_format = beets.config["time_format"].as_str()
time_local = time.strftime(time_format, time.localtime(123456789))
assert time_local == t.format(123456789)
# parse
assert 123456789.0 == t.parse(time_local)
assert 123456789.0 == t.parse("123456789.0")
assert t.null == t.parse("not123456789.0")
assert t.null == t.parse("1973-11-29")
def test_pathtype(self):
t = beets.library.PathType()
# format
assert "/tmp" == t.format("/tmp")
assert "/tmp/\xe4lbum" == t.format("/tmp/\u00e4lbum")
# parse
assert np(b"/tmp") == t.parse("/tmp")
assert np(b"/tmp/\xc3\xa4lbum") == t.parse("/tmp/\u00e4lbum/")
def test_musicalkey(self):
t = beets.library.MusicalKey()
# parse
assert "C#m" == t.parse("c#m")
assert "Gm" == t.parse("g minor")
assert "Not c#m" == t.parse("not C#m")
def test_durationtype(self):
t = beets.library.DurationType()
# format
assert "1:01" == t.format(61.23)
assert "60:01" == t.format(3601.23)
assert "0:00" == t.format(None)
# parse
assert 61.0 == t.parse("1:01")
assert 61.23 == t.parse("61.23")
assert 3601.0 == t.parse("60:01")
assert t.null == t.parse("1:00:01")
assert t.null == t.parse("not61.23")
# config format_raw_length
beets.config["format_raw_length"] = True
assert 61.23 == t.format(61.23)
assert 3601.23 == t.format(3601.23)

View file

@ -466,9 +466,9 @@ class PathQueryTest(ItemInDBTestCase, AssertsMixin):
# Unadorned path queries with path separators in them are considered
# path queries only when the path in question actually exists. So we
# mock the existence check to return true.
beets.library.PathQuery.force_implicit_query_detection = True
beets.dbcore.query.PathQuery.force_implicit_query_detection = True
yield
beets.library.PathQuery.force_implicit_query_detection = False
beets.dbcore.query.PathQuery.force_implicit_query_detection = False
def test_path_exact_match(self):
q = "path:/a/b/c.mp3"
@ -609,7 +609,7 @@ class PathQueryTest(ItemInDBTestCase, AssertsMixin):
def test_case_sensitivity(self):
self.add_album(path=b"/A/B/C2.mp3", title="caps path")
makeq = partial(beets.library.PathQuery, "path", "/A/B")
makeq = partial(beets.dbcore.query.PathQuery, "path", "/A/B")
results = self.lib.items(makeq(case_sensitive=True))
self.assert_items_matched(results, ["caps path"])
@ -621,7 +621,7 @@ class PathQueryTest(ItemInDBTestCase, AssertsMixin):
# both os.sep and os.altsep
@unittest.skipIf(sys.platform == "win32", "win32")
def test_path_sep_detection(self):
is_path_query = beets.library.PathQuery.is_path_query
is_path_query = beets.dbcore.query.PathQuery.is_path_query
with self.force_implicit_query_detection():
assert is_path_query("/foo/bar")
@ -641,7 +641,7 @@ class PathQueryTest(ItemInDBTestCase, AssertsMixin):
Thus, don't use the `force_implicit_query_detection()`
contextmanager which would disable the existence check.
"""
is_path_query = beets.library.PathQuery.is_path_query
is_path_query = beets.dbcore.query.PathQuery.is_path_query
path = self.touch(os.path.join(b"foo", b"bar"))
assert os.path.isabs(util.syspath(path))
@ -664,7 +664,7 @@ class PathQueryTest(ItemInDBTestCase, AssertsMixin):
Thus, don't use the `force_implicit_query_detection()`
contextmanager which would disable the existence check.
"""
is_path_query = beets.library.PathQuery.is_path_query
is_path_query = beets.dbcore.query.PathQuery.is_path_query
self.touch(os.path.join(b"foo", b"bar"))

59
test/test_types.py Normal file
View file

@ -0,0 +1,59 @@
import time
import unittest
import beets
from beets.dbcore import types
from beets.util import normpath
class LibraryFieldTypesTest(unittest.TestCase):
"""Test format() and parse() for library-specific field types"""
def test_datetype(self):
t = types.DATE
# format
time_format = beets.config["time_format"].as_str()
time_local = time.strftime(time_format, time.localtime(123456789))
assert time_local == t.format(123456789)
# parse
assert 123456789.0 == t.parse(time_local)
assert 123456789.0 == t.parse("123456789.0")
assert t.null == t.parse("not123456789.0")
assert t.null == t.parse("1973-11-29")
def test_pathtype(self):
t = types.PathType()
# format
assert "/tmp" == t.format("/tmp")
assert "/tmp/\xe4lbum" == t.format("/tmp/\u00e4lbum")
# parse
assert normpath(b"/tmp") == t.parse("/tmp")
assert normpath(b"/tmp/\xc3\xa4lbum") == t.parse("/tmp/\u00e4lbum/")
def test_musicalkey(self):
t = types.MusicalKey()
# parse
assert "C#m" == t.parse("c#m")
assert "Gm" == t.parse("g minor")
assert "Not c#m" == t.parse("not C#m")
def test_durationtype(self):
t = types.DurationType()
# format
assert "1:01" == t.format(61.23)
assert "60:01" == t.format(3601.23)
assert "0:00" == t.format(None)
# parse
assert 61.0 == t.parse("1:01")
assert 61.23 == t.parse("61.23")
assert 3601.0 == t.parse("60:01")
assert t.null == t.parse("1:00:01")
assert t.null == t.parse("not61.23")
# config format_raw_length
beets.config["format_raw_length"] = True
assert 61.23 == t.format(61.23)
assert 3601.23 == t.format(3601.23)