Added last missing typehints in importer and fixed some typehint issues

in util. Also run poe format
This commit is contained in:
Sebastian Mohr 2025-02-01 15:52:11 +01:00
parent 09b15aaf52
commit 04aa1b8ddb
4 changed files with 153 additions and 68 deletions

View file

@ -2,19 +2,20 @@
autotagging music files.
"""
from __future__ import annotations
import itertools
import os
import pickle
import re
import shutil
import time
from abc import ABC, abstractmethod
from bisect import bisect_left, insort
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from tempfile import mkdtemp
from typing import Dict, Iterable, Optional, Sequence, Union
from typing import Callable, Dict, Iterable, Optional, Sequence, Union, cast
import mediafile
@ -98,7 +99,7 @@ class ImportState:
path: PathLike
def __init__(self, readonly=False, path: Union[PathLike, None] = None):
self.path = path or config["statefile"].as_filename()
self.path = path or cast(PathLike, config["statefile"].as_filename())
self.tagprogress = {}
self.taghistory = set()
self._open()
@ -140,7 +141,7 @@ class ImportState:
# -------------------------------- Tagprogress ------------------------------- #
def progress_add(self, toppath: PathLike, *paths: list[PathLike]):
def progress_add(self, toppath: PathLike, *paths: PathLike):
"""Record that the files under all of the `paths` have been imported
under `toppath`.
"""
@ -253,7 +254,9 @@ class ImportSession:
iconfig["incremental"] = False
if iconfig["reflink"]:
iconfig["reflink"] = iconfig["reflink"].as_choice(["auto", True, False])
iconfig["reflink"] = iconfig["reflink"].as_choice(
["auto", True, False]
)
# Copy, move, reflink, link, and hardlink are mutually exclusive.
if iconfig["move"]:
@ -311,16 +314,24 @@ class ImportSession:
self.tag_log("skip", paths)
def should_resume(self, path):
raise NotImplementedError("Inheriting class must implement `should_resume`")
raise NotImplementedError(
"Inheriting class must implement `should_resume`"
)
def choose_match(self, task):
raise NotImplementedError("Inheriting class must implement `choose_match`")
raise NotImplementedError(
"Inheriting class must implement `choose_match`"
)
def resolve_duplicate(self, task, found_duplicates):
raise NotImplementedError("Inheriting class must implement `resolve_duplicate`")
raise NotImplementedError(
"Inheriting class must implement `resolve_duplicate`"
)
def choose_item(self, task):
raise NotImplementedError("Inheriting class must implement `choose_item`")
raise NotImplementedError(
"Inheriting class must implement `choose_item`"
)
def run(self):
"""Run the import task."""
@ -448,7 +459,16 @@ class BaseImportTask:
Tasks flow through the importer pipeline. Each stage can update
them."""
def __init__(self, toppath, paths, items):
toppath: Optional[PathLike]
paths: list[PathLike]
items: list[library.Item]
def __init__(
self,
toppath: Optional[PathLike],
paths: Optional[Iterable[PathLike]],
items: Optional[Iterable[library.Item]],
):
"""Create a task. The primary fields that define a task are:
* `toppath`: The user-specified base directory that contains the
@ -466,8 +486,8 @@ class BaseImportTask:
These fields should not change after initialization.
"""
self.toppath = toppath
self.paths = paths
self.items = items
self.paths = list(paths) if paths is not None else []
self.items = list(items) if items is not None else []
class ImportTask(BaseImportTask):
@ -514,9 +534,14 @@ class ImportTask(BaseImportTask):
self.is_album = True
self.search_ids = [] # user-supplied candidate IDs.
def set_choice(self, choice):
def set_choice(
self, choice: Union[action, autotag.AlbumMatch, autotag.TrackMatch]
):
"""Given an AlbumMatch or TrackMatch object or an action constant,
indicates that an action has been selected for this task.
Album and trackmatch are implemented as tuples, so we can't
use isinstance to check for them.
"""
# Not part of the task structure:
assert choice != action.APPLY # Only used internally.
@ -566,7 +591,7 @@ class ImportTask(BaseImportTask):
if self.choice_flag in (action.ASIS, action.RETAG):
likelies, consensus = autotag.current_metadata(self.items)
return likelies
elif self.choice_flag is action.APPLY:
elif self.choice_flag is action.APPLY and self.match:
return self.match.info.copy()
assert False
@ -578,13 +603,19 @@ class ImportTask(BaseImportTask):
"""
if self.choice_flag in (action.ASIS, action.RETAG):
return list(self.items)
elif self.choice_flag == action.APPLY:
elif self.choice_flag == action.APPLY and isinstance(
self.match, autotag.AlbumMatch
):
return list(self.match.mapping.keys())
else:
assert False
def apply_metadata(self):
"""Copy metadata from match info to the items."""
assert isinstance(
self.match, autotag.AlbumMatch
), "apply_metadata() only works for albums"
if config["import"]["from_scratch"]:
for item in self.match.mapping:
item.clear()
@ -603,7 +634,9 @@ class ImportTask(BaseImportTask):
for item in duplicate_items:
item.remove()
if lib.directory in util.ancestry(item.path):
log.debug("deleting duplicate {0}", util.displayable_path(item.path))
log.debug(
"deleting duplicate {0}", util.displayable_path(item.path)
)
util.remove(item.path)
util.prune_dirs(os.path.dirname(item.path), lib.directory)
@ -635,8 +668,7 @@ class ImportTask(BaseImportTask):
self.save_progress()
if session.config["incremental"] and not (
# Should we skip recording to incremental list?
self.skip
and session.config["incremental_skip_later"]
self.skip and session.config["incremental_skip_later"]
):
self.save_history()
@ -663,7 +695,7 @@ class ImportTask(BaseImportTask):
for old_path in self.old_paths:
# Only delete files that were actually copied.
if old_path not in new_paths:
util.remove(syspath(old_path), False)
util.remove(old_path, False)
self.prune(old_path)
# When moving, prune empty directories containing the original files.
@ -671,7 +703,7 @@ class ImportTask(BaseImportTask):
for old_path in self.old_paths:
self.prune(old_path)
def _emit_imported(self, lib):
def _emit_imported(self, lib: library.Library):
plugins.send("album_imported", lib=lib, album=self.album)
def handle_created(self, session):
@ -693,7 +725,9 @@ class ImportTask(BaseImportTask):
candidate IDs are stored in self.search_ids: if present, the
initial lookup is restricted to only those IDs.
"""
artist, album, prop = autotag.tag_album(self.items, search_ids=self.search_ids)
artist, album, prop = autotag.tag_album(
self.items, search_ids=self.search_ids
)
self.cur_artist = artist
self.cur_album = album
self.candidates = prop.candidates
@ -713,7 +747,9 @@ class ImportTask(BaseImportTask):
# Construct a query to find duplicates with this metadata. We
# use a temporary Album object to generate any computed fields.
tmp_album = library.Album(lib, **info)
keys = config["import"]["duplicate_keys"]["album"].as_str_seq()
keys = cast(
list[str], config["import"]["duplicate_keys"]["album"].as_str_seq()
)
dup_query = tmp_album.duplicates_query(keys)
# Don't count albums with the same files as duplicates.
@ -744,7 +780,8 @@ class ImportTask(BaseImportTask):
[i.albumartist or i.artist for i in self.items]
)
if freq == len(self.items) or (
freq > 1 and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH
freq > 1
and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH
):
# Single-artist album.
changes["albumartist"] = plur_albumartist
@ -770,7 +807,12 @@ class ImportTask(BaseImportTask):
for item in self.items:
item.update(changes)
def manipulate_files(self, operation=None, write=False, session=None):
def manipulate_files(
self,
operation: Optional[MoveOperation] = None,
write=False,
session: Optional[ImportSession] = None,
):
"""Copy, move, link, hardlink or reflink (depending on `operation`)
the files as well as write metadata.
@ -778,11 +820,12 @@ class ImportTask(BaseImportTask):
If `write` is `True` metadata is written to the files.
"""
assert session is not None, "session must be provided"
items = self.imported_items()
# Save the original paths of all items for deletion and pruning
# in the next step (finalization).
self.old_paths = [item.path for item in items]
self.old_paths: list[PathLike] = [item.path for item in items]
for item in items:
if operation is not None:
# In copy and link modes, treat re-imports specially:
@ -820,7 +863,9 @@ class ImportTask(BaseImportTask):
self.remove_replaced(lib)
self.album = lib.add_album(self.imported_items())
if self.choice_flag == action.APPLY:
if self.choice_flag == action.APPLY and isinstance(
self.match, autotag.AlbumMatch
):
# Copy album flexible fields to the DB
# TODO: change the flow so we create the `Album` object earlier,
# and we can move this into `self.apply_metadata`, just like
@ -830,25 +875,30 @@ class ImportTask(BaseImportTask):
self.reimport_metadata(lib)
def record_replaced(self, lib):
def record_replaced(self, lib: library.Library):
"""Records the replaced items and albums in the `replaced_items`
and `replaced_albums` dictionaries.
"""
self.replaced_items = defaultdict(list)
self.replaced_albums = defaultdict(list)
self.replaced_albums: dict[PathLike, library.Album] = defaultdict()
replaced_album_ids = set()
for item in self.imported_items():
dup_items = list(lib.items(dbcore.query.BytesQuery("path", item.path)))
dup_items = list(
lib.items(dbcore.query.BytesQuery("path", item.path))
)
self.replaced_items[item] = dup_items
for dup_item in dup_items:
if not dup_item.album_id or dup_item.album_id in replaced_album_ids:
if (
not dup_item.album_id
or dup_item.album_id in replaced_album_ids
):
continue
replaced_album = dup_item._cached_album
if replaced_album:
replaced_album_ids.add(dup_item.album_id)
self.replaced_albums[replaced_album.path] = replaced_album
def reimport_metadata(self, lib):
def reimport_metadata(self, lib: library.Library):
"""For reimports, preserves metadata for reimported items and
albums.
"""
@ -894,7 +944,8 @@ class ImportTask(BaseImportTask):
self.album.artpath = replaced_album.artpath
self.album.store()
log.debug(
"Reimported album {}. Preserving attribute ['added']. " "Path: {}",
"Reimported album {}. Preserving attribute ['added']. "
"Path: {}",
self.album.id,
displayable_path(self.album.path),
)
@ -973,14 +1024,14 @@ class ImportTask(BaseImportTask):
util.prune_dirs(
os.path.dirname(filename),
self.toppath,
clutter=config["clutter"].as_str_seq(),
clutter=cast(list[str], config["clutter"].as_str_seq()),
)
class SingletonImportTask(ImportTask):
"""ImportTask for a single track that is not associated to an album."""
def __init__(self, toppath, item):
def __init__(self, toppath: Optional[PathLike], item: library.Item):
super().__init__(toppath, [item.path], [item])
self.item = item
self.is_album = False
@ -995,13 +1046,17 @@ class SingletonImportTask(ImportTask):
assert self.choice_flag in (action.ASIS, action.RETAG, action.APPLY)
if self.choice_flag in (action.ASIS, action.RETAG):
return dict(self.item)
elif self.choice_flag is action.APPLY:
elif self.choice_flag is action.APPLY and self.match:
return self.match.info.copy()
assert False
def imported_items(self):
return [self.item]
def apply_metadata(self):
assert isinstance(
self.match, autotag.TrackMatch
), "apply_metadata() only works for tracks"
autotag.apply_item_metadata(self.item, self.match.info)
def _emit_imported(self, lib):
@ -1022,7 +1077,9 @@ class SingletonImportTask(ImportTask):
# Query for existing items using the same metadata. We use a
# temporary `Item` object to generate any computed fields.
tmp_item = library.Item(lib, **info)
keys = config["import"]["duplicate_keys"]["item"].as_str_seq()
keys = cast(
list[str], config["import"]["duplicate_keys"]["item"].as_str_seq()
)
dup_query = tmp_item.duplicates_query(keys)
found_items = []
@ -1092,23 +1149,24 @@ class SentinelImportTask(ImportTask):
pass
def save_progress(self):
if self.paths is None:
if self.paths is None and self.toppath:
# "Done" sentinel.
ImportState().progress_reset(self.toppath)
else:
elif self.toppath:
# "Directory progress" sentinel for singletons
ImportState().progress_add(self.toppath, *self.paths)
def skip(self):
@property
def skip(self) -> bool:
return True
def set_choice(self, choice):
raise NotImplementedError
def cleanup(self, **kwargs):
def cleanup(self, copy=False, delete=False, move=False):
pass
def _emit_imported(self, session):
def _emit_imported(self, lib):
pass
@ -1152,7 +1210,7 @@ class ArchiveImportTask(SentinelImportTask):
implements the same interface as `tarfile.TarFile`.
"""
if not hasattr(cls, "_handlers"):
cls._handlers = []
cls._handlers: list[tuple[Callable, ...]] = []
from zipfile import ZipFile, is_zipfile
cls._handlers.append((is_zipfile, ZipFile))
@ -1174,9 +1232,9 @@ class ArchiveImportTask(SentinelImportTask):
return cls._handlers
def cleanup(self, **kwargs):
def cleanup(self, copy=False, delete=False, move=False):
"""Removes the temporary directory the archive was extracted to."""
if self.extracted:
if self.extracted and self.toppath:
log.debug(
"Removing extracted directory: {0}",
displayable_path(self.toppath),
@ -1187,10 +1245,18 @@ class ArchiveImportTask(SentinelImportTask):
"""Extracts the archive to a temporary directory and sets
`toppath` to that directory.
"""
assert self.toppath is not None, "toppath must be set"
handler_class = None
for path_test, handler_class in self.handlers():
if path_test(os.fsdecode(self.toppath)):
break
if handler_class is None:
raise ValueError(
"No handler found for archive: {0}".format(self.toppath)
)
extract_to = mkdtemp()
archive = handler_class(os.fsdecode(self.toppath), mode="r")
try:
@ -1268,7 +1334,7 @@ class ImportTaskFactory:
# for archive imports, send the archive task instead (to remove
# the extracted directory).
if self.is_archive:
yield archive_task
yield archive_task # type: ignore
else:
yield self.sentinel()
@ -1308,7 +1374,9 @@ class ImportTaskFactory:
def singleton(self, path):
"""Return a `SingletonImportTask` for the music file."""
if self.session.already_imported(self.toppath, [path]):
log.debug("Skipping previously-imported path: {0}", displayable_path(path))
log.debug(
"Skipping previously-imported path: {0}", displayable_path(path)
)
self.skipped += 1
return None
@ -1331,7 +1399,9 @@ class ImportTaskFactory:
dirs = list({os.path.dirname(p) for p in paths})
if self.session.already_imported(self.toppath, dirs):
log.debug("Skipping previously-imported path: {0}", displayable_path(dirs))
log.debug(
"Skipping previously-imported path: {0}", displayable_path(dirs)
)
self.skipped += 1
return None
@ -1360,7 +1430,8 @@ class ImportTaskFactory:
if not (self.session.config["move"] or self.session.config["copy"]):
log.warning(
"Archive importing requires either " "'copy' or 'move' to be enabled."
"Archive importing requires either "
"'copy' or 'move' to be enabled."
)
return
@ -1473,7 +1544,7 @@ def query_tasks(session):
@pipeline.mutator_stage
def lookup_candidates(session, task):
def lookup_candidates(session: ImportSession, task: ImportTask):
"""A coroutine for performing the initial MusicBrainz lookup for an
album. It accepts lists of Items and yields
(items, cur_artist, cur_album, candidates, rec) tuples. If no match
@ -1495,7 +1566,7 @@ def lookup_candidates(session, task):
@pipeline.stage
def user_query(session, task):
def user_query(session: ImportSession, task: ImportTask):
"""A coroutine for interfacing with the user about the tagging
process.
@ -1528,7 +1599,9 @@ def user_query(session, task):
yield SentinelImportTask(task.toppath, task.paths)
return _extend_pipeline(
emitter(task), lookup_candidates(session), user_query(session)
emitter(task),
lookup_candidates(session),
user_query(session), # type: ignore # Recursion in decorators
)
# As albums: group items by albums and create task for each album
@ -1537,7 +1610,7 @@ def user_query(session, task):
[task],
group_albums(session),
lookup_candidates(session),
user_query(session),
user_query(session), # type: ignore # Recursion in decorators
)
resolve_duplicates(session, task)
@ -1559,7 +1632,9 @@ def user_query(session, task):
)
return _extend_pipeline(
[merged_task], lookup_candidates(session), user_query(session)
[merged_task],
lookup_candidates(session),
user_query(session), # type: ignore # Recursion in decorators
)
apply_choice(session, task)
@ -1573,7 +1648,9 @@ def resolve_duplicates(session, task):
if task.choice_flag in (action.ASIS, action.APPLY, action.RETAG):
found_duplicates = task.find_duplicates(session.lib)
if found_duplicates:
log.debug("found duplicates: {}".format([o.id for o in found_duplicates]))
log.debug(
"found duplicates: {}".format([o.id for o in found_duplicates])
)
# Get the default action to follow from config.
duplicate_action = config["import"]["duplicate_action"].as_choice(
@ -1697,7 +1774,7 @@ def manipulate_files(session, task):
@pipeline.stage
def log_files(session, task):
def log_files(session: ImportSession, task: ImportTask):
"""A coroutine (pipeline stage) to log each file to be imported."""
if isinstance(task, SingletonImportTask):
log.info("Singleton: {0}", displayable_path(task.item["path"]))
@ -1753,8 +1830,8 @@ def albums_in_dir(path):
containing any media files is an album.
"""
collapse_pat = collapse_paths = collapse_items = None
ignore = config["ignore"].as_str_seq()
ignore_hidden = config["ignore_hidden"].get(bool)
ignore = cast(list[str], config["ignore"].as_str_seq())
ignore_hidden = cast(bool, config["ignore_hidden"].get(bool))
for root, dirs, files in sorted_walk(
path, ignore=ignore, ignore_hidden=ignore_hidden, logger=log

View file

@ -198,8 +198,8 @@ def ancestry(path: AnyStr) -> list[AnyStr]:
def sorted_walk(
path: AnyStr,
ignore: Sequence[bytes] = (),
path: PathLike,
ignore: Sequence[PathLike] = (),
ignore_hidden: bool = False,
logger: Logger | None = None,
) -> Iterator[tuple[bytes, Sequence[bytes], Sequence[bytes]]]:
@ -297,8 +297,8 @@ def fnmatch_all(names: Sequence[bytes], patterns: Sequence[bytes]) -> bool:
def prune_dirs(
path: bytes,
root: bytes | None = None,
path: PathLike,
root: PathLike | None = None,
clutter: Sequence[str] = (".DS_Store", "Thumbs.db"),
):
"""If path is an empty directory, then remove it. Recursively remove
@ -419,12 +419,13 @@ PATH_SEP: bytes = bytestring_path(os.sep)
def displayable_path(
path: BytesOrStr | tuple[BytesOrStr, ...], separator: str = "; "
path: PathLike | Iterable[PathLike], separator: str = "; "
) -> str:
"""Attempts to decode a bytestring path to a unicode object for the
purpose of displaying it to the user. If the `path` argument is a
list or a tuple, the elements are joined with `separator`.
"""
if isinstance(path, (list, tuple)):
return separator.join(displayable_path(p) for p in path)
elif isinstance(path, str):
@ -472,7 +473,7 @@ def samefile(p1: bytes, p2: bytes) -> bool:
return False
def remove(path: bytes, soft: bool = True):
def remove(path: PathLike, soft: bool = True):
"""Remove the file. If `soft`, then no error will be raised if the
file does not exist.
"""

View file

@ -31,7 +31,6 @@ To do so, pass an iterable of coroutines to the Pipeline constructor
in place of any single coroutine.
"""
import queue
import sys
from threading import Lock, Thread
@ -420,7 +419,9 @@ class Pipeline:
for i in range(1, queue_count):
for coro in self.stages[i]:
threads.append(
MiddlePipelineThread(coro, queues[i - 1], queues[i], threads)
MiddlePipelineThread(
coro, queues[i - 1], queues[i], threads
)
)
# Last stage.

View file

@ -74,7 +74,9 @@ class TestLyricsUtils:
("横山克", "Masaru Yokoyama", ["Masaru Yokoyama"]),
],
)
def test_search_pairs_artists(self, artist, artist_sort, expected_extra_artists):
def test_search_pairs_artists(
self, artist, artist_sort, expected_extra_artists
):
item = Item(artist=artist, artist_sort=artist_sort, title="song")
actual_artists = [a for a, _ in lyrics.search_pairs(item)]
@ -99,7 +101,9 @@ class TestLyricsUtils:
def test_search_pairs_titles(self, title, expected_extra_titles):
item = Item(title=title, artist="A")
actual_titles = {t: None for _, tit in lyrics.search_pairs(item) for t in tit}
actual_titles = {
t: None for _, tit in lyrics.search_pairs(item) for t in tit
}
assert list(actual_titles) == [title, *expected_extra_titles]
@ -241,7 +245,9 @@ class LyricsBackendTest(LyricsPluginMixin):
@pytest.fixture
def lyrics_html(self, lyrics_root_dir, file_name):
return (lyrics_root_dir / f"{file_name}.txt").read_text(encoding="utf-8")
return (lyrics_root_dir / f"{file_name}.txt").read_text(
encoding="utf-8"
)
@pytest.mark.on_lyrics_update