Move query parsing to new dbcore.queryparse

Fix #649.
This commit is contained in:
Adrian Sampson 2014-05-24 17:06:49 -07:00
parent 95a036d510
commit 70b5a44ef4
5 changed files with 208 additions and 163 deletions

View file

@ -18,5 +18,6 @@ Library.
from .db import Model, Database
from .query import Query, FieldQuery, MatchQuery, AndQuery, OrQuery
from .types import Type
from .queryparse import query_from_strings
# flake8: noqa

117
beets/dbcore/queryparse.py Normal file
View file

@ -0,0 +1,117 @@
# This file is part of beets.
# Copyright 2014, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Parsing of strings into DBCore queries.
"""
import re
from . import query
PARSE_QUERY_PART_REGEX = re.compile(
# Non-capturing optional segment for the keyword.
r'(?:'
r'(\S+?)' # The field key.
r'(?<!\\):' # Unescaped :
r')?'
r'(.*)', # The term itself.
re.I # Case-insensitive.
)
def parse_query_part(part, query_classes={}, prefixes={},
default_class=query.SubstringQuery):
"""Take a query in the form of a key/value pair separated by a
colon and return a tuple of `(key, value, cls)`. `key` may be None,
indicating that any field may be matched. `cls` is a subclass of
`FieldQuery`.
The optional `query_classes` parameter maps field names to default
query types; `default_class` is the fallback. `prefixes` is a map
from query prefix markers and query types. Prefix-indicated queries
take precedence over type-based queries.
To determine the query class, two factors are used: prefixes and
field types. For example, the colon prefix denotes a regular
expression query and a type map might provide a special kind of
query for numeric values. If neither a prefix nor a specific query
class is available, `default_class` is used.
For instance,
'stapler' -> (None, 'stapler', SubstringQuery)
'color:red' -> ('color', 'red', SubstringQuery)
':^Quiet' -> (None, '^Quiet', RegexpQuery)
'color::b..e' -> ('color', 'b..e', RegexpQuery)
Prefixes may be "escaped" with a backslash to disable the keying
behavior.
"""
part = part.strip()
match = PARSE_QUERY_PART_REGEX.match(part)
assert match # Regex should always match.
key = match.group(1)
term = match.group(2).replace('\:', ':')
# Match the search term against the list of prefixes.
for pre, query_class in prefixes.items():
if term.startswith(pre):
return key, term[len(pre):], query_class
# No matching prefix: use type-based or fallback/default query.
query_class = query_classes.get(key, default_class)
return key, term, query_class
def construct_query_part(model_cls, prefixes, query_part):
"""Create a query from a single query component, `query_part`, for
querying instances of `model_cls`. Return a `Query` instance.
"""
# Shortcut for empty query parts.
if not query_part:
return query.TrueQuery()
# Set up and parse the string.
query_classes = dict((k, t.query) for (k, t) in model_cls._fields.items())
key, pattern, query_class = \
parse_query_part(query_part, query_classes, prefixes)
# No key specified.
if key is None:
if issubclass(query_class, query.FieldQuery):
# The query type matches a specific field, but none was
# specified. So we use a version of the query that matches
# any field.
return query.AnyFieldQuery(pattern, model_cls._search_fields,
query_class)
else:
# Other query type.
return query_class(pattern)
key = key.lower()
return query_class(key.lower(), pattern, key in model_cls._fields)
def query_from_strings(query_cls, model_cls, prefixes, query_parts):
"""Creates a collection query of type `query_cls` from a list of
strings in the format used by parse_query_part. `model_cls`
determines how queries are constructed from strings.
"""
subqueries = []
for part in query_parts:
subqueries.append(construct_query_part(model_cls, prefixes, part))
if not subqueries: # No terms in query.
subqueries = [query.TrueQuery()]
return query_cls(subqueries)

View file

@ -15,7 +15,6 @@
"""The core data store and collection logic for beets.
"""
import os
import re
import sys
import logging
import shlex
@ -858,107 +857,7 @@ class Album(LibModel):
item.store()
# Query construction and parsing helpers.
PARSE_QUERY_PART_REGEX = re.compile(
# Non-capturing optional segment for the keyword.
r'(?:'
r'(\S+?)' # The field key.
r'(?<!\\):' # Unescaped :
r')?'
r'(.*)', # The term itself.
re.I # Case-insensitive.
)
def parse_query_part(part, query_classes={}, prefixes={},
default_class=dbcore.query.SubstringQuery):
"""Take a query in the form of a key/value pair separated by a
colon and return a tuple of `(key, value, cls)`. `key` may be None,
indicating that any field may be matched. `cls` is a subclass of
`FieldQuery`.
The optional `query_classes` parameter maps field names to default
query types; `default_class` is the fallback. `prefixes` is a map
from query prefix markers and query types. Prefix-indicated queries
take precedence over type-based queries.
To determine the query class, two factors are used: prefixes and
field types. For example, the colon prefix denotes a regular
expression query and a type map might provide a special kind of
query for numeric values. If neither a prefix nor a specific query
class is available, `default_class` is used.
For instance,
'stapler' -> (None, 'stapler', SubstringQuery)
'color:red' -> ('color', 'red', SubstringQuery)
':^Quiet' -> (None, '^Quiet', RegexpQuery)
'color::b..e' -> ('color', 'b..e', RegexpQuery)
Prefixes may be "escaped" with a backslash to disable the keying
behavior.
"""
part = part.strip()
match = PARSE_QUERY_PART_REGEX.match(part)
assert match # Regex should always match.
key = match.group(1)
term = match.group(2).replace('\:', ':')
# Match the search term against the list of prefixes.
for pre, query_class in prefixes.items():
if term.startswith(pre):
return key, term[len(pre):], query_class
# No matching prefix: use type-based or fallback/default query.
query_class = query_classes.get(key, default_class)
return key, term, query_class
def construct_query_part(model_cls, prefixes, query_part):
"""Create a query from a single query component, `query_part`, for
querying instances of `model_cls`. Return a `Query` instance.
"""
# Shortcut for empty query parts.
if not query_part:
return dbcore.query.TrueQuery()
# Set up and parse the string.
query_classes = dict((k, t.query) for (k, t) in model_cls._fields.items())
key, pattern, query_class = \
parse_query_part(query_part, query_classes, prefixes)
# No key specified.
if key is None:
if issubclass(query_class, dbcore.FieldQuery):
# The query type matches a specific field, but none was
# specified. So we use a version of the query that matches
# any field.
return dbcore.query.AnyFieldQuery(pattern,
model_cls._search_fields,
query_class)
else:
# Other query type.
return query_class(pattern)
key = key.lower()
return query_class(key.lower(), pattern, key in model_cls._fields)
def query_from_strings(query_cls, model_cls, prefixes, query_parts):
"""Creates a collection query of type `query_cls` from a list of
strings in the format used by parse_query_part. `model_cls`
determines how queries are constructed from strings.
"""
subqueries = []
for part in query_parts:
subqueries.append(construct_query_part(model_cls, prefixes, part))
if not subqueries: # No terms in query.
subqueries = [dbcore.query.TrueQuery()]
return query_cls(subqueries)
# Query construction helper.
def get_query(val, model_cls):
"""Take a value which may be None, a query string, a query string
@ -984,6 +883,7 @@ def get_query(val, model_cls):
if val is None:
return dbcore.query.TrueQuery()
elif isinstance(val, list) or isinstance(val, tuple):
# Special-case path-like queries, which are non-field queries
# containing path separators (/).
@ -1000,15 +900,19 @@ def get_query(val, model_cls):
path_parts = ()
non_path_parts = val
query = query_from_strings(dbcore.AndQuery, model_cls, prefixes,
non_path_parts)
# Parse remaining parts and construct an AndQuery.
query = dbcore.query_from_strings(
dbcore.AndQuery, model_cls, prefixes, non_path_parts
)
# Add path queries to aggregate query.
if path_parts:
query.subqueries += [PathQuery('path', s) for s in path_parts]
return query
elif isinstance(val, dbcore.Query):
return val
else:
raise ValueError('query must be None or have type Query or str')

