This commit is contained in:
Rebecca Turner 2026-02-02 17:43:00 +01:00 committed by GitHub
commit ed13c71cd5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -14,8 +14,11 @@
"""List duplicate tracks or albums."""
from __future__ import annotations
import os
import shlex
from typing import TYPE_CHECKING, Any, TypeAlias, cast
from beets.library import Album, Item
from beets.plugins import BeetsPlugin
@ -28,8 +31,23 @@ from beets.util import (
subprocess,
)
if TYPE_CHECKING:
import optparse
from collections import defaultdict
from collections.abc import Iterator
from beets.dbcore.db import Results
from beets.library.library import Library
PLUGIN = "duplicates"
# Value of `tiebreak` config item.
# Really the key is "item" or "album".
Tiebreak: TypeAlias = dict[str, list[str]]
# Attribute values; these are formed by taking the values of `keys` (a list of
# strings) on the items. These are used as dict keys in a bunch of places.
KeyValues: TypeAlias = tuple[Any]
class DuplicatesPlugin(BeetsPlugin):
"""List duplicate tracks or albums"""
@ -57,7 +75,9 @@ class DuplicatesPlugin(BeetsPlugin):
}
)
self._command = Subcommand("duplicates", help=__doc__, aliases=["dup"])
self._command = Subcommand(
"duplicates", help=cast(str, __doc__), aliases=["dup"]
)
self._command.parser.add_option(
"-c",
"--count",
@ -142,23 +162,23 @@ class DuplicatesPlugin(BeetsPlugin):
self._command.parser.add_all_common_options()
def commands(self):
def _dup(lib, opts, args):
def _dup(lib: Library, opts: optparse.Values, args: list[str]):
self.config.set_args(opts)
album = self.config["album"].get(bool)
checksum = self.config["checksum"].get(str)
copy = bytestring_path(self.config["copy"].as_str())
count = self.config["count"].get(bool)
delete = self.config["delete"].get(bool)
remove = self.config["remove"].get(bool)
fmt_tmpl = self.config["format"].get(str)
full = self.config["full"].get(bool)
keys = self.config["keys"].as_str_seq()
merge = self.config["merge"].get(bool)
move = bytestring_path(self.config["move"].as_str())
path = self.config["path"].get(bool)
tiebreak = self.config["tiebreak"].get(dict)
strict = self.config["strict"].get(bool)
tag = self.config["tag"].get(str)
album: bool = self.config["album"].get(bool) # type: ignore
checksum: str = self.config["checksum"].get(str) # type: ignore
copy: bytes = bytestring_path(self.config["copy"].as_str()) # type: ignore
count: bool = self.config["count"].get(bool) # type: ignore
delete: bool = self.config["delete"].get(bool) # type: ignore
remove: bool = self.config["remove"].get(bool) # type: ignore
fmt_tmpl: str = self.config["format"].get(str) # type: ignore
full: bool = self.config["full"].get(bool) # type: ignore
keys: list[str] = self.config["keys"].as_str_seq() # type: ignore
merge: bool = self.config["merge"].get(bool) # type: ignore
move: bytes = bytestring_path(self.config["move"].as_str()) # type: ignore
path: bool = self.config["path"].get(bool) # type: ignore
tiebreak: Tiebreak = self.config["tiebreak"].get(dict) # type: ignore
strict: bool = self.config["strict"].get(bool) # type: ignore
tag: str = self.config["tag"].get(str) # type: ignore
if album:
if not keys:
@ -185,9 +205,11 @@ class DuplicatesPlugin(BeetsPlugin):
fmt_tmpl = "$albumartist - $album - $title"
if checksum:
k = None
for i in items:
k, _ = self._checksum(i, checksum)
keys = [k]
if k is not None:
keys = [k]
for obj_id, obj_count, objs in self._duplicates(
items,
@ -214,13 +236,13 @@ class DuplicatesPlugin(BeetsPlugin):
def _process_item(
self,
item,
copy=False,
move=False,
delete=False,
tag=False,
fmt="",
remove=False,
item: Item | Album,
copy: bytes | None = None,
move: bytes | None = None,
delete: bool = False,
tag: str | None = None,
fmt: str = "",
remove: bool = False,
):
"""Process Item `item`."""
print_(format(item, fmt))
@ -242,16 +264,17 @@ class DuplicatesPlugin(BeetsPlugin):
setattr(item, k, v)
item.store()
def _checksum(self, item, prog):
def _checksum(
self, item: Item | Album, prog: str
) -> tuple[str, bytes | None]:
"""Run external `prog` on file path associated with `item`, cache
output as flexattr on a key that is the name of the program, and
return the key, checksum tuple.
"""
args = [
p.format(file=os.fsdecode(item.path)) for p in shlex.split(prog)
]
path = os.fsdecode(item.path)
args = [p.format(file=path) for p in shlex.split(prog)]
key = args[0]
checksum = getattr(item, key, False)
checksum = cast(bytes | None, getattr(item, key, None))
if not checksum:
self._log.debug(
"key {} on item {.filepath} not cached:computing checksum",
@ -275,7 +298,14 @@ class DuplicatesPlugin(BeetsPlugin):
)
return key, checksum
def _group_by(self, objs, keys, strict):
def _group_by(
self,
objs: Results[Album] | Results[Item],
keys: list[str],
strict: bool,
) -> (
defaultdict[KeyValues, list[Album]] | defaultdict[KeyValues, list[Item]]
):
"""Return a dictionary with keys arbitrary concatenations of attributes
and values lists of objects (Albums or Items) with those keys.
@ -305,7 +335,11 @@ class DuplicatesPlugin(BeetsPlugin):
return counts
def _order(self, objs, tiebreak=None):
def _order(
self,
objs: list[Album] | list[Item],
tiebreak: dict[str, list[str]] | None = None,
) -> list[Album] | list[Item]:
"""Return the objects (Items or Albums) sorted by descending
order of priority.
@ -340,7 +374,7 @@ class DuplicatesPlugin(BeetsPlugin):
def key(x):
return len(x.items())
return sorted(objs, key=key, reverse=True)
return sorted(objs, key=key, reverse=True) # type: ignore
def _merge_items(self, objs):
"""Merge Item objs by copying missing fields from items in the tail to
@ -366,7 +400,7 @@ class DuplicatesPlugin(BeetsPlugin):
break
return objs
def _merge_albums(self, objs):
def _merge_albums(self, objs: list[Album]) -> list[Album]:
"""Merge Album objs by copying missing items from albums in the tail
to the head album.
@ -400,12 +434,20 @@ class DuplicatesPlugin(BeetsPlugin):
objs = self._merge_albums(objs)
return objs
def _duplicates(self, objs, keys, full, strict, tiebreak, merge):
def _duplicates(
self,
objs: Results[Album] | Results[Item],
keys: list[str],
full: bool,
strict: bool,
tiebreak: dict[str, list[str]],
merge: bool,
) -> Iterator[tuple[KeyValues, int, list[Album] | list[Item]]]:
"""Generate triples of keys, duplicate counts, and constituent objects."""
offset = 0 if full else 1
for k, objs in self._group_by(objs, keys, strict).items():
if len(objs) > 1:
objs = self._order(objs, tiebreak)
for k, grouped_objs in self._group_by(objs, keys, strict).items():
if len(grouped_objs) > 1:
ordered_objs = self._order(grouped_objs, tiebreak)
if merge:
objs = self._merge(objs)
yield (k, len(objs) - offset, objs[offset:])
ordered_objs = self._merge(ordered_objs)
yield (k, len(ordered_objs) - offset, ordered_objs[offset:])