Refactor ui/commands.py monolith into modular structure (#6119)

## Description

This one’s a big one 🎣 Proceed with care and a bit
of time ;)

The `ui/commands.py` file had grown into an unwieldy monolith (2000+
lines) over time, so this PR breaks it apart into a modular structure
i.e. **one file per command**, plus some cleanup and reorganization
along the way.

---

###  What changed

* **Commands modularized:**
Every command (`help`, `list`, `move`, `update`, `remove`, etc.) now
lives in its own file under `ui/commands/`.
* **Support code reorganized:**
  * Utility functions moved into a separate helper module.
* `commands.py` converted into `commands/__init__.py` for better import
handling.
* The `import` command (and related helpers) moved into its own folder:
    * `importer/session.py` for import session logic
    * `importer/display.py` for display-related functions
* **Tests cleaned up:**
  * Each command’s tests now live in their own file.
  * All UI-related tests were moved into a dedicated folder for clarity.
This commit is contained in:
Sebastian Mohr 2025-11-05 16:01:10 +01:00 committed by GitHub
commit 61a4c737ee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
42 changed files with 4324 additions and 4191 deletions

View file

@ -73,3 +73,9 @@ d93ddf8dd43e4f9ed072a03829e287c78d2570a2
33f1a5d0bef8ca08be79ee7a0d02a018d502680d
# Moved art.py utility module from beets into beetsplug
28aee0fde463f1e18dfdba1994e2bdb80833722f
# Refactor `ui/commands.py` into multiple modules
59c93e70139f70e9fd1c6f3c1bceb005945bec33
# Moved ui.commands._utils into ui.commands.utils
25ae330044abf04045e3f378f72bbaed739fb30d
# Refactor test_ui_command.py into multiple modules
a59e41a88365e414db3282658d2aa456e0b3468a

View file

@ -107,7 +107,11 @@ def item(lib=None, **kwargs):
# Dummy import session.
def import_session(lib=None, loghandler=None, paths=[], query=[], cli=False):
cls = commands.TerminalImportSession if cli else importer.ImportSession
cls = (
commands.import_.session.TerminalImportSession
if cli
else importer.ImportSession
)
return cls(lib, loghandler, paths, query)

View file

