move remaining generic Query types to dbcore.query

NumericQuery is still broken. This, of course, is the whole reason for the
change.
This commit is contained in:
Adrian Sampson 2014-01-20 16:40:50 -08:00
parent 680524197c
commit 9ee4adc5e1
11 changed files with 353 additions and 360 deletions

View file

@ -16,4 +16,4 @@
Library.
"""
from .db import Type, Model, Database
from .query import Query, FieldQuery, MatchQuery
from .query import Query, FieldQuery, MatchQuery, AndQuery

View file

@ -14,6 +14,10 @@
"""The Query type hierarchy for DBCore.
"""
import re
from beets import util
class Query(object):
"""An abstract class representing a query into the item database.
"""
@ -75,3 +79,252 @@ class MatchQuery(FieldQuery):
@classmethod
def value_match(cls, pattern, value):
return pattern == value
class StringFieldQuery(FieldQuery):
"""A FieldQuery that converts values to strings before matching
them.
"""
@classmethod
def value_match(cls, pattern, value):
"""Determine whether the value matches the pattern. The value
may have any type.
"""
return cls.string_match(pattern, util.as_string(value))
@classmethod
def string_match(cls, pattern, value):
"""Determine whether the value matches the pattern. Both
arguments are strings. Subclasses implement this method.
"""
raise NotImplementedError()
class SubstringQuery(StringFieldQuery):
"""A query that matches a substring in a specific item field."""
def col_clause(self):
search = '%' + (self.pattern.replace('\\','\\\\').replace('%','\\%')
.replace('_','\\_')) + '%'
clause = self.field + " like ? escape '\\'"
subvals = [search]
return clause, subvals
@classmethod
def string_match(cls, pattern, value):
return pattern.lower() in value.lower()
class RegexpQuery(StringFieldQuery):
"""A query that matches a regular expression in a specific item
field.
"""
@classmethod
def string_match(cls, pattern, value):
try:
res = re.search(pattern, value)
except re.error:
# Invalid regular expression.
return False
return res is not None
class BooleanQuery(MatchQuery):
"""Matches a boolean field. Pattern should either be a boolean or a
string reflecting a boolean.
"""
def __init__(self, field, pattern):
super(BooleanQuery, self).__init__(field, pattern)
if isinstance(pattern, basestring):
self.pattern = util.str2bool(pattern)
self.pattern = int(self.pattern)
class BytesQuery(MatchQuery):
"""Match a raw bytes field (i.e., a path). This is a necessary hack
to distinguish between the common case, matching Unicode strings,
and the special case in which we match bytes.
"""
def __init__(self, field, pattern):
super(BytesQuery, self).__init__(field, pattern)
# Use a buffer representation of the pattern for SQLite
# matching. This instructs SQLite to treat the blob as binary
# rather than encoded Unicode.
if isinstance(self.pattern, bytes):
self.buf_pattern = buffer(self.pattern)
elif isinstance(self.battern, buffer):
self.buf_pattern = self.pattern
self.pattern = bytes(self.pattern)
def col_clause(self):
return self.field + " = ?", [self.buf_pattern]
class NumericQuery(FieldQuery):
"""Matches numeric fields. A syntax using Ruby-style range ellipses
(``..``) lets users specify one- or two-sided ranges. For example,
``year:2001..`` finds music released since the turn of the century.
"""
# FIXME
# types = dict((r[0], r[1]) for r in ITEM_FIELDS)
@classmethod
def applies_to(cls, field):
"""Determine whether a field has numeric type. NumericQuery
should only be used with such fields.
"""
if field not in cls.types:
# This can happen when using album fields.
# FIXME should no longer be necessary with the new type system.
return False
return cls.types.get(field).py_type in (int, float)
def _convert(self, s):
"""Convert a string to the appropriate numeric type. If the
string cannot be converted, return None.
"""
try:
return self.numtype(s)
except ValueError:
return None
def __init__(self, field, pattern, fast=True):
super(NumericQuery, self).__init__(field, pattern, fast)
self.numtype = self.types[field].py_type
parts = pattern.split('..', 1)
if len(parts) == 1:
# No range.
self.point = self._convert(parts[0])
self.rangemin = None
self.rangemax = None
else:
# One- or two-sided range.
self.point = None
self.rangemin = self._convert(parts[0])
self.rangemax = self._convert(parts[1])
def match(self, item):
value = getattr(item, self.field)
if isinstance(value, basestring):
value = self._convert(value)
if self.point is not None:
return value == self.point
else:
if self.rangemin is not None and value < self.rangemin:
return False
if self.rangemax is not None and value > self.rangemax:
return False
return True
def col_clause(self):
if self.point is not None:
return self.field + '=?', (self.point,)
else:
if self.rangemin is not None and self.rangemax is not None:
return (u'{0} >= ? AND {0} <= ?'.format(self.field),
(self.rangemin, self.rangemax))
elif self.rangemin is not None:
return u'{0} >= ?'.format(self.field), (self.rangemin,)
elif self.rangemax is not None:
return u'{0} <= ?'.format(self.field), (self.rangemax,)
else:
return '1'
class CollectionQuery(Query):
"""An abstract query class that aggregates other queries. Can be
indexed like a list to access the sub-queries.
"""
def __init__(self, subqueries=()):
self.subqueries = subqueries
# Act like a sequence.
def __len__(self):
return len(self.subqueries)
def __getitem__(self, key):
return self.subqueries[key]
def __iter__(self):
return iter(self.subqueries)
def __contains__(self, item):
return item in self.subqueries
def clause_with_joiner(self, joiner):
"""Returns a clause created by joining together the clauses of
all subqueries with the string joiner (padded by spaces).
"""
clause_parts = []
subvals = []
for subq in self.subqueries:
subq_clause, subq_subvals = subq.clause()
if not subq_clause:
# Fall back to slow query.
return None, ()
clause_parts.append('(' + subq_clause + ')')
subvals += subq_subvals
clause = (' ' + joiner + ' ').join(clause_parts)
return clause, subvals
class AnyFieldQuery(CollectionQuery):
"""A query that matches if a given FieldQuery subclass matches in
any field. The individual field query class is provided to the
constructor.
"""
def __init__(self, pattern, fields, cls):
self.pattern = pattern
self.fields = fields
self.query_class = cls
subqueries = []
for field in self.fields:
subqueries.append(cls(field, pattern, True))
super(AnyFieldQuery, self).__init__(subqueries)
def clause(self):
return self.clause_with_joiner('or')
def match(self, item):
for subq in self.subqueries:
if subq.match(item):
return True
return False
class MutableCollectionQuery(CollectionQuery):
"""A collection query whose subqueries may be modified after the
query is initialized.
"""
def __setitem__(self, key, value):
self.subqueries[key] = value
def __delitem__(self, key):
del self.subqueries[key]
class AndQuery(MutableCollectionQuery):
"""A conjunction of a list of other queries."""
def clause(self):
return self.clause_with_joiner('and')
def match(self, item):
return all([q.match(item) for q in self.subqueries])
class TrueQuery(Query):
"""A query that always matches."""
def clause(self):
return '1', ()
def match(self, item):
return True
class FalseQuery(Query):
"""A query that never matches."""
def clause(self):
return '0', ()
def match(self, item):
return False

View file

@ -84,7 +84,7 @@ def _item_duplicate_check(lib, task):
artist, title = task.chosen_ident()
found_items = []
query = library.AndQuery((
query = dbcore.AndQuery((
dbcore.MatchQuery('artist', artist),
dbcore.MatchQuery('title', title),
))
@ -752,7 +752,7 @@ def apply_choices(session):
task.replaced_items = defaultdict(list)
for item in items:
dup_items = session.lib.items(
library.BytesQuery('path', item.path)
dbcore.query.BytesQuery('path', item.path)
)
for dup_item in dup_items:
task.replaced_items[item].append(dup_item)

View file

@ -478,7 +478,7 @@ class Item(LibModel):
for query, path_format in path_formats:
if query == PF_KEY_DEFAULT:
continue
query = AndQuery.from_string(query)
query = get_query(query, type(self))
if query.match(self):
# The query matches the item! Use the corresponding path
# format.
@ -707,158 +707,25 @@ class Album(LibModel):
# Query abstraction hierarchy.
# Library-specific query types.
class StringFieldQuery(dbcore.FieldQuery):
"""A FieldQuery that converts values to strings before matching
them.
"""
@classmethod
def value_match(cls, pattern, value):
"""Determine whether the value matches the pattern. The value
may have any type.
"""
return cls.string_match(pattern, util.as_string(value))
@classmethod
def string_match(cls, pattern, value):
"""Determine whether the value matches the pattern. Both
arguments are strings. Subclasses implement this method.
"""
raise NotImplementedError()
class SubstringQuery(StringFieldQuery):
"""A query that matches a substring in a specific item field."""
def col_clause(self):
search = '%' + (self.pattern.replace('\\','\\\\').replace('%','\\%')
.replace('_','\\_')) + '%'
clause = self.field + " like ? escape '\\'"
subvals = [search]
return clause, subvals
@classmethod
def string_match(cls, pattern, value):
return pattern.lower() in value.lower()
class RegexpQuery(StringFieldQuery):
"""A query that matches a regular expression in a specific item
field.
"""
@classmethod
def string_match(cls, pattern, value):
try:
res = re.search(pattern, value)
except re.error:
# Invalid regular expression.
return False
return res is not None
class BooleanQuery(dbcore.MatchQuery):
"""Matches a boolean field. Pattern should either be a boolean or a
string reflecting a boolean.
"""
def __init__(self, field, pattern):
super(BooleanQuery, self).__init__(field, pattern)
if isinstance(pattern, basestring):
self.pattern = util.str2bool(pattern)
self.pattern = int(self.pattern)
class BytesQuery(dbcore.MatchQuery):
"""Match a raw bytes field (i.e., a path). This is a necessary hack
to distinguish between the common case, matching Unicode strings,
and the special case in which we match bytes.
"""
def __init__(self, field, pattern):
super(BytesQuery, self).__init__(field, pattern)
# Use a buffer representation of the pattern for SQLite
# matching. This instructs SQLite to treat the blob as binary
# rather than encoded Unicode.
if isinstance(self.pattern, bytes):
self.buf_pattern = buffer(self.pattern)
elif isinstance(self.battern, buffer):
self.buf_pattern = self.pattern
self.pattern = bytes(self.pattern)
def col_clause(self):
return self.field + " = ?", [self.buf_pattern]
class NumericQuery(dbcore.FieldQuery):
"""Matches numeric fields. A syntax using Ruby-style range ellipses
(``..``) lets users specify one- or two-sided ranges. For example,
``year:2001..`` finds music released since the turn of the century.
"""
types = dict((r[0], r[1]) for r in ITEM_FIELDS)
@classmethod
def applies_to(cls, field):
"""Determine whether a field has numeric type. NumericQuery
should only be used with such fields.
"""
if field not in cls.types:
# This can happen when using album fields.
# FIXME should no longer be necessary with the new type system.
return False
return cls.types.get(field).py_type in (int, float)
def _convert(self, s):
"""Convert a string to the appropriate numeric type. If the
string cannot be converted, return None.
"""
try:
return self.numtype(s)
except ValueError:
return None
def __init__(self, field, pattern, fast=True):
super(NumericQuery, self).__init__(field, pattern, fast)
self.numtype = self.types[field].py_type
parts = pattern.split('..', 1)
if len(parts) == 1:
# No range.
self.point = self._convert(parts[0])
self.rangemin = None
self.rangemax = None
else:
# One- or two-sided range.
self.point = None
self.rangemin = self._convert(parts[0])
self.rangemax = self._convert(parts[1])
class PathQuery(dbcore.Query):
"""A query that matches all items under a given path."""
def __init__(self, path):
# Match the path as a single file.
self.file_path = util.bytestring_path(util.normpath(path))
# As a directory (prefix).
self.dir_path = util.bytestring_path(os.path.join(self.file_path, ''))
def match(self, item):
value = getattr(item, self.field)
if isinstance(value, basestring):
value = self._convert(value)
return (item.path == self.file_path) or \
item.path.startswith(self.dir_path)
if self.point is not None:
return value == self.point
else:
if self.rangemin is not None and value < self.rangemin:
return False
if self.rangemax is not None and value > self.rangemax:
return False
return True
def col_clause(self):
if self.point is not None:
return self.field + '=?', (self.point,)
else:
if self.rangemin is not None and self.rangemax is not None:
return (u'{0} >= ? AND {0} <= ?'.format(self.field),
(self.rangemin, self.rangemax))
elif self.rangemin is not None:
return u'{0} >= ?'.format(self.field), (self.rangemin,)
elif self.rangemax is not None:
return u'{0} <= ?'.format(self.field), (self.rangemax,)
else:
return '1'
def clause(self):
dir_pat = buffer(self.dir_path + '%')
file_blob = buffer(self.file_path)
return '(path = ?) || (path LIKE ?)', (file_blob, dir_pat)
class SingletonQuery(dbcore.Query):
@ -876,147 +743,6 @@ class SingletonQuery(dbcore.Query):
return (not item.album_id) == self.sense
class CollectionQuery(dbcore.Query):
"""An abstract query class that aggregates other queries. Can be
indexed like a list to access the sub-queries.
"""
def __init__(self, subqueries=()):
self.subqueries = subqueries
# is there a better way to do this?
def __len__(self): return len(self.subqueries)
def __getitem__(self, key): return self.subqueries[key]
def __iter__(self): return iter(self.subqueries)
def __contains__(self, item): return item in self.subqueries
def clause_with_joiner(self, joiner):
"""Returns a clause created by joining together the clauses of
all subqueries with the string joiner (padded by spaces).
"""
clause_parts = []
subvals = []
for subq in self.subqueries:
subq_clause, subq_subvals = subq.clause()
if not subq_clause:
# Fall back to slow query.
return None, ()
clause_parts.append('(' + subq_clause + ')')
subvals += subq_subvals
clause = (' ' + joiner + ' ').join(clause_parts)
return clause, subvals
@classmethod
def from_strings(cls, query_parts, default_fields, all_keys):
"""Creates a query from a list of strings in the format used by
parse_query_part. If default_fields are specified, they are the
fields to be searched by unqualified search terms. Otherwise,
all fields are searched for those terms.
"""
subqueries = []
for part in query_parts:
subq = construct_query_part(part, default_fields, all_keys)
if subq:
subqueries.append(subq)
if not subqueries: # No terms in query.
subqueries = [TrueQuery()]
return cls(subqueries)
@classmethod
def from_string(cls, query, default_fields=ITEM_DEFAULT_FIELDS,
all_keys=ITEM_KEYS):
"""Creates a query based on a single string. The string is split
into query parts using shell-style syntax.
"""
# A bug in Python < 2.7.3 prevents correct shlex splitting of
# Unicode strings.
# http://bugs.python.org/issue6988
if isinstance(query, unicode):
query = query.encode('utf8')
parts = [s.decode('utf8') for s in shlex.split(query)]
return cls.from_strings(parts, default_fields, all_keys)
class AnyFieldQuery(CollectionQuery):
"""A query that matches if a given FieldQuery subclass matches in
any field. The individual field query class is provided to the
constructor.
"""
def __init__(self, pattern, fields, cls):
self.pattern = pattern
self.fields = fields
self.query_class = cls
subqueries = []
for field in self.fields:
subqueries.append(cls(field, pattern, True))
super(AnyFieldQuery, self).__init__(subqueries)
def clause(self):
return self.clause_with_joiner('or')
def match(self, item):
for subq in self.subqueries:
if subq.match(item):
return True
return False
class MutableCollectionQuery(CollectionQuery):
"""A collection query whose subqueries may be modified after the
query is initialized.
"""
def __setitem__(self, key, value):
self.subqueries[key] = value
def __delitem__(self, key):
del self.subqueries[key]
class AndQuery(MutableCollectionQuery):
"""A conjunction of a list of other queries."""
def clause(self):
return self.clause_with_joiner('and')
def match(self, item):
return all([q.match(item) for q in self.subqueries])
class TrueQuery(dbcore.Query):
"""A query that always matches."""
def clause(self):
return '1', ()
def match(self, item):
return True
class FalseQuery(dbcore.Query):
"""A query that never matches."""
def clause(self):
return '0', ()
def match(self, item):
return False
class PathQuery(dbcore.Query):
"""A query that matches all items under a given path."""
def __init__(self, path):
# Match the path as a single file.
self.file_path = bytestring_path(normpath(path))
# As a directory (prefix).
self.dir_path = bytestring_path(os.path.join(self.file_path, ''))
def match(self, item):
return (item.path == self.file_path) or \
item.path.startswith(self.dir_path)
def clause(self):
dir_pat = buffer(self.dir_path + '%')
file_blob = buffer(self.file_path)
return '(path = ?) || (path LIKE ?)', (file_blob, dir_pat)
# Query construction and parsing helpers.
@ -1055,7 +781,7 @@ def parse_query_part(part):
part = part.strip()
match = PARSE_QUERY_PART_REGEX.match(part)
prefixes = {':': RegexpQuery}
prefixes = {':': dbcore.query.RegexpQuery}
prefixes.update(plugins.queries())
if match:
@ -1065,9 +791,9 @@ def parse_query_part(part):
for pre, query_class in prefixes.items():
if term.startswith(pre):
return key, term[len(pre):], query_class
if key and NumericQuery.applies_to(key):
return key, term, NumericQuery
return key, term, SubstringQuery # The default query type.
if key and dbcore.query.NumericQuery.applies_to(key):
return key, term, dbcore.query.NumericQuery
return key, term, dbcore.query.SubstringQuery # Default query type.
def construct_query_part(query_part, default_fields, all_keys):
@ -1089,7 +815,8 @@ def construct_query_part(query_part, default_fields, all_keys):
# The query type matches a specific field, but none was
# specified. So we use a version of the query that matches
# any field.
return AnyFieldQuery(pattern, default_fields, query_class)
return dbcore.query.AnyFieldQuery(pattern, default_fields,
query_class)
else:
# Other query type.
return query_class(pattern)
@ -1098,11 +825,11 @@ def construct_query_part(query_part, default_fields, all_keys):
# A boolean field.
if key.lower() == 'comp':
return BooleanQuery(key, pattern)
return dbcore.query.BooleanQuery(key, pattern)
# Path field.
elif key == 'path' and 'path' in all_keys:
if query_class is SubstringQuery:
if query_class is dbcore.query.SubstringQuery:
# By default, use special path matching logic.
return PathQuery(pattern)
else:
@ -1118,6 +845,22 @@ def construct_query_part(query_part, default_fields, all_keys):
return query_class(key.lower(), pattern, key in all_keys)
def query_from_strings(query_cls, query_parts, default_fields, all_keys):
"""Creates a collection query of type `query-cls` from a list of
strings in the format used by parse_query_part. If default_fields
are specified, they are the fields to be searched by unqualified
search terms. Otherwise, all fields are searched for those terms.
"""
subqueries = []
for part in query_parts:
subq = construct_query_part(part, default_fields, all_keys)
if subq:
subqueries.append(subq)
if not subqueries: # No terms in query.
subqueries = [dbcore.query.TrueQuery()]
return query_cls(subqueries)
def get_query(val, model_cls):
"""Takes a value which may be None, a query string, a query string
list, or a Query object, and returns a suitable Query object.
@ -1128,13 +871,19 @@ def get_query(val, model_cls):
# Convert a single string into a list of space-separated
# criteria.
if isinstance(val, basestring):
val = val.split()
# A bug in Python < 2.7.3 prevents correct shlex splitting of
# Unicode strings.
# http://bugs.python.org/issue6988
if isinstance(val, unicode):
val = val.encode('utf8')
val = [s.decode('utf8') for s in shlex.split(val)]
if val is None:
return TrueQuery()
return dbcore.query.TrueQuery()
elif isinstance(val, list) or isinstance(val, tuple):
return AndQuery.from_strings(val, model_cls._search_fields,
model_cls._fields.keys())
return query_from_strings(dbcore.AndQuery,
val, model_cls._search_fields,
model_cls._fields.keys())
elif isinstance(val, dbcore.Query):
return val
else:
@ -1372,7 +1121,7 @@ class DefaultTemplateFunctions(object):
for key in keys:
value = getattr(album, key)
subqueries.append(dbcore.MatchQuery(key, value))
albums = self.lib.albums(AndQuery(subqueries))
albums = self.lib.albums(dbcore.AndQuery(subqueries))
# If there's only one album to matching these details, then do
# nothing.