View file

@ -302,6 +302,88 @@ class ParseTest(_common.TestCase):
self.assertEqual(value, u'2')
class QueryParseTest(_common.TestCase):
def pqp(self, part):
return dbcore.queryparse.parse_query_part(
part,
{'year': dbcore.query.NumericQuery},
{':': dbcore.query.RegexpQuery},
)
def test_one_basic_term(self):
q = 'test'
r = (None, 'test', dbcore.query.SubstringQuery)
self.assertEqual(self.pqp(q), r)
def test_one_keyed_term(self):
q = 'test:val'
r = ('test', 'val', dbcore.query.SubstringQuery)
self.assertEqual(self.pqp(q), r)
def test_colon_at_end(self):
q = 'test:'
r = ('test', '', dbcore.query.SubstringQuery)
self.assertEqual(self.pqp(q), r)
def test_one_basic_regexp(self):
q = r':regexp'
r = (None, 'regexp', dbcore.query.RegexpQuery)
self.assertEqual(self.pqp(q), r)
def test_keyed_regexp(self):
q = r'test::regexp'
r = ('test', 'regexp', dbcore.query.RegexpQuery)
self.assertEqual(self.pqp(q), r)
def test_escaped_colon(self):
q = r'test\:val'
r = (None, 'test:val', dbcore.query.SubstringQuery)
self.assertEqual(self.pqp(q), r)
def test_escaped_colon_in_regexp(self):
q = r':test\:regexp'
r = (None, 'test:regexp', dbcore.query.RegexpQuery)
self.assertEqual(self.pqp(q), r)
def test_single_year(self):
q = 'year:1999'
r = ('year', '1999', dbcore.query.NumericQuery)
self.assertEqual(self.pqp(q), r)
def test_multiple_years(self):
q = 'year:1999..2010'
r = ('year', '1999..2010', dbcore.query.NumericQuery)
self.assertEqual(self.pqp(q), r)
def test_empty_query_part(self):
q = ''
r = (None, '', dbcore.query.SubstringQuery)
self.assertEqual(self.pqp(q), r)
class QueryFromStringsTest(_common.TestCase):
def qfs(self, strings):
return dbcore.queryparse.query_from_strings(
dbcore.query.AndQuery,
TestModel1,
{':': dbcore.query.RegexpQuery},
strings,
)
def test_zero_parts(self):
q = self.qfs([])
self.assertIsInstance(q, dbcore.query.AndQuery)
self.assertEqual(len(q.subqueries), 1)
self.assertIsInstance(q.subqueries[0], dbcore.query.TrueQuery)
def test_two_parts(self):
q = self.qfs(['foo', 'bar:baz'])
self.assertIsInstance(q, dbcore.query.AndQuery)
self.assertEqual(len(q.subqueries), 2)
self.assertIsInstance(q.subqueries[0], dbcore.query.AnyFieldQuery)
self.assertIsInstance(q.subqueries[1], dbcore.query.SubstringQuery)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View file

