Importer restructure (#5624)

## Description

Hello y'all, when working on the importer.py file in a previous
[PR](#5611) I noticed that this file grew quite large and badly needs a
restructuring. Restructuring should improve our ability to apply changes
to it in the future and isolate sub-functionalities within the importer.

### Overview

For now I only changed the structure keeping the code (mostly)
unchanged.

I split the functions and classes in the importer.py into the following
responsibilities:
- `importer/session.py` : Includes the `ImportSession` class.
- `importer/stages.py` : Includes all stage functions, I prefixed the
helper functions with a `_` to allow distinguishing between stages and
helper functions more easily.
- `importer/state.py` : Includes the logic for the `ImportState`
handling i.e. the resume feat.
- `importer/tasks.py` : Includes the `ImportTask` class and all derived
classes. Also includes the `Action` enum which I have renamed from
`action`.
- `importer/__init__.py` : Identified all public facing classes and
functions and added them to `__all__`

### Potential future changes

I don't want to add this to this PR but there are some places here where
I see possible improvements for our code:
- There are quite some config parsing related functions in the
ImportSession which could be isolated (see e.g. set_config,
want_resume). Maybe a mixin class which handles the config operations
could be useful?
- The ImportSession should be abstract if it is not used directly (I
think it shouldn't). The function definitions which raise NotImplemented
errors are quite weird imo and could be avoided by making the class
abstract.
- For me it was difficult to understand the flow of the importer as
stages call session function and it is not clear which function is
called by which stage and when. Maybe a naming convention for the stage
functions in conjunction with the session methods could help here. Not
sure how this will look in practice but right now it is quite hard to
follow imo. Alternatively splitting the session into a outfacing session
and a session context which is passed to the stages could help.
- The use of the stage decorator is highly inconsistent. Maybe a better
way to handle the stages could be found. This is more of a pipeline
related issue and not directly related to the restructuring but I think
it is worth mentioning.
- Similar to the ImportSession, I think the ImportTask should be
abstract as well, maybe we can put a bit more thought into the task
hierarchy. This might also automatically improve the flow of the
importer pipeline.

Am happy to tackle some of these issues in future PRs if you also think
they are worth it.

Best,
Sebastian


Note: This PR is based on #5611 and can only be merged once the typing
additions are accepted.
This commit is contained in:
Šarūnas Nejus 2025-05-17 14:54:55 +01:00 committed by GitHub
commit 050f8a5a5f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 1097 additions and 925 deletions

View file

@ -47,3 +47,5 @@ f36bc497c8c8f89004f3f6879908d3f0b25123e1
# 2025
# Fix formatting
c490ac5810b70f3cf5fd8649669838e8fdb19f4d
# Importer restructure
9147577b2b19f43ca827e9650261a86fb0450cef

View file

@ -0,0 +1,38 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Provides the basic, interface-agnostic workflow for importing and
autotagging music files.
"""
from .session import ImportAbortError, ImportSession
from .tasks import (
Action,
ArchiveImportTask,
ImportTask,
SentinelImportTask,
SingletonImportTask,
)
# Note: Stages are not exposed to the public API
__all__ = [
"ImportSession",
"ImportAbortError",
"Action",
"ImportTask",
"ArchiveImportTask",
"SentinelImportTask",
"SingletonImportTask",
]

306
beets/importer/session.py Normal file
View file

@ -0,0 +1,306 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
from __future__ import annotations
import os
import time
from typing import TYPE_CHECKING, Sequence
from beets import config, dbcore, library, logging, plugins, util
from beets.importer.tasks import Action
from beets.util import displayable_path, normpath, pipeline, syspath
from . import stages as stagefuncs
from .state import ImportState
if TYPE_CHECKING:
from beets.util import PathBytes
from .tasks import ImportTask
QUEUE_SIZE = 128
# Global logger.
log = logging.getLogger("beets")
class ImportAbortError(Exception):
"""Raised when the user aborts the tagging operation."""
pass
class ImportSession:
"""Controls an import action. Subclasses should implement methods to
communicate with the user or otherwise make decisions.
"""
logger: logging.Logger
paths: list[PathBytes]
lib: library.Library
_is_resuming: dict[bytes, bool]
_merged_items: set[PathBytes]
_merged_dirs: set[PathBytes]
def __init__(
self,
lib: library.Library,
loghandler: logging.Handler | None,
paths: Sequence[PathBytes] | None,
query: dbcore.Query | None,
):
"""Create a session.
Parameters
----------
lib : library.Library
The library instance to which items will be imported.
loghandler : logging.Handler or None
A logging handler to use for the session's logger. If None, a
NullHandler will be used.
paths : os.PathLike or None
The paths to be imported.
query : dbcore.Query or None
A query to filter items for import.
"""
self.lib = lib
self.logger = self._setup_logging(loghandler)
self.query = query
self._is_resuming = {}
self._merged_items = set()
self._merged_dirs = set()
# Normalize the paths.
self.paths = list(map(normpath, paths or []))
def _setup_logging(self, loghandler: logging.Handler | None):
logger = logging.getLogger(__name__)
logger.propagate = False
if not loghandler:
loghandler = logging.NullHandler()
logger.handlers = [loghandler]
return logger
def set_config(self, config):
"""Set `config` property from global import config and make
implied changes.
"""
# FIXME: Maybe this function should not exist and should instead
# provide "decision wrappers" like "should_resume()", etc.
iconfig = dict(config)
self.config = iconfig
# Incremental and progress are mutually exclusive.
if iconfig["incremental"]:
iconfig["resume"] = False
# When based on a query instead of directories, never
# save progress or try to resume.
if self.query is not None:
iconfig["resume"] = False
iconfig["incremental"] = False
if iconfig["reflink"]:
iconfig["reflink"] = iconfig["reflink"].as_choice(
["auto", True, False]
)
# Copy, move, reflink, link, and hardlink are mutually exclusive.
if iconfig["move"]:
iconfig["copy"] = False
iconfig["link"] = False
iconfig["hardlink"] = False
iconfig["reflink"] = False
elif iconfig["link"]:
iconfig["copy"] = False
iconfig["move"] = False
iconfig["hardlink"] = False
iconfig["reflink"] = False
elif iconfig["hardlink"]:
iconfig["copy"] = False
iconfig["move"] = False
iconfig["link"] = False
iconfig["reflink"] = False
elif iconfig["reflink"]:
iconfig["copy"] = False
iconfig["move"] = False
iconfig["link"] = False
iconfig["hardlink"] = False
# Only delete when copying.
if not iconfig["copy"]:
iconfig["delete"] = False
self.want_resume = config["resume"].as_choice([True, False, "ask"])
def tag_log(self, status, paths: Sequence[PathBytes]):
"""Log a message about a given album to the importer log. The status
should reflect the reason the album couldn't be tagged.
"""
self.logger.info("{0} {1}", status, displayable_path(paths))
def log_choice(self, task: ImportTask, duplicate=False):
"""Logs the task's current choice if it should be logged. If
``duplicate``, then this is a secondary choice after a duplicate was
detected and a decision was made.
"""
paths = task.paths
if duplicate:
# Duplicate: log all three choices (skip, keep both, and trump).
if task.should_remove_duplicates:
self.tag_log("duplicate-replace", paths)
elif task.choice_flag in (Action.ASIS, Action.APPLY):
self.tag_log("duplicate-keep", paths)
elif task.choice_flag is Action.SKIP:
self.tag_log("duplicate-skip", paths)
else:
# Non-duplicate: log "skip" and "asis" choices.
if task.choice_flag is Action.ASIS:
self.tag_log("asis", paths)
elif task.choice_flag is Action.SKIP:
self.tag_log("skip", paths)
def should_resume(self, path: PathBytes):
raise NotImplementedError
def choose_match(self, task: ImportTask):
raise NotImplementedError
def resolve_duplicate(self, task: ImportTask, found_duplicates):
raise NotImplementedError
def choose_item(self, task: ImportTask):
raise NotImplementedError
def run(self):
"""Run the import task."""
self.logger.info("import started {0}", time.asctime())
self.set_config(config["import"])
# Set up the pipeline.
if self.query is None:
stages = [stagefuncs.read_tasks(self)]
else:
stages = [stagefuncs.query_tasks(self)]
# In pretend mode, just log what would otherwise be imported.
if self.config["pretend"]:
stages += [stagefuncs.log_files(self)]
else:
if self.config["group_albums"] and not self.config["singletons"]:
# Split directory tasks into one task for each album.
stages += [stagefuncs.group_albums(self)]
# These stages either talk to the user to get a decision or,
# in the case of a non-autotagged import, just choose to
# import everything as-is. In *both* cases, these stages
# also add the music to the library database, so later
# stages need to read and write data from there.
if self.config["autotag"]:
stages += [
stagefuncs.lookup_candidates(self),
stagefuncs.user_query(self),
]
else:
stages += [stagefuncs.import_asis(self)]
# Plugin stages.
for stage_func in plugins.early_import_stages():
stages.append(stagefuncs.plugin_stage(self, stage_func))
for stage_func in plugins.import_stages():
stages.append(stagefuncs.plugin_stage(self, stage_func))
stages += [stagefuncs.manipulate_files(self)]
pl = pipeline.Pipeline(stages)
# Run the pipeline.
plugins.send("import_begin", session=self)
try:
if config["threaded"]:
pl.run_parallel(QUEUE_SIZE)
else:
pl.run_sequential()
except ImportAbortError:
# User aborted operation. Silently stop.
pass
# Incremental and resumed imports
def already_imported(self, toppath: PathBytes, paths: Sequence[PathBytes]):
"""Returns true if the files belonging to this task have already
been imported in a previous session.
"""
if self.is_resuming(toppath) and all(
[ImportState().progress_has_element(toppath, p) for p in paths]
):
return True
if self.config["incremental"] and tuple(paths) in self.history_dirs:
return True
return False
_history_dirs = None
@property
def history_dirs(self) -> set[tuple[PathBytes, ...]]:
# FIXME: This could be simplified to a cached property
if self._history_dirs is None:
self._history_dirs = ImportState().taghistory
return self._history_dirs
def already_merged(self, paths: Sequence[PathBytes]):
"""Returns true if all the paths being imported were part of a merge
during previous tasks.
"""
for path in paths:
if path not in self._merged_items and path not in self._merged_dirs:
return False
return True
def mark_merged(self, paths: Sequence[PathBytes]):
"""Mark paths and directories as merged for future reimport tasks."""
self._merged_items.update(paths)
dirs = {
os.path.dirname(path) if os.path.isfile(syspath(path)) else path
for path in paths
}
self._merged_dirs.update(dirs)
def is_resuming(self, toppath: PathBytes):
"""Return `True` if user wants to resume import of this path.
You have to call `ask_resume` first to determine the return value.
"""
return self._is_resuming.get(toppath, False)
def ask_resume(self, toppath: PathBytes):
"""If import of `toppath` was aborted in an earlier session, ask
user if they want to resume the import.
Determines the return value of `is_resuming(toppath)`.
"""
if self.want_resume and ImportState().progress_has(toppath):
# Either accept immediately or prompt for input to decide.
if self.want_resume is True or self.should_resume(toppath):
log.warning(
"Resuming interrupted import of {0}",
util.displayable_path(toppath),
)
self._is_resuming[toppath] = True
else:
# Clear progress; we're starting from the top.
ImportState().progress_reset(toppath)

396
beets/importer/stages.py Normal file
View file

@ -0,0 +1,396 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
from __future__ import annotations
import itertools
import logging
from typing import TYPE_CHECKING, Callable
from beets import config, plugins
from beets.util import MoveOperation, displayable_path, pipeline
from .tasks import (
Action,
ImportTask,
ImportTaskFactory,
SentinelImportTask,
SingletonImportTask,
)
if TYPE_CHECKING:
from beets import library
from .session import ImportSession
# Global logger.
log = logging.getLogger("beets")
# ---------------------------- Producer functions ---------------------------- #
# Functions that are called first i.e. they generate import tasks
def read_tasks(session: ImportSession):
"""A generator yielding all the albums (as ImportTask objects) found
in the user-specified list of paths. In the case of a singleton
import, yields single-item tasks instead.
"""
skipped = 0
for toppath in session.paths:
# Check whether we need to resume the import.
session.ask_resume(toppath)
# Generate tasks.
task_factory = ImportTaskFactory(toppath, session)
yield from task_factory.tasks()
skipped += task_factory.skipped
if not task_factory.imported:
log.warning("No files imported from {0}", displayable_path(toppath))
# Show skipped directories (due to incremental/resume).
if skipped:
log.info("Skipped {0} paths.", skipped)
def query_tasks(session: ImportSession):
"""A generator that works as a drop-in-replacement for read_tasks.
Instead of finding files from the filesystem, a query is used to
match items from the library.
"""
if session.config["singletons"]:
# Search for items.
for item in session.lib.items(session.query):
task = SingletonImportTask(None, item)
for task in task.handle_created(session):
yield task
else:
# Search for albums.
for album in session.lib.albums(session.query):
log.debug(
"yielding album {0}: {1} - {2}",
album.id,
album.albumartist,
album.album,
)
items = list(album.items())
_freshen_items(items)
task = ImportTask(None, [album.item_dir()], items)
for task in task.handle_created(session):
yield task
# ---------------------------------- Stages ---------------------------------- #
# Functions that process import tasks, may transform or filter them
# They are chained together in the pipeline e.g. stage2(stage1(task)) -> task
def group_albums(session: ImportSession):
"""A pipeline stage that groups the items of each task into albums
using their metadata.
Groups are identified using their artist and album fields. The
pipeline stage emits new album tasks for each discovered group.
"""
def group(item):
return (item.albumartist or item.artist, item.album)
task = None
while True:
task = yield task
if task.skip:
continue
tasks = []
sorted_items: list[library.Item] = sorted(task.items, key=group)
for _, items in itertools.groupby(sorted_items, group):
l_items = list(items)
task = ImportTask(task.toppath, [i.path for i in l_items], l_items)
tasks += task.handle_created(session)
tasks.append(SentinelImportTask(task.toppath, task.paths))
task = pipeline.multiple(tasks)
@pipeline.mutator_stage
def lookup_candidates(session: ImportSession, task: ImportTask):
"""A coroutine for performing the initial MusicBrainz lookup for an
album. It accepts lists of Items and yields
(items, cur_artist, cur_album, candidates, rec) tuples. If no match
is found, all of the yielded parameters (except items) are None.
"""
if task.skip:
# FIXME This gets duplicated a lot. We need a better
# abstraction.
return
plugins.send("import_task_start", session=session, task=task)
log.debug("Looking up: {0}", displayable_path(task.paths))
# Restrict the initial lookup to IDs specified by the user via the -m
# option. Currently all the IDs are passed onto the tasks directly.
task.search_ids = session.config["search_ids"].as_str_seq()
task.lookup_candidates()
@pipeline.stage
def user_query(session: ImportSession, task: ImportTask):
"""A coroutine for interfacing with the user about the tagging
process.
The coroutine accepts an ImportTask objects. It uses the
session's `choose_match` method to determine the `action` for
this task. Depending on the action additional stages are executed
and the processed task is yielded.
It emits the ``import_task_choice`` event for plugins. Plugins have
access to the choice via the ``task.choice_flag`` property and may
choose to change it.
"""
if task.skip:
return task
if session.already_merged(task.paths):
return pipeline.BUBBLE
# Ask the user for a choice.
task.choose_match(session)
plugins.send("import_task_choice", session=session, task=task)
# As-tracks: transition to singleton workflow.
if task.choice_flag is Action.TRACKS:
# Set up a little pipeline for dealing with the singletons.
def emitter(task):
for item in task.items:
task = SingletonImportTask(task.toppath, item)
yield from task.handle_created(session)
yield SentinelImportTask(task.toppath, task.paths)
return _extend_pipeline(
emitter(task), lookup_candidates(session), user_query(session)
)
# As albums: group items by albums and create task for each album
if task.choice_flag is Action.ALBUMS:
return _extend_pipeline(
[task],
group_albums(session),
lookup_candidates(session),
user_query(session),
)
_resolve_duplicates(session, task)
if task.should_merge_duplicates:
# Create a new task for tagging the current items
# and duplicates together
duplicate_items = task.duplicate_items(session.lib)
# Duplicates would be reimported so make them look "fresh"
_freshen_items(duplicate_items)
duplicate_paths = [item.path for item in duplicate_items]
# Record merged paths in the session so they are not reimported
session.mark_merged(duplicate_paths)
merged_task = ImportTask(
None, task.paths + duplicate_paths, task.items + duplicate_items
)
return _extend_pipeline(
[merged_task], lookup_candidates(session), user_query(session)
)
_apply_choice(session, task)
return task
@pipeline.mutator_stage
def import_asis(session: ImportSession, task: ImportTask):
"""Select the `action.ASIS` choice for all tasks.
This stage replaces the initial_lookup and user_query stages
when the importer is run without autotagging.
"""
if task.skip:
return
log.info("{}", displayable_path(task.paths))
task.set_choice(Action.ASIS)
_apply_choice(session, task)
@pipeline.mutator_stage
def plugin_stage(
session: ImportSession,
func: Callable[[ImportSession, ImportTask], None],
task: ImportTask,
):
"""A coroutine (pipeline stage) that calls the given function with
each non-skipped import task. These stages occur between applying
metadata changes and moving/copying/writing files.
"""
if task.skip:
return
func(session, task)
# Stage may modify DB, so re-load cached item data.
# FIXME Importer plugins should not modify the database but instead
# the albums and items attached to tasks.
task.reload()
@pipeline.stage
def log_files(session: ImportSession, task: ImportTask):
"""A coroutine (pipeline stage) to log each file to be imported."""
if isinstance(task, SingletonImportTask):
log.info("Singleton: {0}", displayable_path(task.item["path"]))
elif task.items:
log.info("Album: {0}", displayable_path(task.paths[0]))
for item in task.items:
log.info(" {0}", displayable_path(item["path"]))
# --------------------------------- Consumer --------------------------------- #
# Anything that should be placed last in the pipeline
# In theory every stage could be a consumer, but in practice there are some
# functions which are typically placed last in the pipeline
@pipeline.stage
def manipulate_files(session: ImportSession, task: ImportTask):
"""A coroutine (pipeline stage) that performs necessary file
manipulations *after* items have been added to the library and
finalizes each task.
"""
if not task.skip:
if task.should_remove_duplicates:
task.remove_duplicates(session.lib)
if session.config["move"]:
operation = MoveOperation.MOVE
elif session.config["copy"]:
operation = MoveOperation.COPY
elif session.config["link"]:
operation = MoveOperation.LINK
elif session.config["hardlink"]:
operation = MoveOperation.HARDLINK
elif session.config["reflink"] == "auto":
operation = MoveOperation.REFLINK_AUTO
elif session.config["reflink"]:
operation = MoveOperation.REFLINK
else:
operation = None
task.manipulate_files(
session=session,
operation=operation,
write=session.config["write"],
)
# Progress, cleanup, and event.
task.finalize(session)
# ---------------------------- Utility functions ----------------------------- #
# Private functions only used in the stages above
def _apply_choice(session: ImportSession, task: ImportTask):
"""Apply the task's choice to the Album or Item it contains and add
it to the library.
"""
if task.skip:
return
# Change metadata.
if task.apply:
task.apply_metadata()
plugins.send("import_task_apply", session=session, task=task)
task.add(session.lib)
# If ``set_fields`` is set, set those fields to the
# configured values.
# NOTE: This cannot be done before the ``task.add()`` call above,
# because then the ``ImportTask`` won't have an `album` for which
# it can set the fields.
if config["import"]["set_fields"]:
task.set_fields(session.lib)
def _resolve_duplicates(session: ImportSession, task: ImportTask):
"""Check if a task conflicts with items or albums already imported
and ask the session to resolve this.
"""
if task.choice_flag in (Action.ASIS, Action.APPLY, Action.RETAG):
found_duplicates = task.find_duplicates(session.lib)
if found_duplicates:
log.debug(
"found duplicates: {}".format([o.id for o in found_duplicates])
)
# Get the default action to follow from config.
duplicate_action = config["import"]["duplicate_action"].as_choice(
{
"skip": "s",
"keep": "k",
"remove": "r",
"merge": "m",
"ask": "a",
}
)
log.debug("default action for duplicates: {0}", duplicate_action)
if duplicate_action == "s":
# Skip new.
task.set_choice(Action.SKIP)
elif duplicate_action == "k":
# Keep both. Do nothing; leave the choice intact.
pass
elif duplicate_action == "r":
# Remove old.
task.should_remove_duplicates = True
elif duplicate_action == "m":
# Merge duplicates together
task.should_merge_duplicates = True
else:
# No default action set; ask the session.
session.resolve_duplicate(task, found_duplicates)
session.log_choice(task, True)
def _freshen_items(items):
# Clear IDs from re-tagged items so they appear "fresh" when
# we add them back to the library.
for item in items:
item.id = None
item.album_id = None
def _extend_pipeline(tasks, *stages):
# Return pipeline extension for stages with list of tasks
if isinstance(tasks, list):
task_iter = iter(tasks)
else:
task_iter = tasks
ipl = pipeline.Pipeline([task_iter] + list(stages))
return pipeline.multiple(ipl.pull())

142
beets/importer/state.py Normal file
View file

@ -0,0 +1,142 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
from __future__ import annotations
import logging
import os
import pickle
from bisect import bisect_left, insort
from dataclasses import dataclass
from typing import TYPE_CHECKING
from beets import config
if TYPE_CHECKING:
from beets.util import PathBytes
# Global logger.
log = logging.getLogger("beets")
@dataclass
class ImportState:
"""Representing the progress of an import task.
Opens the state file on creation of the class. If you want
to ensure the state is written to disk, you should use the
context manager protocol.
Tagprogress allows long tagging tasks to be resumed when they pause.
Taghistory is a utility for manipulating the "incremental" import log.
This keeps track of all directories that were ever imported, which
allows the importer to only import new stuff.
Usage
-----
```
# Readonly
progress = ImportState().tagprogress
# Read and write
with ImportState() as state:
state["key"] = "value"
```
"""
tagprogress: dict[PathBytes, list[PathBytes]]
taghistory: set[tuple[PathBytes, ...]]
path: PathBytes
def __init__(self, readonly=False, path: PathBytes | None = None):
self.path = path or os.fsencode(config["statefile"].as_filename())
self.tagprogress = {}
self.taghistory = set()
self._open()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._save()
def _open(
self,
):
try:
with open(self.path, "rb") as f:
state = pickle.load(f)
# Read the states
self.tagprogress = state.get("tagprogress", {})
self.taghistory = state.get("taghistory", set())
except Exception as exc:
# The `pickle` module can emit all sorts of exceptions during
# unpickling, including ImportError. We use a catch-all
# exception to avoid enumerating them all (the docs don't even have a
# full list!).
log.debug("state file could not be read: {0}", exc)
def _save(self):
try:
with open(self.path, "wb") as f:
pickle.dump(
{
"tagprogress": self.tagprogress,
"taghistory": self.taghistory,
},
f,
)
except OSError as exc:
log.error("state file could not be written: {0}", exc)
# -------------------------------- Tagprogress ------------------------------- #
def progress_add(self, toppath: PathBytes, *paths: PathBytes):
"""Record that the files under all of the `paths` have been imported
under `toppath`.
"""
with self as state:
imported = state.tagprogress.setdefault(toppath, [])
for path in paths:
if imported and imported[-1] <= path:
imported.append(path)
else:
insort(imported, path)
def progress_has_element(self, toppath: PathBytes, path: PathBytes) -> bool:
"""Return whether `path` has been imported in `toppath`."""
imported = self.tagprogress.get(toppath, [])
i = bisect_left(imported, path)
return i != len(imported) and imported[i] == path
def progress_has(self, toppath: PathBytes) -> bool:
"""Return `True` if there exist paths that have already been
imported under `toppath`.
"""
return toppath in self.tagprogress
def progress_reset(self, toppath: PathBytes | None):
"""Reset the progress for `toppath`."""
with self as state:
if toppath in state.tagprogress:
del state.tagprogress[toppath]
# -------------------------------- Taghistory -------------------------------- #
def history_add(self, paths: list[PathBytes]):
"""Add the paths to the history."""
with self as state:
state.taghistory.add(tuple(paths))

File diff suppressed because it is too large Load diff

View file

@ -659,9 +659,9 @@ class ImportSessionFixture(ImportSession):
>>> lib = Library(':memory:')
>>> importer = ImportSessionFixture(lib, paths=['/path/to/import'])
>>> importer.add_choice(importer.action.SKIP)
>>> importer.add_choice(importer.action.ASIS)
>>> importer.default_choice = importer.action.APPLY
>>> importer.add_choice(importer.Action.SKIP)
>>> importer.add_choice(importer.Action.ASIS)
>>> importer.default_choice = importer.Action.APPLY
>>> importer.run()
This imports ``/path/to/import`` into `lib`. It skips the first
@ -674,7 +674,7 @@ class ImportSessionFixture(ImportSession):
self._choices = []
self._resolutions = []
default_choice = importer.action.APPLY
default_choice = importer.Action.APPLY
def add_choice(self, choice):
self._choices.append(choice)
@ -688,7 +688,7 @@ class ImportSessionFixture(ImportSession):
except IndexError:
choice = self.default_choice
if choice == importer.action.APPLY:
if choice == importer.Action.APPLY:
return task.candidates[0]
elif isinstance(choice, int):
return task.candidates[choice - 1]
@ -708,7 +708,7 @@ class ImportSessionFixture(ImportSession):
res = self.default_resolution
if res == self.Resolution.SKIP:
task.set_choice(importer.action.SKIP)
task.set_choice(importer.Action.SKIP)
elif res == self.Resolution.REMOVE:
task.should_remove_duplicates = True
elif res == self.Resolution.MERGE:
@ -721,7 +721,7 @@ class TerminalImportSessionFixture(TerminalImportSession):
super().__init__(*args, **kwargs)
self._choices = []
default_choice = importer.action.APPLY
default_choice = importer.Action.APPLY
def add_choice(self, choice):
self._choices.append(choice)
@ -743,15 +743,15 @@ class TerminalImportSessionFixture(TerminalImportSession):
except IndexError:
choice = self.default_choice
if choice == importer.action.APPLY:
if choice == importer.Action.APPLY:
self.io.addinput("A")
elif choice == importer.action.ASIS:
elif choice == importer.Action.ASIS:
self.io.addinput("U")
elif choice == importer.action.ALBUMS:
elif choice == importer.Action.ALBUMS:
self.io.addinput("G")
elif choice == importer.action.TRACKS:
elif choice == importer.Action.TRACKS:
self.io.addinput("T")
elif choice == importer.action.SKIP:
elif choice == importer.Action.SKIP:
self.io.addinput("S")
else:
self.io.addinput("M")

View file

@ -811,12 +811,12 @@ def _summary_judgment(rec):
if config["import"]["quiet"]:
if rec == Recommendation.strong:
return importer.action.APPLY
return importer.Action.APPLY
else:
action = config["import"]["quiet_fallback"].as_choice(
{
"skip": importer.action.SKIP,
"asis": importer.action.ASIS,
"skip": importer.Action.SKIP,
"asis": importer.Action.ASIS,
}
)
elif config["import"]["timid"]:
@ -824,17 +824,17 @@ def _summary_judgment(rec):
elif rec == Recommendation.none:
action = config["import"]["none_rec_action"].as_choice(
{
"skip": importer.action.SKIP,
"asis": importer.action.ASIS,
"skip": importer.Action.SKIP,
"asis": importer.Action.ASIS,
"ask": None,
}
)
else:
return None
if action == importer.action.SKIP:
if action == importer.Action.SKIP:
print_("Skipping.")
elif action == importer.action.ASIS:
elif action == importer.Action.ASIS:
print_("Importing as-is.")
return action
@ -1064,7 +1064,7 @@ class TerminalImportSession(importer.ImportSession):
# Take immediate action if appropriate.
action = _summary_judgment(task.rec)
if action == importer.action.APPLY:
if action == importer.Action.APPLY:
match = task.candidates[0]
show_change(task.cur_artist, task.cur_album, match)
return match
@ -1074,7 +1074,7 @@ class TerminalImportSession(importer.ImportSession):
# Loop until we have a choice.
while True:
# Ask for a choice from the user. The result of
# `choose_candidate` may be an `importer.action`, an
# `choose_candidate` may be an `importer.Action`, an
# `AlbumMatch` object for a specific selection, or a
# `PromptChoice`.
choices = self._get_choices(task)
@ -1089,7 +1089,7 @@ class TerminalImportSession(importer.ImportSession):
)
# Basic choices that require no more action here.
if choice in (importer.action.SKIP, importer.action.ASIS):
if choice in (importer.Action.SKIP, importer.Action.ASIS):
# Pass selection to main control flow.
return choice
@ -1097,7 +1097,7 @@ class TerminalImportSession(importer.ImportSession):
# function.
elif choice in choices:
post_choice = choice.callback(self, task)
if isinstance(post_choice, importer.action):
if isinstance(post_choice, importer.Action):
return post_choice
elif isinstance(post_choice, autotag.Proposal):
# Use the new candidates and continue around the loop.
@ -1121,7 +1121,7 @@ class TerminalImportSession(importer.ImportSession):
# Take immediate action if appropriate.
action = _summary_judgment(task.rec)
if action == importer.action.APPLY:
if action == importer.Action.APPLY:
match = candidates[0]
show_item_change(task.item, match)
return match
@ -1135,12 +1135,12 @@ class TerminalImportSession(importer.ImportSession):
candidates, True, rec, item=task.item, choices=choices
)
if choice in (importer.action.SKIP, importer.action.ASIS):
if choice in (importer.Action.SKIP, importer.Action.ASIS):
return choice
elif choice in choices:
post_choice = choice.callback(self, task)
if isinstance(post_choice, importer.action):
if isinstance(post_choice, importer.Action):
return post_choice
elif isinstance(post_choice, autotag.Proposal):
candidates = post_choice.candidates
@ -1203,7 +1203,7 @@ class TerminalImportSession(importer.ImportSession):
if sel == "s":
# Skip new.
task.set_choice(importer.action.SKIP)
task.set_choice(importer.Action.SKIP)
elif sel == "k":
# Keep both. Do nothing; leave the choice intact.
pass
@ -1239,16 +1239,16 @@ class TerminalImportSession(importer.ImportSession):
"""
# Standard, built-in choices.
choices = [
PromptChoice("s", "Skip", lambda s, t: importer.action.SKIP),
PromptChoice("u", "Use as-is", lambda s, t: importer.action.ASIS),
PromptChoice("s", "Skip", lambda s, t: importer.Action.SKIP),
PromptChoice("u", "Use as-is", lambda s, t: importer.Action.ASIS),
]
if task.is_album:
choices += [
PromptChoice(
"t", "as Tracks", lambda s, t: importer.action.TRACKS
"t", "as Tracks", lambda s, t: importer.Action.TRACKS
),
PromptChoice(
"g", "Group albums", lambda s, t: importer.action.ALBUMS
"g", "Group albums", lambda s, t: importer.Action.ALBUMS
),
]
choices += [

View file

@ -69,6 +69,10 @@ BytesOrStr = Union[str, bytes]
PathLike = Union[BytesOrStr, Path]
Replacements: TypeAlias = "Sequence[tuple[Pattern[str], str]]"
# Here for now to allow for a easy replace later on
# once we can move to a PathLike (mainly used in importer)
PathBytes = bytes
class HumanReadableError(Exception):
"""An Exception that can include a human-readable error message to

View file

@ -194,7 +194,7 @@ class BadFiles(BeetsPlugin):
sel = ui.input_options(["aBort", "skip", "continue"])
if sel == "s":
return importer.action.SKIP
return importer.Action.SKIP
elif sel == "c":
return None
elif sel == "b":

View file

@ -24,7 +24,7 @@ import yaml
from beets import plugins, ui, util
from beets.dbcore import types
from beets.importer import action
from beets.importer import Action
from beets.ui.commands import PromptChoice, _do_query
# These "safe" types can avoid the format/parse cycle that most fields go
@ -380,9 +380,9 @@ class EditPlugin(plugins.BeetsPlugin):
# Save the new data.
if success:
# Return action.RETAG, which makes the importer write the tags
# Return Action.RETAG, which makes the importer write the tags
# to the files if needed without re-applying metadata.
return action.RETAG
return Action.RETAG
else:
# Edit cancelled / no edits made. Revert changes.
for obj in task.items:

View file

@ -1306,12 +1306,12 @@ class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
):
# Album already has art (probably a re-import); skip it.
return
if task.choice_flag == importer.action.ASIS:
if task.choice_flag == importer.Action.ASIS:
# For as-is imports, don't search Web sources for art.
local = True
elif task.choice_flag in (
importer.action.APPLY,
importer.action.RETAG,
importer.Action.APPLY,
importer.Action.RETAG,
):
# Search everywhere for art.
local = False

View file

@ -15,7 +15,7 @@
"""Warns you about things you hate (or even blocks import)."""
from beets.importer import action
from beets.importer import Action
from beets.library import Album, Item, parse_query_string
from beets.plugins import BeetsPlugin
@ -65,11 +65,11 @@ class IHatePlugin(BeetsPlugin):
skip_queries = self.config["skip"].as_str_seq()
warn_queries = self.config["warn"].as_str_seq()
if task.choice_flag == action.APPLY:
if task.choice_flag == Action.APPLY:
if skip_queries or warn_queries:
self._log.debug("processing your hate")
if self.do_i_hate_this(task, skip_queries):
task.choice_flag = action.SKIP
task.choice_flag = Action.SKIP
self._log.info("skipped: {0}", summary(task))
return
if self.do_i_hate_this(task, warn_queries):

View file

@ -19,7 +19,7 @@ import re
import confuse
from mediafile import MediaFile
from beets.importer import action
from beets.importer import Action
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, decargs, input_yn
@ -105,7 +105,7 @@ class ZeroPlugin(BeetsPlugin):
self.fields_to_progs[field] = []
def import_task_choice_event(self, session, task):
if task.choice_flag == action.ASIS and not self.warned:
if task.choice_flag == Action.ASIS and not self.warned:
self._log.warning('cannot zero in "as-is" mode')
self.warned = True
# TODO request write in as-is mode

View file

@ -648,6 +648,6 @@ by the choices on the core importer prompt, and hence should not be used:
``a``, ``s``, ``u``, ``t``, ``g``, ``e``, ``i``, ``b``.
Additionally, the callback function can optionally specify the next action to
be performed by returning a ``importer.action`` value. It may also return a
be performed by returning a ``importer.Action`` value. It may also return a
``autotag.Proposal`` value to update the set of current proposals to be
considered.

View file

@ -57,7 +57,7 @@ class ImportAddedTest(PluginMixin, AutotagImportTestCase):
os.path.getmtime(mfile.path) for mfile in self.import_media
)
self.importer = self.setup_importer()
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
def find_media_file(self, item):
"""Find the pre-import MediaFile for an Item"""

View file

@ -34,7 +34,7 @@ from mediafile import MediaFile
from beets import config, importer, logging, util
from beets.autotag import AlbumInfo, AlbumMatch, TrackInfo
from beets.importer import albums_in_dir
from beets.importer.tasks import albums_in_dir
from beets.test import _common
from beets.test.helper import (
NEEDS_REFLINK,
@ -273,52 +273,52 @@ class ImportSingletonTest(AutotagImportTestCase):
def test_apply_asis_adds_track(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.items().get().title == "Tag Track 1"
def test_apply_asis_does_not_add_album(self):
assert self.lib.albums().get() is None
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.albums().get() is None
def test_apply_asis_adds_singleton_path(self):
self.assert_lib_dir_empty()
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
self.assert_file_in_lib(b"singletons", b"Tag Track 1.mp3")
def test_apply_candidate_adds_track(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().title == "Applied Track 1"
def test_apply_candidate_does_not_add_album(self):
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.albums().get() is None
def test_apply_candidate_adds_singleton_path(self):
self.assert_lib_dir_empty()
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assert_file_in_lib(b"singletons", b"Applied Track 1.mp3")
def test_skip_does_not_add_first_track(self):
self.importer.add_choice(importer.action.SKIP)
self.importer.add_choice(importer.Action.SKIP)
self.importer.run()
assert self.lib.items().get() is None
def test_skip_adds_other_tracks(self):
self.prepare_album_for_import(2)
self.importer.add_choice(importer.action.SKIP)
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.SKIP)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert len(self.lib.items()) == 1
@ -334,8 +334,8 @@ class ImportSingletonTest(AutotagImportTestCase):
self.setup_importer()
self.importer.paths = import_files
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert len(self.lib.items()) == 2
@ -355,7 +355,7 @@ class ImportSingletonTest(AutotagImportTestCase):
# As-is item import.
assert self.lib.albums().get() is None
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
for item in self.lib.items():
@ -370,7 +370,7 @@ class ImportSingletonTest(AutotagImportTestCase):
# Autotagged.
assert self.lib.albums().get() is None
self.importer.clear_choices()
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
for item in self.lib.items():
@ -392,41 +392,41 @@ class ImportTest(AutotagImportTestCase):
def test_apply_asis_adds_album(self):
assert self.lib.albums().get() is None
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.albums().get().album == "Tag Album"
def test_apply_asis_adds_tracks(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.items().get().title == "Tag Track 1"
def test_apply_asis_adds_album_path(self):
self.assert_lib_dir_empty()
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
self.assert_file_in_lib(b"Tag Artist", b"Tag Album", b"Tag Track 1.mp3")
def test_apply_candidate_adds_album(self):
assert self.lib.albums().get() is None
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.albums().get().album == "Applied Album"
def test_apply_candidate_adds_tracks(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().title == "Applied Track 1"
def test_apply_candidate_adds_album_path(self):
self.assert_lib_dir_empty()
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assert_file_in_lib(
b"Applied Artist", b"Applied Album", b"Applied Track 1.mp3"
@ -439,14 +439,14 @@ class ImportTest(AutotagImportTestCase):
mediafile.genre = "Tag Genre"
mediafile.save()
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().genre == ""
def test_apply_from_scratch_keeps_format(self):
config["import"]["from_scratch"] = True
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().format == "MP3"
@ -454,7 +454,7 @@ class ImportTest(AutotagImportTestCase):
config["import"]["from_scratch"] = True
bitrate = 80000
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().bitrate == bitrate
@ -464,7 +464,7 @@ class ImportTest(AutotagImportTestCase):
import_file = os.path.join(self.import_dir, b"album", b"track_1.mp3")
self.assertExists(import_file)
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assertNotExists(import_file)
@ -474,26 +474,26 @@ class ImportTest(AutotagImportTestCase):
import_file = os.path.join(self.import_dir, b"album", b"track_1.mp3")
self.assertExists(import_file)
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assertNotExists(import_file)
def test_skip_does_not_add_track(self):
self.importer.add_choice(importer.action.SKIP)
self.importer.add_choice(importer.Action.SKIP)
self.importer.run()
assert self.lib.items().get() is None
def test_skip_non_album_dirs(self):
self.assertIsDir(os.path.join(self.import_dir, b"album"))
self.touch(b"cruft", dir=self.import_dir)
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert len(self.lib.albums()) == 1
def test_unmatched_tracks_not_added(self):
self.prepare_album_for_import(2)
self.matcher.matching = self.matcher.MISSING
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert len(self.lib.items()) == 1
@ -520,7 +520,7 @@ class ImportTest(AutotagImportTestCase):
def test_asis_no_data_source(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
with pytest.raises(AttributeError):
@ -542,7 +542,7 @@ class ImportTest(AutotagImportTestCase):
# As-is album import.
assert self.lib.albums().get() is None
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
for album in self.lib.albums():
@ -564,7 +564,7 @@ class ImportTest(AutotagImportTestCase):
# Autotagged.
assert self.lib.albums().get() is None
self.importer.clear_choices()
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
for album in self.lib.albums():
@ -594,9 +594,9 @@ class ImportTracksTest(AutotagImportTestCase):
assert self.lib.items().get() is None
assert self.lib.albums().get() is None
self.importer.add_choice(importer.action.TRACKS)
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.TRACKS)
self.importer.add_choice(importer.Action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().title == "Applied Track 1"
assert self.lib.albums().get() is None
@ -604,9 +604,9 @@ class ImportTracksTest(AutotagImportTestCase):
def test_apply_tracks_adds_singleton_path(self):
self.assert_lib_dir_empty()
self.importer.add_choice(importer.action.TRACKS)
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.TRACKS)
self.importer.add_choice(importer.Action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assert_file_in_lib(b"singletons", b"Applied Track 1.mp3")
@ -620,7 +620,7 @@ class ImportCompilationTest(AutotagImportTestCase):
self.setup_importer()
def test_asis_homogenous_sets_albumartist(self):
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.albums().get().albumartist == "Tag Artist"
for item in self.lib.items():
@ -632,7 +632,7 @@ class ImportCompilationTest(AutotagImportTestCase):
self.import_media[1].artist = "Another Artist"
self.import_media[1].save()
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.albums().get().albumartist == "Various Artists"
for item in self.lib.items():
@ -644,7 +644,7 @@ class ImportCompilationTest(AutotagImportTestCase):
self.import_media[1].artist = "Another Artist"
self.import_media[1].save()
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
for item in self.lib.items():
assert item.comp
@ -655,7 +655,7 @@ class ImportCompilationTest(AutotagImportTestCase):
self.import_media[1].artist = "Other Artist"
self.import_media[1].save()
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.albums().get().albumartist == "Other Artist"
for item in self.lib.items():
@ -669,7 +669,7 @@ class ImportCompilationTest(AutotagImportTestCase):
mediafile.mb_albumartistid = "Album Artist ID"
mediafile.save()
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.albums().get().albumartist == "Album Artist"
assert self.lib.albums().get().mb_albumartistid == "Album Artist ID"
@ -688,7 +688,7 @@ class ImportCompilationTest(AutotagImportTestCase):
mediafile.mb_albumartistid = "Album Artist ID"
mediafile.save()
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.albums().get().albumartist == "Album Artist"
assert self.lib.albums().get().albumartists == [
@ -730,7 +730,7 @@ class ImportExistingTest(AutotagImportTestCase):
self.importer.run()
assert len(self.lib.items()) == 1
self.reimporter.add_choice(importer.action.APPLY)
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
assert len(self.lib.items()) == 1
@ -738,18 +738,18 @@ class ImportExistingTest(AutotagImportTestCase):
self.importer.run()
assert len(self.lib.albums()) == 1
self.reimporter.add_choice(importer.action.APPLY)
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
assert len(self.lib.albums()) == 1
def test_does_not_duplicate_singleton_track(self):
self.importer.add_choice(importer.action.TRACKS)
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.TRACKS)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert len(self.lib.items()) == 1
self.reimporter.add_choice(importer.action.TRACKS)
self.reimporter.add_choice(importer.action.APPLY)
self.reimporter.add_choice(importer.Action.TRACKS)
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
assert len(self.lib.items()) == 1
@ -759,7 +759,7 @@ class ImportExistingTest(AutotagImportTestCase):
medium.title = "New Title"
medium.save()
self.reimporter.add_choice(importer.action.ASIS)
self.reimporter.add_choice(importer.Action.ASIS)
self.reimporter.run()
assert self.lib.items().get().title == "New Title"
@ -774,7 +774,7 @@ class ImportExistingTest(AutotagImportTestCase):
)
self.assert_file_in_lib(old_path)
self.reimporter.add_choice(importer.action.ASIS)
self.reimporter.add_choice(importer.Action.ASIS)
self.reimporter.run()
self.assert_file_in_lib(
b"Applied Artist", b"Applied Album", b"New Title.mp3"
@ -793,7 +793,7 @@ class ImportExistingTest(AutotagImportTestCase):
self.assert_file_in_lib(old_path)
config["import"]["copy"] = False
self.reimporter.add_choice(importer.action.ASIS)
self.reimporter.add_choice(importer.Action.ASIS)
self.reimporter.run()
self.assert_file_not_in_lib(
b"Applied Artist", b"Applied Album", b"New Title.mp3"
@ -808,7 +808,7 @@ class ImportExistingTest(AutotagImportTestCase):
)
self.reimporter = self.setup_importer()
self.reimporter.add_choice(importer.action.APPLY)
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
new_path = os.path.join(
b"Applied Artist", b"Applied Album", b"Applied Track 1.mp3"
@ -827,7 +827,7 @@ class ImportExistingTest(AutotagImportTestCase):
)
self.reimporter = self.setup_importer(move=True)
self.reimporter.add_choice(importer.action.APPLY)
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
self.assertNotExists(self.import_media[0].path)
@ -841,9 +841,9 @@ class GroupAlbumsImportTest(AutotagImportTestCase):
self.setup_importer()
# Split tracks into two albums and use both as-is
self.importer.add_choice(importer.action.ALBUMS)
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.action.ASIS)
self.importer.add_choice(importer.Action.ALBUMS)
self.importer.add_choice(importer.Action.ASIS)
self.importer.add_choice(importer.Action.ASIS)
def test_add_album_for_different_artist_and_different_album(self):
self.import_media[0].artist = "Artist B"
@ -896,7 +896,7 @@ class GlobalGroupAlbumsImportTest(GroupAlbumsImportTest):
def setUp(self):
super().setUp()
self.importer.clear_choices()
self.importer.default_choice = importer.action.ASIS
self.importer.default_choice = importer.Action.ASIS
config["import"]["group_albums"] = True
@ -939,7 +939,7 @@ class InferAlbumDataTest(BeetsTestCase):
)
def test_asis_homogenous_single_artist(self):
self.task.set_choice(importer.action.ASIS)
self.task.set_choice(importer.Action.ASIS)
self.task.align_album_level_fields()
assert not self.items[0].comp
assert self.items[0].albumartist == self.items[2].artist
@ -947,7 +947,7 @@ class InferAlbumDataTest(BeetsTestCase):
def test_asis_heterogenous_va(self):
self.items[0].artist = "another artist"
self.items[1].artist = "some other artist"
self.task.set_choice(importer.action.ASIS)
self.task.set_choice(importer.Action.ASIS)
self.task.align_album_level_fields()
@ -957,7 +957,7 @@ class InferAlbumDataTest(BeetsTestCase):
def test_asis_comp_applied_to_all_items(self):
self.items[0].artist = "another artist"
self.items[1].artist = "some other artist"
self.task.set_choice(importer.action.ASIS)
self.task.set_choice(importer.Action.ASIS)
self.task.align_album_level_fields()
@ -967,7 +967,7 @@ class InferAlbumDataTest(BeetsTestCase):
def test_asis_majority_artist_single_artist(self):
self.items[0].artist = "another artist"
self.task.set_choice(importer.action.ASIS)
self.task.set_choice(importer.Action.ASIS)
self.task.align_album_level_fields()
@ -980,7 +980,7 @@ class InferAlbumDataTest(BeetsTestCase):
for item in self.items:
item.albumartist = "some album artist"
item.mb_albumartistid = "some album artist id"
self.task.set_choice(importer.action.ASIS)
self.task.set_choice(importer.Action.ASIS)
self.task.align_album_level_fields()
@ -1009,7 +1009,7 @@ class InferAlbumDataTest(BeetsTestCase):
def test_small_single_artist_album(self):
self.items = [self.items[0]]
self.task.items = self.items
self.task.set_choice(importer.action.ASIS)
self.task.set_choice(importer.Action.ASIS)
self.task.align_album_level_fields()
assert not self.items[0].comp
@ -1505,7 +1505,7 @@ class ReimportTest(AutotagImportTestCase):
def _setup_session(self, singletons=False):
self.setup_importer(import_dir=self.libdir, singletons=singletons)
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
def _album(self):
return self.lib.albums().get()
@ -1697,7 +1697,7 @@ class ImportIdTest(ImportTestCase):
def test_one_mbid_one_album(self):
self.setup_importer(search_ids=[self.ID_RELEASE_0])
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.albums().get().album == "VALID_RELEASE_0"
@ -1705,14 +1705,14 @@ class ImportIdTest(ImportTestCase):
self.setup_importer(search_ids=[self.ID_RELEASE_0, self.ID_RELEASE_1])
self.importer.add_choice(2) # Pick the 2nd best match (release 1).
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.albums().get().album == "VALID_RELEASE_1"
def test_one_mbid_one_singleton(self):
self.setup_singleton_importer(search_ids=[self.ID_RECORDING_0])
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().title == "VALID_RECORDING_0"
@ -1722,7 +1722,7 @@ class ImportIdTest(ImportTestCase):
)
self.importer.add_choice(2) # Pick the 2nd best match (recording 1).
self.importer.add_choice(importer.action.APPLY)
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().title == "VALID_RECORDING_1"

View file

@ -24,10 +24,10 @@ from mediafile import MediaFile
from beets import config, plugins, ui
from beets.dbcore import types
from beets.importer import (
Action,
ArchiveImportTask,
SentinelImportTask,
SingletonImportTask,
action,
)
from beets.library import Item
from beets.plugins import MetadataSourcePlugin
@ -389,7 +389,7 @@ class PromptChoicesTest(TerminalImportMixin, PluginImportTestCase):
"aBort",
) + ("Foo", "baR")
self.importer.add_choice(action.SKIP)
self.importer.add_choice(Action.SKIP)
self.importer.run()
self.mock_input_options.assert_called_once_with(
opts, default="a", require=ANY
@ -424,7 +424,7 @@ class PromptChoicesTest(TerminalImportMixin, PluginImportTestCase):
) + ("Foo", "baR")
config["import"]["singletons"] = True
self.importer.add_choice(action.SKIP)
self.importer.add_choice(Action.SKIP)
self.importer.run()
self.mock_input_options.assert_called_with(
opts, default="a", require=ANY
@ -461,7 +461,7 @@ class PromptChoicesTest(TerminalImportMixin, PluginImportTestCase):
"enter Id",
"aBort",
) + ("baZ",)
self.importer.add_choice(action.SKIP)
self.importer.add_choice(Action.SKIP)
self.importer.run()
self.mock_input_options.assert_called_once_with(
opts, default="a", require=ANY
@ -523,7 +523,7 @@ class PromptChoicesTest(TerminalImportMixin, PluginImportTestCase):
return [ui.commands.PromptChoice("f", "Foo", self.foo)]
def foo(self, session, task):
return action.SKIP
return Action.SKIP
self.register_plugin(DummyPlugin)
# Default options + extra choices by the plugin ('Foo', 'Bar')