@ -54,7 +54,7 @@ from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.importer import ImportSession
from beets.library import Item, Library
from beets.test import _common
from beets.ui.commands import TerminalImportSession
from beets.ui.commands.import_.session import TerminalImportSession
from beets.util import (
MoveOperation,
bytestring_path,

View file

@ -1111,76 +1111,9 @@ def show_model_changes(
return bool(changes)
def show_path_changes(path_changes):
"""Given a list of tuples (source, destination) that indicate the
path changes, log the changes as INFO-level output to the beets log.
The output is guaranteed to be unicode.
Every pair is shown on a single line if the terminal width permits it,
else it is split over two lines. E.g.,
Source -> Destination
vs.
Source
-> Destination
"""
sources, destinations = zip(*path_changes)
# Ensure unicode output
sources = list(map(util.displayable_path, sources))
destinations = list(map(util.displayable_path, destinations))
# Calculate widths for terminal split
col_width = (term_width() - len(" -> ")) // 2
max_width = len(max(sources + destinations, key=len))
if max_width > col_width:
# Print every change over two lines
for source, dest in zip(sources, destinations):
color_source, color_dest = colordiff(source, dest)
print_(f"{color_source} \n -> {color_dest}")
else:
# Print every change on a single line, and add a header
title_pad = max_width - len("Source ") + len(" -> ")
print_(f"Source {' ' * title_pad} Destination")
for source, dest in zip(sources, destinations):
pad = max_width - len(source)
color_source, color_dest = colordiff(source, dest)
print_(f"{color_source} {' ' * pad} -> {color_dest}")
# Helper functions for option parsing.
def _store_dict(option, opt_str, value, parser):
"""Custom action callback to parse options which have ``key=value``
pairs as values. All such pairs passed for this option are
aggregated into a dictionary.
"""
dest = option.dest
option_values = getattr(parser.values, dest, None)
if option_values is None:
# This is the first supplied ``key=value`` pair of option.
# Initialize empty dictionary and get a reference to it.
setattr(parser.values, dest, {})
option_values = getattr(parser.values, dest)
try:
key, value = value.split("=", 1)
if not (key and value):
raise ValueError
except ValueError:
raise UserError(
f"supplied argument `{value}' is not of the form `key=value'"
)
option_values[key] = value
class CommonOptionsParser(optparse.OptionParser):
"""Offers a simple way to add common formatting options.
@ -1666,7 +1599,7 @@ def _raw_main(args: list[str], lib=None) -> None:
and subargs[0] == "config"
and ("-e" in subargs or "--edit" in subargs)
):
from beets.ui.commands import config_edit
from beets.ui.commands.config import config_edit
return config_edit()

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,69 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""This module provides the default commands for beets' command-line
interface.
"""
from beets.util import deprecate_imports
from .completion import completion_cmd
from .config import config_cmd
from .fields import fields_cmd
from .help import HelpCommand
from .import_ import import_cmd
from .list import list_cmd
from .modify import modify_cmd
from .move import move_cmd
from .remove import remove_cmd
from .stats import stats_cmd
from .update import update_cmd
from .version import version_cmd
from .write import write_cmd
def __getattr__(name: str):
"""Handle deprecated imports."""
return deprecate_imports(
old_module=__name__,
new_module_by_name={
"TerminalImportSession": "beets.ui.commands.import_.session",
"PromptChoice": "beets.ui.commands.import_.session",
# TODO: We might want to add more deprecated imports here
},
name=name,
version="3.0.0",
)
# The list of default subcommands. This is populated with Subcommand
# objects that can be fed to a SubcommandsOptionParser.
default_commands = [
fields_cmd,
HelpCommand(),
import_cmd,
list_cmd,
update_cmd,
remove_cmd,
stats_cmd,
version_cmd,
modify_cmd,
move_cmd,
write_cmd,
config_cmd,
completion_cmd,
]
__all__ = ["default_commands"]

View file

@ -0,0 +1,117 @@
"""The 'completion' command: print shell script for command line completion."""
import os
import re
from beets import library, logging, plugins, ui
from beets.util import syspath
# Global logger.
log = logging.getLogger("beets")
def print_completion(*args):
from beets.ui.commands import default_commands
for line in completion_script(default_commands + plugins.commands()):
ui.print_(line, end="")
if not any(os.path.isfile(syspath(p)) for p in BASH_COMPLETION_PATHS):
log.warning(
"Warning: Unable to find the bash-completion package. "
"Command line completion might not work."
)
completion_cmd = ui.Subcommand(
"completion",
help="print shell script that provides command line completion",
)
completion_cmd.func = print_completion
completion_cmd.hide = True
BASH_COMPLETION_PATHS = [
b"/etc/bash_completion",
b"/usr/share/bash-completion/bash_completion",
b"/usr/local/share/bash-completion/bash_completion",
# SmartOS
b"/opt/local/share/bash-completion/bash_completion",
# Homebrew (before bash-completion2)
b"/usr/local/etc/bash_completion",
]
def completion_script(commands):
"""Yield the full completion shell script as strings.
``commands`` is alist of ``ui.Subcommand`` instances to generate
completion data for.
"""
base_script = os.path.join(
os.path.dirname(__file__), "./completion_base.sh"
)
with open(base_script) as base_script:
yield base_script.read()
options = {}
aliases = {}
command_names = []
# Collect subcommands
for cmd in commands:
name = cmd.name
command_names.append(name)
for alias in cmd.aliases:
if re.match(r"^\w+$", alias):
aliases[alias] = name
options[name] = {"flags": [], "opts": []}
for opts in cmd.parser._get_all_options()[1:]:
if opts.action in ("store_true", "store_false"):
option_type = "flags"
else:
option_type = "opts"
options[name][option_type].extend(
opts._short_opts + opts._long_opts
)
# Add global options
options["_global"] = {
"flags": ["-v", "--verbose"],
"opts": "-l --library -c --config -d --directory -h --help".split(" "),
}
# Add flags common to all commands
options["_common"] = {"flags": ["-h", "--help"]}
# Start generating the script
yield "_beet() {\n"
# Command names
yield f" local commands={' '.join(command_names)!r}\n"
yield "\n"
# Command aliases
yield f" local aliases={' '.join(aliases.keys())!r}\n"
for alias, cmd in aliases.items():
yield f" local alias__{alias.replace('-', '_')}={cmd}\n"
yield "\n"
# Fields
fields = library.Item._fields.keys() | library.Album._fields.keys()
yield f" fields={' '.join(fields)!r}\n"
# Command options
for cmd, opts in options.items():
for option_type, option_list in opts.items():
if option_list:
option_list = " ".join(option_list)
yield (
" local"
f" {option_type}__{cmd.replace('-', '_')}='{option_list}'\n"
)
yield " _beet_dispatch\n"
yield "}\n"

View file

@ -0,0 +1,90 @@
"""The 'config' command: show and edit user configuration."""
import os
from beets import config, ui
from beets.util import displayable_path, editor_command, interactive_open
def config_func(lib, opts, args):
# Make sure lazy configuration is loaded
config.resolve()
# Print paths.
if opts.paths:
filenames = []
for source in config.sources:
if not opts.defaults and source.default:
continue
if source.filename:
filenames.append(source.filename)
# In case the user config file does not exist, prepend it to the
# list.
user_path = config.user_config_path()
if user_path not in filenames:
filenames.insert(0, user_path)
for filename in filenames:
ui.print_(displayable_path(filename))
# Open in editor.
elif opts.edit:
config_edit()
# Dump configuration.
else:
config_out = config.dump(full=opts.defaults, redact=opts.redact)
if config_out.strip() != "{}":
ui.print_(config_out)
else:
print("Empty configuration")
def config_edit():
"""Open a program to edit the user configuration.
An empty config file is created if no existing config file exists.
"""
path = config.user_config_path()
editor = editor_command()
try:
if not os.path.isfile(path):
open(path, "w+").close()
interactive_open([path], editor)
except OSError as exc:
message = f"Could not edit configuration: {exc}"
if not editor:
message += (
". Please set the VISUAL (or EDITOR) environment variable"
)
raise ui.UserError(message)
config_cmd = ui.Subcommand("config", help="show or edit the user configuration")
config_cmd.parser.add_option(
"-p",
"--paths",
action="store_true",
help="show files that configuration was loaded from",
)
config_cmd.parser.add_option(
"-e",
"--edit",
action="store_true",
help="edit user configuration with $VISUAL (or $EDITOR)",
)
config_cmd.parser.add_option(
"-d",
"--defaults",
action="store_true",
help="include the default configuration",
)
config_cmd.parser.add_option(
"-c",
"--clear",
action="store_false",
dest="redact",
default=True,
help="do not redact sensitive fields",
)
config_cmd.func = config_func

View file

@ -0,0 +1,41 @@
"""The `fields` command: show available fields for queries and format strings."""
import textwrap
from beets import library, ui
def _print_keys(query):
"""Given a SQLite query result, print the `key` field of each
returned row, with indentation of 2 spaces.
"""
for row in query:
ui.print_(f" {row['key']}")
def fields_func(lib, opts, args):
def _print_rows(names):
names.sort()
ui.print_(textwrap.indent("\n".join(names), " "))
ui.print_("Item fields:")
_print_rows(library.Item.all_keys())
ui.print_("Album fields:")
_print_rows(library.Album.all_keys())
with lib.transaction() as tx:
# The SQL uses the DISTINCT to get unique values from the query
unique_fields = "SELECT DISTINCT key FROM ({})"
ui.print_("Item flexible attributes:")
_print_keys(tx.query(unique_fields.format(library.Item._flex_table)))
ui.print_("Album flexible attributes:")
_print_keys(tx.query(unique_fields.format(library.Album._flex_table)))
fields_cmd = ui.Subcommand(
"fields", help="show fields available for queries and format strings"
)
fields_cmd.func = fields_func

22
beets/ui/commands/help.py Normal file
View file

@ -0,0 +1,22 @@
"""The 'help' command: show help information for commands."""
from beets import ui
class HelpCommand(ui.Subcommand):
def __init__(self):
super().__init__(
"help",
aliases=("?",),
help="give detailed help on a specific sub-command",
)
def func(self, lib, opts, args):
if args:
cmdname = args[0]
helpcommand = self.root_parser._subcommand_for_name(cmdname)
if not helpcommand:
raise ui.UserError(f"unknown command '{cmdname}'")
helpcommand.print_help()
else:
self.root_parser.print_help()

View file

@ -0,0 +1,341 @@
"""The `import` command: import new music into the library."""
import os
from beets import config, logging, plugins, ui
from beets.util import displayable_path, normpath, syspath
from .session import TerminalImportSession
# Global logger.
log = logging.getLogger("beets")
def paths_from_logfile(path):
"""Parse the logfile and yield skipped paths to pass to the `import`
command.
"""
with open(path, encoding="utf-8") as fp:
for i, line in enumerate(fp, start=1):
verb, sep, paths = line.rstrip("\n").partition(" ")
if not sep:
raise ValueError(f"line {i} is invalid")
# Ignore informational lines that don't need to be re-imported.
if verb in {"import", "duplicate-keep", "duplicate-replace"}:
continue
if verb not in {"asis", "skip", "duplicate-skip"}:
raise ValueError(f"line {i} contains unknown verb {verb}")
yield os.path.commonpath(paths.split("; "))
def parse_logfiles(logfiles):
"""Parse all `logfiles` and yield paths from it."""
for logfile in logfiles:
try:
yield from paths_from_logfile(syspath(normpath(logfile)))
except ValueError as err:
raise ui.UserError(
f"malformed logfile {displayable_path(logfile)}: {err}"
) from err
except OSError as err:
raise ui.UserError(
f"unreadable logfile {displayable_path(logfile)}: {err}"
) from err
def import_files(lib, paths: list[bytes], query):
"""Import the files in the given list of paths or matching the
query.
"""
# Check parameter consistency.
if config["import"]["quiet"] and config["import"]["timid"]:
raise ui.UserError("can't be both quiet and timid")
# Open the log.
if config["import"]["log"].get() is not None:
logpath = syspath(config["import"]["log"].as_filename())
try:
loghandler = logging.FileHandler(logpath, encoding="utf-8")
except OSError:
raise ui.UserError(
"Could not open log file for writing:"
f" {displayable_path(logpath)}"
)
else:
loghandler = None
# Never ask for input in quiet mode.
if config["import"]["resume"].get() == "ask" and config["import"]["quiet"]:
config["import"]["resume"] = False
session = TerminalImportSession(lib, loghandler, paths, query)
session.run()
# Emit event.
plugins.send("import", lib=lib, paths=paths)
def import_func(lib, opts, args: list[str]):
config["import"].set_args(opts)
# Special case: --copy flag suppresses import_move (which would
# otherwise take precedence).
if opts.copy:
config["import"]["move"] = False
if opts.library:
query = args
byte_paths = []
else:
query = None
paths = args
# The paths from the logfiles go into a separate list to allow handling
# errors differently from user-specified paths.
paths_from_logfiles = list(parse_logfiles(opts.from_logfiles or []))
if not paths and not paths_from_logfiles:
raise ui.UserError("no path specified")
byte_paths = [os.fsencode(p) for p in paths]
paths_from_logfiles = [os.fsencode(p) for p in paths_from_logfiles]
# Check the user-specified directories.
for path in byte_paths:
if not os.path.exists(syspath(normpath(path))):
raise ui.UserError(
f"no such file or directory: {displayable_path(path)}"
)
# Check the directories from the logfiles, but don't throw an error in
# case those paths don't exist. Maybe some of those paths have already
# been imported and moved separately, so logging a warning should
# suffice.
for path in paths_from_logfiles:
if not os.path.exists(syspath(normpath(path))):
log.warning(
"No such file or directory: {}", displayable_path(path)
)
continue
byte_paths.append(path)
# If all paths were read from a logfile, and none of them exist, throw
# an error
if not paths:
raise ui.UserError("none of the paths are importable")
import_files(lib, byte_paths, query)
def _store_dict(option, opt_str, value, parser):
"""Custom action callback to parse options which have ``key=value``
pairs as values. All such pairs passed for this option are
aggregated into a dictionary.
"""
dest = option.dest
option_values = getattr(parser.values, dest, None)
if option_values is None:
# This is the first supplied ``key=value`` pair of option.
# Initialize empty dictionary and get a reference to it.
setattr(parser.values, dest, {})
option_values = getattr(parser.values, dest)
try:
key, value = value.split("=", 1)
if not (key and value):
raise ValueError
except ValueError:
raise ui.UserError(
f"supplied argument `{value}' is not of the form `key=value'"
)
option_values[key] = value
import_cmd = ui.Subcommand(
"import", help="import new music", aliases=("imp", "im")
)
import_cmd.parser.add_option(
"-c",
"--copy",
action="store_true",
default=None,
help="copy tracks into library directory (default)",
)
import_cmd.parser.add_option(
"-C",
"--nocopy",
action="store_false",
dest="copy",
help="don't copy tracks (opposite of -c)",
)
import_cmd.parser.add_option(
"-m",
"--move",
action="store_true",
dest="move",
help="move tracks into the library (overrides -c)",
)
import_cmd.parser.add_option(
"-w",
"--write",
action="store_true",
default=None,
help="write new metadata to files' tags (default)",
)
import_cmd.parser.add_option(
"-W",
"--nowrite",
action="store_false",
dest="write",
help="don't write metadata (opposite of -w)",
)
import_cmd.parser.add_option(
"-a",
"--autotag",
action="store_true",
dest="autotag",
help="infer tags for imported files (default)",
)
import_cmd.parser.add_option(
"-A",
"--noautotag",
action="store_false",
dest="autotag",
help="don't infer tags for imported files (opposite of -a)",
)
import_cmd.parser.add_option(
"-p",
"--resume",
action="store_true",
default=None,
help="resume importing if interrupted",
)
import_cmd.parser.add_option(
"-P",
"--noresume",
action="store_false",
dest="resume",
help="do not try to resume importing",
)
import_cmd.parser.add_option(
"-q",
"--quiet",
action="store_true",
dest="quiet",
help="never prompt for input: skip albums instead",
)
import_cmd.parser.add_option(
"--quiet-fallback",
type="string",
dest="quiet_fallback",
help="decision in quiet mode when no strong match: skip or asis",
)
import_cmd.parser.add_option(
"-l",
"--log",
dest="log",
help="file to log untaggable albums for later review",
)
import_cmd.parser.add_option(
"-s",
"--singletons",
action="store_true",
help="import individual tracks instead of full albums",
)
import_cmd.parser.add_option(
"-t",
"--timid",
dest="timid",
action="store_true",
help="always confirm all actions",
)
import_cmd.parser.add_option(
"-L",
"--library",
dest="library",
action="store_true",
help="retag items matching a query",
)
import_cmd.parser.add_option(
"-i",
"--incremental",
dest="incremental",
action="store_true",
help="skip already-imported directories",
)
import_cmd.parser.add_option(
"-I",
"--noincremental",
dest="incremental",
action="store_false",
help="do not skip already-imported directories",
)
import_cmd.parser.add_option(
"-R",
"--incremental-skip-later",
action="store_true",
dest="incremental_skip_later",
help="do not record skipped files during incremental import",
)
import_cmd.parser.add_option(
"-r",
"--noincremental-skip-later",
action="store_false",
dest="incremental_skip_later",
help="record skipped files during incremental import",
)
import_cmd.parser.add_option(
"--from-scratch",
dest="from_scratch",
action="store_true",
help="erase existing metadata before applying new metadata",
)
import_cmd.parser.add_option(
"--flat",
dest="flat",
action="store_true",
help="import an entire tree as a single album",
)
import_cmd.parser.add_option(
"-g",
"--group-albums",
dest="group_albums",
action="store_true",
help="group tracks in a folder into separate albums",
)
import_cmd.parser.add_option(
"--pretend",
dest="pretend",
action="store_true",
help="just print the files to import",
)
import_cmd.parser.add_option(
"-S",
"--search-id",
dest="search_ids",
action="append",
metavar="ID",
help="restrict matching to a specific metadata backend ID",
)
import_cmd.parser.add_option(
"--from-logfile",
dest="from_logfiles",
action="append",
metavar="PATH",
help="read skipped paths from an existing logfile",
)
import_cmd.parser.add_option(
"--set",
dest="set_fields",
action="callback",
callback=_store_dict,
metavar="FIELD=VALUE",
help="set the given fields to the supplied values",
)
import_cmd.func = import_func

View file

@ -0,0 +1,570 @@
import os
from collections.abc import Sequence
from functools import cached_property
from beets import autotag, config, ui
from beets.autotag import hooks
from beets.util import displayable_path
from beets.util.units import human_seconds_short
VARIOUS_ARTISTS = "Various Artists"
class ChangeRepresentation:
"""Keeps track of all information needed to generate a (colored) text
representation of the changes that will be made if an album or singleton's
tags are changed according to `match`, which must be an AlbumMatch or
TrackMatch object, accordingly.
"""
@cached_property
def changed_prefix(self) -> str:
return ui.colorize("changed", "\u2260")
cur_artist = None
# cur_album set if album, cur_title set if singleton
cur_album = None
cur_title = None
match = None
indent_header = ""
indent_detail = ""
def __init__(self):
# Read match header indentation width from config.
match_header_indent_width = config["ui"]["import"]["indentation"][
"match_header"
].as_number()
self.indent_header = ui.indent(match_header_indent_width)
# Read match detail indentation width from config.
match_detail_indent_width = config["ui"]["import"]["indentation"][
"match_details"
].as_number()
self.indent_detail = ui.indent(match_detail_indent_width)
# Read match tracklist indentation width from config
match_tracklist_indent_width = config["ui"]["import"]["indentation"][
"match_tracklist"
].as_number()
self.indent_tracklist = ui.indent(match_tracklist_indent_width)
self.layout = config["ui"]["import"]["layout"].as_choice(
{
"column": 0,
"newline": 1,
}
)
def print_layout(
self, indent, left, right, separator=" -> ", max_width=None
):
if not max_width:
# If no max_width provided, use terminal width
max_width = ui.term_width()
if self.layout == 0:
ui.print_column_layout(indent, left, right, separator, max_width)
else:
ui.print_newline_layout(indent, left, right, separator, max_width)
def show_match_header(self):
"""Print out a 'header' identifying the suggested match (album name,
artist name,...) and summarizing the changes that would be made should
the user accept the match.
"""
# Print newline at beginning of change block.
ui.print_("")
# 'Match' line and similarity.
ui.print_(
f"{self.indent_header}Match ({dist_string(self.match.distance)}):"
)
if isinstance(self.match.info, autotag.hooks.AlbumInfo):
# Matching an album - print that
artist_album_str = (
f"{self.match.info.artist} - {self.match.info.album}"
)
else:
# Matching a single track
artist_album_str = (
f"{self.match.info.artist} - {self.match.info.title}"
)
ui.print_(
self.indent_header
+ dist_colorize(artist_album_str, self.match.distance)
)
# Penalties.
penalties = penalty_string(self.match.distance)
if penalties:
ui.print_(f"{self.indent_header}{penalties}")
# Disambiguation.
disambig = disambig_string(self.match.info)
if disambig:
ui.print_(f"{self.indent_header}{disambig}")
# Data URL.
if self.match.info.data_url:
url = ui.colorize("text_faint", f"{self.match.info.data_url}")
ui.print_(f"{self.indent_header}{url}")
def show_match_details(self):
"""Print out the details of the match, including changes in album name
and artist name.
"""
# Artist.
artist_l, artist_r = self.cur_artist or "", self.match.info.artist
if artist_r == VARIOUS_ARTISTS:
# Hide artists for VA releases.
artist_l, artist_r = "", ""
if artist_l != artist_r:
artist_l, artist_r = ui.colordiff(artist_l, artist_r)
left = {
"prefix": f"{self.changed_prefix} Artist: ",
"contents": artist_l,
"suffix": "",
}
right = {"prefix": "", "contents": artist_r, "suffix": ""}
self.print_layout(self.indent_detail, left, right)
else:
ui.print_(f"{self.indent_detail}*", "Artist:", artist_r)
if self.cur_album:
# Album
album_l, album_r = self.cur_album or "", self.match.info.album
if (
self.cur_album != self.match.info.album
and self.match.info.album != VARIOUS_ARTISTS
):
album_l, album_r = ui.colordiff(album_l, album_r)
left = {
"prefix": f"{self.changed_prefix} Album: ",
"contents": album_l,
"suffix": "",
}
right = {"prefix": "", "contents": album_r, "suffix": ""}
self.print_layout(self.indent_detail, left, right)
else:
ui.print_(f"{self.indent_detail}*", "Album:", album_r)
elif self.cur_title:
# Title - for singletons
title_l, title_r = self.cur_title or "", self.match.info.title
if self.cur_title != self.match.info.title:
title_l, title_r = ui.colordiff(title_l, title_r)
left = {
"prefix": f"{self.changed_prefix} Title: ",
"contents": title_l,
"suffix": "",
}
right = {"prefix": "", "contents": title_r, "suffix": ""}
self.print_layout(self.indent_detail, left, right)
else:
ui.print_(f"{self.indent_detail}*", "Title:", title_r)
def make_medium_info_line(self, track_info):
"""Construct a line with the current medium's info."""
track_media = track_info.get("media", "Media")
# Build output string.
if self.match.info.mediums > 1 and track_info.disctitle:
return (
f"* {track_media} {track_info.medium}: {track_info.disctitle}"
)
elif self.match.info.mediums > 1:
return f"* {track_media} {track_info.medium}"
elif track_info.disctitle:
return f"* {track_media}: {track_info.disctitle}"
else:
return ""
def format_index(self, track_info):
"""Return a string representing the track index of the given
TrackInfo or Item object.
"""
if isinstance(track_info, hooks.TrackInfo):
index = track_info.index
medium_index = track_info.medium_index
medium = track_info.medium
mediums = self.match.info.mediums
else:
index = medium_index = track_info.track
medium = track_info.disc
mediums = track_info.disctotal
if config["per_disc_numbering"]:
if mediums and mediums > 1:
return f"{medium}-{medium_index}"
else:
return str(medium_index if medium_index is not None else index)
else:
return str(index)
def make_track_numbers(self, item, track_info):
"""Format colored track indices."""
cur_track = self.format_index(item)
new_track = self.format_index(track_info)
changed = False
# Choose color based on change.
if cur_track != new_track:
changed = True
if item.track in (track_info.index, track_info.medium_index):
highlight_color = "text_highlight_minor"
else:
highlight_color = "text_highlight"
else:
highlight_color = "text_faint"
lhs_track = ui.colorize(highlight_color, f"(#{cur_track})")
rhs_track = ui.colorize(highlight_color, f"(#{new_track})")
return lhs_track, rhs_track, changed
@staticmethod
def make_track_titles(item, track_info):
"""Format colored track titles."""
new_title = track_info.title
if not item.title.strip():
# If there's no title, we use the filename. Don't colordiff.
cur_title = displayable_path(os.path.basename(item.path))
return cur_title, new_title, True
else:
# If there is a title, highlight differences.
cur_title = item.title.strip()
cur_col, new_col = ui.colordiff(cur_title, new_title)
return cur_col, new_col, cur_title != new_title
@staticmethod
def make_track_lengths(item, track_info):
"""Format colored track lengths."""
changed = False
if (
item.length
and track_info.length
and abs(item.length - track_info.length)
>= config["ui"]["length_diff_thresh"].as_number()
):
highlight_color = "text_highlight"
changed = True
else:
highlight_color = "text_highlight_minor"
# Handle nonetype lengths by setting to 0
cur_length0 = item.length if item.length else 0
new_length0 = track_info.length if track_info.length else 0
# format into string
cur_length = f"({human_seconds_short(cur_length0)})"
new_length = f"({human_seconds_short(new_length0)})"
# colorize
lhs_length = ui.colorize(highlight_color, cur_length)
rhs_length = ui.colorize(highlight_color, new_length)
return lhs_length, rhs_length, changed
def make_line(self, item, track_info):
"""Extract changes from item -> new TrackInfo object, and colorize
appropriately. Returns (lhs, rhs) for column printing.
"""
# Track titles.
lhs_title, rhs_title, diff_title = self.make_track_titles(
item, track_info
)
# Track number change.
lhs_track, rhs_track, diff_track = self.make_track_numbers(
item, track_info
)
# Length change.
lhs_length, rhs_length, diff_length = self.make_track_lengths(
item, track_info
)
changed = diff_title or diff_track or diff_length
# Construct lhs and rhs dicts.
# Previously, we printed the penalties, however this is no longer
# the case, thus the 'info' dictionary is unneeded.
# penalties = penalty_string(self.match.distance.tracks[track_info])
lhs = {
"prefix": f"{self.changed_prefix if changed else '*'} {lhs_track} ",
"contents": lhs_title,
"suffix": f" {lhs_length}",
}
rhs = {"prefix": "", "contents": "", "suffix": ""}
if not changed:
# Only return the left side, as nothing changed.
return (lhs, rhs)
else:
# Construct a dictionary for the "changed to" side
rhs = {
"prefix": f"{rhs_track} ",
"contents": rhs_title,
"suffix": f" {rhs_length}",
}
return (lhs, rhs)
def print_tracklist(self, lines):
"""Calculates column widths for tracks stored as line tuples:
(left, right). Then prints each line of tracklist.
"""
if len(lines) == 0:
# If no lines provided, e.g. details not required, do nothing.
return
def get_width(side):
"""Return the width of left or right in uncolorized characters."""
try:
return len(
ui.uncolorize(
" ".join(
[side["prefix"], side["contents"], side["suffix"]]
)
)
)
except KeyError:
# An empty dictionary -> Nothing to report
return 0
# Check how to fit content into terminal window
indent_width = len(self.indent_tracklist)
terminal_width = ui.term_width()
joiner_width = len("".join(["* ", " -> "]))
col_width = (terminal_width - indent_width - joiner_width) // 2
max_width_l = max(get_width(line_tuple[0]) for line_tuple in lines)
max_width_r = max(get_width(line_tuple[1]) for line_tuple in lines)
if (
(max_width_l <= col_width)
and (max_width_r <= col_width)
or (
((max_width_l > col_width) or (max_width_r > col_width))
and ((max_width_l + max_width_r) <= col_width * 2)
)
):
# All content fits. Either both maximum widths are below column
# widths, or one of the columns is larger than allowed but the
# other is smaller than allowed.
# In this case we can afford to shrink the columns to fit their
# largest string
col_width_l = max_width_l
col_width_r = max_width_r
else:
# Not all content fits - stick with original half/half split
col_width_l = col_width
col_width_r = col_width
# Print out each line, using the calculated width from above.
for left, right in lines:
left["width"] = col_width_l
right["width"] = col_width_r
self.print_layout(self.indent_tracklist, left, right)
class AlbumChange(ChangeRepresentation):
"""Album change representation, setting cur_album"""
def __init__(self, cur_artist, cur_album, match):
super().__init__()
self.cur_artist = cur_artist
self.cur_album = cur_album
self.match = match
def show_match_tracks(self):
"""Print out the tracks of the match, summarizing changes the match
suggests for them.
"""
# Tracks.
# match is an AlbumMatch NamedTuple, mapping is a dict
# Sort the pairs by the track_info index (at index 1 of the NamedTuple)
pairs = list(self.match.mapping.items())
pairs.sort(key=lambda item_and_track_info: item_and_track_info[1].index)
# Build up LHS and RHS for track difference display. The `lines` list
# contains `(left, right)` tuples.
lines = []
medium = disctitle = None
for item, track_info in pairs:
# If the track is the first on a new medium, show medium
# number and title.
if medium != track_info.medium or disctitle != track_info.disctitle:
# Create header for new medium
header = self.make_medium_info_line(track_info)
if header != "":
# Print tracks from previous medium
self.print_tracklist(lines)
lines = []
ui.print_(f"{self.indent_detail}{header}")
# Save new medium details for future comparison.
medium, disctitle = track_info.medium, track_info.disctitle
# Construct the line tuple for the track.
left, right = self.make_line(item, track_info)
if right["contents"] != "":
lines.append((left, right))
else:
if config["import"]["detail"]:
lines.append((left, right))
self.print_tracklist(lines)
# Missing and unmatched tracks.
if self.match.extra_tracks:
ui.print_(
"Missing tracks"
f" ({len(self.match.extra_tracks)}/{len(self.match.info.tracks)} -"
f" {len(self.match.extra_tracks) / len(self.match.info.tracks):.1%}):"
)
for track_info in self.match.extra_tracks:
line = f" ! {track_info.title} (#{self.format_index(track_info)})"
if track_info.length:
line += f" ({human_seconds_short(track_info.length)})"
ui.print_(ui.colorize("text_warning", line))
if self.match.extra_items:
ui.print_(f"Unmatched tracks ({len(self.match.extra_items)}):")
for item in self.match.extra_items:
line = f" ! {item.title} (#{self.format_index(item)})"
if item.length:
line += f" ({human_seconds_short(item.length)})"
ui.print_(ui.colorize("text_warning", line))
class TrackChange(ChangeRepresentation):
"""Track change representation, comparing item with match."""
def __init__(self, cur_artist, cur_title, match):
super().__init__()
self.cur_artist = cur_artist
self.cur_title = cur_title
self.match = match
def show_change(cur_artist, cur_album, match):
"""Print out a representation of the changes that will be made if an
album's tags are changed according to `match`, which must be an AlbumMatch
object.
"""
change = AlbumChange(
cur_artist=cur_artist, cur_album=cur_album, match=match
)
# Print the match header.
change.show_match_header()
# Print the match details.
change.show_match_details()
# Print the match tracks.
change.show_match_tracks()
def show_item_change(item, match):
"""Print out the change that would occur by tagging `item` with the
metadata from `match`, a TrackMatch object.
"""
change = TrackChange(
cur_artist=item.artist, cur_title=item.title, match=match
)
# Print the match header.
change.show_match_header()
# Print the match details.
change.show_match_details()
def disambig_string(info):
"""Generate a string for an AlbumInfo or TrackInfo object that
provides context that helps disambiguate similar-looking albums and
tracks.
"""
if isinstance(info, hooks.AlbumInfo):
disambig = get_album_disambig_fields(info)
elif isinstance(info, hooks.TrackInfo):
disambig = get_singleton_disambig_fields(info)
else:
return ""
return ", ".join(disambig)
def get_singleton_disambig_fields(info: hooks.TrackInfo) -> Sequence[str]:
out = []
chosen_fields = config["match"]["singleton_disambig_fields"].as_str_seq()
calculated_values = {
"index": f"Index {info.index}",
"track_alt": f"Track {info.track_alt}",
"album": (
f"[{info.album}]"
if (
config["import"]["singleton_album_disambig"].get()
and info.get("album")
)
else ""
),
}
for field in chosen_fields:
if field in calculated_values:
out.append(str(calculated_values[field]))
else:
try:
out.append(str(info[field]))
except (AttributeError, KeyError):
print(f"Disambiguation string key {field} does not exist.")
return out
def get_album_disambig_fields(info: hooks.AlbumInfo) -> Sequence[str]:
out = []
chosen_fields = config["match"]["album_disambig_fields"].as_str_seq()
calculated_values = {
"media": (
f"{info.mediums}x{info.media}"
if (info.mediums and info.mediums > 1)
else info.media
),
}
for field in chosen_fields:
if field in calculated_values:
out.append(str(calculated_values[field]))
else:
try:
out.append(str(info[field]))
except (AttributeError, KeyError):
print(f"Disambiguation string key {field} does not exist.")
return out
def dist_colorize(string, dist):
"""Formats a string as a colorized similarity string according to
a distance.
"""
if dist <= config["match"]["strong_rec_thresh"].as_number():
string = ui.colorize("text_success", string)
elif dist <= config["match"]["medium_rec_thresh"].as_number():
string = ui.colorize("text_warning", string)
else:
string = ui.colorize("text_error", string)
return string
def dist_string(dist):
"""Formats a distance (a float) as a colorized similarity percentage
string.
"""
string = f"{(1 - dist) * 100:.1f}%"
return dist_colorize(string, dist)
def penalty_string(distance, limit=None):
"""Returns a colorized string that indicates all the penalties
applied to a distance object.
"""
penalties = []
for key in distance.keys():
key = key.replace("album_", "")
key = key.replace("track_", "")
key = key.replace("_", " ")
penalties.append(key)
if penalties:
if limit and len(penalties) > limit:
penalties = penalties[:limit] + ["..."]
# Prefix penalty string with U+2260: Not Equal To
penalty_string = f"\u2260 {', '.join(penalties)}"
return ui.colorize("changed", penalty_string)

View file

@ -0,0 +1,557 @@
from collections import Counter
from itertools import chain
from typing import Any, NamedTuple
from beets import autotag, config, importer, logging, plugins, ui
from beets.autotag import Recommendation
from beets.util import displayable_path
from beets.util.units import human_bytes, human_seconds_short
from .display import (
disambig_string,
dist_colorize,
penalty_string,
show_change,
show_item_change,
)
# Global logger.
log = logging.getLogger("beets")
class TerminalImportSession(importer.ImportSession):
"""An import session that runs in a terminal."""
def choose_match(self, task):
"""Given an initial autotagging of items, go through an interactive
dance with the user to ask for a choice of metadata. Returns an
AlbumMatch object, ASIS, or SKIP.
"""
# Show what we're tagging.
ui.print_()
path_str0 = displayable_path(task.paths, "\n")
path_str = ui.colorize("import_path", path_str0)
items_str0 = f"({len(task.items)} items)"
items_str = ui.colorize("import_path_items", items_str0)
ui.print_(" ".join([path_str, items_str]))
# Let plugins display info or prompt the user before we go through the
# process of selecting candidate.
results = plugins.send(
"import_task_before_choice", session=self, task=task
)
actions = [action for action in results if action]
if len(actions) == 1:
return actions[0]
elif len(actions) > 1:
raise plugins.PluginConflictError(
"Only one handler for `import_task_before_choice` may return "
"an action."
)
# Take immediate action if appropriate.
action = _summary_judgment(task.rec)
if action == importer.Action.APPLY:
match = task.candidates[0]
show_change(task.cur_artist, task.cur_album, match)
return match
elif action is not None:
return action
# Loop until we have a choice.
while True:
# Ask for a choice from the user. The result of
# `choose_candidate` may be an `importer.Action`, an
# `AlbumMatch` object for a specific selection, or a
# `PromptChoice`.
choices = self._get_choices(task)
choice = choose_candidate(
task.candidates,
False,
task.rec,
task.cur_artist,
task.cur_album,
itemcount=len(task.items),
choices=choices,
)
# Basic choices that require no more action here.
if choice in (importer.Action.SKIP, importer.Action.ASIS):
# Pass selection to main control flow.
return choice
# Plugin-provided choices. We invoke the associated callback
# function.
elif choice in choices:
post_choice = choice.callback(self, task)
if isinstance(post_choice, importer.Action):
return post_choice
elif isinstance(post_choice, autotag.Proposal):
# Use the new candidates and continue around the loop.
task.candidates = post_choice.candidates
task.rec = post_choice.recommendation
# Otherwise, we have a specific match selection.
else:
# We have a candidate! Finish tagging. Here, choice is an
# AlbumMatch object.
assert isinstance(choice, autotag.AlbumMatch)
return choice
def choose_item(self, task):
"""Ask the user for a choice about tagging a single item. Returns
either an action constant or a TrackMatch object.
"""
ui.print_()
ui.print_(displayable_path(task.item.path))
candidates, rec = task.candidates, task.rec
# Take immediate action if appropriate.
action = _summary_judgment(task.rec)
if action == importer.Action.APPLY:
match = candidates[0]
show_item_change(task.item, match)
return match
elif action is not None:
return action
while True:
# Ask for a choice.
choices = self._get_choices(task)
choice = choose_candidate(
candidates, True, rec, item=task.item, choices=choices
)
if choice in (importer.Action.SKIP, importer.Action.ASIS):
return choice
elif choice in choices:
post_choice = choice.callback(self, task)
if isinstance(post_choice, importer.Action):
return post_choice
elif isinstance(post_choice, autotag.Proposal):
candidates = post_choice.candidates
rec = post_choice.recommendation
else:
# Chose a candidate.
assert isinstance(choice, autotag.TrackMatch)
return choice
def resolve_duplicate(self, task, found_duplicates):
"""Decide what to do when a new album or item seems similar to one
that's already in the library.
"""
log.warning(
"This {} is already in the library!",
("album" if task.is_album else "item"),
)
if config["import"]["quiet"]:
# In quiet mode, don't prompt -- just skip.
log.info("Skipping.")
sel = "s"
else:
# Print some detail about the existing and new items so the
# user can make an informed decision.
for duplicate in found_duplicates:
ui.print_(
"Old: "
+ summarize_items(
(
list(duplicate.items())
if task.is_album
else [duplicate]
),
not task.is_album,
)
)
if config["import"]["duplicate_verbose_prompt"]:
if task.is_album:
for dup in duplicate.items():
print(f" {dup}")
else:
print(f" {duplicate}")
ui.print_(
"New: "
+ summarize_items(
task.imported_items(),
not task.is_album,
)
)
if config["import"]["duplicate_verbose_prompt"]:
for item in task.imported_items():
print(f" {item}")
sel = ui.input_options(
("Skip new", "Keep all", "Remove old", "Merge all")
)
if sel == "s":
# Skip new.
task.set_choice(importer.Action.SKIP)
elif sel == "k":
# Keep both. Do nothing; leave the choice intact.
pass
elif sel == "r":
# Remove old.
task.should_remove_duplicates = True
elif sel == "m":
task.should_merge_duplicates = True
else:
assert False
def should_resume(self, path):
return ui.input_yn(
f"Import of the directory:\n{displayable_path(path)}\n"
"was interrupted. Resume (Y/n)?"
)
def _get_choices(self, task):
"""Get the list of prompt choices that should be presented to the
user. This consists of both built-in choices and ones provided by
plugins.
The `before_choose_candidate` event is sent to the plugins, with
session and task as its parameters. Plugins are responsible for
checking the right conditions and returning a list of `PromptChoice`s,
which is flattened and checked for conflicts.
If two or more choices have the same short letter, a warning is
emitted and all but one choices are discarded, giving preference
to the default importer choices.
Returns a list of `PromptChoice`s.
"""
# Standard, built-in choices.
choices = [
PromptChoice("s", "Skip", lambda s, t: importer.Action.SKIP),
PromptChoice("u", "Use as-is", lambda s, t: importer.Action.ASIS),
]
if task.is_album:
choices += [
PromptChoice(
"t", "as Tracks", lambda s, t: importer.Action.TRACKS
),
PromptChoice(
"g", "Group albums", lambda s, t: importer.Action.ALBUMS
),
]
choices += [
PromptChoice("e", "Enter search", manual_search),
PromptChoice("i", "enter Id", manual_id),
PromptChoice("b", "aBort", abort_action),
]
# Send the before_choose_candidate event and flatten list.
extra_choices = list(
chain(
*plugins.send(
"before_choose_candidate", session=self, task=task
)
)
)
# Add a "dummy" choice for the other baked-in option, for
# duplicate checking.
all_choices = (
[
PromptChoice("a", "Apply", None),
]
+ choices
+ extra_choices
)
# Check for conflicts.
short_letters = [c.short for c in all_choices]
if len(short_letters) != len(set(short_letters)):
# Duplicate short letter has been found.
duplicates = [
i for i, count in Counter(short_letters).items() if count > 1
]
for short in duplicates:
# Keep the first of the choices, removing the rest.
dup_choices = [c for c in all_choices if c.short == short]
for c in dup_choices[1:]:
log.warning(
"Prompt choice '{0.long}' removed due to conflict "
"with '{1[0].long}' (short letter: '{0.short}')",
c,
dup_choices,
)
extra_choices.remove(c)
return choices + extra_choices
def summarize_items(items, singleton):
"""Produces a brief summary line describing a set of items. Used for
manually resolving duplicates during import.
`items` is a list of `Item` objects. `singleton` indicates whether
this is an album or single-item import (if the latter, them `items`
should only have one element).
"""
summary_parts = []
if not singleton:
summary_parts.append(f"{len(items)} items")
format_counts = {}
for item in items:
format_counts[item.format] = format_counts.get(item.format, 0) + 1
if len(format_counts) == 1:
# A single format.
summary_parts.append(items[0].format)
else:
# Enumerate all the formats by decreasing frequencies:
for fmt, count in sorted(
format_counts.items(),
key=lambda fmt_and_count: (-fmt_and_count[1], fmt_and_count[0]),
):
summary_parts.append(f"{fmt} {count}")
if items:
average_bitrate = sum([item.bitrate for item in items]) / len(items)
total_duration = sum([item.length for item in items])
total_filesize = sum([item.filesize for item in items])
summary_parts.append(f"{int(average_bitrate / 1000)}kbps")
if items[0].format == "FLAC":
sample_bits = (
f"{round(int(items[0].samplerate) / 1000, 1)}kHz"
f"/{items[0].bitdepth} bit"
)
summary_parts.append(sample_bits)
summary_parts.append(human_seconds_short(total_duration))
summary_parts.append(human_bytes(total_filesize))
return ", ".join(summary_parts)
def _summary_judgment(rec):
"""Determines whether a decision should be made without even asking
the user. This occurs in quiet mode and when an action is chosen for
NONE recommendations. Return None if the user should be queried.
Otherwise, returns an action. May also print to the console if a
summary judgment is made.
"""
if config["import"]["quiet"]:
if rec == Recommendation.strong:
return importer.Action.APPLY
else:
action = config["import"]["quiet_fallback"].as_choice(
{
"skip": importer.Action.SKIP,
"asis": importer.Action.ASIS,
}
)
elif config["import"]["timid"]:
return None
elif rec == Recommendation.none:
action = config["import"]["none_rec_action"].as_choice(
{
"skip": importer.Action.SKIP,
"asis": importer.Action.ASIS,
"ask": None,
}
)
else:
return None
if action == importer.Action.SKIP:
ui.print_("Skipping.")
elif action == importer.Action.ASIS:
ui.print_("Importing as-is.")
return action
class PromptChoice(NamedTuple):
short: str
long: str
callback: Any
def choose_candidate(
candidates,
singleton,
rec,
cur_artist=None,
cur_album=None,
item=None,
itemcount=None,
choices=[],
):
"""Given a sorted list of candidates, ask the user for a selection
of which candidate to use. Applies to both full albums and
singletons (tracks). Candidates are either AlbumMatch or TrackMatch
objects depending on `singleton`. for albums, `cur_artist`,
`cur_album`, and `itemcount` must be provided. For singletons,
`item` must be provided.
`choices` is a list of `PromptChoice`s to be used in each prompt.
Returns one of the following:
* the result of the choice, which may be SKIP or ASIS
* a candidate (an AlbumMatch/TrackMatch object)
* a chosen `PromptChoice` from `choices`
"""
# Sanity check.
if singleton:
assert item is not None
else:
assert cur_artist is not None
assert cur_album is not None
# Build helper variables for the prompt choices.
choice_opts = tuple(c.long for c in choices)
choice_actions = {c.short: c for c in choices}
# Zero candidates.
if not candidates:
if singleton:
ui.print_("No matching recordings found.")
else:
ui.print_(f"No matching release found for {itemcount} tracks.")
ui.print_(
"For help, see: "
"https://beets.readthedocs.org/en/latest/faq.html#nomatch"
)
sel = ui.input_options(choice_opts)
if sel in choice_actions:
return choice_actions[sel]
else:
assert False
# Is the change good enough?
bypass_candidates = False
if rec != Recommendation.none:
match = candidates[0]
bypass_candidates = True
while True:
# Display and choose from candidates.
require = rec <= Recommendation.low
if not bypass_candidates:
# Display list of candidates.
ui.print_("")
ui.print_(
f"Finding tags for {'track' if singleton else 'album'} "
f'"{item.artist if singleton else cur_artist} -'
f' {item.title if singleton else cur_album}".'
)
ui.print_(" Candidates:")
for i, match in enumerate(candidates):
# Index, metadata, and distance.
index0 = f"{i + 1}."
index = dist_colorize(index0, match.distance)
dist = f"({(1 - match.distance) * 100:.1f}%)"
distance = dist_colorize(dist, match.distance)
metadata = (
f"{match.info.artist} -"
f" {match.info.title if singleton else match.info.album}"
)
if i == 0:
metadata = dist_colorize(metadata, match.distance)
else:
metadata = ui.colorize("text_highlight_minor", metadata)
line1 = [index, distance, metadata]
ui.print_(f" {' '.join(line1)}")
# Penalties.
penalties = penalty_string(match.distance, 3)
if penalties:
ui.print_(f"{' ' * 13}{penalties}")
# Disambiguation
disambig = disambig_string(match.info)
if disambig:
ui.print_(f"{' ' * 13}{disambig}")
# Ask the user for a choice.
sel = ui.input_options(choice_opts, numrange=(1, len(candidates)))
if sel == "m":
pass
elif sel in choice_actions:
return choice_actions[sel]
else: # Numerical selection.
match = candidates[sel - 1]
if sel != 1:
# When choosing anything but the first match,
# disable the default action.
require = True
bypass_candidates = False
# Show what we're about to do.
if singleton:
show_item_change(item, match)
else:
show_change(cur_artist, cur_album, match)
# Exact match => tag automatically if we're not in timid mode.
if rec == Recommendation.strong and not config["import"]["timid"]:
return match
# Ask for confirmation.
default = config["import"]["default_action"].as_choice(
{
"apply": "a",
"skip": "s",
"asis": "u",
"none": None,
}
)
if default is None:
require = True
# Bell ring when user interaction is needed.
if config["import"]["bell"]:
ui.print_("\a", end="")
sel = ui.input_options(
("Apply", "More candidates") + choice_opts,
require=require,
default=default,
)
if sel == "a":
return match
elif sel in choice_actions:
return choice_actions[sel]
def manual_search(session, task):
"""Get a new `Proposal` using manual search criteria.
Input either an artist and album (for full albums) or artist and
track name (for singletons) for manual search.
"""
artist = ui.input_("Artist:").strip()
name = ui.input_("Album:" if task.is_album else "Track:").strip()
if task.is_album:
_, _, prop = autotag.tag_album(task.items, artist, name)
return prop
else:
return autotag.tag_item(task.item, artist, name)
def manual_id(session, task):
"""Get a new `Proposal` using a manually-entered ID.
Input an ID, either for an album ("release") or a track ("recording").
"""
prompt = f"Enter {'release' if task.is_album else 'recording'} ID:"
search_id = ui.input_(prompt).strip()
if task.is_album:
_, _, prop = autotag.tag_album(task.items, search_ids=search_id.split())
return prop
else:
return autotag.tag_item(task.item, search_ids=search_id.split())
def abort_action(session, task):
"""A prompt choice callback that aborts the importer."""
raise importer.ImportAbortError()

25
beets/ui/commands/list.py Normal file
View file

@ -0,0 +1,25 @@
"""The 'list' command: query and show library contents."""
from beets import ui
def list_items(lib, query, album, fmt=""):
"""Print out items in lib matching query. If album, then search for
albums instead of single items.
"""
if album:
for album in lib.albums(query):
ui.print_(format(album, fmt))
else:
for item in lib.items(query):
ui.print_(format(item, fmt))
def list_func(lib, opts, args):
list_items(lib, args, opts.album)
list_cmd = ui.Subcommand("list", help="query the library", aliases=("ls",))
list_cmd.parser.usage += "\nExample: %prog -f '$album: $title' artist:beatles"
list_cmd.parser.add_all_common_options()
list_cmd.func = list_func

162
beets/ui/commands/modify.py Normal file
View file

@ -0,0 +1,162 @@
"""The `modify` command: change metadata fields."""
from beets import library, ui
from beets.util import functemplate
from .utils import do_query
def modify_items(lib, mods, dels, query, write, move, album, confirm, inherit):
"""Modifies matching items according to user-specified assignments and
deletions.
`mods` is a dictionary of field and value pairse indicating
assignments. `dels` is a list of fields to be deleted.
"""
# Parse key=value specifications into a dictionary.
model_cls = library.Album if album else library.Item
# Get the items to modify.
items, albums = do_query(lib, query, album, False)
objs = albums if album else items
# Apply changes *temporarily*, preview them, and collect modified
# objects.
ui.print_(f"Modifying {len(objs)} {'album' if album else 'item'}s.")
changed = []
templates = {
key: functemplate.template(value) for key, value in mods.items()
}
for obj in objs:
obj_mods = {
key: model_cls._parse(key, obj.evaluate_template(templates[key]))
for key in mods.keys()
}
if print_and_modify(obj, obj_mods, dels) and obj not in changed:
changed.append(obj)
# Still something to do?
if not changed:
ui.print_("No changes to make.")
return
# Confirm action.
if confirm:
if write and move:
extra = ", move and write tags"
elif write:
extra = " and write tags"
elif move:
extra = " and move"
else:
extra = ""
changed = ui.input_select_objects(
f"Really modify{extra}",
changed,
lambda o: print_and_modify(o, mods, dels),
)
# Apply changes to database and files
with lib.transaction():
for obj in changed:
obj.try_sync(write, move, inherit)
def print_and_modify(obj, mods, dels):
"""Print the modifications to an item and return a bool indicating
whether any changes were made.
`mods` is a dictionary of fields and values to update on the object;
`dels` is a sequence of fields to delete.
"""
obj.update(mods)
for field in dels:
try:
del obj[field]
except KeyError:
pass
return ui.show_model_changes(obj)
def modify_parse_args(args):
"""Split the arguments for the modify subcommand into query parts,
assignments (field=value), and deletions (field!). Returns the result as
a three-tuple in that order.
"""
mods = {}
dels = []
query = []
for arg in args:
if arg.endswith("!") and "=" not in arg and ":" not in arg:
dels.append(arg[:-1]) # Strip trailing !.
elif "=" in arg and ":" not in arg.split("=", 1)[0]:
key, val = arg.split("=", 1)
mods[key] = val
else:
query.append(arg)
return query, mods, dels
def modify_func(lib, opts, args):
query, mods, dels = modify_parse_args(args)
if not mods and not dels:
raise ui.UserError("no modifications specified")
modify_items(
lib,
mods,
dels,
query,
ui.should_write(opts.write),
ui.should_move(opts.move),
opts.album,
not opts.yes,
opts.inherit,
)
modify_cmd = ui.Subcommand(
"modify", help="change metadata fields", aliases=("mod",)
)
modify_cmd.parser.add_option(
"-m",
"--move",
action="store_true",
dest="move",
help="move files in the library directory",
)
modify_cmd.parser.add_option(
"-M",
"--nomove",
action="store_false",
dest="move",
help="don't move files in library",
)
modify_cmd.parser.add_option(
"-w",
"--write",
action="store_true",
default=None,
help="write new metadata to files' tags (default)",
)
modify_cmd.parser.add_option(
"-W",
"--nowrite",
action="store_false",
dest="write",
help="don't write metadata (opposite of -w)",
)
modify_cmd.parser.add_album_option()
modify_cmd.parser.add_format_option(target="item")
modify_cmd.parser.add_option(
"-y", "--yes", action="store_true", help="skip confirmation"
)
modify_cmd.parser.add_option(
"-I",
"--noinherit",
action="store_false",
dest="inherit",
default=True,
help="when modifying albums, don't also change item data",
)
modify_cmd.func = modify_func

200
beets/ui/commands/move.py Normal file
View file

@ -0,0 +1,200 @@
"""The 'move' command: Move/copy files to the library or a new base directory."""
import os
from beets import logging, ui
from beets.util import (
MoveOperation,
PathLike,
displayable_path,
normpath,
syspath,
)
from .utils import do_query
# Global logger.
log = logging.getLogger("beets")
def show_path_changes(path_changes):
"""Given a list of tuples (source, destination) that indicate the
path changes, log the changes as INFO-level output to the beets log.
The output is guaranteed to be unicode.
Every pair is shown on a single line if the terminal width permits it,
else it is split over two lines. E.g.,
Source -> Destination
vs.
Source
-> Destination
"""
sources, destinations = zip(*path_changes)
# Ensure unicode output
sources = list(map(displayable_path, sources))
destinations = list(map(displayable_path, destinations))
# Calculate widths for terminal split
col_width = (ui.term_width() - len(" -> ")) // 2
max_width = len(max(sources + destinations, key=len))
if max_width > col_width:
# Print every change over two lines
for source, dest in zip(sources, destinations):
color_source, color_dest = ui.colordiff(source, dest)
ui.print_(f"{color_source} \n -> {color_dest}")
else:
# Print every change on a single line, and add a header
title_pad = max_width - len("Source ") + len(" -> ")
ui.print_(f"Source {' ' * title_pad} Destination")
for source, dest in zip(sources, destinations):
pad = max_width - len(source)
color_source, color_dest = ui.colordiff(source, dest)
ui.print_(f"{color_source} {' ' * pad} -> {color_dest}")
def move_items(
lib,
dest_path: PathLike,
query,
copy,
album,
pretend,
confirm=False,
export=False,
):
"""Moves or copies items to a new base directory, given by dest. If
dest is None, then the library's base directory is used, making the
command "consolidate" files.
"""
dest = os.fsencode(dest_path) if dest_path else dest_path
items, albums = do_query(lib, query, album, False)
objs = albums if album else items
num_objs = len(objs)
# Filter out files that don't need to be moved.
def isitemmoved(item):
return item.path != item.destination(basedir=dest)
def isalbummoved(album):
return any(isitemmoved(i) for i in album.items())
objs = [o for o in objs if (isalbummoved if album else isitemmoved)(o)]
num_unmoved = num_objs - len(objs)
# Report unmoved files that match the query.
unmoved_msg = ""
if num_unmoved > 0:
unmoved_msg = f" ({num_unmoved} already in place)"
copy = copy or export # Exporting always copies.
action = "Copying" if copy else "Moving"
act = "copy" if copy else "move"
entity = "album" if album else "item"
log.info(
"{} {} {}{}{}.",
action,
len(objs),
entity,
"s" if len(objs) != 1 else "",
unmoved_msg,
)
if not objs:
return
if pretend:
if album:
show_path_changes(
[
(item.path, item.destination(basedir=dest))
for obj in objs
for item in obj.items()
]
)
else:
show_path_changes(
[(obj.path, obj.destination(basedir=dest)) for obj in objs]
)
else:
if confirm:
objs = ui.input_select_objects(
f"Really {act}",
objs,
lambda o: show_path_changes(
[(o.path, o.destination(basedir=dest))]
),
)
for obj in objs:
log.debug("moving: {.filepath}", obj)
if export:
# Copy without affecting the database.
obj.move(
operation=MoveOperation.COPY, basedir=dest, store=False
)
else:
# Ordinary move/copy: store the new path.
if copy:
obj.move(operation=MoveOperation.COPY, basedir=dest)
else:
obj.move(operation=MoveOperation.MOVE, basedir=dest)
def move_func(lib, opts, args):
dest = opts.dest
if dest is not None:
dest = normpath(dest)
if not os.path.isdir(syspath(dest)):
raise ui.UserError(f"no such directory: {displayable_path(dest)}")
move_items(
lib,
dest,
args,
opts.copy,
opts.album,
opts.pretend,
opts.timid,
opts.export,
)
move_cmd = ui.Subcommand("move", help="move or copy items", aliases=("mv",))
move_cmd.parser.add_option(
"-d", "--dest", metavar="DIR", dest="dest", help="destination directory"
)
move_cmd.parser.add_option(
"-c",
"--copy",
default=False,
action="store_true",
help="copy instead of moving",
)
move_cmd.parser.add_option(
"-p",
"--pretend",
default=False,
action="store_true",
help="show how files would be moved, but don't touch anything",
)
move_cmd.parser.add_option(
"-t",
"--timid",
dest="timid",
action="store_true",
help="always confirm all actions",
)
move_cmd.parser.add_option(
"-e",
"--export",
default=False,
action="store_true",
help="copy without changing the database path",
)
move_cmd.parser.add_album_option()
move_cmd.func = move_func

View file

@ -0,0 +1,84 @@
"""The `remove` command: remove items from the library (and optionally delete files)."""
from beets import ui
from .utils import do_query
def remove_items(lib, query, album, delete, force):
"""Remove items matching query from lib. If album, then match and
remove whole albums. If delete, also remove files from disk.
"""
# Get the matching items.
items, albums = do_query(lib, query, album)
objs = albums if album else items
# Confirm file removal if not forcing removal.
if not force:
# Prepare confirmation with user.
album_str = (
f" in {len(albums)} album{'s' if len(albums) > 1 else ''}"
if album
else ""
)
if delete:
fmt = "$path - $title"
prompt = "Really DELETE"
prompt_all = (
"Really DELETE"
f" {len(items)} file{'s' if len(items) > 1 else ''}{album_str}"
)
else:
fmt = ""
prompt = "Really remove from the library?"
prompt_all = (
"Really remove"
f" {len(items)} item{'s' if len(items) > 1 else ''}{album_str}"
" from the library?"
)
# Helpers for printing affected items
def fmt_track(t):
ui.print_(format(t, fmt))
def fmt_album(a):
ui.print_()
for i in a.items():
fmt_track(i)
fmt_obj = fmt_album if album else fmt_track
# Show all the items.
for o in objs:
fmt_obj(o)
# Confirm with user.
objs = ui.input_select_objects(
prompt, objs, fmt_obj, prompt_all=prompt_all
)
if not objs:
return
# Remove (and possibly delete) items.
with lib.transaction():
for obj in objs:
obj.remove(delete)
def remove_func(lib, opts, args):
remove_items(lib, args, opts.album, opts.delete, opts.force)
remove_cmd = ui.Subcommand(
"remove", help="remove matching items from the library", aliases=("rm",)
)
remove_cmd.parser.add_option(
"-d", "--delete", action="store_true", help="also remove files from disk"
)
remove_cmd.parser.add_option(
"-f", "--force", action="store_true", help="do not ask when removing items"
)
remove_cmd.parser.add_album_option()
remove_cmd.func = remove_func

View file

@ -0,0 +1,62 @@
"""The 'stats' command: show library statistics."""
import os
from beets import logging, ui
from beets.util import syspath
from beets.util.units import human_bytes, human_seconds
# Global logger.
log = logging.getLogger("beets")
def show_stats(lib, query, exact):
"""Shows some statistics about the matched items."""
items = lib.items(query)
total_size = 0
total_time = 0.0
total_items = 0
artists = set()
albums = set()
album_artists = set()
for item in items:
if exact:
try:
total_size += os.path.getsize(syspath(item.path))
except OSError as exc:
log.info("could not get size of {.path}: {}", item, exc)
else:
total_size += int(item.length * item.bitrate / 8)
total_time += item.length
total_items += 1
artists.add(item.artist)
album_artists.add(item.albumartist)
if item.album_id:
albums.add(item.album_id)
size_str = human_bytes(total_size)
if exact:
size_str += f" ({total_size} bytes)"
ui.print_(f"""Tracks: {total_items}
Total time: {human_seconds(total_time)}
{f" ({total_time:.2f} seconds)" if exact else ""}
{"Total size" if exact else "Approximate total size"}: {size_str}
Artists: {len(artists)}
Albums: {len(albums)}
Album artists: {len(album_artists)}""")
def stats_func(lib, opts, args):
show_stats(lib, args, opts.exact)
stats_cmd = ui.Subcommand(
"stats", help="show statistics about the library or a query"
)
stats_cmd.parser.add_option(
"-e", "--exact", action="store_true", help="exact size and time"
)
stats_cmd.func = stats_func

196
beets/ui/commands/update.py Normal file
View file

@ -0,0 +1,196 @@
"""The `update` command: Update library contents according to on-disk tags."""
import os
from beets import library, logging, ui
from beets.util import ancestry, syspath
from .utils import do_query
# Global logger.
log = logging.getLogger("beets")
def update_items(lib, query, album, move, pretend, fields, exclude_fields=None):
"""For all the items matched by the query, update the library to
reflect the item's embedded tags.
:param fields: The fields to be stored. If not specified, all fields will
be.
:param exclude_fields: The fields to not be stored. If not specified, all
fields will be.
"""
with lib.transaction():
items, _ = do_query(lib, query, album)
if move and fields is not None and "path" not in fields:
# Special case: if an item needs to be moved, the path field has to
# updated; otherwise the new path will not be reflected in the
# database.
fields.append("path")
if fields is None:
# no fields were provided, update all media fields
item_fields = fields or library.Item._media_fields
if move and "path" not in item_fields:
# move is enabled, add 'path' to the list of fields to update
item_fields.add("path")
else:
# fields was provided, just update those
item_fields = fields
# get all the album fields to update
album_fields = fields or library.Album._fields.keys()
if exclude_fields:
# remove any excluded fields from the item and album sets
item_fields = [f for f in item_fields if f not in exclude_fields]
album_fields = [f for f in album_fields if f not in exclude_fields]
# Walk through the items and pick up their changes.
affected_albums = set()
for item in items:
# Item deleted?
if not item.path or not os.path.exists(syspath(item.path)):
ui.print_(format(item))
ui.print_(ui.colorize("text_error", " deleted"))
if not pretend:
item.remove(True)
affected_albums.add(item.album_id)
continue
# Did the item change since last checked?
if item.current_mtime() <= item.mtime:
log.debug(
"skipping {0.filepath} because mtime is up to date ({0.mtime})",
item,
)
continue
# Read new data.
try:
item.read()
except library.ReadError as exc:
log.error("error reading {.filepath}: {}", item, exc)
continue
# Special-case album artist when it matches track artist. (Hacky
# but necessary for preserving album-level metadata for non-
# autotagged imports.)
if not item.albumartist:
old_item = lib.get_item(item.id)
if old_item.albumartist == old_item.artist == item.artist:
item.albumartist = old_item.albumartist
item._dirty.discard("albumartist")
# Check for and display changes.
changed = ui.show_model_changes(item, fields=item_fields)
# Save changes.
if not pretend:
if changed:
# Move the item if it's in the library.
if move and lib.directory in ancestry(item.path):
item.move(store=False)
item.store(fields=item_fields)
affected_albums.add(item.album_id)
else:
# The file's mtime was different, but there were no
# changes to the metadata. Store the new mtime,
# which is set in the call to read(), so we don't
# check this again in the future.
item.store(fields=item_fields)
# Skip album changes while pretending.
if pretend:
return
# Modify affected albums to reflect changes in their items.
for album_id in affected_albums:
if album_id is None: # Singletons.
continue
album = lib.get_album(album_id)
if not album: # Empty albums have already been removed.
log.debug("emptied album {}", album_id)
continue
first_item = album.items().get()
# Update album structure to reflect an item in it.
for key in library.Album.item_keys:
album[key] = first_item[key]
album.store(fields=album_fields)
# Move album art (and any inconsistent items).
if move and lib.directory in ancestry(first_item.path):
log.debug("moving album {}", album_id)
# Manually moving and storing the album.
items = list(album.items())
for item in items:
item.move(store=False, with_album=False)
item.store(fields=item_fields)
album.move(store=False)
album.store(fields=album_fields)
def update_func(lib, opts, args):
# Verify that the library folder exists to prevent accidental wipes.
if not os.path.isdir(syspath(lib.directory)):
ui.print_("Library path is unavailable or does not exist.")
ui.print_(lib.directory)
if not ui.input_yn("Are you sure you want to continue (y/n)?", True):
return
update_items(
lib,
args,
opts.album,
ui.should_move(opts.move),
opts.pretend,
opts.fields,
opts.exclude_fields,
)
update_cmd = ui.Subcommand(
"update",
help="update the library",
aliases=(
"upd",
"up",
),
)
update_cmd.parser.add_album_option()
update_cmd.parser.add_format_option()
update_cmd.parser.add_option(
"-m",
"--move",
action="store_true",
dest="move",
help="move files in the library directory",
)
update_cmd.parser.add_option(
"-M",
"--nomove",
action="store_false",
dest="move",
help="don't move files in library",
)
update_cmd.parser.add_option(
"-p",
"--pretend",
action="store_true",
help="show all changes but do nothing",
)
update_cmd.parser.add_option(
"-F",
"--field",
default=None,
action="append",
dest="fields",
help="list of fields to update",
)
update_cmd.parser.add_option(
"-e",
"--exclude-field",
default=None,
action="append",
dest="exclude_fields",
help="list of fields to exclude from updates",
)
update_cmd.func = update_func

View file

@ -0,0 +1,29 @@
"""Utility functions for beets UI commands."""
from beets import ui
def do_query(lib, query, album, also_items=True):
"""For commands that operate on matched items, performs a query
and returns a list of matching items and a list of matching
albums. (The latter is only nonempty when album is True.) Raises
a UserError if no items match. also_items controls whether, when
fetching albums, the associated items should be fetched also.
"""
if album:
albums = list(lib.albums(query))
items = []
if also_items:
for al in albums:
items += al.items()
else:
albums = []
items = list(lib.items(query))
if album and not albums:
raise ui.UserError("No matching albums found.")
elif not album and not items:
raise ui.UserError("No matching items found.")
return items, albums

View file

@ -0,0 +1,23 @@
"""The 'version' command: show version information."""
from platform import python_version
import beets
from beets import plugins, ui
def show_version(*args):
ui.print_(f"beets version {beets.__version__}")
ui.print_(f"Python version {python_version()}")
# Show plugins.
names = sorted(p.name for p in plugins.find_plugins())
if names:
ui.print_("plugins:", ", ".join(names))
else:
ui.print_("no plugins loaded")
version_cmd = ui.Subcommand("version", help="output version information")
version_cmd.func = show_version
__all__ = ["version_cmd"]

View file

@ -0,0 +1,60 @@
"""The `write` command: write tag information to files."""
import os
from beets import library, logging, ui
from beets.util import syspath
from .utils import do_query
# Global logger.
log = logging.getLogger("beets")
def write_items(lib, query, pretend, force):
"""Write tag information from the database to the respective files
in the filesystem.
"""
items, albums = do_query(lib, query, False, False)
for item in items:
# Item deleted?
if not os.path.exists(syspath(item.path)):
log.info("missing file: {.filepath}", item)
continue
# Get an Item object reflecting the "clean" (on-disk) state.
try:
clean_item = library.Item.from_path(item.path)
except library.ReadError as exc:
log.error("error reading {.filepath}: {}", item, exc)
continue
# Check for and display changes.
changed = ui.show_model_changes(
item, clean_item, library.Item._media_tag_fields, force
)
if (changed or force) and not pretend:
# We use `try_sync` here to keep the mtime up to date in the
# database.
item.try_sync(True, False)
def write_func(lib, opts, args):
write_items(lib, args, opts.pretend, opts.force)
write_cmd = ui.Subcommand("write", help="write tag information to files")
write_cmd.parser.add_option(
"-p",
"--pretend",
action="store_true",
help="show all changes but do nothing",
)
write_cmd.parser.add_option(
"-f",
"--force",
action="store_true",
help="write tags even if the existing tags match the database",
)
write_cmd.func = write_func

View file

@ -25,7 +25,8 @@ import yaml
from beets import plugins, ui, util
from beets.dbcore import types
from beets.importer import Action
from beets.ui.commands import PromptChoice, _do_query
from beets.ui.commands.import_.session import PromptChoice
from beets.ui.commands.utils import do_query
# These "safe" types can avoid the format/parse cycle that most fields go
# through: they are safe to edit with native YAML types.
@ -176,7 +177,7 @@ class EditPlugin(plugins.BeetsPlugin):
def _edit_command(self, lib, opts, args):
"""The CLI command function for the `beet edit` command."""
# Get the objects to edit.
items, albums = _do_query(lib, args, opts.album, False)
items, albums = do_query(lib, args, opts.album, False)
objs = albums if opts.album else items
if not objs:
ui.print_("Nothing to edit.")

View file

@ -43,6 +43,9 @@ Other changes:
- The documentation chapter :doc:`dev/paths` has been moved to the "For
Developers" section and revised to reflect current best practices (pathlib
usage).
- Refactored the ``beets/ui/commands.py`` monolithic file (2000+ lines) into
multiple modules within the ``beets/ui/commands`` directory for better
maintainability.
2.5.1 (October 14, 2025)
------------------------

File diff suppressed because it is too large Load diff

0
test/ui/__init__.py Normal file
View file

View file

View file

@ -0,0 +1,64 @@
import os
import subprocess
import sys
import pytest
from beets.test import _common
from beets.test.helper import IOMixin, has_program
from beets.ui.commands.completion import BASH_COMPLETION_PATHS
from beets.util import syspath
from ..test_ui import TestPluginTestCase
@_common.slow_test()
@pytest.mark.xfail(
os.environ.get("GITHUB_ACTIONS") == "true" and sys.platform == "linux",
reason="Completion is for some reason unhappy on Ubuntu 24.04 in CI",
)
class CompletionTest(IOMixin, TestPluginTestCase):
def test_completion(self):
# Do not load any other bash completion scripts on the system.
env = dict(os.environ)
env["BASH_COMPLETION_DIR"] = os.devnull
env["BASH_COMPLETION_COMPAT_DIR"] = os.devnull
# Open a `bash` process to run the tests in. We'll pipe in bash
# commands via stdin.
cmd = os.environ.get("BEETS_TEST_SHELL", "/bin/bash --norc").split()
if not has_program(cmd[0]):
self.skipTest("bash not available")
tester = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=env
)
# Load bash_completion library.
for path in BASH_COMPLETION_PATHS:
if os.path.exists(syspath(path)):
bash_completion = path
break
else:
self.skipTest("bash-completion script not found")
try:
with open(syspath(bash_completion), "rb") as f:
tester.stdin.writelines(f)
except OSError:
self.skipTest("could not read bash-completion script")
# Load completion script.
self.run_command("completion", lib=None)
completion_script = self.io.getoutput().encode("utf-8")
self.io.restore()
tester.stdin.writelines(completion_script.splitlines(True))
# Load test suite.
test_script_name = os.path.join(_common.RSRC, b"test_completion.sh")
with open(test_script_name, "rb") as test_script_file:
tester.stdin.writelines(test_script_file)
out, err = tester.communicate()
assert tester.returncode == 0
assert out == b"completion tests passed\n", (
"test/test_completion.sh did not execute properly. "
f"Output:{out.decode('utf-8')}"
)