@ -20,65 +20,6 @@ import beets.library
from beets import dbcore
class QueryParseTest(_common.TestCase):
def pqp(self, part):
return beets.library.parse_query_part(
part,
{'year': dbcore.query.NumericQuery},
{':': dbcore.query.RegexpQuery},
)
def test_one_basic_term(self):
q = 'test'
r = (None, 'test', dbcore.query.SubstringQuery)
self.assertEqual(self.pqp(q), r)
def test_one_keyed_term(self):
q = 'test:val'
r = ('test', 'val', dbcore.query.SubstringQuery)
self.assertEqual(self.pqp(q), r)
def test_colon_at_end(self):
q = 'test:'
r = ('test', '', dbcore.query.SubstringQuery)
self.assertEqual(self.pqp(q), r)
def test_one_basic_regexp(self):
q = r':regexp'
r = (None, 'regexp', dbcore.query.RegexpQuery)
self.assertEqual(self.pqp(q), r)
def test_keyed_regexp(self):
q = r'test::regexp'
r = ('test', 'regexp', dbcore.query.RegexpQuery)
self.assertEqual(self.pqp(q), r)
def test_escaped_colon(self):
q = r'test\:val'
r = (None, 'test:val', dbcore.query.SubstringQuery)
self.assertEqual(self.pqp(q), r)
def test_escaped_colon_in_regexp(self):
q = r':test\:regexp'
r = (None, 'test:regexp', dbcore.query.RegexpQuery)
self.assertEqual(self.pqp(q), r)
def test_single_year(self):
q = 'year:1999'
r = ('year', '1999', dbcore.query.NumericQuery)
self.assertEqual(self.pqp(q), r)
def test_multiple_years(self):
q = 'year:1999..2010'
r = ('year', '1999..2010', dbcore.query.NumericQuery)
self.assertEqual(self.pqp(q), r)
def test_empty_query_part(self):
q = ''
r = (None, '', dbcore.query.SubstringQuery)
self.assertEqual(self.pqp(q), r)
class AnyFieldQueryTest(_common.LibTestCase):
def test_no_restriction(self):
q = dbcore.query.AnyFieldQuery(