Remove all Python 2 references

This commit is contained in:
Šarūnas Nejus 2025-07-07 11:21:18 +01:00
parent 7cada1c9f8
commit 4260162d44
No known key found for this signature in database
GPG key ID: DD28F6704DBE3435
9 changed files with 46 additions and 120 deletions

View file

@ -411,39 +411,6 @@ class BooleanQuery(MatchQuery[int]):
super().__init__(field_name, pattern_int, fast) super().__init__(field_name, pattern_int, fast)
class BytesQuery(FieldQuery[bytes]):
"""Match a raw bytes field (i.e., a path). This is a necessary hack
to work around the `sqlite3` module's desire to treat `bytes` and
`unicode` equivalently in Python 2. Always use this query instead of
`MatchQuery` when matching on BLOB values.
"""
def __init__(self, field_name: str, pattern: bytes | str | memoryview):
# Use a buffer/memoryview representation of the pattern for SQLite
# matching. This instructs SQLite to treat the blob as binary
# rather than encoded Unicode.
if isinstance(pattern, (str, bytes)):
if isinstance(pattern, str):
bytes_pattern = pattern.encode("utf-8")
else:
bytes_pattern = pattern
self.buf_pattern = memoryview(bytes_pattern)
elif isinstance(pattern, memoryview):
self.buf_pattern = pattern
bytes_pattern = bytes(pattern)
else:
raise ValueError("pattern must be bytes, str, or memoryview")
super().__init__(field_name, bytes_pattern)
def col_clause(self) -> tuple[str, Sequence[SQLiteType]]:
return self.field + " = ?", [self.buf_pattern]
@classmethod
def value_match(cls, pattern: bytes, value: Any) -> bool:
return pattern == value
class NumericQuery(FieldQuery[str]): class NumericQuery(FieldQuery[str]):
"""Matches numeric fields. A syntax using Ruby-style range ellipses """Matches numeric fields. A syntax using Ruby-style range ellipses
(``..``) lets users specify one- or two-sided ranges. For example, (``..``) lets users specify one- or two-sided ranges. For example,

View file