View file

@ -0,0 +1,24 @@
from beets import library
from beets.test.helper import IOMixin, ItemInDBTestCase
from beets.ui.commands.fields import fields_func
class FieldsTest(IOMixin, ItemInDBTestCase):
def remove_keys(self, keys, text):
for i in text:
try:
keys.remove(i)
except ValueError:
pass
def test_fields_func(self):
fields_func(self.lib, [], [])
items = library.Item.all_keys()
albums = library.Album.all_keys()
output = self.io.stdout.get().split()
self.remove_keys(items, output)
self.remove_keys(albums, output)
assert len(items) == 0
assert len(albums) == 0

View file

@ -0,0 +1,256 @@
import os
import re
import unittest
from unittest.mock import Mock, patch
import pytest
from beets import autotag, config, library, ui
from beets.autotag.match import distance
from beets.test import _common
from beets.test.helper import BeetsTestCase, IOMixin
from beets.ui.commands.import_ import import_files, paths_from_logfile
from beets.ui.commands.import_.display import show_change
from beets.ui.commands.import_.session import summarize_items
class ImportTest(BeetsTestCase):
def test_quiet_timid_disallowed(self):
config["import"]["quiet"] = True
config["import"]["timid"] = True
with pytest.raises(ui.UserError):
import_files(None, [], None)
def test_parse_paths_from_logfile(self):
if os.path.__name__ == "ntpath":
logfile_content = (
"import started Wed Jun 15 23:08:26 2022\n"
"asis C:\\music\\Beatles, The\\The Beatles; C:\\music\\Beatles, The\\The Beatles\\CD 01; C:\\music\\Beatles, The\\The Beatles\\CD 02\n" # noqa: E501
"duplicate-replace C:\\music\\Bill Evans\\Trio '65\n"
"skip C:\\music\\Michael Jackson\\Bad\n"
"skip C:\\music\\Soulwax\\Any Minute Now\n"
)
expected_paths = [
"C:\\music\\Beatles, The\\The Beatles",
"C:\\music\\Michael Jackson\\Bad",
"C:\\music\\Soulwax\\Any Minute Now",
]
else:
logfile_content = (
"import started Wed Jun 15 23:08:26 2022\n"
"asis /music/Beatles, The/The Beatles; /music/Beatles, The/The Beatles/CD 01; /music/Beatles, The/The Beatles/CD 02\n" # noqa: E501
"duplicate-replace /music/Bill Evans/Trio '65\n"
"skip /music/Michael Jackson/Bad\n"
"skip /music/Soulwax/Any Minute Now\n"
)
expected_paths = [
"/music/Beatles, The/The Beatles",
"/music/Michael Jackson/Bad",
"/music/Soulwax/Any Minute Now",
]
logfile = os.path.join(self.temp_dir, b"logfile.log")
with open(logfile, mode="w") as fp:
fp.write(logfile_content)
actual_paths = list(paths_from_logfile(logfile))
assert actual_paths == expected_paths
class ShowChangeTest(IOMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.items = [_common.item()]
self.items[0].track = 1
self.items[0].path = b"/path/to/file.mp3"
self.info = autotag.AlbumInfo(
album="the album",
album_id="album id",
artist="the artist",
artist_id="artist id",
tracks=[
autotag.TrackInfo(
title="the title", track_id="track id", index=1
)
],
)
def _show_change(
self,
items=None,
info=None,
color=False,
cur_artist="the artist",
cur_album="the album",
dist=0.1,
):
"""Return an unicode string representing the changes"""
items = items or self.items
info = info or self.info
mapping = dict(zip(items, info.tracks))
config["ui"]["color"] = color
config["import"]["detail"] = True
change_dist = distance(items, info, mapping)
change_dist._penalties = {"album": [dist], "artist": [dist]}
show_change(
cur_artist,
cur_album,
autotag.AlbumMatch(change_dist, info, mapping, set(), set()),
)
return self.io.getoutput().lower()
def test_null_change(self):
msg = self._show_change()
assert "match (90.0%)" in msg
assert "album, artist" in msg
def test_album_data_change(self):
msg = self._show_change(
cur_artist="another artist", cur_album="another album"
)
assert "another artist -> the artist" in msg
assert "another album -> the album" in msg
def test_item_data_change(self):
self.items[0].title = "different"
msg = self._show_change()
assert "different" in msg
assert "the title" in msg
def test_item_data_change_with_unicode(self):
self.items[0].title = "caf\xe9"
msg = self._show_change()
assert "caf\xe9" in msg
assert "the title" in msg
def test_album_data_change_with_unicode(self):
msg = self._show_change(cur_artist="caf\xe9", cur_album="another album")
assert "caf\xe9" in msg
assert "the artist" in msg
def test_item_data_change_title_missing(self):
self.items[0].title = ""
msg = re.sub(r" +", " ", self._show_change())
assert "file.mp3" in msg
assert "the title" in msg
def test_item_data_change_title_missing_with_unicode_filename(self):
self.items[0].title = ""
self.items[0].path = "/path/to/caf\xe9.mp3".encode()
msg = re.sub(r" +", " ", self._show_change())
assert "caf\xe9.mp3" in msg or "caf.mp3" in msg
def test_colorize(self):
assert "test" == ui.uncolorize("test")
txt = ui.uncolorize("\x1b[31mtest\x1b[39;49;00m")
assert "test" == txt
txt = ui.uncolorize("\x1b[31mtest\x1b[39;49;00m test")
assert "test test" == txt
txt = ui.uncolorize("\x1b[31mtest\x1b[39;49;00mtest")
assert "testtest" == txt
txt = ui.uncolorize("test \x1b[31mtest\x1b[39;49;00m test")
assert "test test test" == txt
def test_color_split(self):
exp = ("test", "")
res = ui.color_split("test", 5)
assert exp == res
exp = ("\x1b[31mtes\x1b[39;49;00m", "\x1b[31mt\x1b[39;49;00m")
res = ui.color_split("\x1b[31mtest\x1b[39;49;00m", 3)
assert exp == res
def test_split_into_lines(self):
# Test uncolored text
txt = ui.split_into_lines("test test test", [5, 5, 5])
assert txt == ["test", "test", "test"]
# Test multiple colored texts
colored_text = "\x1b[31mtest \x1b[39;49;00m" * 3
split_txt = [
"\x1b[31mtest\x1b[39;49;00m",
"\x1b[31mtest\x1b[39;49;00m",
"\x1b[31mtest\x1b[39;49;00m",
]
txt = ui.split_into_lines(colored_text, [5, 5, 5])
assert txt == split_txt
# Test single color, multi space text
colored_text = "\x1b[31m test test test \x1b[39;49;00m"
txt = ui.split_into_lines(colored_text, [5, 5, 5])
assert txt == split_txt
# Test single color, different spacing
colored_text = "\x1b[31mtest\x1b[39;49;00mtest test test"
# ToDo: fix color_len to handle mid-text color escapes, and thus
# split colored texts over newlines (potentially with dashes?)
split_txt = ["\x1b[31mtest\x1b[39;49;00mt", "est", "test", "test"]
txt = ui.split_into_lines(colored_text, [5, 5, 5])
assert txt == split_txt
def test_album_data_change_wrap_newline(self):
# Patch ui.term_width to force wrapping
with patch("beets.ui.term_width", return_value=30):
# Test newline layout
config["ui"]["import"]["layout"] = "newline"
long_name = f"another artist with a{' very' * 10} long name"
msg = self._show_change(
cur_artist=long_name, cur_album="another album"
)
assert "artist: another artist" in msg
assert " -> the artist" in msg
assert "another album -> the album" not in msg
def test_item_data_change_wrap_column(self):
# Patch ui.term_width to force wrapping
with patch("beets.ui.term_width", return_value=54):
# Test Column layout
config["ui"]["import"]["layout"] = "column"
long_title = f"a track with a{' very' * 10} long name"
self.items[0].title = long_title
msg = self._show_change()
assert "(#1) a track (1:00) -> (#1) the title (0:00)" in msg
def test_item_data_change_wrap_newline(self):
# Patch ui.term_width to force wrapping
with patch("beets.ui.term_width", return_value=30):
config["ui"]["import"]["layout"] = "newline"
long_title = f"a track with a{' very' * 10} long name"
self.items[0].title = long_title
msg = self._show_change()
assert "(#1) a track with" in msg
assert " -> (#1) the title (0:00)" in msg
@patch("beets.library.Item.try_filesize", Mock(return_value=987))
class SummarizeItemsTest(unittest.TestCase):
def setUp(self):
super().setUp()
item = library.Item()
item.bitrate = 4321
item.length = 10 * 60 + 54
item.format = "F"
self.item = item
def test_summarize_item(self):
summary = summarize_items([], True)
assert summary == ""
summary = summarize_items([self.item], True)
assert summary == "F, 4kbps, 10:54, 987.0 B"
def test_summarize_items(self):
summary = summarize_items([], False)
assert summary == "0 items"
summary = summarize_items([self.item], False)
assert summary == "1 items, F, 4kbps, 10:54, 987.0 B"
# make a copy of self.item
i2 = self.item.copy()
summary = summarize_items([self.item, i2], False)
assert summary == "2 items, F, 4kbps, 21:48, 1.9 KiB"
i2.format = "G"
summary = summarize_items([self.item, i2], False)
assert summary == "2 items, F 1, G 1, 4kbps, 21:48, 1.9 KiB"
summary = summarize_items([self.item, i2, i2], False)
assert summary == "3 items, G 2, F 1, 4kbps, 32:42, 2.9 KiB"

View file

@ -0,0 +1,69 @@
from beets.test import _common
from beets.test.helper import BeetsTestCase, capture_stdout
from beets.ui.commands.list import list_items
class ListTest(BeetsTestCase):
def setUp(self):
super().setUp()
self.item = _common.item()
self.item.path = "xxx/yyy"
self.lib.add(self.item)
self.lib.add_album([self.item])
def _run_list(self, query="", album=False, path=False, fmt=""):
with capture_stdout() as stdout:
list_items(self.lib, query, album, fmt)
return stdout
def test_list_outputs_item(self):
stdout = self._run_list()
assert "the title" in stdout.getvalue()
def test_list_unicode_query(self):
self.item.title = "na\xefve"
self.item.store()
self.lib._connection().commit()
stdout = self._run_list(["na\xefve"])
out = stdout.getvalue()
assert "na\xefve" in out
def test_list_item_path(self):
stdout = self._run_list(fmt="$path")
assert stdout.getvalue().strip() == "xxx/yyy"
def test_list_album_outputs_something(self):
stdout = self._run_list(album=True)
assert len(stdout.getvalue()) > 0
def test_list_album_path(self):
stdout = self._run_list(album=True, fmt="$path")
assert stdout.getvalue().strip() == "xxx"
def test_list_album_omits_title(self):
stdout = self._run_list(album=True)
assert "the title" not in stdout.getvalue()
def test_list_uses_track_artist(self):
stdout = self._run_list()
assert "the artist" in stdout.getvalue()
assert "the album artist" not in stdout.getvalue()
def test_list_album_uses_album_artist(self):
stdout = self._run_list(album=True)
assert "the artist" not in stdout.getvalue()
assert "the album artist" in stdout.getvalue()
def test_list_item_format_artist(self):
stdout = self._run_list(fmt="$artist")
assert "the artist" in stdout.getvalue()
def test_list_item_format_multiple(self):
stdout = self._run_list(fmt="$artist - $album - $year")
assert "the artist - the album - 0001" == stdout.getvalue().strip()
def test_list_album_format(self):
stdout = self._run_list(album=True, fmt="$genre")
assert "the genre" in stdout.getvalue()
assert "the album" not in stdout.getvalue()

View file

@ -0,0 +1,216 @@
import unittest
from mediafile import MediaFile
from beets.test.helper import BeetsTestCase, control_stdin
from beets.ui.commands.modify import modify_parse_args
from beets.util import syspath
class ModifyTest(BeetsTestCase):
def setUp(self):
super().setUp()
self.album = self.add_album_fixture()
[self.item] = self.album.items()
def modify_inp(self, inp, *args):
with control_stdin(inp):
self.run_command("modify", *args)
def modify(self, *args):
self.modify_inp("y", *args)
# Item tests
def test_modify_item(self):
self.modify("title=newTitle")
item = self.lib.items().get()
assert item.title == "newTitle"
def test_modify_item_abort(self):
item = self.lib.items().get()
title = item.title
self.modify_inp("n", "title=newTitle")
item = self.lib.items().get()
assert item.title == title
def test_modify_item_no_change(self):
title = "Tracktitle"
item = self.add_item_fixture(title=title)
self.modify_inp("y", "title", f"title={title}")
item = self.lib.items(title).get()
assert item.title == title
def test_modify_write_tags(self):
self.modify("title=newTitle")
item = self.lib.items().get()
item.read()
assert item.title == "newTitle"
def test_modify_dont_write_tags(self):
self.modify("--nowrite", "title=newTitle")
item = self.lib.items().get()
item.read()
assert item.title != "newTitle"
def test_move(self):
self.modify("title=newTitle")
item = self.lib.items().get()
assert b"newTitle" in item.path
def test_not_move(self):
self.modify("--nomove", "title=newTitle")
item = self.lib.items().get()
assert b"newTitle" not in item.path
def test_no_write_no_move(self):
self.modify("--nomove", "--nowrite", "title=newTitle")
item = self.lib.items().get()
item.read()
assert b"newTitle" not in item.path
assert item.title != "newTitle"
def test_update_mtime(self):
item = self.item
old_mtime = item.mtime
self.modify("title=newTitle")
item.load()
assert old_mtime != item.mtime
assert item.current_mtime() == item.mtime
def test_reset_mtime_with_no_write(self):
item = self.item
self.modify("--nowrite", "title=newTitle")
item.load()
assert 0 == item.mtime
def test_selective_modify(self):
title = "Tracktitle"
album = "album"
original_artist = "composer"
new_artist = "coverArtist"
for i in range(0, 10):
self.add_item_fixture(
title=f"{title}{i}", artist=original_artist, album=album
)
self.modify_inp(
"s\ny\ny\ny\nn\nn\ny\ny\ny\ny\nn", title, f"artist={new_artist}"
)
original_items = self.lib.items(f"artist:{original_artist}")
new_items = self.lib.items(f"artist:{new_artist}")
assert len(list(original_items)) == 3
assert len(list(new_items)) == 7
def test_modify_formatted(self):
for i in range(0, 3):
self.add_item_fixture(
title=f"title{i}", artist="artist", album="album"
)
items = list(self.lib.items())
self.modify("title=${title} - append")
for item in items:
orig_title = item.title
item.load()
assert item.title == f"{orig_title} - append"
# Album Tests
def test_modify_album(self):
self.modify("--album", "album=newAlbum")
album = self.lib.albums().get()
assert album.album == "newAlbum"
def test_modify_album_write_tags(self):
self.modify("--album", "album=newAlbum")
item = self.lib.items().get()
item.read()
assert item.album == "newAlbum"
def test_modify_album_dont_write_tags(self):
self.modify("--album", "--nowrite", "album=newAlbum")
item = self.lib.items().get()
item.read()
assert item.album == "the album"
def test_album_move(self):
self.modify("--album", "album=newAlbum")
item = self.lib.items().get()
item.read()
assert b"newAlbum" in item.path
def test_album_not_move(self):
self.modify("--nomove", "--album", "album=newAlbum")
item = self.lib.items().get()
item.read()
assert b"newAlbum" not in item.path
def test_modify_album_formatted(self):
item = self.lib.items().get()
orig_album = item.album
self.modify("--album", "album=${album} - append")
item.load()
assert item.album == f"{orig_album} - append"
# Misc
def test_write_initial_key_tag(self):
self.modify("initial_key=C#m")
item = self.lib.items().get()
mediafile = MediaFile(syspath(item.path))
assert mediafile.initial_key == "C#m"
def test_set_flexattr(self):
self.modify("flexattr=testAttr")
item = self.lib.items().get()
assert item.flexattr == "testAttr"
def test_remove_flexattr(self):
item = self.lib.items().get()
item.flexattr = "testAttr"
item.store()
self.modify("flexattr!")
item = self.lib.items().get()
assert "flexattr" not in item
@unittest.skip("not yet implemented")
def test_delete_initial_key_tag(self):
item = self.lib.items().get()
item.initial_key = "C#m"
item.write()
item.store()
mediafile = MediaFile(syspath(item.path))
assert mediafile.initial_key == "C#m"
self.modify("initial_key!")
mediafile = MediaFile(syspath(item.path))
assert mediafile.initial_key is None
def test_arg_parsing_colon_query(self):
(query, mods, dels) = modify_parse_args(
["title:oldTitle", "title=newTitle"]
)
assert query == ["title:oldTitle"]
assert mods == {"title": "newTitle"}
def test_arg_parsing_delete(self):
(query, mods, dels) = modify_parse_args(["title:oldTitle", "title!"])
assert query == ["title:oldTitle"]
assert dels == ["title"]
def test_arg_parsing_query_with_exclaimation(self):
(query, mods, dels) = modify_parse_args(
["title:oldTitle!", "title=newTitle!"]
)
assert query == ["title:oldTitle!"]
assert mods == {"title": "newTitle!"}
def test_arg_parsing_equals_in_value(self):
(query, mods, dels) = modify_parse_args(
["title:foo=bar", "title=newTitle"]
)
assert query == ["title:foo=bar"]
assert mods == {"title": "newTitle"}

View file

@ -0,0 +1,102 @@
import shutil
from beets import library
from beets.test.helper import BeetsTestCase
from beets.ui.commands.move import move_items
class MoveTest(BeetsTestCase):
def setUp(self):
super().setUp()
self.initial_item_path = self.lib_path / "srcfile"
shutil.copy(self.resource_path, self.initial_item_path)
# Add a file to the library but don't copy it in yet.
self.i = library.Item.from_path(self.initial_item_path)
self.lib.add(self.i)
self.album = self.lib.add_album([self.i])
# Alternate destination directory.
self.otherdir = self.temp_dir_path / "testotherdir"
def _move(
self,
query=(),
dest=None,
copy=False,
album=False,
pretend=False,
export=False,
):
move_items(self.lib, dest, query, copy, album, pretend, export=export)
def test_move_item(self):
self._move()
self.i.load()
assert b"libdir" in self.i.path
assert self.i.filepath.exists()
assert not self.initial_item_path.exists()
def test_copy_item(self):
self._move(copy=True)
self.i.load()
assert b"libdir" in self.i.path
assert self.i.filepath.exists()
assert self.initial_item_path.exists()
def test_move_album(self):
self._move(album=True)
self.i.load()
assert b"libdir" in self.i.path
assert self.i.filepath.exists()
assert not self.initial_item_path.exists()
def test_copy_album(self):
self._move(copy=True, album=True)
self.i.load()
assert b"libdir" in self.i.path
assert self.i.filepath.exists()
assert self.initial_item_path.exists()
def test_move_item_custom_dir(self):
self._move(dest=self.otherdir)
self.i.load()
assert b"testotherdir" in self.i.path
assert self.i.filepath.exists()
assert not self.initial_item_path.exists()
def test_move_album_custom_dir(self):
self._move(dest=self.otherdir, album=True)
self.i.load()
assert b"testotherdir" in self.i.path
assert self.i.filepath.exists()
assert not self.initial_item_path.exists()
def test_pretend_move_item(self):
self._move(dest=self.otherdir, pretend=True)
self.i.load()
assert self.i.filepath == self.initial_item_path
def test_pretend_move_album(self):
self._move(album=True, pretend=True)
self.i.load()
assert self.i.filepath == self.initial_item_path
def test_export_item_custom_dir(self):
self._move(dest=self.otherdir, export=True)
self.i.load()
assert self.i.filepath == self.initial_item_path
assert self.otherdir.exists()
def test_export_album_custom_dir(self):
self._move(dest=self.otherdir, album=True, export=True)
self.i.load()
assert self.i.filepath == self.initial_item_path
assert self.otherdir.exists()
def test_pretend_export_item(self):
self._move(dest=self.otherdir, pretend=True, export=True)
self.i.load()
assert self.i.filepath == self.initial_item_path
assert not self.otherdir.exists()

View file

@ -0,0 +1,80 @@
import os
from beets import library
from beets.test.helper import BeetsTestCase, IOMixin
from beets.ui.commands.remove import remove_items
from beets.util import MoveOperation, syspath
class RemoveTest(IOMixin, BeetsTestCase):
def setUp(self):
super().setUp()
# Copy a file into the library.
self.i = library.Item.from_path(self.resource_path)
self.lib.add(self.i)
self.i.move(operation=MoveOperation.COPY)
def test_remove_items_no_delete(self):
self.io.addinput("y")
remove_items(self.lib, "", False, False, False)
items = self.lib.items()
assert len(list(items)) == 0
assert self.i.filepath.exists()
def test_remove_items_with_delete(self):
self.io.addinput("y")
remove_items(self.lib, "", False, True, False)
items = self.lib.items()
assert len(list(items)) == 0
assert not self.i.filepath.exists()
def test_remove_items_with_force_no_delete(self):
remove_items(self.lib, "", False, False, True)
items = self.lib.items()
assert len(list(items)) == 0
assert self.i.filepath.exists()
def test_remove_items_with_force_delete(self):
remove_items(self.lib, "", False, True, True)
items = self.lib.items()
assert len(list(items)) == 0
assert not self.i.filepath.exists()
def test_remove_items_select_with_delete(self):
i2 = library.Item.from_path(self.resource_path)
self.lib.add(i2)
i2.move(operation=MoveOperation.COPY)
for s in ("s", "y", "n"):
self.io.addinput(s)
remove_items(self.lib, "", False, True, False)
items = self.lib.items()
assert len(list(items)) == 1
# There is probably no guarantee that the items are queried in any
# spcecific order, thus just ensure that exactly one was removed.
# To improve upon this, self.io would need to have the capability to
# generate input that depends on previous output.
num_existing = 0
num_existing += 1 if os.path.exists(syspath(self.i.path)) else 0
num_existing += 1 if os.path.exists(syspath(i2.path)) else 0
assert num_existing == 1
def test_remove_albums_select_with_delete(self):
a1 = self.add_album_fixture()
a2 = self.add_album_fixture()
path1 = a1.items()[0].path
path2 = a2.items()[0].path
items = self.lib.items()
assert len(list(items)) == 3
for s in ("s", "y", "n"):
self.io.addinput(s)
remove_items(self.lib, "", True, True, False)
items = self.lib.items()
assert len(list(items)) == 2 # incl. the item from setUp()
# See test_remove_items_select_with_delete()
num_existing = 0
num_existing += 1 if os.path.exists(syspath(path1)) else 0
num_existing += 1 if os.path.exists(syspath(path2)) else 0
assert num_existing == 1

View file

@ -0,0 +1,205 @@
import os
from mediafile import MediaFile
from beets import library
from beets.test import _common
from beets.test.helper import BeetsTestCase, IOMixin
from beets.ui.commands.update import update_items
from beets.util import MoveOperation, remove, syspath
class UpdateTest(IOMixin, BeetsTestCase):
def setUp(self):
super().setUp()
# Copy a file into the library.
item_path = os.path.join(_common.RSRC, b"full.mp3")
item_path_two = os.path.join(_common.RSRC, b"full.flac")
self.i = library.Item.from_path(item_path)
self.i2 = library.Item.from_path(item_path_two)
self.lib.add(self.i)
self.lib.add(self.i2)
self.i.move(operation=MoveOperation.COPY)
self.i2.move(operation=MoveOperation.COPY)
self.album = self.lib.add_album([self.i, self.i2])
# Album art.
artfile = os.path.join(self.temp_dir, b"testart.jpg")
_common.touch(artfile)
self.album.set_art(artfile)
self.album.store()
remove(artfile)
def _update(
self,
query=(),
album=False,
move=False,
reset_mtime=True,
fields=None,
exclude_fields=None,
):
self.io.addinput("y")
if reset_mtime:
self.i.mtime = 0
self.i.store()
update_items(
self.lib,
query,
album,
move,
False,
fields=fields,
exclude_fields=exclude_fields,
)
def test_delete_removes_item(self):
assert list(self.lib.items())
remove(self.i.path)
remove(self.i2.path)
self._update()
assert not list(self.lib.items())
def test_delete_removes_album(self):
assert self.lib.albums()
remove(self.i.path)
remove(self.i2.path)
self._update()
assert not self.lib.albums()
def test_delete_removes_album_art(self):
art_filepath = self.album.art_filepath
assert art_filepath.exists()
remove(self.i.path)
remove(self.i2.path)
self._update()
assert not art_filepath.exists()
def test_modified_metadata_detected(self):
mf = MediaFile(syspath(self.i.path))
mf.title = "differentTitle"
mf.save()
self._update()
item = self.lib.items().get()
assert item.title == "differentTitle"
def test_modified_metadata_moved(self):
mf = MediaFile(syspath(self.i.path))
mf.title = "differentTitle"
mf.save()
self._update(move=True)
item = self.lib.items().get()
assert b"differentTitle" in item.path
def test_modified_metadata_not_moved(self):
mf = MediaFile(syspath(self.i.path))
mf.title = "differentTitle"
mf.save()
self._update(move=False)
item = self.lib.items().get()
assert b"differentTitle" not in item.path
def test_selective_modified_metadata_moved(self):
mf = MediaFile(syspath(self.i.path))
mf.title = "differentTitle"
mf.genre = "differentGenre"
mf.save()
self._update(move=True, fields=["title"])
item = self.lib.items().get()
assert b"differentTitle" in item.path
assert item.genre != "differentGenre"
def test_selective_modified_metadata_not_moved(self):
mf = MediaFile(syspath(self.i.path))
mf.title = "differentTitle"
mf.genre = "differentGenre"
mf.save()
self._update(move=False, fields=["title"])
item = self.lib.items().get()
assert b"differentTitle" not in item.path
assert item.genre != "differentGenre"
def test_modified_album_metadata_moved(self):
mf = MediaFile(syspath(self.i.path))
mf.album = "differentAlbum"
mf.save()
self._update(move=True)
item = self.lib.items().get()
assert b"differentAlbum" in item.path
def test_modified_album_metadata_art_moved(self):
artpath = self.album.artpath
mf = MediaFile(syspath(self.i.path))
mf.album = "differentAlbum"
mf.save()
self._update(move=True)
album = self.lib.albums()[0]
assert artpath != album.artpath
assert album.artpath is not None
def test_selective_modified_album_metadata_moved(self):
mf = MediaFile(syspath(self.i.path))
mf.album = "differentAlbum"
mf.genre = "differentGenre"
mf.save()
self._update(move=True, fields=["album"])
item = self.lib.items().get()
assert b"differentAlbum" in item.path
assert item.genre != "differentGenre"
def test_selective_modified_album_metadata_not_moved(self):
mf = MediaFile(syspath(self.i.path))
mf.album = "differentAlbum"
mf.genre = "differentGenre"
mf.save()
self._update(move=True, fields=["genre"])
item = self.lib.items().get()
assert b"differentAlbum" not in item.path
assert item.genre == "differentGenre"
def test_mtime_match_skips_update(self):
mf = MediaFile(syspath(self.i.path))
mf.title = "differentTitle"
mf.save()
# Make in-memory mtime match on-disk mtime.
self.i.mtime = os.path.getmtime(syspath(self.i.path))
self.i.store()
self._update(reset_mtime=False)
item = self.lib.items().get()
assert item.title == "full"
def test_multivalued_albumtype_roundtrip(self):
# https://github.com/beetbox/beets/issues/4528
# albumtypes is empty for our test fixtures, so populate it first
album = self.album
correct_albumtypes = ["album", "live"]
# Setting albumtypes does not set albumtype, currently.
# Using x[0] mirrors https://github.com/beetbox/mediafile/blob/057432ad53b3b84385e5582f69f44dc00d0a725d/mediafile.py#L1928 # noqa: E501
correct_albumtype = correct_albumtypes[0]
album.albumtype = correct_albumtype
album.albumtypes = correct_albumtypes
album.try_sync(write=True, move=False)
album.load()
assert album.albumtype == correct_albumtype
assert album.albumtypes == correct_albumtypes
self._update()
album.load()
assert album.albumtype == correct_albumtype
assert album.albumtypes == correct_albumtypes
def test_modified_metadata_excluded(self):
mf = MediaFile(syspath(self.i.path))
mf.lyrics = "new lyrics"
mf.save()
self._update(exclude_fields=["lyrics"])
item = self.lib.items().get()
assert item.lyrics != "new lyrics"

View file

@ -1,19 +1,3 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Test module for file ui/commands.py"""
import os
import shutil
@ -21,8 +5,8 @@ import pytest
from beets import library, ui
from beets.test import _common
from beets.test.helper import BeetsTestCase, IOMixin, ItemInDBTestCase
from beets.ui import commands
from beets.test.helper import BeetsTestCase
from beets.ui.commands.utils import do_query
from beets.util import syspath
@ -44,17 +28,17 @@ class QueryTest(BeetsTestCase):
def check_do_query(
self, num_items, num_albums, q=(), album=False, also_items=True
):
items, albums = commands._do_query(self.lib, q, album, also_items)
items, albums = do_query(self.lib, q, album, also_items)
assert len(items) == num_items
assert len(albums) == num_albums
def test_query_empty(self):
with pytest.raises(ui.UserError):
commands._do_query(self.lib, (), False)
do_query(self.lib, (), False)
def test_query_empty_album(self):
with pytest.raises(ui.UserError):
commands._do_query(self.lib, (), True)
do_query(self.lib, (), True)
def test_query_item(self):
self.add_item()
@ -73,24 +57,3 @@ class QueryTest(BeetsTestCase):
self.add_album([item, item2])
self.check_do_query(3, 2, album=True)
self.check_do_query(0, 2, album=True, also_items=False)
class FieldsTest(IOMixin, ItemInDBTestCase):
def remove_keys(self, keys, text):
for i in text:
try:
keys.remove(i)
except ValueError:
pass
def test_fields_func(self):
commands.fields_func(self.lib, [], [])
items = library.Item.all_keys()
albums = library.Album.all_keys()
output = self.io.stdout.get().split()
self.remove_keys(items, output)
self.remove_keys(albums, output)
assert len(items) == 0
assert len(albums) == 0

View file

@ -0,0 +1,46 @@
from beets.test.helper import BeetsTestCase
class WriteTest(BeetsTestCase):
def write_cmd(self, *args):
return self.run_with_output("write", *args)
def test_update_mtime(self):
item = self.add_item_fixture()
item["title"] = "a new title"
item.store()
item = self.lib.items().get()
assert item.mtime == 0
self.write_cmd()
item = self.lib.items().get()
assert item.mtime == item.current_mtime()
def test_non_metadata_field_unchanged(self):
"""Changing a non-"tag" field like `bitrate` and writing should
have no effect.
"""
# An item that starts out "clean".
item = self.add_item_fixture()
item.read()
# ... but with a mismatched bitrate.
item.bitrate = 123
item.store()
output = self.write_cmd()
assert output == ""
def test_write_metadata_field(self):
item = self.add_item_fixture()
item.read()
old_title = item.title
item.title = "new title"
item.store()
output = self.write_cmd()
assert f"{old_title} -> new title" in output

590
test/ui/test_ui.py Normal file
View file

@ -0,0 +1,590 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Tests for the command-line interface."""
import os
import platform
import sys
import unittest
from pathlib import Path
from unittest.mock import patch
import pytest
from confuse import ConfigError
from beets import config, plugins, ui
from beets.test import _common
from beets.test.helper import BeetsTestCase, IOMixin, PluginTestCase
from beets.ui import commands
from beets.util import syspath
class PrintTest(IOMixin, unittest.TestCase):
def test_print_without_locale(self):
lang = os.environ.get("LANG")
if lang:
del os.environ["LANG"]
try:
ui.print_("something")
except TypeError:
self.fail("TypeError during print")
finally:
if lang:
os.environ["LANG"] = lang
def test_print_with_invalid_locale(self):
old_lang = os.environ.get("LANG")
os.environ["LANG"] = ""
old_ctype = os.environ.get("LC_CTYPE")
os.environ["LC_CTYPE"] = "UTF-8"
try:
ui.print_("something")
except ValueError:
self.fail("ValueError during print")
finally:
if old_lang:
os.environ["LANG"] = old_lang
else:
del os.environ["LANG"]
if old_ctype:
os.environ["LC_CTYPE"] = old_ctype
else:
del os.environ["LC_CTYPE"]
@_common.slow_test()
class TestPluginTestCase(PluginTestCase):
plugin = "test"
def setUp(self):
super().setUp()
config["pluginpath"] = [_common.PLUGINPATH]
class ConfigTest(TestPluginTestCase):
def setUp(self):
super().setUp()
# Don't use the BEETSDIR from `helper`. Instead, we point the home
# directory there. Some tests will set `BEETSDIR` themselves.
del os.environ["BEETSDIR"]
# Also set APPDATA, the Windows equivalent of setting $HOME.
appdata_dir = self.temp_dir_path / "AppData" / "Roaming"
self._orig_cwd = os.getcwd()
self.test_cmd = self._make_test_cmd()
commands.default_commands.append(self.test_cmd)
# Default user configuration
if platform.system() == "Windows":
self.user_config_dir = appdata_dir / "beets"
else:
self.user_config_dir = self.temp_dir_path / ".config" / "beets"
self.user_config_dir.mkdir(parents=True, exist_ok=True)
self.user_config_path = self.user_config_dir / "config.yaml"
# Custom BEETSDIR
self.beetsdir = self.temp_dir_path / "beetsdir"
self.beetsdir.mkdir(parents=True, exist_ok=True)
self.env_config_path = str(self.beetsdir / "config.yaml")
self.cli_config_path = str(self.temp_dir_path / "config.yaml")
self.env_patcher = patch(
"os.environ",
{"HOME": str(self.temp_dir_path), "APPDATA": str(appdata_dir)},
)
self.env_patcher.start()
self._reset_config()
def tearDown(self):
self.env_patcher.stop()
commands.default_commands.pop()
os.chdir(syspath(self._orig_cwd))
super().tearDown()
def _make_test_cmd(self):
test_cmd = ui.Subcommand("test", help="test")
def run(lib, options, args):
test_cmd.lib = lib
test_cmd.options = options
test_cmd.args = args
test_cmd.func = run
return test_cmd
def _reset_config(self):
# Config should read files again on demand
config.clear()
config._materialized = False
def write_config_file(self):
return open(self.user_config_path, "w")
def test_paths_section_respected(self):
with self.write_config_file() as config:
config.write("paths: {x: y}")
self.run_command("test", lib=None)
key, template = self.test_cmd.lib.path_formats[0]
assert key == "x"
assert template.original == "y"
def test_default_paths_preserved(self):
default_formats = ui.get_path_formats()
self._reset_config()
with self.write_config_file() as config:
config.write("paths: {x: y}")
self.run_command("test", lib=None)
key, template = self.test_cmd.lib.path_formats[0]
assert key == "x"
assert template.original == "y"
assert self.test_cmd.lib.path_formats[1:] == default_formats
def test_nonexistant_db(self):
with self.write_config_file() as config:
config.write("library: /xxx/yyy/not/a/real/path")
with pytest.raises(ui.UserError):
self.run_command("test", lib=None)
def test_user_config_file(self):
with self.write_config_file() as file:
file.write("anoption: value")
self.run_command("test", lib=None)
assert config["anoption"].get() == "value"
def test_replacements_parsed(self):
with self.write_config_file() as config:
config.write("replace: {'[xy]': z}")
self.run_command("test", lib=None)
replacements = self.test_cmd.lib.replacements
repls = [(p.pattern, s) for p, s in replacements] # Compare patterns.
assert repls == [("[xy]", "z")]
def test_multiple_replacements_parsed(self):
with self.write_config_file() as config:
config.write("replace: {'[xy]': z, foo: bar}")
self.run_command("test", lib=None)
replacements = self.test_cmd.lib.replacements
repls = [(p.pattern, s) for p, s in replacements]
assert repls == [("[xy]", "z"), ("foo", "bar")]
def test_cli_config_option(self):
with open(self.cli_config_path, "w") as file:
file.write("anoption: value")
self.run_command("--config", self.cli_config_path, "test", lib=None)
assert config["anoption"].get() == "value"
def test_cli_config_file_overwrites_user_defaults(self):
with open(self.user_config_path, "w") as file:
file.write("anoption: value")
with open(self.cli_config_path, "w") as file:
file.write("anoption: cli overwrite")
self.run_command("--config", self.cli_config_path, "test", lib=None)
assert config["anoption"].get() == "cli overwrite"
def test_cli_config_file_overwrites_beetsdir_defaults(self):
os.environ["BEETSDIR"] = str(self.beetsdir)
with open(self.env_config_path, "w") as file:
file.write("anoption: value")
with open(self.cli_config_path, "w") as file:
file.write("anoption: cli overwrite")
self.run_command("--config", self.cli_config_path, "test", lib=None)
assert config["anoption"].get() == "cli overwrite"
# @unittest.skip('Difficult to implement with optparse')
# def test_multiple_cli_config_files(self):
# cli_config_path_1 = os.path.join(self.temp_dir, b'config.yaml')
# cli_config_path_2 = os.path.join(self.temp_dir, b'config_2.yaml')
#
# with open(cli_config_path_1, 'w') as file:
# file.write('first: value')
#
# with open(cli_config_path_2, 'w') as file:
# file.write('second: value')
#
# self.run_command('--config', cli_config_path_1,
# '--config', cli_config_path_2, 'test', lib=None)
# assert config['first'].get() == 'value'
# assert config['second'].get() == 'value'
#
# @unittest.skip('Difficult to implement with optparse')
# def test_multiple_cli_config_overwrite(self):
# cli_overwrite_config_path = os.path.join(self.temp_dir,
# b'overwrite_config.yaml')
#
# with open(self.cli_config_path, 'w') as file:
# file.write('anoption: value')
#
# with open(cli_overwrite_config_path, 'w') as file:
# file.write('anoption: overwrite')
#
# self.run_command('--config', self.cli_config_path,
# '--config', cli_overwrite_config_path, 'test')
# assert config['anoption'].get() == 'cli overwrite'
# FIXME: fails on windows
@unittest.skipIf(sys.platform == "win32", "win32")
def test_cli_config_paths_resolve_relative_to_user_dir(self):
with open(self.cli_config_path, "w") as file:
file.write("library: beets.db\n")
file.write("statefile: state")
self.run_command("--config", self.cli_config_path, "test", lib=None)
assert config["library"].as_path() == self.user_config_dir / "beets.db"
assert config["statefile"].as_path() == self.user_config_dir / "state"
def test_cli_config_paths_resolve_relative_to_beetsdir(self):
os.environ["BEETSDIR"] = str(self.beetsdir)
with open(self.cli_config_path, "w") as file:
file.write("library: beets.db\n")
file.write("statefile: state")
self.run_command("--config", self.cli_config_path, "test", lib=None)
assert config["library"].as_path() == self.beetsdir / "beets.db"
assert config["statefile"].as_path() == self.beetsdir / "state"
def test_command_line_option_relative_to_working_dir(self):
config.read()
os.chdir(syspath(self.temp_dir))
self.run_command("--library", "foo.db", "test", lib=None)
assert config["library"].as_path() == Path.cwd() / "foo.db"
def test_cli_config_file_loads_plugin_commands(self):
with open(self.cli_config_path, "w") as file:
file.write(f"pluginpath: {_common.PLUGINPATH}\n")
file.write("plugins: test")
self.run_command("--config", self.cli_config_path, "plugin", lib=None)
plugs = plugins.find_plugins()
assert len(plugs) == 1
assert plugs[0].is_test_plugin
self.unload_plugins()
def test_beetsdir_config(self):
os.environ["BEETSDIR"] = str(self.beetsdir)
with open(self.env_config_path, "w") as file:
file.write("anoption: overwrite")
config.read()
assert config["anoption"].get() == "overwrite"
def test_beetsdir_points_to_file_error(self):
beetsdir = str(self.temp_dir_path / "beetsfile")
open(beetsdir, "a").close()
os.environ["BEETSDIR"] = beetsdir
with pytest.raises(ConfigError):
self.run_command("test")
def test_beetsdir_config_does_not_load_default_user_config(self):
os.environ["BEETSDIR"] = str(self.beetsdir)
with open(self.user_config_path, "w") as file:
file.write("anoption: value")
config.read()
assert not config["anoption"].exists()
def test_default_config_paths_resolve_relative_to_beetsdir(self):
os.environ["BEETSDIR"] = str(self.beetsdir)
config.read()
assert config["library"].as_path() == self.beetsdir / "library.db"
assert config["statefile"].as_path() == self.beetsdir / "state.pickle"
def test_beetsdir_config_paths_resolve_relative_to_beetsdir(self):
os.environ["BEETSDIR"] = str(self.beetsdir)
with open(self.env_config_path, "w") as file:
file.write("library: beets.db\n")
file.write("statefile: state")
config.read()
assert config["library"].as_path() == self.beetsdir / "beets.db"
assert config["statefile"].as_path() == self.beetsdir / "state"
class ShowModelChangeTest(IOMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.a = _common.item()
self.b = _common.item()
self.a.path = self.b.path
def _show(self, **kwargs):
change = ui.show_model_changes(self.a, self.b, **kwargs)
out = self.io.getoutput()
return change, out
def test_identical(self):
change, out = self._show()
assert not change
assert out == ""
def test_string_fixed_field_change(self):
self.b.title = "x"
change, out = self._show()
assert change
assert "title" in out
def test_int_fixed_field_change(self):
self.b.track = 9
change, out = self._show()
assert change
assert "track" in out
def test_floats_close_to_identical(self):
self.a.length = 1.00001
self.b.length = 1.00005
change, out = self._show()
assert not change
assert out == ""
def test_floats_different(self):
self.a.length = 1.00001
self.b.length = 2.00001
change, out = self._show()
assert change
assert "length" in out
def test_both_values_shown(self):
self.a.title = "foo"
self.b.title = "bar"
change, out = self._show()
assert "foo" in out
assert "bar" in out
class PathFormatTest(unittest.TestCase):
def test_custom_paths_prepend(self):
default_formats = ui.get_path_formats()
config["paths"] = {"foo": "bar"}
pf = ui.get_path_formats()
key, tmpl = pf[0]
assert key == "foo"
assert tmpl.original == "bar"
assert pf[1:] == default_formats
@_common.slow_test()
class PluginTest(TestPluginTestCase):
def test_plugin_command_from_pluginpath(self):
self.run_command("test", lib=None)
class CommonOptionsParserCliTest(BeetsTestCase):
"""Test CommonOptionsParser and formatting LibModel formatting on 'list'
command.
"""
def setUp(self):
super().setUp()
self.item = _common.item()
self.item.path = b"xxx/yyy"
self.lib.add(self.item)
self.lib.add_album([self.item])
def test_base(self):
output = self.run_with_output("ls")
assert output == "the artist - the album - the title\n"
output = self.run_with_output("ls", "-a")
assert output == "the album artist - the album\n"
def test_path_option(self):
output = self.run_with_output("ls", "-p")
assert output == "xxx/yyy\n"
output = self.run_with_output("ls", "-a", "-p")
assert output == "xxx\n"
def test_format_option(self):
output = self.run_with_output("ls", "-f", "$artist")
assert output == "the artist\n"
output = self.run_with_output("ls", "-a", "-f", "$albumartist")
assert output == "the album artist\n"
def test_format_option_unicode(self):
output = self.run_with_output("ls", "-f", "caf\xe9")
assert output == "caf\xe9\n"
def test_root_format_option(self):
output = self.run_with_output(
"--format-item", "$artist", "--format-album", "foo", "ls"
)
assert output == "the artist\n"
output = self.run_with_output(
"--format-item", "foo", "--format-album", "$albumartist", "ls", "-a"
)
assert output == "the album artist\n"
def test_help(self):
output = self.run_with_output("help")
assert "Usage:" in output
output = self.run_with_output("help", "list")
assert "Usage:" in output
with pytest.raises(ui.UserError):
self.run_command("help", "this.is.not.a.real.command")
def test_stats(self):
output = self.run_with_output("stats")
assert "Approximate total size:" in output
# # Need to have more realistic library setup for this to work
# output = self.run_with_output('stats', '-e')
# assert 'Total size:' in output
def test_version(self):
output = self.run_with_output("version")
assert "Python version" in output
assert "no plugins loaded" in output
# # Need to have plugin loaded
# output = self.run_with_output('version')
# assert 'plugins: ' in output
class CommonOptionsParserTest(unittest.TestCase):
def test_album_option(self):
parser = ui.CommonOptionsParser()
assert not parser._album_flags
parser.add_album_option()
assert bool(parser._album_flags)
assert parser.parse_args([]) == ({"album": None}, [])
assert parser.parse_args(["-a"]) == ({"album": True}, [])
assert parser.parse_args(["--album"]) == ({"album": True}, [])
def test_path_option(self):
parser = ui.CommonOptionsParser()
parser.add_path_option()
assert not parser._album_flags
config["format_item"].set("$foo")
assert parser.parse_args([]) == ({"path": None}, [])
assert config["format_item"].as_str() == "$foo"
assert parser.parse_args(["-p"]) == (
{"path": True, "format": "$path"},
[],
)
assert parser.parse_args(["--path"]) == (
{"path": True, "format": "$path"},
[],
)
assert config["format_item"].as_str() == "$path"
assert config["format_album"].as_str() == "$path"
def test_format_option(self):
parser = ui.CommonOptionsParser()
parser.add_format_option()
assert not parser._album_flags
config["format_item"].set("$foo")
assert parser.parse_args([]) == ({"format": None}, [])
assert config["format_item"].as_str() == "$foo"
assert parser.parse_args(["-f", "$bar"]) == ({"format": "$bar"}, [])
assert parser.parse_args(["--format", "$baz"]) == (
{"format": "$baz"},
[],
)
assert config["format_item"].as_str() == "$baz"
assert config["format_album"].as_str() == "$baz"
def test_format_option_with_target(self):
with pytest.raises(KeyError):
ui.CommonOptionsParser().add_format_option(target="thingy")
parser = ui.CommonOptionsParser()
parser.add_format_option(target="item")
config["format_item"].set("$item")
config["format_album"].set("$album")
assert parser.parse_args(["-f", "$bar"]) == ({"format": "$bar"}, [])
assert config["format_item"].as_str() == "$bar"
assert config["format_album"].as_str() == "$album"
def test_format_option_with_album(self):
parser = ui.CommonOptionsParser()
parser.add_album_option()
parser.add_format_option()
config["format_item"].set("$item")
config["format_album"].set("$album")
parser.parse_args(["-f", "$bar"])
assert config["format_item"].as_str() == "$bar"
assert config["format_album"].as_str() == "$album"
parser.parse_args(["-a", "-f", "$foo"])
assert config["format_item"].as_str() == "$bar"
assert config["format_album"].as_str() == "$foo"
parser.parse_args(["-f", "$foo2", "-a"])
assert config["format_album"].as_str() == "$foo2"
def test_add_all_common_options(self):
parser = ui.CommonOptionsParser()
parser.add_all_common_options()
assert parser.parse_args([]) == (
{"album": None, "path": None, "format": None},
[],
)
class EncodingTest(unittest.TestCase):
"""Tests for the `terminal_encoding` config option and our
`_in_encoding` and `_out_encoding` utility functions.
"""
def out_encoding_overridden(self):
config["terminal_encoding"] = "fake_encoding"
assert ui._out_encoding() == "fake_encoding"
def in_encoding_overridden(self):
config["terminal_encoding"] = "fake_encoding"
assert ui._in_encoding() == "fake_encoding"
def out_encoding_default_utf8(self):
with patch("sys.stdout") as stdout:
stdout.encoding = None
assert ui._out_encoding() == "utf-8"
def in_encoding_default_utf8(self):
with patch("sys.stdin") as stdin:
stdin.encoding = None
assert ui._in_encoding() == "utf-8"