Add missing types to importer and pipeline

This commit is contained in:
Šarūnas Nejus 2025-07-13 22:58:48 +01:00
parent 21459c70ee
commit d3c64d8506
No known key found for this signature in database
GPG key ID: DD28F6704DBE3435
4 changed files with 41 additions and 30 deletions

View file

@ -70,6 +70,7 @@ def query_tasks(session: ImportSession):
Instead of finding files from the filesystem, a query is used to
match items from the library.
"""
task: ImportTask
if session.config["singletons"]:
# Search for items.
for item in session.lib.items(session.query):

View file

@ -22,7 +22,7 @@ import time
from collections import defaultdict
from enum import Enum
from tempfile import mkdtemp
from typing import TYPE_CHECKING, Callable, Iterable, Sequence
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
import mediafile
@ -367,7 +367,7 @@ class ImportTask(BaseImportTask):
autotag.tag_album(self.items, search_ids=search_ids)
)
def find_duplicates(self, lib: library.Library):
def find_duplicates(self, lib: library.Library) -> list[library.Album]:
"""Return a list of albums from `lib` with the same artist and
album name as the task.
"""
@ -698,7 +698,7 @@ class SingletonImportTask(ImportTask):
self.item, search_ids=search_ids
)
def find_duplicates(self, lib):
def find_duplicates(self, lib: library.Library) -> list[library.Item]: # type: ignore[override] # Need splitting Singleton and Album tasks into separate classes
"""Return a list of items from `lib` that have the same artist
and title as the task.
"""
@ -800,6 +800,11 @@ class SentinelImportTask(ImportTask):
pass
ArchiveHandler = tuple[
Callable[[util.StrPath], bool], Callable[[util.StrPath], Any]
]
class ArchiveImportTask(SentinelImportTask):
"""An import task that represents the processing of an archive.
@ -825,13 +830,13 @@ class ArchiveImportTask(SentinelImportTask):
if not os.path.isfile(path):
return False
for path_test, _ in cls.handlers():
for path_test, _ in cls.handlers:
if path_test(os.fsdecode(path)):
return True
return False
@classmethod
def handlers(cls):
@util.cached_classproperty
def handlers(cls) -> list[ArchiveHandler]:
"""Returns a list of archive handlers.
Each handler is a `(path_test, ArchiveClass)` tuple. `path_test`
@ -839,28 +844,27 @@ class ArchiveImportTask(SentinelImportTask):
handled by `ArchiveClass`. `ArchiveClass` is a class that
implements the same interface as `tarfile.TarFile`.
"""
if not hasattr(cls, "_handlers"):
cls._handlers: list[tuple[Callable, ...]] = []
_handlers: list[ArchiveHandler] = []
from zipfile import ZipFile, is_zipfile
cls._handlers.append((is_zipfile, ZipFile))
_handlers.append((is_zipfile, ZipFile))
import tarfile
cls._handlers.append((tarfile.is_tarfile, tarfile.open))
_handlers.append((tarfile.is_tarfile, tarfile.open))
try:
from rarfile import RarFile, is_rarfile
except ImportError:
pass
else:
cls._handlers.append((is_rarfile, RarFile))
_handlers.append((is_rarfile, RarFile))
try:
from py7zr import SevenZipFile, is_7zfile
except ImportError:
pass
else:
cls._handlers.append((is_7zfile, SevenZipFile))
_handlers.append((is_7zfile, SevenZipFile))
return cls._handlers
return _handlers
def cleanup(self, copy=False, delete=False, move=False):
"""Removes the temporary directory the archive was extracted to."""
@ -877,7 +881,7 @@ class ArchiveImportTask(SentinelImportTask):
"""
assert self.toppath is not None, "toppath must be set"
for path_test, handler_class in self.handlers():
for path_test, handler_class in self.handlers:
if path_test(os.fsdecode(self.toppath)):
break
else:
@ -923,7 +927,7 @@ class ImportTaskFactory:
self.imported = 0 # "Real" tasks created.
self.is_archive = ArchiveImportTask.is_archive(util.syspath(toppath))
def tasks(self):
def tasks(self) -> Iterable[ImportTask]:
"""Yield all import tasks for music found in the user-specified
path `self.toppath`. Any necessary sentinel tasks are also
produced.
@ -1112,7 +1116,10 @@ def albums_in_dir(path: util.PathBytes):
a list of Items that is probably an album. Specifically, any folder
containing any media files is an album.
"""
collapse_pat = collapse_paths = collapse_items = None
collapse_paths: list[util.PathBytes] = []
collapse_items: list[util.PathBytes] = []
collapse_pat = None
ignore: list[str] = config["ignore"].as_str_seq()
ignore_hidden: bool = config["ignore_hidden"].get(bool)
@ -1137,7 +1144,7 @@ def albums_in_dir(path: util.PathBytes):
# proceed to process the current one.
if collapse_items:
yield collapse_paths, collapse_items
collapse_pat = collapse_paths = collapse_items = None
collapse_pat, collapse_paths, collapse_items = None, [], []
# Check whether this directory looks like the *first* directory
# in a multi-disc sequence. There are two indicators: the file

View file

@ -63,6 +63,7 @@ MAX_FILENAME_LENGTH = 200
WINDOWS_MAGIC_PREFIX = "\\\\?\\"
T = TypeVar("T")
PathLike = Union[str, bytes, Path]
StrPath = Union[str, Path]
Replacements = Sequence[tuple[Pattern[str], str]]
# Here for now to allow for a easy replace later on

View file

@ -48,6 +48,8 @@ POISON = "__PIPELINE_POISON__"
DEFAULT_QUEUE_SIZE = 16
Tq = TypeVar("Tq")
def _invalidate_queue(q, val=None, sync=True):
"""Breaks a Queue such that it never blocks, always has size 1,
@ -91,7 +93,7 @@ def _invalidate_queue(q, val=None, sync=True):
q.mutex.release()
class CountedQueue(queue.Queue):
class CountedQueue(queue.Queue[Tq]):
"""A queue that keeps track of the number of threads that are
still feeding into it. The queue is poisoned when all threads are
finished with the queue.