View file

@ -58,8 +58,7 @@ class BeetsPlugin(object):
return ()
def queries(self):
"""Should return a dict mapping prefixes to PluginQuery
subclasses.
"""Should return a dict mapping prefixes to Query subclasses.
"""
return {}
@ -232,8 +231,8 @@ def commands():
return out
def queries():
"""Returns a dict mapping prefix strings to beet.library.PluginQuery
subclasses all loaded plugins.
"""Returns a dict mapping prefix strings to Query subclasses all loaded
plugins.
"""
out = {}
for plugin in find_plugins():

View file

@ -31,6 +31,7 @@ import beets.ui
from beets import vfs
from beets.util import bluelet
from beets.library import ITEM_KEYS_WRITABLE
from beets import dbcore
PROTOCOL_VERSION = '0.13.0'
BUFSIZE = 1024
@ -1003,21 +1004,21 @@ class Server(BaseServer):
else:
_, key = self._tagtype_lookup(tag)
queries.append(query_type(key, value))
return beets.library.AndQuery(queries)
return dbcore.query.AndQuery(queries)
else: # No key-value pairs.
return beets.library.TrueQuery()
return dbcore.query.TrueQuery()
def cmd_search(self, conn, *kv):
"""Perform a substring match for items."""
query = self._metadata_query(beets.library.SubstringQuery,
beets.library.AnyFieldQuery,
query = self._metadata_query(dbcore.query.SubstringQuery,
dbcore.query.AnyFieldQuery,
kv)
for item in self.lib.items(query):
yield self._item_info(item)
def cmd_find(self, conn, *kv):
"""Perform an exact match for items."""
query = self._metadata_query(beets.dbcore.MatchQuery,
query = self._metadata_query(dbcore.query.MatchQuery,
None,
kv)
for item in self.lib.items(query):
@ -1028,7 +1029,7 @@ class Server(BaseServer):
filtered by matching match_tag to match_term.
"""
show_tag_canon, show_key = self._tagtype_lookup(show_tag)
query = self._metadata_query(beets.dbcore.MatchQuery, None, kv)
query = self._metadata_query(dbcore.query.MatchQuery, None, kv)
clause, subvals = query.clause()
statement = 'SELECT DISTINCT ' + show_key + \
@ -1047,7 +1048,7 @@ class Server(BaseServer):
_, key = self._tagtype_lookup(tag)
songs = 0
playtime = 0.0
for item in self.lib.items(beets.dbcore.MatchQuery(key, value)):
for item in self.lib.items(dbcore.query.MatchQuery(key, value)):
songs += 1
playtime += item.length
yield u'songs: ' + unicode(songs)

View file

@ -16,7 +16,7 @@
"""
from beets.plugins import BeetsPlugin
from beets.library import StringFieldQuery
from beets.dbcore.query import StringFieldQuery
import beets
import difflib

View file

@ -23,7 +23,7 @@ import os
from beets import ui
from beets import config
from beets import plugins
from beets import library
from beets import dbcore
from beets.util import displayable_path
log = logging.getLogger('beets')
@ -165,7 +165,7 @@ class MPDStats(object):
def get_item(self, path):
"""Return the beets item related to path.
"""
query = library.BytesQuery('path', path)
query = dbcore.query.BytesQuery('path', path)
item = self.lib.items(query).get()
if item:
return item

View file

@ -35,7 +35,7 @@ def update_playlists(lib):
relative_to = normpath(relative_to)
for playlist in playlists:
items = lib.items(library.AndQuery.from_string(playlist['query']))
items = lib.items(library.get_query(playlist['query'], library.Item))
m3us = {}
basename = playlist['name'].encode('utf8')
# As we allow tags in the m3u names, we'll need to iterate through

View file

@ -362,9 +362,9 @@ You can add new kinds of queries to beets' :doc:`query syntax
supports regular expression queries, which are indicated by a colon
prefix---plugins can do the same.
To do so, define a subclass of the ``Query`` type from the ``beets.library``
module. Then, in the ``queries`` method of your plugin class, return a
dictionary mapping prefix strings to query classes.
To do so, define a subclass of the ``Query`` type from the
``beets.dbcore.query`` module. Then, in the ``queries`` method of your plugin
class, return a dictionary mapping prefix strings to query classes.
One simple kind of query you can extend is the ``FieldQuery``, which
implements string comparisons on fields. To use it, create a subclass
@ -375,7 +375,7 @@ plugin will be used if we issue a command like ``beet ls @something`` or
``beet ls artist:@something``::
from beets.plugins import BeetsPlugin
from beets.library import FieldQuery
from beets.dbcore import FieldQuery
class ExactMatchQuery(FieldQuery):
@classmethod

View file

@ -17,6 +17,7 @@
import _common
from _common import unittest
import beets.library
from beets import dbcore
pqp = beets.library.parse_query_part
@ -24,64 +25,64 @@ pqp = beets.library.parse_query_part
class QueryParseTest(_common.TestCase):
def test_one_basic_term(self):
q = 'test'
r = (None, 'test', beets.library.SubstringQuery)
r = (None, 'test', dbcore.query.SubstringQuery)
self.assertEqual(pqp(q), r)
def test_one_keyed_term(self):
q = 'test:val'
r = ('test', 'val', beets.library.SubstringQuery)
r = ('test', 'val', dbcore.query.SubstringQuery)
self.assertEqual(pqp(q), r)
def test_colon_at_end(self):
q = 'test:'
r = (None, 'test:', beets.library.SubstringQuery)
r = (None, 'test:', dbcore.query.SubstringQuery)
self.assertEqual(pqp(q), r)
def test_one_basic_regexp(self):
q = r':regexp'
r = (None, 'regexp', beets.library.RegexpQuery)
r = (None, 'regexp', dbcore.query.RegexpQuery)
self.assertEqual(pqp(q), r)
def test_keyed_regexp(self):
q = r'test::regexp'
r = ('test', 'regexp', beets.library.RegexpQuery)
r = ('test', 'regexp', dbcore.query.RegexpQuery)
self.assertEqual(pqp(q), r)
def test_escaped_colon(self):
q = r'test\:val'
r = (None, 'test:val', beets.library.SubstringQuery)
r = (None, 'test:val', dbcore.query.SubstringQuery)
self.assertEqual(pqp(q), r)
def test_escaped_colon_in_regexp(self):
q = r':test\:regexp'
r = (None, 'test:regexp', beets.library.RegexpQuery)
r = (None, 'test:regexp', dbcore.query.RegexpQuery)
self.assertEqual(pqp(q), r)
def test_single_year(self):
q = 'year:1999'
r = ('year', '1999', beets.library.NumericQuery)
r = ('year', '1999', dbcore.query.NumericQuery)
self.assertEqual(pqp(q), r)
def test_multiple_years(self):
q = 'year:1999..2010'
r = ('year', '1999..2010', beets.library.NumericQuery)
r = ('year', '1999..2010', dbcore.query.NumericQuery)
self.assertEqual(pqp(q), r)
class AnyFieldQueryTest(_common.LibTestCase):
def test_no_restriction(self):
q = beets.library.AnyFieldQuery('title', beets.library.ITEM_KEYS,
beets.library.SubstringQuery)
q = dbcore.query.AnyFieldQuery('title', beets.library.ITEM_KEYS,
dbcore.query.SubstringQuery)
self.assertEqual(self.lib.items(q).get().title, 'the title')
def test_restriction_completeness(self):
q = beets.library.AnyFieldQuery('title', ['title'],
beets.library.SubstringQuery)
q = dbcore.query.AnyFieldQuery('title', ['title'],
dbcore.query.SubstringQuery)
self.assertEqual(self.lib.items(q).get().title, 'the title')
def test_restriction_soundness(self):
q = beets.library.AnyFieldQuery('title', ['artist'],
beets.library.SubstringQuery)
q = dbcore.query.AnyFieldQuery('title', ['artist'],
dbcore.query.SubstringQuery)
self.assertEqual(self.lib.items(q).get(), None)
@ -300,12 +301,12 @@ class GetTest(DummyDataTestCase):
self.assert_matched(results, [u'caf\xe9'])
def test_numeric_search_positive(self):
q = beets.library.NumericQuery('year', '2001')
q = dbcore.query.NumericQuery('year', '2001')
results = self.lib.items(q)
self.assertTrue(results)
def test_numeric_search_negative(self):
q = beets.library.NumericQuery('year', '1999')
q = dbcore.query.NumericQuery('year', '1999')
results = self.lib.items(q)
self.assertFalse(results)
@ -316,43 +317,43 @@ class MatchTest(_common.TestCase):
self.item = _common.item()
def test_regex_match_positive(self):
q = beets.library.RegexpQuery('album', '^the album$')
q = dbcore.query.RegexpQuery('album', '^the album$')
self.assertTrue(q.match(self.item))
def test_regex_match_negative(self):
q = beets.library.RegexpQuery('album', '^album$')
q = dbcore.query.RegexpQuery('album', '^album$')
self.assertFalse(q.match(self.item))
def test_regex_match_non_string_value(self):
q = beets.library.RegexpQuery('disc', '^6$')
q = dbcore.query.RegexpQuery('disc', '^6$')
self.assertTrue(q.match(self.item))
def test_substring_match_positive(self):
q = beets.library.SubstringQuery('album', 'album')
q = dbcore.query.SubstringQuery('album', 'album')
self.assertTrue(q.match(self.item))
def test_substring_match_negative(self):
q = beets.library.SubstringQuery('album', 'ablum')
q = dbcore.query.SubstringQuery('album', 'ablum')
self.assertFalse(q.match(self.item))
def test_substring_match_non_string_value(self):
q = beets.library.SubstringQuery('disc', '6')
q = dbcore.query.SubstringQuery('disc', '6')
self.assertTrue(q.match(self.item))
def test_year_match_positive(self):
q = beets.library.NumericQuery('year', '1')
q = dbcore.query.NumericQuery('year', '1')
self.assertTrue(q.match(self.item))
def test_year_match_negative(self):
q = beets.library.NumericQuery('year', '10')
q = dbcore.query.NumericQuery('year', '10')
self.assertFalse(q.match(self.item))
def test_bitrate_range_positive(self):
q = beets.library.NumericQuery('bitrate', '100000..200000')
q = dbcore.query.NumericQuery('bitrate', '100000..200000')
self.assertTrue(q.match(self.item))
def test_bitrate_range_negative(self):
q = beets.library.NumericQuery('bitrate', '200000..300000')
q = dbcore.query.NumericQuery('bitrate', '200000..300000')
self.assertFalse(q.match(self.item))
@ -432,16 +433,6 @@ class DefaultSearchFieldsTest(DummyDataTestCase):
self.assert_matched(items, [])
class StringParseTest(_common.TestCase):
def test_single_field_query(self):
q = beets.library.AndQuery.from_string(u'albumtype:soundtrack')
self.assertEqual(len(q.subqueries), 1)
subq = q.subqueries[0]
self.assertTrue(isinstance(subq, beets.library.SubstringQuery))
self.assertEqual(subq.field, 'albumtype')
self.assertEqual(subq.pattern, 'soundtrack')
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)