Remove decargs and other references to Python 2 (#5859)

This PR modernizes the codebase by removing Python 2 compatibility code
and simplifying several areas:

- Deleted `BytesQuery` class (replaced with `PathQuery`)
- Removed `decargs()` function that was a no-op in Python 3
- Simplified `print_()` function signature and implementation
- Removed coding-related workarounds in various modules
This commit is contained in:
Šarūnas Nejus 2025-07-09 02:56:31 +01:00 committed by GitHub
commit a815305fcd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
43 changed files with 126 additions and 227 deletions

View file

@ -411,39 +411,6 @@ class BooleanQuery(MatchQuery[int]):
super().__init__(field_name, pattern_int, fast)
class BytesQuery(FieldQuery[bytes]):
"""Match a raw bytes field (i.e., a path). This is a necessary hack
to work around the `sqlite3` module's desire to treat `bytes` and
`unicode` equivalently in Python 2. Always use this query instead of
`MatchQuery` when matching on BLOB values.
"""
def __init__(self, field_name: str, pattern: bytes | str | memoryview):
# Use a buffer/memoryview representation of the pattern for SQLite
# matching. This instructs SQLite to treat the blob as binary
# rather than encoded Unicode.
if isinstance(pattern, (str, bytes)):
if isinstance(pattern, str):
bytes_pattern = pattern.encode("utf-8")
else:
bytes_pattern = pattern
self.buf_pattern = memoryview(bytes_pattern)
elif isinstance(pattern, memoryview):
self.buf_pattern = pattern
bytes_pattern = bytes(pattern)
else:
raise ValueError("pattern must be bytes, str, or memoryview")
super().__init__(field_name, bytes_pattern)
def col_clause(self) -> tuple[str, Sequence[SQLiteType]]:
return self.field + " = ?", [self.buf_pattern]
@classmethod
def value_match(cls, pattern: bytes, value: Any) -> bool:
return pattern == value
class NumericQuery(FieldQuery[str]):
"""Matches numeric fields. A syntax using Ruby-style range ellipses
(``..``) lets users specify one- or two-sided ranges. For example,

View file

@ -26,7 +26,8 @@ from typing import TYPE_CHECKING, Callable, Iterable, Sequence
import mediafile
from beets import autotag, config, dbcore, library, plugins, util
from beets import autotag, config, library, plugins, util
from beets.dbcore.query import PathQuery
from .state import ImportState
@ -520,9 +521,7 @@ class ImportTask(BaseImportTask):
)
replaced_album_ids = set()
for item in self.imported_items():
dup_items = list(
lib.items(query=dbcore.query.BytesQuery("path", item.path))
)
dup_items = list(lib.items(query=PathQuery("path", item.path)))
self.replaced_items[item] = dup_items
for dup_item in dup_items:
if (

View file

@ -104,30 +104,15 @@ def _stream_encoding(stream, default="utf-8"):
return stream.encoding or default
def decargs(arglist):
"""Given a list of command-line argument bytestrings, attempts to
decode them to Unicode strings when running under Python 2.
"""
return arglist
def print_(*strings, **kwargs):
def print_(*strings: str, end: str = "\n") -> None:
"""Like print, but rather than raising an error when a character
is not in the terminal's encoding's character set, just silently
replaces it.
The arguments must be Unicode strings: `unicode` on Python 2; `str` on
Python 3.
The `end` keyword argument behaves similarly to the built-in `print`
(it defaults to a newline).
"""
if not strings:
strings = [""]
assert isinstance(strings[0], str)
txt = " ".join(strings)
txt += kwargs.get("end", "\n")
txt = " ".join(strings or ("",)) + end
# Encode the string and write it to stdout.
# On Python 3, sys.stdout expects text strings and uses the
@ -1308,14 +1293,9 @@ class CommonOptionsParser(optparse.OptionParser):
setattr(parser.values, option.dest, True)
# Use the explicitly specified format, or the string from the option.
if fmt:
value = fmt
elif value:
(value,) = decargs([value])
else:
value = ""
value = fmt or value or ""
parser.values.format = value
if target:
config[target._format_config_key].set(value)
else:

View file

@ -28,7 +28,6 @@ import beets
from beets import autotag, config, importer, library, logging, plugins, ui, util
from beets.autotag import Recommendation, hooks
from beets.ui import (
decargs,
input_,
print_,
print_column_layout,
@ -1303,7 +1302,7 @@ class TerminalImportSession(importer.ImportSession):
# The import command.
def import_files(lib, paths, query):
def import_files(lib, paths: list[bytes], query):
"""Import the files in the given list of paths or matching the
query.
"""
@ -1334,7 +1333,7 @@ def import_files(lib, paths, query):
plugins.send("import", lib=lib, paths=paths)
def import_func(lib, opts, args):
def import_func(lib, opts, args: list[str]):
config["import"].set_args(opts)
# Special case: --copy flag suppresses import_move (which would
@ -1343,7 +1342,7 @@ def import_func(lib, opts, args):
config["import"]["move"] = False
if opts.library:
query = decargs(args)
query = args
paths = []
else:
query = None
@ -1356,15 +1355,11 @@ def import_func(lib, opts, args):
if not paths and not paths_from_logfiles:
raise ui.UserError("no path specified")
# On Python 2, we used to get filenames as raw bytes, which is
# what we need. On Python 3, we need to undo the "helpful"
# conversion to Unicode strings to get the real bytestring
# filename.
paths = [os.fsencode(p) for p in paths]
byte_paths = [os.fsencode(p) for p in paths]
paths_from_logfiles = [os.fsencode(p) for p in paths_from_logfiles]
# Check the user-specified directories.
for path in paths:
for path in byte_paths:
if not os.path.exists(syspath(normpath(path))):
raise ui.UserError(
"no such file or directory: {}".format(
@ -1385,14 +1380,14 @@ def import_func(lib, opts, args):
)
continue
paths.append(path)
byte_paths.append(path)
# If all paths were read from a logfile, and none of them exist, throw
# an error
if not paths:
raise ui.UserError("none of the paths are importable")
import_files(lib, paths, query)
import_files(lib, byte_paths, query)
import_cmd = ui.Subcommand(
@ -1596,7 +1591,7 @@ def list_items(lib, query, album, fmt=""):
def list_func(lib, opts, args):
list_items(lib, decargs(args), opts.album)
list_items(lib, args, opts.album)
list_cmd = ui.Subcommand("list", help="query the library", aliases=("ls",))
@ -1739,7 +1734,7 @@ def update_func(lib, opts, args):
return
update_items(
lib,
decargs(args),
args,
opts.album,
ui.should_move(opts.move),
opts.pretend,
@ -1861,7 +1856,7 @@ def remove_items(lib, query, album, delete, force):
def remove_func(lib, opts, args):
remove_items(lib, decargs(args), opts.album, opts.delete, opts.force)
remove_items(lib, args, opts.album, opts.delete, opts.force)
remove_cmd = ui.Subcommand(
@ -1931,7 +1926,7 @@ Album artists: {}""".format(
def stats_func(lib, opts, args):
show_stats(lib, decargs(args), opts.exact)
show_stats(lib, args, opts.exact)
stats_cmd = ui.Subcommand(
@ -2059,7 +2054,7 @@ def modify_parse_args(args):
def modify_func(lib, opts, args):
query, mods, dels = modify_parse_args(decargs(args))
query, mods, dels = modify_parse_args(args)
if not mods and not dels:
raise ui.UserError("no modifications specified")
modify_items(
@ -2217,7 +2212,7 @@ def move_func(lib, opts, args):
move_items(
lib,
dest,
decargs(args),
args,
opts.copy,
opts.album,
opts.pretend,
@ -2298,7 +2293,7 @@ def write_items(lib, query, pretend, force):
def write_func(lib, opts, args):
write_items(lib, decargs(args), opts.pretend, opts.force)
write_items(lib, args, opts.pretend, opts.force)
write_cmd = ui.Subcommand("write", help="write tag information to files")

View file

@ -28,6 +28,7 @@ import sys
import tempfile
import traceback
from collections import Counter
from collections.abc import Sequence
from contextlib import suppress
from enum import Enum
from functools import cache
@ -41,7 +42,6 @@ from typing import (
AnyStr,
Callable,
Generic,
Iterable,
NamedTuple,
TypeVar,
Union,
@ -53,23 +53,17 @@ import beets
from beets.util import hidden
if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
from collections.abc import Iterable, Iterator
from logging import Logger
from beets.library import Item
if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias
MAX_FILENAME_LENGTH = 200
WINDOWS_MAGIC_PREFIX = "\\\\?\\"
T = TypeVar("T")
BytesOrStr = Union[str, bytes]
PathLike = Union[BytesOrStr, Path]
Replacements: TypeAlias = "Sequence[tuple[Pattern[str], str]]"
PathLike = Union[str, bytes, Path]
Replacements = Sequence[tuple[Pattern[str], str]]
# Here for now to allow for a easy replace later on
# once we can move to a PathLike (mainly used in importer)
@ -860,7 +854,9 @@ class CommandOutput(NamedTuple):
stderr: bytes
def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
def command_output(
cmd: list[str] | list[bytes], shell: bool = False
) -> CommandOutput:
"""Runs the command and returns its output after it has exited.
Returns a CommandOutput. The attributes ``stdout`` and ``stderr`` contain
@ -878,8 +874,6 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
This replaces `subprocess.check_output` which can have problems if lots of
output is sent to stderr.
"""
converted_cmd = [os.fsdecode(a) for a in cmd]
devnull = subprocess.DEVNULL
proc = subprocess.Popen(
@ -894,7 +888,7 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
if proc.returncode:
raise subprocess.CalledProcessError(
returncode=proc.returncode,
cmd=" ".join(converted_cmd),
cmd=" ".join(map(os.fsdecode, cmd)),
output=stdout + stderr,
)
return CommandOutput(stdout, stderr)

View file

@ -214,9 +214,9 @@ class IMBackend(LocalBackend):
else:
return cls._version
convert_cmd: list[str | bytes]
identify_cmd: list[str | bytes]
compare_cmd: list[str | bytes]
convert_cmd: list[str]
identify_cmd: list[str]
compare_cmd: list[str]
def __init__(self) -> None:
"""Initialize a wrapper around ImageMagick for local image operations.
@ -265,7 +265,7 @@ class IMBackend(LocalBackend):
# with regards to the height.
# ImageMagick already seems to default to no interlace, but we include
# it here for the sake of explicitness.
cmd: list[str | bytes] = self.convert_cmd + [
cmd: list[str] = self.convert_cmd + [
syspath(path_in, prefix=False),
"-resize",
f"{maxwidth}x>",
@ -295,7 +295,7 @@ class IMBackend(LocalBackend):
return path_out
def get_size(self, path_in: bytes) -> tuple[int, int] | None:
cmd: list[str | bytes] = self.identify_cmd + [
cmd: list[str] = self.identify_cmd + [
"-format",
"%w %h",
syspath(path_in, prefix=False),
@ -480,10 +480,11 @@ class IMBackend(LocalBackend):
return True
def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None:
assignments = list(
chain.from_iterable(("-set", k, v) for k, v in metadata.items())
assignments = chain.from_iterable(
("-set", k, v) for k, v in metadata.items()
)
command = self.convert_cmd + [file, *assignments, file]
str_file = os.fsdecode(file)
command = self.convert_cmd + [str_file, *assignments, str_file]
util.command_output(command)

View file

@ -137,7 +137,7 @@ only files which would be processed",
)
else:
# Get items from arguments
items = lib.items(ui.decargs(args))
items = lib.items(args)
self.opts = opts
util.par_map(self.analyze_submit, items)

View file

@ -116,7 +116,7 @@ class AcousticPlugin(plugins.BeetsPlugin):
)
def func(lib, opts, args):
items = lib.items(ui.decargs(args))
items = lib.items(args)
self._fetch_info(
items,
ui.should_write(),

View file

@ -204,7 +204,7 @@ class BadFiles(BeetsPlugin):
def command(self, lib, opts, args):
# Get items from arguments
items = lib.items(ui.decargs(args))
items = lib.items(args)
self.verbose = opts.verbose
def check_and_print(item):

View file

@ -23,7 +23,7 @@ from unidecode import unidecode
from beets import ui
from beets.dbcore.query import StringFieldQuery
from beets.plugins import BeetsPlugin
from beets.ui import decargs, print_
from beets.ui import print_
class BareascQuery(StringFieldQuery[str]):
@ -83,14 +83,13 @@ class BareascPlugin(BeetsPlugin):
def unidecode_list(self, lib, opts, args):
"""Emulate normal 'list' command but with unidecode output."""
query = decargs(args)
album = opts.album
# Copied from commands.py - list_items
if album:
for album in lib.albums(query):
for album in lib.albums(args):
bare = unidecode(str(album))
print_(bare)
else:
for item in lib.items(query):
for item in lib.items(args):
bare = unidecode(str(item))
print_(bare)

View file

@ -125,7 +125,7 @@ class BenchmarkPlugin(BeetsPlugin):
"-i", "--id", default=None, help="album ID to match against"
)
match_bench_cmd.func = lambda lib, opts, args: match_benchmark(
lib, opts.profile, ui.decargs(args), opts.id
lib, opts.profile, args, opts.id
)
return [aunique_bench_cmd, match_bench_cmd]

View file

@ -63,9 +63,8 @@ class BPMPlugin(BeetsPlugin):
return [cmd]
def command(self, lib, opts, args):
items = lib.items(ui.decargs(args))
write = ui.should_write()
self.get_bpm(items, write)
self.get_bpm(lib.items(args), write)
def get_bpm(self, items, write=False):
overwrite = self.config["overwrite"].get(bool)

View file

@ -65,10 +65,9 @@ class BPSyncPlugin(BeetsPlugin):
move = ui.should_move(opts.move)
pretend = opts.pretend
write = ui.should_write(opts.write)
query = ui.decargs(args)
self.singletons(lib, query, move, pretend, write)
self.albums(lib, query, move, pretend, write)
self.singletons(lib, args, move, pretend, write)
self.albums(lib, args, move, pretend, write)
def singletons(self, lib, query, move, pretend, write):
"""Retrieve and apply info from the autotagger for items matched by

View file

@ -233,7 +233,7 @@ class AcoustidPlugin(plugins.BeetsPlugin):
apikey = config["acoustid"]["apikey"].as_str()
except confuse.NotFoundError:
raise ui.UserError("no Acoustid user API key provided")
submit_items(self._log, apikey, lib.items(ui.decargs(args)))
submit_items(self._log, apikey, lib.items(args))
submit_cmd.func = submit_cmd_func
@ -242,7 +242,7 @@ class AcoustidPlugin(plugins.BeetsPlugin):
)
def fingerprint_cmd_func(lib, opts, args):
for item in lib.items(ui.decargs(args)):
for item in lib.items(args):
fingerprint_item(self._log, item, write=ui.should_write())
fingerprint_cmd.func = fingerprint_cmd_func

View file

@ -301,7 +301,7 @@ class ConvertPlugin(BeetsPlugin):
encode_cmd.append(os.fsdecode(args[i]))
if pretend:
self._log.info("{0}", " ".join(ui.decargs(args)))
self._log.info("{0}", " ".join(args))
return
try:
@ -323,9 +323,7 @@ class ConvertPlugin(BeetsPlugin):
raise
except OSError as exc:
raise ui.UserError(
"convert: couldn't invoke '{}': {}".format(
" ".join(ui.decargs(args)), exc
)
"convert: couldn't invoke '{}': {}".format(" ".join(args), exc)
)
if not quiet and not pretend:
@ -579,13 +577,13 @@ class ConvertPlugin(BeetsPlugin):
) = self._get_opts_and_config(opts)
if opts.album:
albums = lib.albums(ui.decargs(args))
albums = lib.albums(args)
items = [i for a in albums for i in a.items()]
if not pretend:
for a in albums:
ui.print_(format(a, ""))
else:
items = list(lib.items(ui.decargs(args)))
items = list(lib.items(args))
if not pretend:
for i in items:
ui.print_(format(i, ""))

View file

@ -54,7 +54,7 @@ class DeezerPlugin(MetadataSourcePlugin[Response], BeetsPlugin):
)
def func(lib: Library, opts, args):
items = lib.items(ui.decargs(args))
items = lib.items(args)
self.deezerupdate(list(items), ui.should_write())
deezer_update_cmd.func = func

View file

@ -19,7 +19,7 @@ import shlex
from beets.library import Album, Item
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, UserError, decargs, print_
from beets.ui import Subcommand, UserError, print_
from beets.util import (
MoveOperation,
bytestring_path,
@ -163,11 +163,11 @@ class DuplicatesPlugin(BeetsPlugin):
if album:
if not keys:
keys = ["mb_albumid"]
items = lib.albums(decargs(args))
items = lib.albums(args)
else:
if not keys:
keys = ["mb_trackid", "mb_albumid"]
items = lib.items(decargs(args))
items = lib.items(args)
# If there's nothing to do, return early. The code below assumes
# `items` to be non-empty.

View file

@ -180,8 +180,7 @@ class EditPlugin(plugins.BeetsPlugin):
def _edit_command(self, lib, opts, args):
"""The CLI command function for the `beet edit` command."""
# Get the objects to edit.
query = ui.decargs(args)
items, albums = _do_query(lib, query, opts.album, False)
items, albums = _do_query(lib, args, opts.album, False)
objs = albums if opts.album else items
if not objs:
ui.print_("Nothing to edit.")

View file

@ -22,7 +22,7 @@ import requests
from beets import art, config, ui
from beets.plugins import BeetsPlugin
from beets.ui import decargs, print_
from beets.ui import print_
from beets.util import bytestring_path, displayable_path, normpath, syspath
from beets.util.artresizer import ArtResizer
@ -115,7 +115,7 @@ class EmbedCoverArtPlugin(BeetsPlugin):
)
)
items = lib.items(decargs(args))
items = lib.items(args)
# Confirm with user.
if not opts.yes and not _confirm(items, not opts.file):
@ -151,7 +151,7 @@ class EmbedCoverArtPlugin(BeetsPlugin):
except Exception as e:
self._log.error("Unable to save image: {}".format(e))
return
items = lib.items(decargs(args))
items = lib.items(args)
# Confirm with user.
if not opts.yes and not _confirm(items, not opts.url):
os.remove(tempimg)
@ -169,7 +169,7 @@ class EmbedCoverArtPlugin(BeetsPlugin):
)
os.remove(tempimg)
else:
albums = lib.albums(decargs(args))
albums = lib.albums(args)
# Confirm with user.
if not opts.yes and not _confirm(albums, not opts.file):
return
@ -212,7 +212,7 @@ class EmbedCoverArtPlugin(BeetsPlugin):
def extract_func(lib, opts, args):
if opts.outpath:
art.extract_first(
self._log, normpath(opts.outpath), lib.items(decargs(args))
self._log, normpath(opts.outpath), lib.items(args)
)
else:
filename = bytestring_path(
@ -223,7 +223,7 @@ class EmbedCoverArtPlugin(BeetsPlugin):
"Only specify a name rather than a path for -n"
)
return
for album in lib.albums(decargs(args)):
for album in lib.albums(args):
artpath = normpath(os.path.join(album.path, filename))
artpath = art.extract_first(
self._log, artpath, album.items()
@ -244,11 +244,11 @@ class EmbedCoverArtPlugin(BeetsPlugin):
)
def clear_func(lib, opts, args):
items = lib.items(decargs(args))
items = lib.items(args)
# Confirm with user.
if not opts.yes and not _confirm(items, False):
return
art.clear(self._log, lib, decargs(args))
art.clear(self._log, lib, args)
clear_cmd.func = clear_func

View file

@ -144,7 +144,7 @@ class ExportPlugin(BeetsPlugin):
items = []
for data_emitter in data_collector(
lib,
ui.decargs(args),
args,
album=opts.album,
):
try:

View file

@ -1503,9 +1503,7 @@ class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
)
def func(lib: Library, opts, args) -> None:
self.batch_fetch_art(
lib, lib.albums(ui.decargs(args)), opts.force, opts.quiet
)
self.batch_fetch_art(lib, lib.albums(args), opts.force, opts.quiet)
cmd.func = func
return [cmd]

View file

@ -118,7 +118,7 @@ class FtInTitlePlugin(plugins.BeetsPlugin):
keep_in_artist_field = self.config["keep_in_artist"].get(bool)
write = ui.should_write()
for item in lib.items(ui.decargs(args)):
for item in lib.items(args):
if self.ft_in_title(item, drop_feat, keep_in_artist_field):
item.store()
if write:

View file

@ -14,27 +14,21 @@
"""Allows custom commands to be run when an event is emitted by beets"""
from __future__ import annotations
import os
import shlex
import string
import subprocess
import sys
from typing import Any
from beets.plugins import BeetsPlugin
class CodingFormatter(string.Formatter):
"""A variant of `string.Formatter` that converts everything to `unicode`
strings.
class BytesToStrFormatter(string.Formatter):
"""A variant of `string.Formatter` that converts `bytes` to `str`."""
This was necessary on Python 2, in needs to be kept for backwards
compatibility.
"""
def __init__(self, coding):
"""Creates a new coding formatter with the provided coding."""
self._coding = coding
def convert_field(self, value, conversion):
def convert_field(self, value: Any, conversion: str | None) -> Any:
"""Converts the provided value given a conversion type.
This method decodes the converted value using the formatter's coding.
@ -42,7 +36,7 @@ class CodingFormatter(string.Formatter):
converted = super().convert_field(value, conversion)
if isinstance(converted, bytes):
return converted.decode(self._coding)
return os.fsdecode(converted)
return converted
@ -72,8 +66,8 @@ class HookPlugin(BeetsPlugin):
return
# For backwards compatibility, use a string formatter that decodes
# bytes (in particular, paths) to unicode strings.
formatter = CodingFormatter(sys.getfilesystemencoding())
# bytes (in particular, paths) to strings.
formatter = BytesToStrFormatter()
command_pieces = [
formatter.format(piece, event=event, **kwargs)
for piece in shlex.split(command)

View file

@ -215,7 +215,7 @@ class InfoPlugin(BeetsPlugin):
summary = {}
for data_emitter in data_collector(
lib,
ui.decargs(args),
args,
album=opts.album,
):
try:
@ -232,7 +232,7 @@ class InfoPlugin(BeetsPlugin):
if opts.keys_only:
print_data_keys(data, item)
else:
fmt = ui.decargs([opts.format])[0] if opts.format else None
fmt = [opts.format][0] if opts.format else None
print_data(data, item, fmt)
first = False

View file

@ -74,7 +74,7 @@ class IPFSPlugin(BeetsPlugin):
def func(lib, opts, args):
if opts.add:
for album in lib.albums(ui.decargs(args)):
for album in lib.albums(args):
if len(album.items()) == 0:
self._log.info(
"{0} does not contain items, aborting", album
@ -84,19 +84,19 @@ class IPFSPlugin(BeetsPlugin):
album.store()
if opts.get:
self.ipfs_get(lib, ui.decargs(args))
self.ipfs_get(lib, args)
if opts.publish:
self.ipfs_publish(lib)
if opts._import:
self.ipfs_import(lib, ui.decargs(args))
self.ipfs_import(lib, args)
if opts._list:
self.ipfs_list(lib, ui.decargs(args))
self.ipfs_list(lib, args)
if opts.play:
self.ipfs_play(lib, opts, ui.decargs(args))
self.ipfs_play(lib, opts, args)
cmd.func = func
return [cmd]

View file

@ -43,7 +43,7 @@ class KeyFinderPlugin(BeetsPlugin):
return [cmd]
def command(self, lib, opts, args):
self.find_key(lib.items(ui.decargs(args)), write=ui.should_write())
self.find_key(lib.items(args), write=ui.should_write())
def imported(self, session, task):
self.find_key(task.imported_items())

View file

@ -521,7 +521,7 @@ class LastGenrePlugin(plugins.BeetsPlugin):
if opts.album:
# Fetch genres for whole albums
for album in lib.albums(ui.decargs(args)):
for album in lib.albums(args):
album.genre, src = self._get_genre(album)
self._log.info(
'genre for album "{0.album}" ({1}): {0.genre}',
@ -550,7 +550,7 @@ class LastGenrePlugin(plugins.BeetsPlugin):
else:
# Just query singletons, i.e. items that are not part of
# an album
for item in lib.items(ui.decargs(args)):
for item in lib.items(args):
item.genre, src = self._get_genre(item)
item.store()
self._log.info(

View file

@ -25,7 +25,7 @@ from itertools import islice
from beets.dbcore import FieldQuery
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, decargs, print_
from beets.ui import Subcommand, print_
def lslimit(lib, opts, args):
@ -36,11 +36,10 @@ def lslimit(lib, opts, args):
if (opts.head or opts.tail or 0) < 0:
raise ValueError("Limit value must be non-negative")
query = decargs(args)
if opts.album:
objs = lib.albums(query)
objs = lib.albums(args)
else:
objs = lib.items(query)
objs = lib.items(args)
if opts.head is not None:
objs = islice(objs, opts.head)

View file

@ -86,7 +86,7 @@ class MBSubmitPlugin(BeetsPlugin):
)
def func(lib, opts, args):
items = lib.items(ui.decargs(args))
items = lib.items(args)
self._mbsubmit(items)
mbsubmit_cmd.func = func

View file

@ -63,10 +63,9 @@ class MBSyncPlugin(BeetsPlugin):
move = ui.should_move(opts.move)
pretend = opts.pretend
write = ui.should_write(opts.write)
query = ui.decargs(args)
self.singletons(lib, query, move, pretend, write)
self.albums(lib, query, move, pretend, write)
self.singletons(lib, args, move, pretend, write)
self.albums(lib, args, move, pretend, write)
def singletons(self, lib, query, move, pretend, write):
"""Retrieve and apply info from the autotagger for items matched by

View file

@ -97,7 +97,6 @@ class MetaSyncPlugin(BeetsPlugin):
def func(self, lib, opts, args):
"""Command handler for the metasync function."""
pretend = opts.pretend
query = ui.decargs(args)
sources = []
for source in opts.sources:
@ -106,7 +105,7 @@ class MetaSyncPlugin(BeetsPlugin):
sources = sources or self.config["source"].as_str_seq()
meta_source_instances = {}
items = lib.items(query)
items = lib.items(args)
# Avoid needlessly instantiating meta sources (can be expensive)
if not items:

View file

@ -25,7 +25,7 @@ from beets import config, plugins
from beets.dbcore import types
from beets.library import Album, Item, Library
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, decargs, print_
from beets.ui import Subcommand, print_
MB_ARTIST_QUERY = r"mb_albumartistid::^\w{8}-\w{4}-\w{4}-\w{4}-\w{12}$"
@ -135,7 +135,7 @@ class MissingPlugin(BeetsPlugin):
albms = self.config["album"].get()
helper = self._missing_albums if albms else self._missing_tracks
helper(lib, decargs(args))
helper(lib, args)
self._command.func = _miss
return [self._command]

View file

@ -88,7 +88,7 @@ class ParentWorkPlugin(BeetsPlugin):
force_parent = self.config["force"].get(bool)
write = ui.should_write()
for item in lib.items(ui.decargs(args)):
for item in lib.items(args):
changed = self.find_work(item, force_parent, verbose=True)
if changed:
item.store()

View file

@ -107,7 +107,7 @@ class PlayPlugin(BeetsPlugin):
# Perform search by album and add folders rather than tracks to
# playlist.
if opts.album:
selection = lib.albums(ui.decargs(args))
selection = lib.albums(args)
paths = []
sort = lib.get_default_album_sort()
@ -120,7 +120,7 @@ class PlayPlugin(BeetsPlugin):
# Perform item query and add tracks to playlist.
else:
selection = lib.items(ui.decargs(args))
selection = lib.items(args)
paths = [item.path for item in selection]
item_type = "track"

View file

@ -16,17 +16,16 @@
from beets.plugins import BeetsPlugin
from beets.random import random_objs
from beets.ui import Subcommand, decargs, print_
from beets.ui import Subcommand, print_
def random_func(lib, opts, args):
"""Select some random items or albums and print the results."""
# Fetch all the objects matching the query into a list.
query = decargs(args)
if opts.album:
objs = list(lib.albums(query))
objs = list(lib.albums(args))
else:
objs = list(lib.items(query))
objs = list(lib.items(args))
# Print a random subset.
objs = random_objs(

View file

@ -62,7 +62,7 @@ class FatalGstreamerPluginReplayGainError(FatalReplayGainError):
loading the required plugins."""
def call(args: list[Any], log: Logger, **kwargs: Any):
def call(args: list[str], log: Logger, **kwargs: Any):
"""Execute the command and return its output or raise a
ReplayGainError on failure.
"""
@ -73,11 +73,6 @@ def call(args: list[Any], log: Logger, **kwargs: Any):
raise ReplayGainError(
"{} exited with status {}".format(args[0], e.returncode)
)
except UnicodeEncodeError:
# Due to a bug in Python 2's subprocess on Windows, Unicode
# filenames can fail to encode on that platform. See:
# https://github.com/google-code-export/beets/issues/499
raise ReplayGainError("argument encoding failed")
def db_to_lufs(db: float) -> float:
@ -403,20 +398,18 @@ class FfmpegBackend(Backend):
def _construct_cmd(
self, item: Item, peak_method: PeakMethod | None
) -> list[str | bytes]:
) -> list[str]:
"""Construct the shell command to analyse items."""
return [
self._ffmpeg_path,
"-nostats",
"-hide_banner",
"-i",
item.path,
str(item.filepath),
"-map",
"a:0",
"-filter",
"ebur128=peak={}".format(
"none" if peak_method is None else peak_method.name
),
f"ebur128=peak={'none' if peak_method is None else peak_method.name}",
"-f",
"null",
"-",
@ -660,7 +653,7 @@ class CommandBackend(Backend):
# tag-writing; this turns the mp3gain/aacgain tool into a gain
# calculator rather than a tag manipulator because we take care
# of changing tags ourselves.
cmd: list[bytes | str] = [self.command, "-o", "-s", "s"]
cmd: list[str] = [self.command, "-o", "-s", "s"]
if self.noclip:
# Adjust to avoid clipping.
cmd = cmd + ["-k"]
@ -1039,7 +1032,7 @@ class AudioToolsBackend(Backend):
os.fsdecode(syspath(item.path))
)
except OSError:
raise ReplayGainError(f"File {item.path} was not found")
raise ReplayGainError(f"File {item.filepath} was not found")
except self._mod_audiotools.UnsupportedFile:
raise ReplayGainError(f"Unsupported file type {item.format}")
@ -1530,7 +1523,7 @@ class ReplayGainPlugin(BeetsPlugin):
self.open_pool(threads)
if opts.album:
albums = lib.albums(ui.decargs(args))
albums = lib.albums(args)
self._log.info(
"Analyzing {} albums ~ {} backend...".format(
len(albums), self.backend_name
@ -1539,7 +1532,7 @@ class ReplayGainPlugin(BeetsPlugin):
for album in albums:
self.handle_album(album, write, force)
else:
items = lib.items(ui.decargs(args))
items = lib.items(args)
self._log.info(
"Analyzing {} tracks ~ {} backend...".format(
len(items), self.backend_name

View file

@ -58,7 +58,7 @@ class ScrubPlugin(BeetsPlugin):
def commands(self):
def scrub_func(lib, opts, args):
# Walk through matching files and remove tags.
for item in lib.items(ui.decargs(args)):
for item in lib.items(args):
self._log.info(
"scrubbing: {0}", util.displayable_path(item.path)
)

View file

@ -127,7 +127,7 @@ class SmartPlaylistPlugin(BeetsPlugin):
def update_cmd(self, lib, opts, args):
self.build_queries()
if args:
args = set(ui.decargs(args))
args = set(args)
for a in list(args):
if not a.endswith(".m3u"):
args.add(f"{a}.m3u")

View file

@ -453,7 +453,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
def queries(lib, opts, args):
success = self._parse_opts(opts)
if success:
results = self._match_library_tracks(lib, ui.decargs(args))
results = self._match_library_tracks(lib, args)
self._output_match_results(results)
spotify_cmd = ui.Subcommand(
@ -491,7 +491,7 @@ class SpotifyPlugin(MetadataSourcePlugin, BeetsPlugin):
)
def func(lib, opts, args):
items = lib.items(ui.decargs(args))
items = lib.items(args)
self._fetch_info(items, ui.should_write(), opts.force_refetch)
sync_cmd.func = func

View file

@ -28,7 +28,7 @@ from pathlib import PurePosixPath
from xdg import BaseDirectory
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, decargs
from beets.ui import Subcommand
from beets.util import bytestring_path, displayable_path, syspath
from beets.util.artresizer import ArtResizer
@ -78,7 +78,7 @@ class ThumbnailsPlugin(BeetsPlugin):
def process_query(self, lib, opts, args):
self.config.set_args(opts)
if self._check_local_ok():
for album in lib.albums(decargs(args)):
for album in lib.albums(args):
self.process_album(album)
def _check_local_ok(self):

View file

@ -308,18 +308,8 @@ def all_items():
def item_file(item_id):
item = g.lib.get_item(item_id)
# On Windows under Python 2, Flask wants a Unicode path. On Python 3, it
# *always* wants a Unicode path.
if os.name == "nt":
item_path = util.syspath(item.path)
else:
item_path = os.fsdecode(item.path)
item_path = util.syspath(item.path)
base_filename = os.path.basename(item_path)
if isinstance(base_filename, bytes):
unicode_base_filename = util.displayable_path(base_filename)
else:
unicode_base_filename = base_filename
try:
# Imitate http.server behaviour
@ -327,7 +317,7 @@ def item_file(item_id):
except UnicodeError:
safe_filename = unidecode(base_filename)
else:
safe_filename = unicode_base_filename
safe_filename = base_filename
response = flask.send_file(
item_path, as_attachment=True, download_name=safe_filename
@ -470,7 +460,7 @@ class WebPlugin(BeetsPlugin):
)
def func(lib, opts, args):
args = ui.decargs(args)
args = args
if args:
self.config["host"] = args.pop(0)
if args:

View file

@ -21,7 +21,7 @@ from mediafile import MediaFile
from beets.importer import Action
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, decargs, input_yn
from beets.ui import Subcommand, input_yn
__author__ = "baobab@heresiarch.info"
@ -75,11 +75,11 @@ class ZeroPlugin(BeetsPlugin):
zero_command = Subcommand("zero", help="set fields to null")
def zero_fields(lib, opts, args):
if not decargs(args) and not input_yn(
if not args and not input_yn(
"Remove fields for all items? (Y/n)", True
):
return
for item in lib.items(decargs(args)):
for item in lib.items(args):
self.process_item(item)
zero_command.func = zero_fields

View file

@ -232,8 +232,7 @@ class ThumbnailsTest(BeetsTestCase):
)
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok", Mock())
@patch("beetsplug.thumbnails.decargs")
def test_invokations(self, mock_decargs):
def test_invokations(self):
plugin = ThumbnailsPlugin()
plugin.process_album = Mock()
album = Mock()
@ -243,7 +242,6 @@ class ThumbnailsTest(BeetsTestCase):
album2 = Mock()
lib.albums.return_value = [album, album2]
plugin.process_query(lib, Mock(), None)
lib.albums.assert_called_once_with(mock_decargs.return_value)
plugin.process_album.assert_has_calls(
[call(album), call(album2)], any_order=True
)