diff --git a/beetsplug/smartplaylist.py b/beetsplug/smartplaylist.py index 8203ce4ef..ed417f2b9 100644 --- a/beetsplug/smartplaylist.py +++ b/beetsplug/smartplaylist.py @@ -14,14 +14,16 @@ """Generates smart playlists based on beets queries.""" +from __future__ import annotations + import os +from typing import Any, TypeAlias from urllib.parse import quote from urllib.request import pathname2url from beets import ui -from beets.dbcore import OrQuery -from beets.dbcore.query import MultipleSort, ParsingError -from beets.library import Album, Item, parse_query_string +from beets.dbcore.query import ParsingError, Query, Sort +from beets.library import Album, Item, Library, parse_query_string from beets.plugins import BeetsPlugin from beets.plugins import send as send_event from beets.util import ( @@ -34,9 +36,17 @@ from beets.util import ( syspath, ) +QueryAndSort = tuple[Query, Sort] +PlaylistQuery = Query | tuple[QueryAndSort, ...] | None +PlaylistMatch: TypeAlias = tuple[ + str, + tuple[PlaylistQuery, Sort | None], + tuple[PlaylistQuery, Sort | None], +] + class SmartPlaylistPlugin(BeetsPlugin): - def __init__(self): + def __init__(self) -> None: super().__init__() self.config.add( { @@ -55,13 +65,13 @@ class SmartPlaylistPlugin(BeetsPlugin): ) self.config["prefix"].redact = True # May contain username/password. - self._matched_playlists = None - self._unmatched_playlists = None + self._matched_playlists: set[PlaylistMatch] = set() + self._unmatched_playlists: set[PlaylistMatch] = set() if self.config["auto"]: self.register_listener("database_change", self.db_change) - def commands(self): + def commands(self) -> list[ui.Subcommand]: spl_update = ui.Subcommand( "splupdate", help="update the smart playlists. Playlist names may be " @@ -124,18 +134,18 @@ class SmartPlaylistPlugin(BeetsPlugin): spl_update.func = self.update_cmd return [spl_update] - def update_cmd(self, lib, opts, args): + def update_cmd(self, lib: Library, opts: Any, args: list[str]) -> None: self.build_queries() if args: - args = set(args) - for a in list(args): + args_set = set(args) + for a in list(args_set): if not a.endswith(".m3u"): - args.add(f"{a}.m3u") + args_set.add(f"{a}.m3u") playlists = { (name, q, a_q) for name, q, a_q in self._unmatched_playlists - if name in args + if name in args_set } if not playlists: unmatched = [name for name, _, _ in self._unmatched_playlists] @@ -151,16 +161,32 @@ class SmartPlaylistPlugin(BeetsPlugin): self.__apply_opts_to_config(opts) self.update_playlists(lib, opts.pretend) - def __apply_opts_to_config(self, opts): + def __apply_opts_to_config(self, opts: Any) -> None: for k, v in opts.__dict__.items(): if v is not None and k in self.config: self.config[k] = v - def build_queries(self): + def _parse_one_query( + self, playlist: dict[str, Any], key: str, model_cls: type + ) -> tuple[PlaylistQuery, Sort | None]: + qs = playlist.get(key) + if qs is None: + return None, None + if isinstance(qs, str): + return parse_query_string(qs, model_cls) + if len(qs) == 1: + return parse_query_string(qs[0], model_cls) + + queries_and_sorts: tuple[QueryAndSort, ...] = tuple( + parse_query_string(q, model_cls) for q in qs + ) + return queries_and_sorts, None + + def build_queries(self) -> None: """ Instantiate queries for the playlists. - Each playlist has 2 queries: one or items one for albums, each with a + Each playlist has 2 queries: one for items, one for albums, each with a sort. We must also remember its name. _unmatched_playlists is a set of tuples (name, (q, q_sort), (album_q, album_q_sort)). @@ -169,7 +195,7 @@ class SmartPlaylistPlugin(BeetsPlugin): More precisely - it will be NullSort when a playlist query ('query' or 'album_query') is a single item or a list with 1 element - - it will be None when there are multiple items i a query + - it will be None when there are multiple items in a query """ self._unmatched_playlists = set() self._matched_playlists = set() @@ -179,55 +205,37 @@ class SmartPlaylistPlugin(BeetsPlugin): self._log.warning("playlist configuration is missing name") continue - playlist_data = (playlist["name"],) try: - for key, model_cls in (("query", Item), ("album_query", Album)): - qs = playlist.get(key) - if qs is None: - query_and_sort = None, None - elif isinstance(qs, str): - query_and_sort = parse_query_string(qs, model_cls) - elif len(qs) == 1: - query_and_sort = parse_query_string(qs[0], model_cls) - else: - # multiple queries and sorts - queries, sorts = zip( - *(parse_query_string(q, model_cls) for q in qs) - ) - query = OrQuery(queries) - final_sorts = [] - for s in sorts: - if s: - if isinstance(s, MultipleSort): - final_sorts += s.sorts - else: - final_sorts.append(s) - if not final_sorts: - sort = None - elif len(final_sorts) == 1: - (sort,) = final_sorts - else: - sort = MultipleSort(final_sorts) - query_and_sort = query, sort - - playlist_data += (query_and_sort,) - + q_match = self._parse_one_query(playlist, "query", Item) + a_match = self._parse_one_query(playlist, "album_query", Album) except ParsingError as exc: self._log.warning( "invalid query in playlist {}: {}", playlist["name"], exc ) continue - self._unmatched_playlists.add(playlist_data) + self._unmatched_playlists.add((playlist["name"], q_match, a_match)) - def matches(self, model, query, album_query): - if album_query and isinstance(model, Album): - return album_query.match(model) - if query and isinstance(model, Item): - return query.match(model) + def _matches_query(self, model: Item | Album, query: PlaylistQuery) -> bool: + if not query: + return False + if isinstance(query, (list, tuple)): + return any(q.match(model) for q, _ in query) + return query.match(model) + + def matches( + self, + model: Item | Album, + query: PlaylistQuery, + album_query: PlaylistQuery, + ) -> bool: + if isinstance(model, Album): + return self._matches_query(model, album_query) + if isinstance(model, Item): + return self._matches_query(model, query) return False - def db_change(self, lib, model): + def db_change(self, lib: Library, model: Item | Album) -> None: if self._unmatched_playlists is None: self.build_queries() @@ -240,7 +248,7 @@ class SmartPlaylistPlugin(BeetsPlugin): self._unmatched_playlists -= self._matched_playlists - def update_playlists(self, lib, pretend=False): + def update_playlists(self, lib: Library, pretend: bool = False) -> None: if pretend: self._log.info( "Showing query results for {} smart playlists...", @@ -260,7 +268,7 @@ class SmartPlaylistPlugin(BeetsPlugin): relative_to = normpath(relative_to) # Maps playlist filenames to lists of track filenames. - m3us = {} + m3us: dict[str, list[PlaylistItem]] = {} for playlist in self._matched_playlists: name, (query, q_sort), (album_query, a_q_sort) = playlist @@ -270,9 +278,28 @@ class SmartPlaylistPlugin(BeetsPlugin): self._log.info("Creating playlist {}", name) items = [] - if query: + # Handle tuple/list of queries (preserves order) + # Track seen items to avoid duplicates when an item matches + # multiple queries + seen_ids = set() + + if isinstance(query, (list, tuple)): + for q, sort in query: + for item in lib.items(q, sort): + if item.id not in seen_ids: + items.append(item) + seen_ids.add(item.id) + elif query: items.extend(lib.items(query, q_sort)) - if album_query: + + if isinstance(album_query, (list, tuple)): + for q, sort in album_query: + for album in lib.albums(q, sort): + for item in album.items(): + if item.id not in seen_ids: + items.append(item) + seen_ids.add(item.id) + elif album_query: for album in lib.albums(album_query, a_q_sort): items.extend(album.items()) @@ -292,7 +319,9 @@ class SmartPlaylistPlugin(BeetsPlugin): if self.config["forward_slash"].get(): item_uri = path_as_posix(item_uri) if self.config["urlencode"]: - item_uri = bytestring_path(pathname2url(item_uri)) + item_uri = bytestring_path( + pathname2url(os.fsdecode(item_uri)) + ) item_uri = prefix + item_uri if item_uri not in m3us[m3u_name]: @@ -336,7 +365,7 @@ class SmartPlaylistPlugin(BeetsPlugin): ) f.write(comment.encode("utf-8") + entry.uri + b"\n") # Send an event when playlists were updated. - send_event("smartplaylist_update") + send_event("smartplaylist_update") # type: ignore if pretend: self._log.info( @@ -348,6 +377,6 @@ class SmartPlaylistPlugin(BeetsPlugin): class PlaylistItem: - def __init__(self, item, uri): + def __init__(self, item: Item, uri: bytes) -> None: self.item = item self.uri = uri diff --git a/docs/changelog.rst b/docs/changelog.rst index b9a5c1f3f..475e56634 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -34,6 +34,10 @@ New features: Bug fixes: +- :doc:`/plugins/smartplaylist`: Fixed an issue where multiple queries in a + playlist configuration were not preserving their order, causing items to + appear in database order rather than the order specified in the config. + :bug:`6183` - :doc:`plugins/inline`: Fix recursion error when an inline field definition shadows a built-in item field (e.g., redefining ``track_no``). Inline expressions now skip self-references during evaluation to avoid infinite diff --git a/test/plugins/test_smartplaylist.py b/test/plugins/test_smartplaylist.py index d3569d836..8ec2c74ce 100644 --- a/test/plugins/test_smartplaylist.py +++ b/test/plugins/test_smartplaylist.py @@ -22,7 +22,6 @@ from unittest.mock import MagicMock, Mock, PropertyMock import pytest from beets import config -from beets.dbcore import OrQuery from beets.dbcore.query import FixedFieldSort, MultipleSort, NullSort from beets.library import Album, Item, parse_query_string from beets.test.helper import BeetsTestCase, PluginTestCase @@ -34,8 +33,8 @@ from beetsplug.smartplaylist import SmartPlaylistPlugin class SmartPlaylistTest(BeetsTestCase): def test_build_queries(self): spl = SmartPlaylistPlugin() - assert spl._matched_playlists is None - assert spl._unmatched_playlists is None + assert spl._matched_playlists == set() + assert spl._unmatched_playlists == set() config["smartplaylist"]["playlists"].set([]) spl.build_queries() @@ -54,16 +53,17 @@ class SmartPlaylistTest(BeetsTestCase): foo_foo = parse_query_string("FOO foo", Item) baz_baz = parse_query_string("BAZ baz", Item) baz_baz2 = parse_query_string("BAZ baz", Album) - bar_bar = OrQuery( - ( - parse_query_string("BAR bar1", Album)[0], - parse_query_string("BAR bar2", Album)[0], - ) + # Multiple queries are now stored as a tuple of (query, sort) tuples + bar_queries = tuple( + [ + parse_query_string("BAR bar1", Album), + parse_query_string("BAR bar2", Album), + ] ) assert spl._unmatched_playlists == { ("foo", foo_foo, (None, None)), ("baz", baz_baz, baz_baz2), - ("bar", (None, None), (bar_bar, None)), + ("bar", (None, None), (bar_queries, None)), } def test_build_queries_with_sorts(self): @@ -86,19 +86,28 @@ class SmartPlaylistTest(BeetsTestCase): ) spl.build_queries() - sorts = {name: sort for name, (_, sort), _ in spl._unmatched_playlists} + + # Multiple queries now return a tuple of (query, sort) tuples, not combined + sorts = {} + for name, (query_data, sort), _ in spl._unmatched_playlists: + if isinstance(query_data, tuple): + # Tuple of queries - each has its own sort + sorts[name] = [s for _, s in query_data] + else: + sorts[name] = sort sort = FixedFieldSort # short cut since we're only dealing with this assert sorts["no_sort"] == NullSort() assert sorts["one_sort"] == sort("year") - assert sorts["only_empty_sorts"] is None - assert sorts["one_non_empty_sort"] == sort("year") - assert sorts["multiple_sorts"] == MultipleSort( - [sort("year"), sort("genre", False)] - ) - assert sorts["mixed"] == MultipleSort( - [sort("year"), sort("genre"), sort("id", False)] - ) + # Multiple queries store individual sorts in the tuple + assert all(isinstance(x, NullSort) for x in sorts["only_empty_sorts"]) + assert sorts["one_non_empty_sort"] == [sort("year"), NullSort()] + assert sorts["multiple_sorts"] == [sort("year"), sort("genre", False)] + assert sorts["mixed"] == [ + sort("year"), + NullSort(), + MultipleSort([sort("genre"), sort("id", False)]), + ] def test_matches(self): spl = SmartPlaylistPlugin() @@ -122,6 +131,15 @@ class SmartPlaylistTest(BeetsTestCase): assert spl.matches(i, query, a_query) assert spl.matches(a, query, a_query) + # Test with list of queries + q1 = Mock() + q1.match.return_value = False + q2 = Mock() + q2.match.side_effect = {i: True}.__getitem__ + queries_list = [(q1, None), (q2, None)] + assert spl.matches(i, queries_list, None) + assert not spl.matches(a, queries_list, None) + def test_db_changes(self): spl = SmartPlaylistPlugin() @@ -164,7 +182,7 @@ class SmartPlaylistTest(BeetsTestCase): q = Mock() a_q = Mock() pl = b"$title-my.m3u", (q, None), (a_q, None) - spl._matched_playlists = [pl] + spl._matched_playlists = {pl} dir = mkdtemp() config["smartplaylist"]["relative_to"] = False @@ -206,7 +224,7 @@ class SmartPlaylistTest(BeetsTestCase): q = Mock() a_q = Mock() pl = b"$title-my.m3u", (q, None), (a_q, None) - spl._matched_playlists = [pl] + spl._matched_playlists = {pl} dir = mkdtemp() config["smartplaylist"]["output"] = "extm3u" @@ -256,7 +274,7 @@ class SmartPlaylistTest(BeetsTestCase): q = Mock() a_q = Mock() pl = b"$title-my.m3u", (q, None), (a_q, None) - spl._matched_playlists = [pl] + spl._matched_playlists = {pl} dir = mkdtemp() config["smartplaylist"]["output"] = "extm3u" @@ -301,7 +319,7 @@ class SmartPlaylistTest(BeetsTestCase): q = Mock() a_q = Mock() pl = b"$title-my.m3u", (q, None), (a_q, None) - spl._matched_playlists = [pl] + spl._matched_playlists = {pl} dir = mkdtemp() tpl = "http://beets:8337/item/$id/file" @@ -327,6 +345,118 @@ class SmartPlaylistTest(BeetsTestCase): assert content == b"http://beets:8337/item/3/file\n" + def test_playlist_update_multiple_queries_preserve_order(self): + """Test that multiple queries preserve their order in the playlist.""" + spl = SmartPlaylistPlugin() + + # Create three mock items + i1 = Mock(path=b"/item1.mp3", id=1) + i1.evaluate_template.return_value = "ordered.m3u" + i2 = Mock(path=b"/item2.mp3", id=2) + i2.evaluate_template.return_value = "ordered.m3u" + i3 = Mock(path=b"/item3.mp3", id=3) + i3.evaluate_template.return_value = "ordered.m3u" + + lib = Mock() + lib.replacements = CHAR_REPLACE + lib.albums.return_value = [] + + # Set up lib.items to return different items for different queries + q1 = Mock() + q2 = Mock() + q3 = Mock() + + def items_side_effect(query, sort): + if query == q1: + return [i1] + elif query == q2: + return [i2] + elif query == q3: + return [i3] + return [] + + lib.items.side_effect = items_side_effect + + # Create playlist with multiple queries (stored as tuple) + queries_and_sorts = ((q1, None), (q2, None), (q3, None)) + pl = "ordered.m3u", (queries_and_sorts, None), (None, None) + spl._matched_playlists = {pl} + + dir = mkdtemp() + config["smartplaylist"]["relative_to"] = False + config["smartplaylist"]["playlist_dir"] = str(dir) + try: + spl.update_playlists(lib) + except Exception: + rmtree(syspath(dir)) + raise + + # Verify that lib.items was called with queries in the correct order + assert lib.items.call_count == 3 + lib.items.assert_any_call(q1, None) + lib.items.assert_any_call(q2, None) + lib.items.assert_any_call(q3, None) + + m3u_filepath = Path(dir, "ordered.m3u") + assert m3u_filepath.exists() + content = m3u_filepath.read_bytes() + rmtree(syspath(dir)) + + # Items should be in order: i1, i2, i3 + assert content == b"/item1.mp3\n/item2.mp3\n/item3.mp3\n" + + def test_playlist_update_multiple_queries_no_duplicates(self): + """Test that items matching multiple queries only appear once.""" + spl = SmartPlaylistPlugin() + + # Create two mock items + i1 = Mock(path=b"/item1.mp3", id=1) + i1.evaluate_template.return_value = "dedup.m3u" + i2 = Mock(path=b"/item2.mp3", id=2) + i2.evaluate_template.return_value = "dedup.m3u" + + lib = Mock() + lib.replacements = CHAR_REPLACE + lib.albums.return_value = [] + + # Set up lib.items so both queries return overlapping items + q1 = Mock() + q2 = Mock() + + def items_side_effect(query, sort): + if query == q1: + return [i1, i2] # Both items match q1 + elif query == q2: + return [i2] # Only i2 matches q2 + return [] + + lib.items.side_effect = items_side_effect + + # Create playlist with multiple queries (stored as tuple) + queries_and_sorts = ((q1, None), (q2, None)) + pl = "dedup.m3u", (queries_and_sorts, None), (None, None) + spl._matched_playlists = {pl} + + dir = mkdtemp() + config["smartplaylist"]["relative_to"] = False + config["smartplaylist"]["playlist_dir"] = str(dir) + try: + spl.update_playlists(lib) + except Exception: + rmtree(syspath(dir)) + raise + + m3u_filepath = Path(dir, "dedup.m3u") + assert m3u_filepath.exists() + content = m3u_filepath.read_bytes() + rmtree(syspath(dir)) + + # i2 should only appear once even though it matches both queries + # Order should be: i1 (from q1), i2 (from q1, skipped in q2) + assert content == b"/item1.mp3\n/item2.mp3\n" + # Verify i2 is not duplicated + assert content.count(b"/item2.mp3") == 1 + class SmartPlaylistCLITest(PluginTestCase): plugin = "smartplaylist"