diff --git a/beetsplug/duplicates.py b/beetsplug/duplicates.py index 904e19262..2d2e0d0c0 100644 --- a/beetsplug/duplicates.py +++ b/beetsplug/duplicates.py @@ -14,8 +14,11 @@ """List duplicate tracks or albums.""" +from __future__ import annotations + import os import shlex +from typing import TYPE_CHECKING, Any, TypeAlias, cast from beets.library import Album, Item from beets.plugins import BeetsPlugin @@ -28,8 +31,23 @@ from beets.util import ( subprocess, ) +if TYPE_CHECKING: + import optparse + from collections import defaultdict + from collections.abc import Iterator + + from beets.dbcore.db import Results + from beets.library.library import Library + PLUGIN = "duplicates" +# Value of `tiebreak` config item. +# Really the key is "item" or "album". +Tiebreak: TypeAlias = dict[str, list[str]] +# Attribute values; these are formed by taking the values of `keys` (a list of +# strings) on the items. These are used as dict keys in a bunch of places. +KeyValues: TypeAlias = tuple[Any] + class DuplicatesPlugin(BeetsPlugin): """List duplicate tracks or albums""" @@ -57,7 +75,9 @@ class DuplicatesPlugin(BeetsPlugin): } ) - self._command = Subcommand("duplicates", help=__doc__, aliases=["dup"]) + self._command = Subcommand( + "duplicates", help=cast(str, __doc__), aliases=["dup"] + ) self._command.parser.add_option( "-c", "--count", @@ -142,23 +162,23 @@ class DuplicatesPlugin(BeetsPlugin): self._command.parser.add_all_common_options() def commands(self): - def _dup(lib, opts, args): + def _dup(lib: Library, opts: optparse.Values, args: list[str]): self.config.set_args(opts) - album = self.config["album"].get(bool) - checksum = self.config["checksum"].get(str) - copy = bytestring_path(self.config["copy"].as_str()) - count = self.config["count"].get(bool) - delete = self.config["delete"].get(bool) - remove = self.config["remove"].get(bool) - fmt_tmpl = self.config["format"].get(str) - full = self.config["full"].get(bool) - keys = self.config["keys"].as_str_seq() - merge = self.config["merge"].get(bool) - move = bytestring_path(self.config["move"].as_str()) - path = self.config["path"].get(bool) - tiebreak = self.config["tiebreak"].get(dict) - strict = self.config["strict"].get(bool) - tag = self.config["tag"].get(str) + album: bool = self.config["album"].get(bool) # type: ignore + checksum: str = self.config["checksum"].get(str) # type: ignore + copy: bytes = bytestring_path(self.config["copy"].as_str()) # type: ignore + count: bool = self.config["count"].get(bool) # type: ignore + delete: bool = self.config["delete"].get(bool) # type: ignore + remove: bool = self.config["remove"].get(bool) # type: ignore + fmt_tmpl: str = self.config["format"].get(str) # type: ignore + full: bool = self.config["full"].get(bool) # type: ignore + keys: list[str] = self.config["keys"].as_str_seq() # type: ignore + merge: bool = self.config["merge"].get(bool) # type: ignore + move: bytes = bytestring_path(self.config["move"].as_str()) # type: ignore + path: bool = self.config["path"].get(bool) # type: ignore + tiebreak: Tiebreak = self.config["tiebreak"].get(dict) # type: ignore + strict: bool = self.config["strict"].get(bool) # type: ignore + tag: str = self.config["tag"].get(str) # type: ignore if album: if not keys: @@ -185,9 +205,11 @@ class DuplicatesPlugin(BeetsPlugin): fmt_tmpl = "$albumartist - $album - $title" if checksum: + k = None for i in items: k, _ = self._checksum(i, checksum) - keys = [k] + if k is not None: + keys = [k] for obj_id, obj_count, objs in self._duplicates( items, @@ -214,13 +236,13 @@ class DuplicatesPlugin(BeetsPlugin): def _process_item( self, - item, - copy=False, - move=False, - delete=False, - tag=False, - fmt="", - remove=False, + item: Item | Album, + copy: bytes | None = None, + move: bytes | None = None, + delete: bool = False, + tag: str | None = None, + fmt: str = "", + remove: bool = False, ): """Process Item `item`.""" print_(format(item, fmt)) @@ -242,16 +264,17 @@ class DuplicatesPlugin(BeetsPlugin): setattr(item, k, v) item.store() - def _checksum(self, item, prog): + def _checksum( + self, item: Item | Album, prog: str + ) -> tuple[str, bytes | None]: """Run external `prog` on file path associated with `item`, cache output as flexattr on a key that is the name of the program, and return the key, checksum tuple. """ - args = [ - p.format(file=os.fsdecode(item.path)) for p in shlex.split(prog) - ] + path = os.fsdecode(item.path) + args = [p.format(file=path) for p in shlex.split(prog)] key = args[0] - checksum = getattr(item, key, False) + checksum = cast(bytes | None, getattr(item, key, None)) if not checksum: self._log.debug( "key {} on item {.filepath} not cached:computing checksum", @@ -275,7 +298,14 @@ class DuplicatesPlugin(BeetsPlugin): ) return key, checksum - def _group_by(self, objs, keys, strict): + def _group_by( + self, + objs: Results[Album] | Results[Item], + keys: list[str], + strict: bool, + ) -> ( + defaultdict[KeyValues, list[Album]] | defaultdict[KeyValues, list[Item]] + ): """Return a dictionary with keys arbitrary concatenations of attributes and values lists of objects (Albums or Items) with those keys. @@ -305,7 +335,11 @@ class DuplicatesPlugin(BeetsPlugin): return counts - def _order(self, objs, tiebreak=None): + def _order( + self, + objs: list[Album] | list[Item], + tiebreak: dict[str, list[str]] | None = None, + ) -> list[Album] | list[Item]: """Return the objects (Items or Albums) sorted by descending order of priority. @@ -340,7 +374,7 @@ class DuplicatesPlugin(BeetsPlugin): def key(x): return len(x.items()) - return sorted(objs, key=key, reverse=True) + return sorted(objs, key=key, reverse=True) # type: ignore def _merge_items(self, objs): """Merge Item objs by copying missing fields from items in the tail to @@ -366,7 +400,7 @@ class DuplicatesPlugin(BeetsPlugin): break return objs - def _merge_albums(self, objs): + def _merge_albums(self, objs: list[Album]) -> list[Album]: """Merge Album objs by copying missing items from albums in the tail to the head album. @@ -400,12 +434,20 @@ class DuplicatesPlugin(BeetsPlugin): objs = self._merge_albums(objs) return objs - def _duplicates(self, objs, keys, full, strict, tiebreak, merge): + def _duplicates( + self, + objs: Results[Album] | Results[Item], + keys: list[str], + full: bool, + strict: bool, + tiebreak: dict[str, list[str]], + merge: bool, + ) -> Iterator[tuple[KeyValues, int, list[Album] | list[Item]]]: """Generate triples of keys, duplicate counts, and constituent objects.""" offset = 0 if full else 1 - for k, objs in self._group_by(objs, keys, strict).items(): - if len(objs) > 1: - objs = self._order(objs, tiebreak) + for k, grouped_objs in self._group_by(objs, keys, strict).items(): + if len(grouped_objs) > 1: + ordered_objs = self._order(grouped_objs, tiebreak) if merge: - objs = self._merge(objs) - yield (k, len(objs) - offset, objs[offset:]) + ordered_objs = self._merge(ordered_objs) + yield (k, len(ordered_objs) - offset, ordered_objs[offset:])