@ -26,7 +26,8 @@ from typing import TYPE_CHECKING, Callable, Iterable, Sequence
import mediafile import mediafile
from beets import autotag, config, dbcore, library, plugins, util from beets import autotag, config, library, plugins, util
from beets.dbcore.query import PathQuery
from .state import ImportState from .state import ImportState
@ -520,9 +521,7 @@ class ImportTask(BaseImportTask):
) )
replaced_album_ids = set() replaced_album_ids = set()
for item in self.imported_items(): for item in self.imported_items():
dup_items = list( dup_items = list(lib.items(query=PathQuery("path", item.path)))
lib.items(query=dbcore.query.BytesQuery("path", item.path))
)
self.replaced_items[item] = dup_items self.replaced_items[item] = dup_items
for dup_item in dup_items: for dup_item in dup_items:
if ( if (

View file

@ -104,23 +104,15 @@ def _stream_encoding(stream, default="utf-8"):
return stream.encoding or default return stream.encoding or default
def print_(*strings, **kwargs): def print_(*strings: str, end: str = "\n") -> None:
"""Like print, but rather than raising an error when a character """Like print, but rather than raising an error when a character
is not in the terminal's encoding's character set, just silently is not in the terminal's encoding's character set, just silently
replaces it. replaces it.
The arguments must be Unicode strings: `unicode` on Python 2; `str` on
Python 3.
The `end` keyword argument behaves similarly to the built-in `print` The `end` keyword argument behaves similarly to the built-in `print`
(it defaults to a newline). (it defaults to a newline).
""" """
if not strings: txt = " ".join(strings or ("",)) + end
strings = [""]
assert isinstance(strings[0], str)
txt = " ".join(strings)
txt += kwargs.get("end", "\n")
# Encode the string and write it to stdout. # Encode the string and write it to stdout.
# On Python 3, sys.stdout expects text strings and uses the # On Python 3, sys.stdout expects text strings and uses the

View file

@ -1302,7 +1302,7 @@ class TerminalImportSession(importer.ImportSession):
# The import command. # The import command.
def import_files(lib, paths, query): def import_files(lib, paths: list[bytes], query):
"""Import the files in the given list of paths or matching the """Import the files in the given list of paths or matching the
query. query.
""" """
@ -1333,7 +1333,7 @@ def import_files(lib, paths, query):
plugins.send("import", lib=lib, paths=paths) plugins.send("import", lib=lib, paths=paths)
def import_func(lib, opts, args): def import_func(lib, opts, args: list[str]):
config["import"].set_args(opts) config["import"].set_args(opts)
# Special case: --copy flag suppresses import_move (which would # Special case: --copy flag suppresses import_move (which would
@ -1355,15 +1355,11 @@ def import_func(lib, opts, args):
if not paths and not paths_from_logfiles: if not paths and not paths_from_logfiles:
raise ui.UserError("no path specified") raise ui.UserError("no path specified")
# On Python 2, we used to get filenames as raw bytes, which is byte_paths = [os.fsencode(p) for p in paths]
# what we need. On Python 3, we need to undo the "helpful"
# conversion to Unicode strings to get the real bytestring
# filename.
paths = [os.fsencode(p) for p in paths]
paths_from_logfiles = [os.fsencode(p) for p in paths_from_logfiles] paths_from_logfiles = [os.fsencode(p) for p in paths_from_logfiles]
# Check the user-specified directories. # Check the user-specified directories.
for path in paths: for path in byte_paths:
if not os.path.exists(syspath(normpath(path))): if not os.path.exists(syspath(normpath(path))):
raise ui.UserError( raise ui.UserError(
"no such file or directory: {}".format( "no such file or directory: {}".format(
@ -1384,14 +1380,14 @@ def import_func(lib, opts, args):
) )
continue continue
paths.append(path) byte_paths.append(path)
# If all paths were read from a logfile, and none of them exist, throw # If all paths were read from a logfile, and none of them exist, throw
# an error # an error
if not paths: if not paths:
raise ui.UserError("none of the paths are importable") raise ui.UserError("none of the paths are importable")
import_files(lib, paths, query) import_files(lib, byte_paths, query)
import_cmd = ui.Subcommand( import_cmd = ui.Subcommand(

View file

@ -28,6 +28,7 @@ import sys
import tempfile import tempfile
import traceback import traceback
from collections import Counter from collections import Counter
from collections.abc import Sequence
from contextlib import suppress from contextlib import suppress
from enum import Enum from enum import Enum
from functools import cache from functools import cache
@ -41,7 +42,6 @@ from typing import (
AnyStr, AnyStr,
Callable, Callable,
Generic, Generic,
Iterable,
NamedTuple, NamedTuple,
TypeVar, TypeVar,
Union, Union,
@ -53,23 +53,17 @@ import beets
from beets.util import hidden from beets.util import hidden
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterator, Sequence from collections.abc import Iterable, Iterator
from logging import Logger from logging import Logger
from beets.library import Item from beets.library import Item
if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias
MAX_FILENAME_LENGTH = 200 MAX_FILENAME_LENGTH = 200
WINDOWS_MAGIC_PREFIX = "\\\\?\\" WINDOWS_MAGIC_PREFIX = "\\\\?\\"
T = TypeVar("T") T = TypeVar("T")
BytesOrStr = Union[str, bytes] PathLike = Union[str, bytes, Path]
PathLike = Union[BytesOrStr, Path] Replacements = Sequence[tuple[Pattern[str], str]]
Replacements: TypeAlias = "Sequence[tuple[Pattern[str], str]]"
# Here for now to allow for a easy replace later on # Here for now to allow for a easy replace later on
# once we can move to a PathLike (mainly used in importer) # once we can move to a PathLike (mainly used in importer)
@ -860,7 +854,9 @@ class CommandOutput(NamedTuple):
stderr: bytes stderr: bytes
def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput: def command_output(
cmd: list[str] | list[bytes], shell: bool = False
) -> CommandOutput:
"""Runs the command and returns its output after it has exited. """Runs the command and returns its output after it has exited.
Returns a CommandOutput. The attributes ``stdout`` and ``stderr`` contain Returns a CommandOutput. The attributes ``stdout`` and ``stderr`` contain
@ -878,8 +874,6 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
This replaces `subprocess.check_output` which can have problems if lots of This replaces `subprocess.check_output` which can have problems if lots of
output is sent to stderr. output is sent to stderr.
""" """
converted_cmd = [os.fsdecode(a) for a in cmd]
devnull = subprocess.DEVNULL devnull = subprocess.DEVNULL
proc = subprocess.Popen( proc = subprocess.Popen(
@ -894,7 +888,7 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
if proc.returncode: if proc.returncode:
raise subprocess.CalledProcessError( raise subprocess.CalledProcessError(
returncode=proc.returncode, returncode=proc.returncode,
cmd=" ".join(converted_cmd), cmd=" ".join(map(os.fsdecode, cmd)),
output=stdout + stderr, output=stdout + stderr,
) )
return CommandOutput(stdout, stderr) return CommandOutput(stdout, stderr)

View file

@ -214,9 +214,9 @@ class IMBackend(LocalBackend):
else: else:
return cls._version return cls._version
convert_cmd: list[str | bytes] convert_cmd: list[str]
identify_cmd: list[str | bytes] identify_cmd: list[str]
compare_cmd: list[str | bytes] compare_cmd: list[str]
def __init__(self) -> None: def __init__(self) -> None:
"""Initialize a wrapper around ImageMagick for local image operations. """Initialize a wrapper around ImageMagick for local image operations.
@ -265,7 +265,7 @@ class IMBackend(LocalBackend):
# with regards to the height. # with regards to the height.
# ImageMagick already seems to default to no interlace, but we include # ImageMagick already seems to default to no interlace, but we include
# it here for the sake of explicitness. # it here for the sake of explicitness.
cmd: list[str | bytes] = self.convert_cmd + [ cmd: list[str] = self.convert_cmd + [
syspath(path_in, prefix=False), syspath(path_in, prefix=False),
"-resize", "-resize",
f"{maxwidth}x>", f"{maxwidth}x>",
@ -295,7 +295,7 @@ class IMBackend(LocalBackend):
return path_out return path_out
def get_size(self, path_in: bytes) -> tuple[int, int] | None: def get_size(self, path_in: bytes) -> tuple[int, int] | None:
cmd: list[str | bytes] = self.identify_cmd + [ cmd: list[str] = self.identify_cmd + [
"-format", "-format",
"%w %h", "%w %h",
syspath(path_in, prefix=False), syspath(path_in, prefix=False),
@ -480,10 +480,11 @@ class IMBackend(LocalBackend):
return True return True
def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None: def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None:
assignments = list( assignments = chain.from_iterable(
chain.from_iterable(("-set", k, v) for k, v in metadata.items()) ("-set", k, v) for k, v in metadata.items()
) )
command = self.convert_cmd + [file, *assignments, file] str_file = os.fsdecode(file)
command = self.convert_cmd + [str_file, *assignments, str_file]
util.command_output(command) util.command_output(command)

View file

@ -14,27 +14,21 @@
"""Allows custom commands to be run when an event is emitted by beets""" """Allows custom commands to be run when an event is emitted by beets"""
from __future__ import annotations
import os
import shlex import shlex
import string import string
import subprocess import subprocess
import sys from typing import Any
from beets.plugins import BeetsPlugin from beets.plugins import BeetsPlugin
class CodingFormatter(string.Formatter): class BytesToStrFormatter(string.Formatter):
"""A variant of `string.Formatter` that converts everything to `unicode` """A variant of `string.Formatter` that converts `bytes` to `str`."""
strings.
This was necessary on Python 2, in needs to be kept for backwards def convert_field(self, value: Any, conversion: str | None) -> Any:
compatibility.
"""
def __init__(self, coding):
"""Creates a new coding formatter with the provided coding."""
self._coding = coding
def convert_field(self, value, conversion):
"""Converts the provided value given a conversion type. """Converts the provided value given a conversion type.
This method decodes the converted value using the formatter's coding. This method decodes the converted value using the formatter's coding.
@ -42,7 +36,7 @@ class CodingFormatter(string.Formatter):
converted = super().convert_field(value, conversion) converted = super().convert_field(value, conversion)
if isinstance(converted, bytes): if isinstance(converted, bytes):
return converted.decode(self._coding) return os.fsdecode(converted)
return converted return converted
@ -72,8 +66,8 @@ class HookPlugin(BeetsPlugin):
return return
# For backwards compatibility, use a string formatter that decodes # For backwards compatibility, use a string formatter that decodes
# bytes (in particular, paths) to unicode strings. # bytes (in particular, paths) to strings.
formatter = CodingFormatter(sys.getfilesystemencoding()) formatter = BytesToStrFormatter()
command_pieces = [ command_pieces = [
formatter.format(piece, event=event, **kwargs) formatter.format(piece, event=event, **kwargs)
for piece in shlex.split(command) for piece in shlex.split(command)

View file

@ -62,7 +62,7 @@ class FatalGstreamerPluginReplayGainError(FatalReplayGainError):
loading the required plugins.""" loading the required plugins."""
def call(args: list[Any], log: Logger, **kwargs: Any): def call(args: list[str], log: Logger, **kwargs: Any):
"""Execute the command and return its output or raise a """Execute the command and return its output or raise a
ReplayGainError on failure. ReplayGainError on failure.
""" """
@ -73,11 +73,6 @@ def call(args: list[Any], log: Logger, **kwargs: Any):
raise ReplayGainError( raise ReplayGainError(
"{} exited with status {}".format(args[0], e.returncode) "{} exited with status {}".format(args[0], e.returncode)
) )
except UnicodeEncodeError:
# Due to a bug in Python 2's subprocess on Windows, Unicode
# filenames can fail to encode on that platform. See:
# https://github.com/google-code-export/beets/issues/499
raise ReplayGainError("argument encoding failed")
def db_to_lufs(db: float) -> float: def db_to_lufs(db: float) -> float:
@ -403,20 +398,18 @@ class FfmpegBackend(Backend):
def _construct_cmd( def _construct_cmd(
self, item: Item, peak_method: PeakMethod | None self, item: Item, peak_method: PeakMethod | None
) -> list[str | bytes]: ) -> list[str]:
"""Construct the shell command to analyse items.""" """Construct the shell command to analyse items."""
return [ return [
self._ffmpeg_path, self._ffmpeg_path,
"-nostats", "-nostats",
"-hide_banner", "-hide_banner",
"-i", "-i",
item.path, str(item.filepath),
"-map", "-map",
"a:0", "a:0",
"-filter", "-filter",
"ebur128=peak={}".format( f"ebur128=peak={'none' if peak_method is None else peak_method.name}",
"none" if peak_method is None else peak_method.name
),
"-f", "-f",
"null", "null",
"-", "-",
@ -660,7 +653,7 @@ class CommandBackend(Backend):
# tag-writing; this turns the mp3gain/aacgain tool into a gain # tag-writing; this turns the mp3gain/aacgain tool into a gain
# calculator rather than a tag manipulator because we take care # calculator rather than a tag manipulator because we take care
# of changing tags ourselves. # of changing tags ourselves.
cmd: list[bytes | str] = [self.command, "-o", "-s", "s"] cmd: list[str] = [self.command, "-o", "-s", "s"]
if self.noclip: if self.noclip:
# Adjust to avoid clipping. # Adjust to avoid clipping.
cmd = cmd + ["-k"] cmd = cmd + ["-k"]
@ -1039,7 +1032,7 @@ class AudioToolsBackend(Backend):
os.fsdecode(syspath(item.path)) os.fsdecode(syspath(item.path))
) )
except OSError: except OSError:
raise ReplayGainError(f"File {item.path} was not found") raise ReplayGainError(f"File {item.filepath} was not found")
except self._mod_audiotools.UnsupportedFile: except self._mod_audiotools.UnsupportedFile:
raise ReplayGainError(f"Unsupported file type {item.format}") raise ReplayGainError(f"Unsupported file type {item.format}")

View file

@ -308,18 +308,8 @@ def all_items():
def item_file(item_id): def item_file(item_id):
item = g.lib.get_item(item_id) item = g.lib.get_item(item_id)
# On Windows under Python 2, Flask wants a Unicode path. On Python 3, it item_path = util.syspath(item.path)
# *always* wants a Unicode path.
if os.name == "nt":
item_path = util.syspath(item.path)
else:
item_path = os.fsdecode(item.path)
base_filename = os.path.basename(item_path) base_filename = os.path.basename(item_path)
if isinstance(base_filename, bytes):
unicode_base_filename = util.displayable_path(base_filename)
else:
unicode_base_filename = base_filename
try: try:
# Imitate http.server behaviour # Imitate http.server behaviour
@ -327,7 +317,7 @@ def item_file(item_id):
except UnicodeError: except UnicodeError:
safe_filename = unidecode(base_filename) safe_filename = unidecode(base_filename)
else: else:
safe_filename = unicode_base_filename safe_filename = base_filename
response = flask.send_file( response = flask.send_file(
item_path, as_attachment=True, download_name=safe_filename item_path, as_attachment=True, download_name=safe_filename