Add typing for path functions

This commit is contained in:
Serene-Arc 2022-11-24 17:11:53 +10:00
parent 2d56e196a8
commit d0ea71d5c9

View file

@ -23,12 +23,18 @@ import shutil
import fnmatch
import functools
from collections import Counter, namedtuple
from logging import Logger
from multiprocessing.pool import ThreadPool
import traceback
import subprocess
import platform
import shlex
<<<<<<< HEAD
from typing import AnyStr, List
=======
from typing import Callable, List, Optional, Sequence, Pattern, \
Tuple, MutableSequence, AnyStr, TypeVar, Generator
>>>>>>> 4908d7fd (Add typing for path functions)
from beets.util import hidden
from unidecode import unidecode
@ -37,6 +43,7 @@ from enum import Enum
MAX_FILENAME_LENGTH = 200
WINDOWS_MAGIC_PREFIX = '\\\\?\\'
T = TypeVar('T')
class HumanReadableException(Exception):
@ -137,7 +144,7 @@ class MoveOperation(Enum):
REFLINK_AUTO = 5
def normpath(path):
def normpath(path: bytes) -> bytes:
"""Provide the canonical form of the path suitable for storing in
the database.
"""
@ -146,7 +153,7 @@ def normpath(path):
return bytestring_path(path)
def ancestry(path: AnyStr) -> List[AnyStr]:
def ancestry(path: AnyStr) -> List[bytes]:
"""Return a list consisting of path's parent directory, its
grandparent, and so on. For instance:
@ -170,7 +177,12 @@ def ancestry(path: AnyStr) -> List[AnyStr]:
return out
def sorted_walk(path, ignore=(), ignore_hidden=False, logger=None):
def sorted_walk(
path: bytes,
ignore: Sequence = (),
ignore_hidden: bool = False,
logger: Optional[Logger] = None,
) -> Generator[Tuple]:
"""Like `os.walk`, but yields things in case-insensitive sorted,
breadth-first order. Directory and file names matching any glob
pattern in `ignore` are skipped. If `logger` is provided, then
@ -227,14 +239,14 @@ def sorted_walk(path, ignore=(), ignore_hidden=False, logger=None):
yield from sorted_walk(cur, ignore, ignore_hidden, logger)
def path_as_posix(path):
def path_as_posix(path: bytes) -> bytes:
"""Return the string representation of the path with forward (/)
slashes.
"""
return path.replace(b'\\', b'/')
def mkdirall(path):
def mkdirall(path: bytes):
"""Make all the enclosing directories of path (like mkdir -p on the
parent).
"""
@ -247,7 +259,7 @@ def mkdirall(path):
traceback.format_exc())
def fnmatch_all(names, patterns):
def fnmatch_all(names: Sequence[bytes], patterns: Sequence[bytes]) -> bool:
"""Determine whether all strings in `names` match at least one of
the `patterns`, which should be shell glob expressions.
"""
@ -262,7 +274,11 @@ def fnmatch_all(names, patterns):
return True
def prune_dirs(path, root=None, clutter=('.DS_Store', 'Thumbs.db')):
def prune_dirs(
path: bytes,
root: Optional[bytes] = None,
clutter: Sequence[str] = ('.DS_Store', 'Thumbs.db'),
):
"""If path is an empty directory, then remove it. Recursively remove
path's ancestry up to root (which is never removed) where there are
empty directories. If path is not contained in root, then nothing is
@ -305,7 +321,7 @@ def prune_dirs(path, root=None, clutter=('.DS_Store', 'Thumbs.db')):
break
def components(path):
def components(path: AnyStr) -> MutableSequence[bytes]:
"""Return a list of the path components in path. For instance:
>>> components('/a/b/c')
@ -329,14 +345,14 @@ def components(path):
return comps
def arg_encoding():
def arg_encoding() -> str:
"""Get the encoding for command-line arguments (and other OS
locale-sensitive strings).
"""
return sys.getfilesystemencoding()
def _fsencoding():
def _fsencoding() -> str:
"""Get the system's filesystem encoding. On Windows, this is always
UTF-8 (not MBCS).
"""
@ -351,7 +367,7 @@ def _fsencoding():
return encoding
def bytestring_path(path):
def bytestring_path(path: AnyStr) -> bytes:
"""Given a path, which is either a bytes or a unicode, returns a str
path (ensuring that we never deal with Unicode pathnames).
"""
@ -372,10 +388,10 @@ def bytestring_path(path):
return path.encode('utf-8')
PATH_SEP = bytestring_path(os.sep)
PATH_SEP: bytes = bytestring_path(os.sep)
def displayable_path(path, separator='; '):
def displayable_path(path, separator: str = '; ') -> str:
"""Attempts to decode a bytestring path to a unicode object for the
purpose of displaying it to the user. If the `path` argument is a
list or a tuple, the elements are joined with `separator`.
@ -394,7 +410,7 @@ def displayable_path(path, separator='; '):
return path.decode('utf-8', 'ignore')
def syspath(path, prefix=True):
def syspath(path: AnyStr, prefix: bool = True) -> str:
"""Convert a path for use by the operating system. In particular,
paths on Windows must receive a magic prefix and must be converted
to Unicode before they are sent to the OS. To disable the magic
@ -428,14 +444,14 @@ def syspath(path, prefix=True):
return path
def samefile(p1, p2):
def samefile(p1: AnyStr, p2: AnyStr) -> bool:
"""Safer equality for paths."""
if p1 == p2:
return True
return shutil._samefile(syspath(p1), syspath(p2))
def remove(path, soft=True):
def remove(path: AnyStr, soft: bool = True):
"""Remove the file. If `soft`, then no error will be raised if the
file does not exist.
"""
@ -448,7 +464,7 @@ def remove(path, soft=True):
raise FilesystemError(exc, 'delete', (path,), traceback.format_exc())
def copy(path, dest, replace=False):
def copy(path: AnyStr, dest: AnyStr, replace: bool = False):
"""Copy a plain file. Permissions are not copied. If `dest` already
exists, raises a FilesystemError unless `replace` is True. Has no
effect if `path` is the same as `dest`. Paths are translated to
@ -467,7 +483,7 @@ def copy(path, dest, replace=False):
traceback.format_exc())
def move(path, dest, replace=False):
def move(path: AnyStr, dest: AnyStr, replace: bool = False):
"""Rename a file. `dest` may not be a directory. If `dest` already
exists, raises an OSError unless `replace` is True. Has no effect if
`path` is the same as `dest`. If the paths are on different
@ -517,7 +533,7 @@ def move(path, dest, replace=False):
os.remove(tmp)
def link(path, dest, replace=False):
def link(path: AnyStr, dest: AnyStr, replace: bool = False):
"""Create a symbolic link from path to `dest`. Raises an OSError if
`dest` already exists, unless `replace` is True. Does nothing if
`path` == `dest`.
@ -538,7 +554,7 @@ def link(path, dest, replace=False):
traceback.format_exc())
def hardlink(path, dest, replace=False):
def hardlink(path: AnyStr, dest: AnyStr, replace: bool = False):
"""Create a hard link from path to `dest`. Raises an OSError if
`dest` already exists, unless `replace` is True. Does nothing if
`path` == `dest`.
@ -562,7 +578,12 @@ def hardlink(path, dest, replace=False):
traceback.format_exc())
def reflink(path, dest, replace=False, fallback=False):
def reflink(
path: AnyStr,
dest: AnyStr,
replace: bool = False,
fallback: bool = False,
):
"""Create a reflink from `dest` to `path`.
Raise an `OSError` if `dest` already exists, unless `replace` is
@ -591,7 +612,7 @@ def reflink(path, dest, replace=False, fallback=False):
'link', (path, dest), traceback.format_exc())
def unique_path(path):
def unique_path(path: AnyStr) -> AnyStr:
"""Returns a version of ``path`` that does not exist on the
filesystem. Specifically, if ``path` itself already exists, then
something unique is appended to the path.
@ -618,7 +639,7 @@ def unique_path(path):
# Unix. They are forbidden here because they cause problems on Samba
# shares, which are sufficiently common as to cause frequent problems.
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247.aspx
CHAR_REPLACE = [
CHAR_REPLACE: List[Tuple[Pattern, str]] = [
(re.compile(r'[\\/]'), '_'), # / and \ -- forbidden everywhere.
(re.compile(r'^\.'), '_'), # Leading dot (hidden files on Unix).
(re.compile(r'[\x00-\x1f]'), ''), # Control characters.
@ -628,7 +649,10 @@ CHAR_REPLACE = [
]
def sanitize_path(path, replacements=None):
def sanitize_path(
path: AnyStr,
replacements: Sequence[Sequence[Pattern, str]] = None,
) -> [str, bytes]:
"""Takes a path (as a Unicode string) and makes sure that it is
legal. Returns a new path. Only works with fragments; won't work
reliably on Windows when a path begins with a drive letter. Path
@ -649,7 +673,7 @@ def sanitize_path(path, replacements=None):
return os.path.join(*comps)
def truncate_path(path, length=MAX_FILENAME_LENGTH):
def truncate_path(path: AnyStr, length: int = MAX_FILENAME_LENGTH) -> bytes:
"""Given a bytestring path or a Unicode path fragment, truncate the
components to a legal length. In the last component, the extension
is preserved.
@ -666,7 +690,13 @@ def truncate_path(path, length=MAX_FILENAME_LENGTH):
return os.path.join(*out)
def _legalize_stage(path, replacements, length, extension, fragment):
def _legalize_stage(
path: AnyStr,
replacements: Optional[Sequence[Sequence[Pattern, str]]],
length: int,
extension: str,
fragment: Optional[str, bytes],
) -> Tuple[bytes, bool]:
"""Perform a single round of path legalization steps
(sanitation/replacement, encoding from Unicode to bytes,
extension-appending, and truncation). Return the path (Unicode if
@ -690,7 +720,13 @@ def _legalize_stage(path, replacements, length, extension, fragment):
return path, path != pre_truncate_path
def legalize_path(path, replacements, length, extension, fragment):
def legalize_path(
path: AnyStr,
replacements: Optional[Sequence[Sequence[Pattern, str]]],
length: int,
extension: bytes,
fragment: Optional[AnyStr],
) -> Tuple[bytes, bool]:
"""Given a path-like Unicode string, produce a legal path. Return
the path and a flag indicating whether some replacements had to be
ignored (see below).
@ -738,7 +774,7 @@ def legalize_path(path, replacements, length, extension, fragment):
return second_stage_path, retruncated
def py3_path(path):
def py3_path(path: AnyStr) -> str:
"""Convert a bytestring path to Unicode.
This helps deal with APIs on Python 3 that *only* accept Unicode
@ -753,12 +789,12 @@ def py3_path(path):
return os.fsdecode(path)
def str2bool(value):
def str2bool(value: str) -> bool:
"""Returns a boolean reflecting a human-entered string."""
return value.lower() in ('yes', '1', 'true', 't', 'y')
def as_string(value):
def as_string(value: Optional[memoryview, bytes]) -> str:
"""Convert a value to a Unicode object for matching with a query.
None becomes the empty string. Bytestrings are silently decoded.
"""
@ -772,7 +808,7 @@ def as_string(value):
return str(value)
def plurality(objs):
def plurality(objs: Sequence[T]) -> T:
"""Given a sequence of hashble objects, returns the object that
is most common in the set and the its number of appearance. The
sequence must contain at least one object.
@ -783,7 +819,7 @@ def plurality(objs):
return c.most_common(1)[0]
def cpu_count():
def cpu_count() -> int:
"""Return the number of hardware thread contexts (cores or SMT
threads) in the system.
"""
@ -814,13 +850,12 @@ def cpu_count():
return 1
def convert_command_args(args):
def convert_command_args(args: List[bytes]) -> List[str]:
"""Convert command arguments, which may either be `bytes` or `str`
objects, to uniformly surrogate-escaped strings.
"""
objects, to uniformly surrogate-escaped strings. """
assert isinstance(args, list)
def convert(arg):
def convert(arg) -> str:
if isinstance(arg, bytes):
return os.fsdecode(arg)
return arg
@ -832,7 +867,7 @@ def convert_command_args(args):
CommandOutput = namedtuple("CommandOutput", ("stdout", "stderr"))
def command_output(cmd, shell=False):
def command_output(cmd: List[AnyStr], shell: bool = False) -> CommandOutput:
"""Runs the command and returns its output after it has exited.
Returns a CommandOutput. The attributes ``stdout`` and ``stderr`` contain
@ -889,7 +924,7 @@ def max_filename_length(path: AnyStr, limit=MAX_FILENAME_LENGTH) -> int:
return limit
def open_anything():
def open_anything() -> str:
"""Return the system command that dispatches execution to the correct
program.
"""
@ -903,7 +938,7 @@ def open_anything():
return base_cmd
def editor_command():
def editor_command() -> str:
"""Get a command for opening a text file.
Use the `EDITOR` environment variable by default. If it is not
@ -916,7 +951,7 @@ def editor_command():
return open_anything()
def interactive_open(targets, command):
def interactive_open(targets: Sequence[str], command: str):
"""Open the files in `targets` by `exec`ing a new `command`, given
as a Unicode string. (The new program takes over, and Python
execution ends: this does not fork a subprocess.)
@ -938,7 +973,7 @@ def interactive_open(targets, command):
return os.execlp(*args)
def case_sensitive(path):
def case_sensitive(path: AnyStr) -> bool:
"""Check whether the filesystem at the given path is case sensitive.
To work best, the path should point to a file or a directory. If the path
@ -986,7 +1021,7 @@ def case_sensitive(path):
return not os.path.samefile(lower_sys, upper_sys)
def raw_seconds_short(string):
def raw_seconds_short(string: str) -> float:
"""Formats a human-readable M:SS string as a float (number of seconds).
Raises ValueError if the conversion cannot take place due to `string` not
@ -999,7 +1034,7 @@ def raw_seconds_short(string):
return float(minutes * 60 + seconds)
def asciify_path(path, sep_replace):
def asciify_path(path: AnyStr, sep_replace: str) -> str:
"""Decodes all unicode characters in a path into ASCII equivalents.
Substitutions are provided by the unidecode module. Path separators in the
@ -1023,7 +1058,7 @@ def asciify_path(path, sep_replace):
return os.sep.join(path_components)
def par_map(transform, items):
def par_map(transform: Callable, items: Sequence):
"""Apply the function `transform` to all the elements in the
iterable `items`, like `map(transform, items)` but with no return
value.
@ -1037,7 +1072,7 @@ def par_map(transform, items):
pool.join()
def lazy_property(func):
def lazy_property(func: Callable):
"""A decorator that creates a lazily evaluated property. On first access,
the property is assigned the return value of `func`. This first value is
stored, so that future accesses do not have to evaluate `func` again.