From f2c8e9ff07846e4578bf9d9f185b136334843645 Mon Sep 17 00:00:00 2001 From: Diego Moreda Date: Mon, 16 Nov 2015 21:15:48 +0100 Subject: [PATCH] Add "not" query operator, initial draft * Add support for user friendly "not" operator in queries without requiring to use regular expressions, via adding NotQuery to beets.dbcore.query. * Update the prefix list at library.parse_query_parts() and the query parsing mechanisms in order to create a NotQuery when the prefix is found. * Add two TestCases, NotQueryMatchTest as the negated version of MatchTest, and the more generic NotQueryTest for testing the integration of the NotQuery with the rest of the existing Query subclasses. --- beets/dbcore/query.py | 18 ++++ beets/dbcore/queryparse.py | 22 +++- beets/library.py | 4 +- test/test_query.py | 200 +++++++++++++++++++++++++++++++++++++ 4 files changed, 240 insertions(+), 4 deletions(-) diff --git a/beets/dbcore/query.py b/beets/dbcore/query.py index c04e734b8..4681f2044 100644 --- a/beets/dbcore/query.py +++ b/beets/dbcore/query.py @@ -447,6 +447,24 @@ class OrQuery(MutableCollectionQuery): return any([q.match(item) for q in self.subqueries]) +class NotQuery(MutableCollectionQuery): + """A query that matches the negation of its `subquery`, as a shorcut for + performing `not(subquery)` without using regular expressions. + + TODO: revise class hierarchy, probably limiting to one subquery + """ + def clause(self): + clause, subvals = self.clause_with_joiner('TODO') + if clause: + return 'not ({0})'.format(clause), subvals + else: + # special case for RegexpQuery, (None, ()) + return clause, subvals + + def match(self, item): + return not all([q.match(item) for q in self.subqueries]) + + class TrueQuery(Query): """A query that always matches.""" def clause(self): diff --git a/beets/dbcore/queryparse.py b/beets/dbcore/queryparse.py index 89b7bd64a..88bcbd601 100644 --- a/beets/dbcore/queryparse.py +++ b/beets/dbcore/queryparse.py @@ -96,17 +96,33 @@ def construct_query_part(model_cls, prefixes, query_part): key, pattern, query_class = \ parse_query_part(query_part, query_classes, prefixes) + # Handle negation: set negation flag, and recover the original query_class. + negate = query_class is query.NotQuery + if negate: + query_class = query_classes.get(key, query.SubstringQuery) + # No key specified. if key is None: if issubclass(query_class, query.FieldQuery): # The query type matches a specific field, but none was # specified. So we use a version of the query that matches # any field. - return query.AnyFieldQuery(pattern, model_cls._search_fields, - query_class) + q = query.AnyFieldQuery(pattern, model_cls._search_fields, + query_class) + if negate: + return query.NotQuery([q]) + else: + return q else: # Other query type. - return query_class(pattern) + if negate: + return query.NotQuery([query_class(pattern)]) + else: + return query_class(pattern) + else: + if negate: + return query.NotQuery([query_class(key.lower(), pattern, + key in model_cls._fields)]) key = key.lower() return query_class(key.lower(), pattern, key in model_cls._fields) diff --git a/beets/library.py b/beets/library.py index cad39c232..37ab005b8 100644 --- a/beets/library.py +++ b/beets/library.py @@ -1105,7 +1105,9 @@ def parse_query_parts(parts, model_cls): special path query detection. """ # Get query types and their prefix characters. - prefixes = {':': dbcore.query.RegexpQuery} + prefixes = {':': dbcore.query.RegexpQuery, + # TODO: decide on a better symbol, or other syntax + u'\u00ac': dbcore.query.NotQuery} prefixes.update(plugins.queries()) # Special-case path-like queries, which are non-field queries diff --git a/test/test_query.py b/test/test_query.py index 0515c2910..68558c054 100644 --- a/test/test_query.py +++ b/test/test_query.py @@ -714,6 +714,206 @@ class NoneQueryTest(unittest.TestCase, TestHelper): self.assertInResult(item, matched) +class NotQueryMatchTest(_common.TestCase): + """Test `query.NotQuery` matching against a single item, using the same + cases and assertions as on `MatchTest`, plus assertion on the negated + queries (ie. assertTrue(q) -> assertFalse(NotQuery(q))). + """ + def setUp(self): + super(NotQueryMatchTest, self).setUp() + self.item = _common.item() + + def test_regex_match_positive(self): + q = dbcore.query.RegexpQuery('album', '^the album$') + self.assertTrue(q.match(self.item)) + self.assertFalse(dbcore.query.NotQuery((q,)).match(self.item)) + + def test_regex_match_negative(self): + q = dbcore.query.RegexpQuery('album', '^album$') + self.assertFalse(q.match(self.item)) + self.assertTrue(dbcore.query.NotQuery((q,)).match(self.item)) + + def test_regex_match_non_string_value(self): + q = dbcore.query.RegexpQuery('disc', '^6$') + self.assertTrue(q.match(self.item)) + self.assertFalse(dbcore.query.NotQuery((q,)).match(self.item)) + + def test_substring_match_positive(self): + q = dbcore.query.SubstringQuery('album', 'album') + self.assertTrue(q.match(self.item)) + self.assertFalse(dbcore.query.NotQuery((q,)).match(self.item)) + + def test_substring_match_negative(self): + q = dbcore.query.SubstringQuery('album', 'ablum') + self.assertFalse(q.match(self.item)) + self.assertTrue(dbcore.query.NotQuery((q,)).match(self.item)) + + def test_substring_match_non_string_value(self): + q = dbcore.query.SubstringQuery('disc', '6') + self.assertTrue(q.match(self.item)) + self.assertFalse(dbcore.query.NotQuery((q,)).match(self.item)) + + def test_year_match_positive(self): + q = dbcore.query.NumericQuery('year', '1') + self.assertTrue(q.match(self.item)) + self.assertFalse(dbcore.query.NotQuery((q,)).match(self.item)) + + def test_year_match_negative(self): + q = dbcore.query.NumericQuery('year', '10') + self.assertFalse(q.match(self.item)) + self.assertTrue(dbcore.query.NotQuery((q,)).match(self.item)) + + def test_bitrate_range_positive(self): + q = dbcore.query.NumericQuery('bitrate', '100000..200000') + self.assertTrue(q.match(self.item)) + self.assertFalse(dbcore.query.NotQuery((q,)).match(self.item)) + + def test_bitrate_range_negative(self): + q = dbcore.query.NumericQuery('bitrate', '200000..300000') + self.assertFalse(q.match(self.item)) + self.assertTrue(dbcore.query.NotQuery((q,)).match(self.item)) + + def test_open_range(self): + q = dbcore.query.NumericQuery('bitrate', '100000..') + dbcore.query.NotQuery((q,)) + + +class NotQueryTest(DummyDataTestCase): + """Test `query.NotQuery` against the dummy data: + - `test_type_xxx`: tests for the negation of a particular XxxQuery class. + + TODO: add test_type_bytes, for ByteQuery? + """ + def assertNegationProperties(self, q): + """Given a Query `q`, assert that: + - q OR not(q) == all items + - q AND not(q) == 0 + - not(not(q)) == q + """ + not_q = dbcore.query.NotQuery([q]) + # assert using OrQuery, AndQuery + q_or = dbcore.query.OrQuery([q, not_q]) + q_and = dbcore.query.AndQuery([q, not_q]) + self.assert_items_matched_all(self.lib.items(q_or)) + self.assert_items_matched(self.lib.items(q_and), []) + + # assert manually checking the item titles + all_titles = set([i.title for i in self.lib.items()]) + q_results = set([i.title for i in self.lib.items(q)]) + not_q_results = set([i.title for i in self.lib.items(not_q)]) + self.assertEqual(q_results.union(not_q_results), all_titles) + self.assertEqual(q_results.intersection(not_q_results), set()) + + # round trip + not_not_q = dbcore.query.NotQuery([not_q]) + self.assertEqual([i.title for i in self.lib.items(q)], + [i.title for i in self.lib.items(not_not_q)]) + + def test_type_and(self): + # not(a and b) <-> not(a) or not(b) + q = dbcore.query.AndQuery([dbcore.query.BooleanQuery('comp', True), + dbcore.query.NumericQuery('year', '2002')]) + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, ['foo bar', 'beets 4 eva']) + self.assertNegationProperties(q) + + def test_type_anyfield(self): + q = dbcore.query.AnyFieldQuery('foo', ['title', 'artist', 'album'], + dbcore.query.SubstringQuery) + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, ['baz qux']) + self.assertNegationProperties(q) + + def test_type_boolean(self): + q = dbcore.query.BooleanQuery('comp', True) + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, ['beets 4 eva']) + self.assertNegationProperties(q) + + def test_type_date(self): + q = dbcore.query.DateQuery('mtime', '0.0') + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, []) + self.assertNegationProperties(q) + + def test_type_false(self): + q = dbcore.query.FalseQuery() + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched_all(not_results) + self.assertNegationProperties(q) + + def test_type_match(self): + q = dbcore.query.MatchQuery('year', '2003') + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, ['foo bar', 'baz qux']) + self.assertNegationProperties(q) + + def test_type_none(self): + q = dbcore.query.NoneQuery('rg_track_gain') + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, []) + self.assertNegationProperties(q) + + def test_type_numeric(self): + q = dbcore.query.NumericQuery('year', '2001..2002') + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, ['beets 4 eva']) + self.assertNegationProperties(q) + + def test_type_or(self): + # not(a or b) <-> not(a) and not(b) + q = dbcore.query.OrQuery([dbcore.query.BooleanQuery('comp', True), + dbcore.query.NumericQuery('year', '2002')]) + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, ['beets 4 eva']) + self.assertNegationProperties(q) + + def test_type_regexp(self): + q = dbcore.query.RegexpQuery('artist', '^t') + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, ['foo bar']) + self.assertNegationProperties(q) + + def test_type_substring(self): + q = dbcore.query.SubstringQuery('album', 'ba') + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, ['beets 4 eva']) + self.assertNegationProperties(q) + + def test_type_true(self): + q = dbcore.query.TrueQuery() + not_results = self.lib.items(dbcore.query.NotQuery([q])) + self.assert_items_matched(not_results, []) + self.assertNegationProperties(q) + + def test_fast_vs_slow(self): + """Test that the results are the same regardless of the `fast` flag + for negated `FieldQuery`s. + + TODO: investigate NoneQuery(fast=False), as it is raising + AttributeError: type object 'NoneQuery' has no attribute 'field' + at NoneQuery.match() (due to being @classmethod, and no self?) + """ + classes = [(dbcore.query.DateQuery, ['mtime', '0.0']), + (dbcore.query.MatchQuery, ['artist', 'one']), + # (dbcore.query.NoneQuery, ['rg_track_gain']), + (dbcore.query.NumericQuery, ['year', '2002']), + (dbcore.query.StringFieldQuery, ['year', '2001']), + (dbcore.query.RegexpQuery, ['album', '^.a']), + (dbcore.query.SubstringQuery, ['title', 'x'])] + + for klass, args in classes: + q_fast = dbcore.query.NotQuery([klass(*(args + [True]))]) + q_slow = dbcore.query.NotQuery([klass(*(args + [False]))]) + + try: + self.assertEqual([i.title for i in self.lib.items(q_fast)], + [i.title for i in self.lib.items(q_slow)]) + except NotImplementedError: + # ignore classes that do not provide `fast` implementation + pass + + def suite(): return unittest.TestLoader().loadTestsFromName(__name__)