refactor: moved some more imports that are only used in the commands

in their respective files. Also fixed some imports
This commit is contained in:
Sebastian Mohr 2025-11-03 14:03:25 +01:00
parent a59e41a883
commit 25ae330044
15 changed files with 186 additions and 172 deletions

View file

@ -1111,76 +1111,9 @@ def show_model_changes(
return bool(changes) return bool(changes)
def show_path_changes(path_changes):
"""Given a list of tuples (source, destination) that indicate the
path changes, log the changes as INFO-level output to the beets log.
The output is guaranteed to be unicode.
Every pair is shown on a single line if the terminal width permits it,
else it is split over two lines. E.g.,
Source -> Destination
vs.
Source
-> Destination
"""
sources, destinations = zip(*path_changes)
# Ensure unicode output
sources = list(map(util.displayable_path, sources))
destinations = list(map(util.displayable_path, destinations))
# Calculate widths for terminal split
col_width = (term_width() - len(" -> ")) // 2
max_width = len(max(sources + destinations, key=len))
if max_width > col_width:
# Print every change over two lines
for source, dest in zip(sources, destinations):
color_source, color_dest = colordiff(source, dest)
print_(f"{color_source} \n -> {color_dest}")
else:
# Print every change on a single line, and add a header
title_pad = max_width - len("Source ") + len(" -> ")
print_(f"Source {' ' * title_pad} Destination")
for source, dest in zip(sources, destinations):
pad = max_width - len(source)
color_source, color_dest = colordiff(source, dest)
print_(f"{color_source} {' ' * pad} -> {color_dest}")
# Helper functions for option parsing. # Helper functions for option parsing.
def _store_dict(option, opt_str, value, parser):
"""Custom action callback to parse options which have ``key=value``
pairs as values. All such pairs passed for this option are
aggregated into a dictionary.
"""
dest = option.dest
option_values = getattr(parser.values, dest, None)
if option_values is None:
# This is the first supplied ``key=value`` pair of option.
# Initialize empty dictionary and get a reference to it.
setattr(parser.values, dest, {})
option_values = getattr(parser.values, dest)
try:
key, value = value.split("=", 1)
if not (key and value):
raise ValueError
except ValueError:
raise UserError(
f"supplied argument `{value}' is not of the form `key=value'"
)
option_values[key] = value
class CommonOptionsParser(optparse.OptionParser): class CommonOptionsParser(optparse.OptionParser):
"""Offers a simple way to add common formatting options. """Offers a simple way to add common formatting options.
@ -1666,7 +1599,7 @@ def _raw_main(args: list[str], lib=None) -> None:
and subargs[0] == "config" and subargs[0] == "config"
and ("-e" in subargs or "--edit" in subargs) and ("-e" in subargs or "--edit" in subargs)
): ):
from beets.ui.commands import config_edit from beets.ui.commands.config import config_edit
return config_edit() return config_edit()

View file

