Remove all Python 2 references

This commit is contained in:
Šarūnas Nejus 2025-07-07 11:21:18 +01:00
parent 7cada1c9f8
commit 4260162d44
No known key found for this signature in database
GPG key ID: DD28F6704DBE3435
9 changed files with 46 additions and 120 deletions

View file

@ -411,39 +411,6 @@ class BooleanQuery(MatchQuery[int]):
super().__init__(field_name, pattern_int, fast)
class BytesQuery(FieldQuery[bytes]):
"""Match a raw bytes field (i.e., a path). This is a necessary hack
to work around the `sqlite3` module's desire to treat `bytes` and
`unicode` equivalently in Python 2. Always use this query instead of
`MatchQuery` when matching on BLOB values.
"""
def __init__(self, field_name: str, pattern: bytes | str | memoryview):
# Use a buffer/memoryview representation of the pattern for SQLite
# matching. This instructs SQLite to treat the blob as binary
# rather than encoded Unicode.
if isinstance(pattern, (str, bytes)):
if isinstance(pattern, str):
bytes_pattern = pattern.encode("utf-8")
else:
bytes_pattern = pattern
self.buf_pattern = memoryview(bytes_pattern)
elif isinstance(pattern, memoryview):
self.buf_pattern = pattern
bytes_pattern = bytes(pattern)
else:
raise ValueError("pattern must be bytes, str, or memoryview")
super().__init__(field_name, bytes_pattern)
def col_clause(self) -> tuple[str, Sequence[SQLiteType]]:
return self.field + " = ?", [self.buf_pattern]
@classmethod
def value_match(cls, pattern: bytes, value: Any) -> bool:
return pattern == value
class NumericQuery(FieldQuery[str]):
"""Matches numeric fields. A syntax using Ruby-style range ellipses
(``..``) lets users specify one- or two-sided ranges. For example,

View file

@ -26,7 +26,8 @@ from typing import TYPE_CHECKING, Callable, Iterable, Sequence
import mediafile
from beets import autotag, config, dbcore, library, plugins, util
from beets import autotag, config, library, plugins, util
from beets.dbcore.query import PathQuery
from .state import ImportState
@ -520,9 +521,7 @@ class ImportTask(BaseImportTask):
)
replaced_album_ids = set()
for item in self.imported_items():
dup_items = list(
lib.items(query=dbcore.query.BytesQuery("path", item.path))
)
dup_items = list(lib.items(query=PathQuery("path", item.path)))
self.replaced_items[item] = dup_items
for dup_item in dup_items:
if (

View file

@ -104,23 +104,15 @@ def _stream_encoding(stream, default="utf-8"):
return stream.encoding or default
def print_(*strings, **kwargs):
def print_(*strings: str, end: str = "\n") -> None:
"""Like print, but rather than raising an error when a character
is not in the terminal's encoding's character set, just silently
replaces it.
The arguments must be Unicode strings: `unicode` on Python 2; `str` on
Python 3.
The `end` keyword argument behaves similarly to the built-in `print`
(it defaults to a newline).
"""
if not strings:
strings = [""]
assert isinstance(strings[0], str)
txt = " ".join(strings)
txt += kwargs.get("end", "\n")
txt = " ".join(strings or ("",)) + end
# Encode the string and write it to stdout.
# On Python 3, sys.stdout expects text strings and uses the

View file

@ -1302,7 +1302,7 @@ class TerminalImportSession(importer.ImportSession):
# The import command.
def import_files(lib, paths, query):
def import_files(lib, paths: list[bytes], query):
"""Import the files in the given list of paths or matching the
query.
"""
@ -1333,7 +1333,7 @@ def import_files(lib, paths, query):
plugins.send("import", lib=lib, paths=paths)
def import_func(lib, opts, args):
def import_func(lib, opts, args: list[str]):
config["import"].set_args(opts)
# Special case: --copy flag suppresses import_move (which would
@ -1355,15 +1355,11 @@ def import_func(lib, opts, args):
if not paths and not paths_from_logfiles:
raise ui.UserError("no path specified")
# On Python 2, we used to get filenames as raw bytes, which is
# what we need. On Python 3, we need to undo the "helpful"
# conversion to Unicode strings to get the real bytestring
# filename.
paths = [os.fsencode(p) for p in paths]
byte_paths = [os.fsencode(p) for p in paths]
paths_from_logfiles = [os.fsencode(p) for p in paths_from_logfiles]
# Check the user-specified directories.
for path in paths:
for path in byte_paths:
if not os.path.exists(syspath(normpath(path))):
raise ui.UserError(
"no such file or directory: {}".format(
@ -1384,14 +1380,14 @@ def import_func(lib, opts, args):
)
continue
paths.append(path)
byte_paths.append(path)
# If all paths were read from a logfile, and none of them exist, throw
# an error
if not paths:
raise ui.UserError("none of the paths are importable")
import_files(lib, paths, query)
import_files(lib, byte_paths, query)
import_cmd = ui.Subcommand(

View file

@ -28,6 +28,7 @@ import sys
import tempfile
import traceback
from collections import Counter
from collections.abc import Sequence
from contextlib import suppress
from enum import Enum
from functools import cache
@ -41,7 +42,6 @@ from typing import (
AnyStr,
Callable,
Generic,
Iterable,
NamedTuple,
TypeVar,
Union,
@ -53,23 +53,17 @@ import beets
from beets.util import hidden
if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
from collections.abc import Iterable, Iterator
from logging import Logger
from beets.library import Item
if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias
MAX_FILENAME_LENGTH = 200
WINDOWS_MAGIC_PREFIX = "\\\\?\\"
T = TypeVar("T")
BytesOrStr = Union[str, bytes]
PathLike = Union[BytesOrStr, Path]
Replacements: TypeAlias = "Sequence[tuple[Pattern[str], str]]"
PathLike = Union[str, bytes, Path]
Replacements = Sequence[tuple[Pattern[str], str]]
# Here for now to allow for a easy replace later on
# once we can move to a PathLike (mainly used in importer)
@ -860,7 +854,9 @@ class CommandOutput(NamedTuple):
stderr: bytes
def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
def command_output(
cmd: list[str] | list[bytes], shell: bool = False
) -> CommandOutput:
"""Runs the command and returns its output after it has exited.
Returns a CommandOutput. The attributes ``stdout`` and ``stderr`` contain
@ -878,8 +874,6 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
This replaces `subprocess.check_output` which can have problems if lots of
output is sent to stderr.
"""
converted_cmd = [os.fsdecode(a) for a in cmd]
devnull = subprocess.DEVNULL
proc = subprocess.Popen(
@ -894,7 +888,7 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
if proc.returncode:
raise subprocess.CalledProcessError(
returncode=proc.returncode,
cmd=" ".join(converted_cmd),
cmd=" ".join(map(os.fsdecode, cmd)),
output=stdout + stderr,
)
return CommandOutput(stdout, stderr)

View file

@ -214,9 +214,9 @@ class IMBackend(LocalBackend):
else:
return cls._version
convert_cmd: list[str | bytes]
identify_cmd: list[str | bytes]
compare_cmd: list[str | bytes]
convert_cmd: list[str]
identify_cmd: list[str]
compare_cmd: list[str]
def __init__(self) -> None:
"""Initialize a wrapper around ImageMagick for local image operations.
@ -265,7 +265,7 @@ class IMBackend(LocalBackend):
# with regards to the height.
# ImageMagick already seems to default to no interlace, but we include
# it here for the sake of explicitness.
cmd: list[str | bytes] = self.convert_cmd + [
cmd: list[str] = self.convert_cmd + [
syspath(path_in, prefix=False),
"-resize",
f"{maxwidth}x>",
@ -295,7 +295,7 @@ class IMBackend(LocalBackend):
return path_out
def get_size(self, path_in: bytes) -> tuple[int, int] | None:
cmd: list[str | bytes] = self.identify_cmd + [
cmd: list[str] = self.identify_cmd + [
"-format",
"%w %h",
syspath(path_in, prefix=False),
@ -480,10 +480,11 @@ class IMBackend(LocalBackend):
return True
def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None:
assignments = list(
chain.from_iterable(("-set", k, v) for k, v in metadata.items())
assignments = chain.from_iterable(
("-set", k, v) for k, v in metadata.items()
)
command = self.convert_cmd + [file, *assignments, file]
str_file = os.fsdecode(file)
command = self.convert_cmd + [str_file, *assignments, str_file]
util.command_output(command)

View file

@ -14,27 +14,21 @@
"""Allows custom commands to be run when an event is emitted by beets"""
from __future__ import annotations
import os
import shlex
import string
import subprocess
import sys
from typing import Any
from beets.plugins import BeetsPlugin
class CodingFormatter(string.Formatter):
"""A variant of `string.Formatter` that converts everything to `unicode`
strings.
class BytesToStrFormatter(string.Formatter):
"""A variant of `string.Formatter` that converts `bytes` to `str`."""
This was necessary on Python 2, in needs to be kept for backwards
compatibility.
"""
def __init__(self, coding):
"""Creates a new coding formatter with the provided coding."""
self._coding = coding
def convert_field(self, value, conversion):
def convert_field(self, value: Any, conversion: str | None) -> Any:
"""Converts the provided value given a conversion type.
This method decodes the converted value using the formatter's coding.
@ -42,7 +36,7 @@ class CodingFormatter(string.Formatter):
converted = super().convert_field(value, conversion)
if isinstance(converted, bytes):
return converted.decode(self._coding)
return os.fsdecode(converted)
return converted
@ -72,8 +66,8 @@ class HookPlugin(BeetsPlugin):
return
# For backwards compatibility, use a string formatter that decodes
# bytes (in particular, paths) to unicode strings.
formatter = CodingFormatter(sys.getfilesystemencoding())
# bytes (in particular, paths) to strings.
formatter = BytesToStrFormatter()
command_pieces = [
formatter.format(piece, event=event, **kwargs)
for piece in shlex.split(command)

View file

@ -62,7 +62,7 @@ class FatalGstreamerPluginReplayGainError(FatalReplayGainError):
loading the required plugins."""
def call(args: list[Any], log: Logger, **kwargs: Any):
def call(args: list[str], log: Logger, **kwargs: Any):
"""Execute the command and return its output or raise a
ReplayGainError on failure.
"""
@ -73,11 +73,6 @@ def call(args: list[Any], log: Logger, **kwargs: Any):
raise ReplayGainError(
"{} exited with status {}".format(args[0], e.returncode)
)
except UnicodeEncodeError:
# Due to a bug in Python 2's subprocess on Windows, Unicode
# filenames can fail to encode on that platform. See:
# https://github.com/google-code-export/beets/issues/499
raise ReplayGainError("argument encoding failed")
def db_to_lufs(db: float) -> float:
@ -403,20 +398,18 @@ class FfmpegBackend(Backend):
def _construct_cmd(
self, item: Item, peak_method: PeakMethod | None
) -> list[str | bytes]:
) -> list[str]:
"""Construct the shell command to analyse items."""
return [
self._ffmpeg_path,
"-nostats",
"-hide_banner",
"-i",
item.path,
str(item.filepath),
"-map",
"a:0",
"-filter",
"ebur128=peak={}".format(
"none" if peak_method is None else peak_method.name
),
f"ebur128=peak={'none' if peak_method is None else peak_method.name}",
"-f",
"null",
"-",
@ -660,7 +653,7 @@ class CommandBackend(Backend):
# tag-writing; this turns the mp3gain/aacgain tool into a gain
# calculator rather than a tag manipulator because we take care
# of changing tags ourselves.
cmd: list[bytes | str] = [self.command, "-o", "-s", "s"]
cmd: list[str] = [self.command, "-o", "-s", "s"]
if self.noclip:
# Adjust to avoid clipping.
cmd = cmd + ["-k"]
@ -1039,7 +1032,7 @@ class AudioToolsBackend(Backend):
os.fsdecode(syspath(item.path))
)
except OSError:
raise ReplayGainError(f"File {item.path} was not found")
raise ReplayGainError(f"File {item.filepath} was not found")
except self._mod_audiotools.UnsupportedFile:
raise ReplayGainError(f"Unsupported file type {item.format}")

View file

@ -308,18 +308,8 @@ def all_items():
def item_file(item_id):
item = g.lib.get_item(item_id)
# On Windows under Python 2, Flask wants a Unicode path. On Python 3, it
# *always* wants a Unicode path.
if os.name == "nt":
item_path = util.syspath(item.path)
else:
item_path = os.fsdecode(item.path)
base_filename = os.path.basename(item_path)
if isinstance(base_filename, bytes):
unicode_base_filename = util.displayable_path(base_filename)
else:
unicode_base_filename = base_filename
try:
# Imitate http.server behaviour
@ -327,7 +317,7 @@ def item_file(item_id):
except UnicodeError:
safe_filename = unidecode(base_filename)
else:
safe_filename = unicode_base_filename
safe_filename = base_filename
response = flask.send_file(
item_path, as_attachment=True, download_name=safe_filename