diff --git a/beets/importer.py b/beets/importer.py index a81c97df0..8f64e25df 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -185,7 +185,7 @@ class ImportSession: """ logger: logging.Logger - paths: Union[list[bytes], None] + paths: Union[list[bytes], None] = None lib: library.Library _is_resuming: Dict[bytes, bool] @@ -223,8 +223,6 @@ class ImportSession: # Normalize the paths. if paths is not None: self.paths = list(map(normpath, paths)) - else: - self.paths = None def _setup_logging(self, loghandler: Optional[logging.Handler]): logger = logging.getLogger(__name__) @@ -286,13 +284,13 @@ class ImportSession: self.want_resume = config["resume"].as_choice([True, False, "ask"]) - def tag_log(self, status, paths): + def tag_log(self, status, paths: Sequence[PathLike]): """Log a message about a given album to the importer log. The status should reflect the reason the album couldn't be tagged. """ self.logger.info("{0} {1}", status, displayable_path(paths)) - def log_choice(self, task, duplicate=False): + def log_choice(self, task: ImportTask, duplicate=False): """Logs the task's current choice if it should be logged. If ``duplicate``, then this is a secondary choice after a duplicate was detected and a decision was made. @@ -313,22 +311,22 @@ class ImportSession: elif task.choice_flag is action.SKIP: self.tag_log("skip", paths) - def should_resume(self, path): + def should_resume(self, path: PathLike): raise NotImplementedError( "Inheriting class must implement `should_resume`" ) - def choose_match(self, task): + def choose_match(self, task: ImportTask): raise NotImplementedError( "Inheriting class must implement `choose_match`" ) - def resolve_duplicate(self, task, found_duplicates): + def resolve_duplicate(self, task: ImportTask, found_duplicates): raise NotImplementedError( "Inheriting class must implement `resolve_duplicate`" ) - def choose_item(self, task): + def choose_item(self, task: ImportTask): raise NotImplementedError( "Inheriting class must implement `choose_item`" ) @@ -522,12 +520,16 @@ class ImportTask(BaseImportTask): system. """ + choice_flag: Optional[action] = None + match: Union[autotag.AlbumMatch, autotag.TrackMatch, None] = None + + # Keep track of the current task item + cur_album: Optional[str] = None + cur_artist: Optional[str] = None + candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = [] + def __init__(self, toppath, paths, items): super().__init__(toppath, paths, items) - self.choice_flag = None - self.cur_album = None - self.cur_artist = None - self.candidates = [] self.rec = None self.should_remove_duplicates = False self.should_merge_duplicates = False @@ -622,13 +624,13 @@ class ImportTask(BaseImportTask): autotag.apply_metadata(self.match.info, self.match.mapping) - def duplicate_items(self, lib): + def duplicate_items(self, lib: library.Library): duplicate_items = [] for album in self.find_duplicates(lib): duplicate_items += album.items() return duplicate_items - def remove_duplicates(self, lib): + def remove_duplicates(self, lib: library.Library): duplicate_items = self.duplicate_items(lib) log.debug("removing {0} old duplicated items", len(duplicate_items)) for item in duplicate_items: @@ -640,7 +642,7 @@ class ImportTask(BaseImportTask): util.remove(item.path) util.prune_dirs(os.path.dirname(item.path), lib.directory) - def set_fields(self, lib): + def set_fields(self, lib: library.Library): """Sets the fields given at CLI or configuration to the specified values, for both the album and all its items. """ @@ -661,7 +663,7 @@ class ImportTask(BaseImportTask): item.store() self.album.store() - def finalize(self, session): + def finalize(self, session: ImportSession): """Save progress, clean up files, and emit plugin event.""" # Update progress. if session.want_resume: @@ -706,7 +708,7 @@ class ImportTask(BaseImportTask): def _emit_imported(self, lib: library.Library): plugins.send("album_imported", lib=lib, album=self.album) - def handle_created(self, session): + def handle_created(self, session: ImportSession): """Send the `import_task_created` event for this task. Return a list of tasks that should continue through the pipeline. By default, this is a list containing only the task itself, but plugins can replace the task @@ -733,7 +735,7 @@ class ImportTask(BaseImportTask): self.candidates = prop.candidates self.rec = prop.recommendation - def find_duplicates(self, lib): + def find_duplicates(self, lib: library.Library): """Return a list of albums from `lib` with the same artist and album name as the task. """ @@ -855,7 +857,7 @@ class ImportTask(BaseImportTask): plugins.send("import_task_files", session=session, task=self) - def add(self, lib): + def add(self, lib: library.Library): """Add the items as an album to the library and remove replaced items.""" self.align_album_level_fields() with lib.transaction(): @@ -1101,7 +1103,7 @@ class SingletonImportTask(ImportTask): def infer_album_fields(self): raise NotImplementedError - def choose_match(self, session): + def choose_match(self, session: ImportSession): """Ask the session which match should apply and apply it.""" choice = session.choose_item(self) self.set_choice(choice) @@ -1285,7 +1287,7 @@ class ImportTaskFactory: indicated by a path. """ - def __init__(self, toppath, session): + def __init__(self, toppath: PathLike, session: ImportSession): """Create a new task factory. `toppath` is the user-specified path to search for music to @@ -1338,7 +1340,7 @@ class ImportTaskFactory: else: yield self.sentinel() - def _create(self, task): + def _create(self, task: Optional[ImportTask]): """Handle a new task to be emitted by the factory. Emit the `import_task_created` event and increment the @@ -1371,7 +1373,7 @@ class ImportTaskFactory: for dirs, paths in albums_in_dir(self.toppath): yield dirs, paths - def singleton(self, path): + def singleton(self, path: PathLike): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): log.debug( @@ -1386,7 +1388,7 @@ class ImportTaskFactory: else: return None - def album(self, paths, dirs=None): + def album(self, paths: Iterable[PathLike], dirs=None): """Return a `ImportTask` with all media files from paths. `dirs` is a list of parent directories used to record already @@ -1413,7 +1415,7 @@ class ImportTaskFactory: else: return None - def sentinel(self, paths=None): + def sentinel(self, paths: Optional[Iterable[PathLike]] = None): """Return a `SentinelImportTask` indicating the end of a top-level directory import. """ @@ -1448,7 +1450,7 @@ class ImportTaskFactory: log.debug("Archive extracted to: {0}", self.toppath) return archive_task - def read_item(self, path): + def read_item(self, path: PathLike): """Return an `Item` read from the path. If an item cannot be read, return `None` instead and log an @@ -1491,12 +1493,16 @@ def _extend_pipeline(tasks, *stages): # Full-album pipeline stages. -def read_tasks(session): +def read_tasks(session: ImportSession): """A generator yielding all the albums (as ImportTask objects) found in the user-specified list of paths. In the case of a singleton import, yields single-item tasks instead. """ skipped = 0 + if session.paths is None: + log.warning("No path specified in session.") + return + for toppath in session.paths: # Check whether we need to resume the import. session.ask_resume(toppath) @@ -1514,7 +1520,7 @@ def read_tasks(session): log.info("Skipped {0} paths.", skipped) -def query_tasks(session): +def query_tasks(session: ImportSession): """A generator that works as a drop-in-replacement for read_tasks. Instead of finding files from the filesystem, a query is used to match items from the library. @@ -1641,7 +1647,7 @@ def user_query(session: ImportSession, task: ImportTask): return task -def resolve_duplicates(session, task): +def resolve_duplicates(session: ImportSession, task: ImportTask): """Check if a task conflicts with items or albums already imported and ask the session to resolve this. """ @@ -1684,7 +1690,7 @@ def resolve_duplicates(session, task): @pipeline.mutator_stage -def import_asis(session, task): +def import_asis(session: ImportSession, task: ImportTask): """Select the `action.ASIS` choice for all tasks. This stage replaces the initial_lookup and user_query stages @@ -1698,7 +1704,7 @@ def import_asis(session, task): apply_choice(session, task) -def apply_choice(session, task): +def apply_choice(session: ImportSession, task: ImportTask): """Apply the task's choice to the Album or Item it contains and add it to the library. """ @@ -1722,7 +1728,11 @@ def apply_choice(session, task): @pipeline.mutator_stage -def plugin_stage(session, func, task): +def plugin_stage( + session: ImportSession, + func: Callable[[ImportSession, ImportTask]], + task: ImportTask, +): """A coroutine (pipeline stage) that calls the given function with each non-skipped import task. These stages occur between applying metadata changes and moving/copying/writing files. @@ -1739,7 +1749,7 @@ def plugin_stage(session, func, task): @pipeline.stage -def manipulate_files(session, task): +def manipulate_files(session: ImportSession, task: ImportTask): """A coroutine (pipeline stage) that performs necessary file manipulations *after* items have been added to the library and finalizes each task. @@ -1784,7 +1794,7 @@ def log_files(session: ImportSession, task: ImportTask): log.info(" {0}", displayable_path(item["path"])) -def group_albums(session): +def group_albums(session: ImportSession): """A pipeline stage that groups the items of each task into albums using their metadata. @@ -1823,7 +1833,7 @@ def is_subdir_of_any_in_list(path, dirs): return any(d in ancestors for d in dirs) -def albums_in_dir(path): +def albums_in_dir(path: PathLike): """Recursively searches the given directory and returns an iterable of (paths, items) where paths is a list of directories and items is a list of Items that is probably an album. Specifically, any folder