@ -32,6 +32,21 @@ from .update import update_cmd
from .version import version_cmd from .version import version_cmd
from .write import write_cmd from .write import write_cmd
def __getattr__(name: str):
"""Handle deprecated imports."""
return deprecate_imports(
old_module=__name__,
new_module_by_name={
"TerminalImportSession": "beets.ui.commands.import_.session",
"PromptChoice": "beets.ui.commands.import_.session",
# TODO: We might want to add more deprecated imports here
},
name=name,
version="3.0.0",
)
# The list of default subcommands. This is populated with Subcommand # The list of default subcommands. This is populated with Subcommand
# objects that can be fed to a SubcommandsOptionParser. # objects that can be fed to a SubcommandsOptionParser.
default_commands = [ default_commands = [

View file

@ -1,67 +0,0 @@
"""Utility functions for beets UI commands."""
import os
from beets import ui
from beets.util import displayable_path, normpath, syspath
def do_query(lib, query, album, also_items=True):
"""For commands that operate on matched items, performs a query
and returns a list of matching items and a list of matching
albums. (The latter is only nonempty when album is True.) Raises
a UserError if no items match. also_items controls whether, when
fetching albums, the associated items should be fetched also.
"""
if album:
albums = list(lib.albums(query))
items = []
if also_items:
for al in albums:
items += al.items()
else:
albums = []
items = list(lib.items(query))
if album and not albums:
raise ui.UserError("No matching albums found.")
elif not album and not items:
raise ui.UserError("No matching items found.")
return items, albums
def paths_from_logfile(path):
"""Parse the logfile and yield skipped paths to pass to the `import`
command.
"""
with open(path, encoding="utf-8") as fp:
for i, line in enumerate(fp, start=1):
verb, sep, paths = line.rstrip("\n").partition(" ")
if not sep:
raise ValueError(f"line {i} is invalid")
# Ignore informational lines that don't need to be re-imported.
if verb in {"import", "duplicate-keep", "duplicate-replace"}:
continue
if verb not in {"asis", "skip", "duplicate-skip"}:
raise ValueError(f"line {i} contains unknown verb {verb}")
yield os.path.commonpath(paths.split("; "))
def parse_logfiles(logfiles):
"""Parse all `logfiles` and yield paths from it."""
for logfile in logfiles:
try:
yield from paths_from_logfile(syspath(normpath(logfile)))
except ValueError as err:
raise ui.UserError(
f"malformed logfile {displayable_path(logfile)}: {err}"
) from err
except OSError as err:
raise ui.UserError(
f"unreadable logfile {displayable_path(logfile)}: {err}"
) from err

View file

@ -48,7 +48,7 @@ def completion_script(commands):
completion data for. completion data for.
""" """
base_script = os.path.join( base_script = os.path.join(
os.path.dirname(__file__), "../completion_base.sh" os.path.dirname(__file__), "./completion_base.sh"
) )
with open(base_script) as base_script: with open(base_script) as base_script:
yield base_script.read() yield base_script.read()

View file

@ -2,7 +2,8 @@
import os import os
from beets import config, ui, util from beets import config, ui
from beets.util import displayable_path, editor_command, interactive_open
def config_func(lib, opts, args): def config_func(lib, opts, args):
@ -25,7 +26,7 @@ def config_func(lib, opts, args):
filenames.insert(0, user_path) filenames.insert(0, user_path)
for filename in filenames: for filename in filenames:
ui.print_(util.displayable_path(filename)) ui.print_(displayable_path(filename))
# Open in editor. # Open in editor.
elif opts.edit: elif opts.edit:
@ -45,11 +46,11 @@ def config_edit():
An empty config file is created if no existing config file exists. An empty config file is created if no existing config file exists.
""" """
path = config.user_config_path() path = config.user_config_path()
editor = util.editor_command() editor = editor_command()
try: try:
if not os.path.isfile(path): if not os.path.isfile(path):
open(path, "w+").close() open(path, "w+").close()
util.interactive_open([path], editor) interactive_open([path], editor)
except OSError as exc: except OSError as exc:
message = f"Could not edit configuration: {exc}" message = f"Could not edit configuration: {exc}"
if not editor: if not editor:

View file

@ -5,13 +5,47 @@ import os
from beets import config, logging, plugins, ui from beets import config, logging, plugins, ui
from beets.util import displayable_path, normpath, syspath from beets.util import displayable_path, normpath, syspath
from .._utils import parse_logfiles
from .session import TerminalImportSession from .session import TerminalImportSession
# Global logger. # Global logger.
log = logging.getLogger("beets") log = logging.getLogger("beets")
def paths_from_logfile(path):
"""Parse the logfile and yield skipped paths to pass to the `import`
command.
"""
with open(path, encoding="utf-8") as fp:
for i, line in enumerate(fp, start=1):
verb, sep, paths = line.rstrip("\n").partition(" ")
if not sep:
raise ValueError(f"line {i} is invalid")
# Ignore informational lines that don't need to be re-imported.
if verb in {"import", "duplicate-keep", "duplicate-replace"}:
continue
if verb not in {"asis", "skip", "duplicate-skip"}:
raise ValueError(f"line {i} contains unknown verb {verb}")
yield os.path.commonpath(paths.split("; "))
def parse_logfiles(logfiles):
"""Parse all `logfiles` and yield paths from it."""
for logfile in logfiles:
try:
yield from paths_from_logfile(syspath(normpath(logfile)))
except ValueError as err:
raise ui.UserError(
f"malformed logfile {displayable_path(logfile)}: {err}"
) from err
except OSError as err:
raise ui.UserError(
f"unreadable logfile {displayable_path(logfile)}: {err}"
) from err
def import_files(lib, paths: list[bytes], query): def import_files(lib, paths: list[bytes], query):
"""Import the files in the given list of paths or matching the """Import the files in the given list of paths or matching the
query. query.
@ -97,6 +131,32 @@ def import_func(lib, opts, args: list[str]):
import_files(lib, byte_paths, query) import_files(lib, byte_paths, query)
def _store_dict(option, opt_str, value, parser):
"""Custom action callback to parse options which have ``key=value``
pairs as values. All such pairs passed for this option are
aggregated into a dictionary.
"""
dest = option.dest
option_values = getattr(parser.values, dest, None)
if option_values is None:
# This is the first supplied ``key=value`` pair of option.
# Initialize empty dictionary and get a reference to it.
setattr(parser.values, dest, {})
option_values = getattr(parser.values, dest)
try:
key, value = value.split("=", 1)
if not (key and value):
raise ValueError
except ValueError:
raise ui.UserError(
f"supplied argument `{value}' is not of the form `key=value'"
)
option_values[key] = value
import_cmd = ui.Subcommand( import_cmd = ui.Subcommand(
"import", help="import new music", aliases=("imp", "im") "import", help="import new music", aliases=("imp", "im")
) )
@ -274,7 +334,7 @@ import_cmd.parser.add_option(
"--set", "--set",
dest="set_fields", dest="set_fields",
action="callback", action="callback",
callback=ui._store_dict, callback=_store_dict,
metavar="FIELD=VALUE", metavar="FIELD=VALUE",
help="set the given fields to the supplied values", help="set the given fields to the supplied values",
) )

View file

@ -2,16 +2,13 @@ import os
from collections.abc import Sequence from collections.abc import Sequence
from functools import cached_property from functools import cached_property
from beets import autotag, config, logging, ui from beets import autotag, config, ui
from beets.autotag import hooks from beets.autotag import hooks
from beets.util import displayable_path from beets.util import displayable_path
from beets.util.units import human_seconds_short from beets.util.units import human_seconds_short
VARIOUS_ARTISTS = "Various Artists" VARIOUS_ARTISTS = "Various Artists"
# Global logger.
log = logging.getLogger("beets")
class ChangeRepresentation: class ChangeRepresentation:
"""Keeps track of all information needed to generate a (colored) text """Keeps track of all information needed to generate a (colored) text

View file

@ -6,7 +6,6 @@ from beets import autotag, config, importer, logging, plugins, ui
from beets.autotag import Recommendation from beets.autotag import Recommendation
from beets.util import displayable_path from beets.util import displayable_path
from beets.util.units import human_bytes, human_seconds_short from beets.util.units import human_bytes, human_seconds_short
from beetsplug.bareasc import print_
from .display import ( from .display import (
disambig_string, disambig_string,
@ -415,8 +414,8 @@ def choose_candidate(
if singleton: if singleton:
ui.print_("No matching recordings found.") ui.print_("No matching recordings found.")
else: else:
print_(f"No matching release found for {itemcount} tracks.") ui.print_(f"No matching release found for {itemcount} tracks.")
print_( ui.print_(
"For help, see: " "For help, see: "
"https://beets.readthedocs.org/en/latest/faq.html#nomatch" "https://beets.readthedocs.org/en/latest/faq.html#nomatch"
) )
@ -461,17 +460,17 @@ def choose_candidate(
else: else:
metadata = ui.colorize("text_highlight_minor", metadata) metadata = ui.colorize("text_highlight_minor", metadata)
line1 = [index, distance, metadata] line1 = [index, distance, metadata]
print_(f" {' '.join(line1)}") ui.print_(f" {' '.join(line1)}")
# Penalties. # Penalties.
penalties = penalty_string(match.distance, 3) penalties = penalty_string(match.distance, 3)
if penalties: if penalties:
print_(f"{' ' * 13}{penalties}") ui.print_(f"{' ' * 13}{penalties}")
# Disambiguation # Disambiguation
disambig = disambig_string(match.info) disambig = disambig_string(match.info)
if disambig: if disambig:
print_(f"{' ' * 13}{disambig}") ui.print_(f"{' ' * 13}{disambig}")
# Ask the user for a choice. # Ask the user for a choice.
sel = ui.input_options(choice_opts, numrange=(1, len(candidates))) sel = ui.input_options(choice_opts, numrange=(1, len(candidates)))

View file

@ -3,7 +3,7 @@
from beets import library, ui from beets import library, ui
from beets.util import functemplate from beets.util import functemplate
from ._utils import do_query from .utils import do_query
def modify_items(lib, mods, dels, query, write, move, album, confirm, inherit): def modify_items(lib, mods, dels, query, write, move, album, confirm, inherit):

View file

@ -2,17 +2,65 @@
import os import os
from beets import logging, ui, util from beets import logging, ui
from beets.util import (
MoveOperation,
PathLike,
displayable_path,
normpath,
syspath,
)
from ._utils import do_query from .utils import do_query
# Global logger. # Global logger.
log = logging.getLogger("beets") log = logging.getLogger("beets")
def show_path_changes(path_changes):
"""Given a list of tuples (source, destination) that indicate the
path changes, log the changes as INFO-level output to the beets log.
The output is guaranteed to be unicode.
Every pair is shown on a single line if the terminal width permits it,
else it is split over two lines. E.g.,
Source -> Destination
vs.
Source
-> Destination
"""
sources, destinations = zip(*path_changes)
# Ensure unicode output
sources = list(map(displayable_path, sources))
destinations = list(map(displayable_path, destinations))
# Calculate widths for terminal split
col_width = (ui.term_width() - len(" -> ")) // 2
max_width = len(max(sources + destinations, key=len))
if max_width > col_width:
# Print every change over two lines
for source, dest in zip(sources, destinations):
color_source, color_dest = ui.colordiff(source, dest)
ui.print_(f"{color_source} \n -> {color_dest}")
else:
# Print every change on a single line, and add a header
title_pad = max_width - len("Source ") + len(" -> ")
ui.print_(f"Source {' ' * title_pad} Destination")
for source, dest in zip(sources, destinations):
pad = max_width - len(source)
color_source, color_dest = ui.colordiff(source, dest)
ui.print_(f"{color_source} {' ' * pad} -> {color_dest}")
def move_items( def move_items(
lib, lib,
dest_path: util.PathLike, dest_path: PathLike,
query, query,
copy, copy,
album, album,
@ -60,7 +108,7 @@ def move_items(
if pretend: if pretend:
if album: if album:
ui.show_path_changes( show_path_changes(
[ [
(item.path, item.destination(basedir=dest)) (item.path, item.destination(basedir=dest))
for obj in objs for obj in objs
@ -68,7 +116,7 @@ def move_items(
] ]
) )
else: else:
ui.show_path_changes( show_path_changes(
[(obj.path, obj.destination(basedir=dest)) for obj in objs] [(obj.path, obj.destination(basedir=dest)) for obj in objs]
) )
else: else:
@ -76,7 +124,7 @@ def move_items(
objs = ui.input_select_objects( objs = ui.input_select_objects(
f"Really {act}", f"Really {act}",
objs, objs,
lambda o: ui.show_path_changes( lambda o: show_path_changes(
[(o.path, o.destination(basedir=dest))] [(o.path, o.destination(basedir=dest))]
), ),
) )
@ -87,24 +135,22 @@ def move_items(
if export: if export:
# Copy without affecting the database. # Copy without affecting the database.
obj.move( obj.move(
operation=util.MoveOperation.COPY, basedir=dest, store=False operation=MoveOperation.COPY, basedir=dest, store=False
) )
else: else:
# Ordinary move/copy: store the new path. # Ordinary move/copy: store the new path.
if copy: if copy:
obj.move(operation=util.MoveOperation.COPY, basedir=dest) obj.move(operation=MoveOperation.COPY, basedir=dest)
else: else:
obj.move(operation=util.MoveOperation.MOVE, basedir=dest) obj.move(operation=MoveOperation.MOVE, basedir=dest)
def move_func(lib, opts, args): def move_func(lib, opts, args):
dest = opts.dest dest = opts.dest
if dest is not None: if dest is not None:
dest = util.normpath(dest) dest = normpath(dest)
if not os.path.isdir(util.syspath(dest)): if not os.path.isdir(syspath(dest)):
raise ui.UserError( raise ui.UserError(f"no such directory: {displayable_path(dest)}")
f"no such directory: {util.displayable_path(dest)}"
)
move_items( move_items(
lib, lib,

View file

@ -2,7 +2,7 @@
from beets import ui from beets import ui
from ._utils import do_query from .utils import do_query
def remove_items(lib, query, album, delete, force): def remove_items(lib, query, album, delete, force):

View file

@ -5,7 +5,7 @@ import os
from beets import library, logging, ui from beets import library, logging, ui
from beets.util import ancestry, syspath from beets.util import ancestry, syspath
from ._utils import do_query from .utils import do_query
# Global logger. # Global logger.
log = logging.getLogger("beets") log = logging.getLogger("beets")

View file

@ -0,0 +1,29 @@
"""Utility functions for beets UI commands."""
from beets import ui
def do_query(lib, query, album, also_items=True):
"""For commands that operate on matched items, performs a query
and returns a list of matching items and a list of matching
albums. (The latter is only nonempty when album is True.) Raises
a UserError if no items match. also_items controls whether, when
fetching albums, the associated items should be fetched also.
"""
if album:
albums = list(lib.albums(query))
items = []
if also_items:
for al in albums:
items += al.items()
else:
albums = []
items = list(lib.items(query))
if album and not albums:
raise ui.UserError("No matching albums found.")
elif not album and not items:
raise ui.UserError("No matching items found.")
return items, albums

View file

@ -5,7 +5,7 @@ import os
from beets import library, logging, ui from beets import library, logging, ui
from beets.util import syspath from beets.util import syspath
from ._utils import do_query from .utils import do_query
# Global logger. # Global logger.
log = logging.getLogger("beets") log = logging.getLogger("beets")

View file

@ -25,7 +25,8 @@ import yaml
from beets import plugins, ui, util from beets import plugins, ui, util
from beets.dbcore import types from beets.dbcore import types
from beets.importer import Action from beets.importer import Action
from beets.ui.commands import PromptChoice, _do_query from beets.ui.commands.import_.session import PromptChoice
from beets.ui.commands.utils import do_query
# These "safe" types can avoid the format/parse cycle that most fields go # These "safe" types can avoid the format/parse cycle that most fields go
# through: they are safe to edit with native YAML types. # through: they are safe to edit with native YAML types.
@ -176,7 +177,7 @@ class EditPlugin(plugins.BeetsPlugin):
def _edit_command(self, lib, opts, args): def _edit_command(self, lib, opts, args):
"""The CLI command function for the `beet edit` command.""" """The CLI command function for the `beet edit` command."""
# Get the objects to edit. # Get the objects to edit.
items, albums = _do_query(lib, args, opts.album, False) items, albums = do_query(lib, args, opts.album, False)
objs = albums if opts.album else items objs = albums if opts.album else items
if not objs: if not objs:
ui.print_("Nothing to edit.") ui.print_("Nothing to edit.")