Added typehints to ImportTask init and reverted some initial changes.

This commit is contained in:
Sebastian Mohr 2025-02-15 13:20:36 +01:00
parent 0ee17c0b06
commit 9acb2b4ca3

View file

@ -29,7 +29,7 @@ from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from tempfile import mkdtemp
from typing import Callable, Iterable, Sequence, Union, cast
from typing import Callable, Iterable, Sequence
import mediafile
@ -335,7 +335,7 @@ class ImportSession:
def resolve_duplicate(self, task: ImportTask, found_duplicates):
raise NotImplementedError
def choose_item(self, task: ImportTask):
raise NotImplementedError
@ -407,7 +407,8 @@ class ImportSession:
_history_dirs = None
@property
def history_dirs(self):
def history_dirs(self) -> set[tuple[PathBytes, ...]]:
# FIXME: This could be simplified to a cached property
if self._history_dirs is None:
self._history_dirs = ImportState().taghistory
return self._history_dirs
@ -536,7 +537,12 @@ class ImportTask(BaseImportTask):
cur_artist: str | None = None
candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = []
def __init__(self, toppath, paths, items):
def __init__(
self,
toppath: PathBytes | None,
paths: Iterable[PathBytes] | None,
items: Iterable[library.Item] | None,
):
super().__init__(toppath, paths, items)
self.rec = None
self.should_remove_duplicates = False
@ -563,16 +569,12 @@ class ImportTask(BaseImportTask):
action.ALBUMS,
action.RETAG,
):
# Cast needed as mypy can't infer the type
self.choice_flag = cast(action, choice)
# TODO: redesign to stricten the type
self.choice_flag = choice # type: ignore[assignment]
self.match = None
else:
self.choice_flag = action.APPLY # Implicit choice.
# Union is needed here for python 3.9 compatibility!
# Cast needed as mypy can't infer the type
self.match = cast(
Union[autotag.AlbumMatch, autotag.TrackMatch], choice
)
self.match = choice # type: ignore[assignment]
def save_progress(self):
"""Updates the progress state to indicate that this album has
@ -583,8 +585,7 @@ class ImportTask(BaseImportTask):
def save_history(self):
"""Save the directory in the history for incremental imports."""
if self.paths:
ImportState().history_add(self.paths)
ImportState().history_add(self.paths)
# Logical decisions.
@ -821,9 +822,9 @@ class ImportTask(BaseImportTask):
def manipulate_files(
self,
session: ImportSession,
operation: MoveOperation | None = None,
write=False,
session: ImportSession | None = None,
):
"""Copy, move, link, hardlink or reflink (depending on `operation`)
the files as well as write metadata.
@ -831,8 +832,8 @@ class ImportTask(BaseImportTask):
`operation` should be an instance of `util.MoveOperation`.
If `write` is `True` metadata is written to the files.
# TODO: Introduce a MoveOperation.NONE or SKIP
"""
assert session is not None, "session must be provided"
items = self.imported_items()
# Save the original paths of all items for deletion and pruning
@ -1058,7 +1059,7 @@ class SingletonImportTask(ImportTask):
assert self.choice_flag in (action.ASIS, action.RETAG, action.APPLY)
if self.choice_flag in (action.ASIS, action.RETAG):
return dict(self.item)
elif self.choice_flag is action.APPLY and self.match:
elif self.choice_flag is action.APPLY:
return self.match.info.copy()
def imported_items(self):
@ -1258,10 +1259,10 @@ class ArchiveImportTask(SentinelImportTask):
"""
assert self.toppath is not None, "toppath must be set"
for path_test, handler_class in self.handlers():
for path_test, handler_class in self.handlers():
if path_test(os.fsdecode(self.toppath)):
break
else:
else:
raise ValueError(f"No handler found for archive: {self.toppath}")
extract_to = mkdtemp()
archive = handler_class(os.fsdecode(self.toppath), mode="r")
@ -1404,7 +1405,9 @@ class ImportTaskFactory:
dirs = list({os.path.dirname(p) for p in paths})
if self.session.already_imported(self.toppath, dirs):
log.debug("Skipping previously-imported path: {0}", displayable_path(dirs))
log.debug(
"Skipping previously-imported path: {0}", displayable_path(dirs)
)
self.skipped += 1
return None
@ -1527,9 +1530,9 @@ def query_tasks(session: ImportSession):
if session.config["singletons"]:
# Search for items.
for item in session.lib.items(session.query):
singleton_task = SingletonImportTask(None, item)
for task in singleton_task.handle_created(session):
yield singleton_task
task = SingletonImportTask(None, item)
for task in task.handle_created(session):
yield task
else:
# Search for albums.
@ -1604,9 +1607,7 @@ def user_query(session: ImportSession, task: ImportTask):
yield SentinelImportTask(task.toppath, task.paths)
return _extend_pipeline(
emitter(task),
lookup_candidates(session),
user_query(session),
emitter(task), lookup_candidates(session), user_query(session)
)
# As albums: group items by albums and create task for each album