mirror of
https://github.com/beetbox/beets.git
synced 2026-01-05 15:33:15 +01:00
Flake8 fixes
This commit is contained in:
parent
9d9f1b539f
commit
883e3b3bc7
7 changed files with 105 additions and 44 deletions
|
|
@ -38,9 +38,7 @@ from beets import config
|
|||
from beets.util import confit
|
||||
from beets.autotag import mb
|
||||
|
||||
|
||||
# On Windows platforms, use colorama to support "ANSI" terminal colors.
|
||||
|
||||
if sys.platform == 'win32':
|
||||
try:
|
||||
import colorama
|
||||
|
|
@ -50,28 +48,22 @@ if sys.platform == 'win32':
|
|||
colorama.init()
|
||||
|
||||
|
||||
|
||||
# Constants.
|
||||
|
||||
log = logging.getLogger('beets')
|
||||
|
||||
PF_KEY_QUERIES = {
|
||||
'comp': 'comp:true',
|
||||
'singleton': 'singleton:true',
|
||||
}
|
||||
|
||||
# UI exception. Commands should throw this in order to display
|
||||
# nonrecoverable errors to the user.
|
||||
|
||||
class UserError(Exception):
|
||||
pass
|
||||
|
||||
# Main logger.
|
||||
log = logging.getLogger('beets')
|
||||
|
||||
"""UI exception. Commands should throw this in order to display
|
||||
nonrecoverable errors to the user.
|
||||
"""
|
||||
|
||||
|
||||
# Utilities.
|
||||
|
||||
|
||||
def _encoding():
|
||||
"""Tries to guess the encoding used by the terminal."""
|
||||
# Configured override?
|
||||
|
|
@ -170,7 +162,7 @@ def input_options(options, require=False, prompt=None, fallback_prompt=None,
|
|||
# Infer a letter.
|
||||
for letter in option:
|
||||
if not letter.isalpha():
|
||||
continue # Don't use punctuation.
|
||||
continue # Don't use punctuation.
|
||||
if letter not in letters:
|
||||
found_letter = letter
|
||||
break
|
||||
|
|
@ -181,9 +173,10 @@ def input_options(options, require=False, prompt=None, fallback_prompt=None,
|
|||
index = option.index(found_letter)
|
||||
|
||||
# Mark the option's shortcut letter for display.
|
||||
if not require and ((default is None and not numrange and first) or
|
||||
(isinstance(default, basestring) and
|
||||
found_letter.lower() == default.lower())):
|
||||
if not require and (
|
||||
(default is None and not numrange and first) or
|
||||
(isinstance(default, basestring) and
|
||||
found_letter.lower() == default.lower())):
|
||||
# The first option is the default; mark it.
|
||||
show_letter = '[%s]' % found_letter.upper()
|
||||
is_default = True
|
||||
|
|
@ -352,11 +345,13 @@ def human_seconds_short(interval):
|
|||
# http://dev.pocoo.org/hg/pygments-main/file/b2deea5b5030/pygments/console.py
|
||||
# (pygments is by Tim Hatch, Armin Ronacher, et al.)
|
||||
COLOR_ESCAPE = "\x1b["
|
||||
DARK_COLORS = ["black", "darkred", "darkgreen", "brown", "darkblue",
|
||||
"purple", "teal", "lightgray"]
|
||||
DARK_COLORS = ["black", "darkred", "darkgreen", "brown", "darkblue",
|
||||
"purple", "teal", "lightgray"]
|
||||
LIGHT_COLORS = ["darkgray", "red", "green", "yellow", "blue",
|
||||
"fuchsia", "turquoise", "white"]
|
||||
RESET_COLOR = COLOR_ESCAPE + "39;49;00m"
|
||||
|
||||
|
||||
def _colorize(color, text):
|
||||
"""Returns a string that prints the given text in the given color
|
||||
in a terminal that is ANSI color-aware. The color must be something
|
||||
|
|
@ -461,8 +456,8 @@ def color_diff_suffix(a, b, highlight='red'):
|
|||
first_diff = min(len(a), len(b))
|
||||
|
||||
# Colorize from the first difference on.
|
||||
return a[:first_diff] + colorize(highlight, a[first_diff:]), \
|
||||
b[:first_diff] + colorize(highlight, b[first_diff:])
|
||||
return (a[:first_diff] + colorize(highlight, a[first_diff:]),
|
||||
b[:first_diff] + colorize(highlight, b[first_diff:]))
|
||||
|
||||
|
||||
def get_path_formats(subview=None):
|
||||
|
|
@ -558,6 +553,8 @@ def term_width():
|
|||
|
||||
|
||||
FLOAT_EPSILON = 0.01
|
||||
|
||||
|
||||
def _field_diff(field, old, new):
|
||||
"""Given two Model objects, format their values for `field` and
|
||||
highlight changes among them. Return a human-readable string. If the
|
||||
|
|
@ -627,10 +624,8 @@ def show_model_changes(new, old=None, fields=None, always=False):
|
|||
return bool(changes)
|
||||
|
||||
|
||||
|
||||
# Subcommand parsing infrastructure.
|
||||
|
||||
|
||||
#
|
||||
# This is a fairly generic subcommand parser for optparse. It is
|
||||
# maintained externally here:
|
||||
# http://gist.github.com/462717
|
||||
|
|
@ -654,14 +649,17 @@ class Subcommand(object):
|
|||
self.help = help
|
||||
self.hide = hide
|
||||
|
||||
|
||||
class SubcommandsOptionParser(optparse.OptionParser):
|
||||
"""A variant of OptionParser that parses subcommands and their
|
||||
arguments.
|
||||
"""
|
||||
# A singleton command used to give help on other subcommands.
|
||||
_HelpSubcommand = Subcommand('help', optparse.OptionParser(),
|
||||
_HelpSubcommand = Subcommand(
|
||||
'help', optparse.OptionParser(),
|
||||
help='give detailed help on a specific sub-command',
|
||||
aliases=('?',))
|
||||
aliases=('?',)
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Create a new subcommand-aware option parser. All of the
|
||||
|
|
@ -684,7 +682,7 @@ class SubcommandsOptionParser(optparse.OptionParser):
|
|||
# Adjust the help-visible name of each subcommand.
|
||||
for subcommand in self.subcommands:
|
||||
subcommand.parser.prog = '%s %s' % \
|
||||
(self.get_prog_name(), subcommand.name)
|
||||
(self.get_prog_name(), subcommand.name)
|
||||
|
||||
# Our root parser needs to stop on the first unrecognized argument.
|
||||
self.disable_interspersed_args()
|
||||
|
|
@ -802,6 +800,8 @@ class SubcommandsOptionParser(optparse.OptionParser):
|
|||
|
||||
|
||||
optparse.Option.ALWAYS_TYPED_ACTIONS += ('callback',)
|
||||
|
||||
|
||||
def vararg_callback(option, opt_str, value, parser):
|
||||
"""Callback for an option with variable arguments.
|
||||
Manually collect arguments right of a callback-action
|
||||
|
|
@ -838,10 +838,8 @@ def vararg_callback(option, opt_str, value, parser):
|
|||
setattr(parser.values, option.dest, value)
|
||||
|
||||
|
||||
|
||||
# The main entry point and bootstrapping.
|
||||
|
||||
|
||||
def _load_plugins():
|
||||
"""Load the plugins specified in the configuration.
|
||||
"""
|
||||
|
|
@ -932,12 +930,11 @@ def _raw_main(args):
|
|||
log.debug(u'data directory: {0}\n'
|
||||
u'library database: {1}\n'
|
||||
u'library directory: {2}'
|
||||
.format(
|
||||
util.displayable_path(config.config_dir()),
|
||||
util.displayable_path(lib.path),
|
||||
util.displayable_path(lib.directory),
|
||||
)
|
||||
)
|
||||
.format(
|
||||
util.displayable_path(config.config_dir()),
|
||||
util.displayable_path(lib.path),
|
||||
util.displayable_path(lib.directory),
|
||||
))
|
||||
|
||||
# Configure the MusicBrainz API.
|
||||
mb.configure()
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ def resize_url(url, maxwidth):
|
|||
maxwidth (preserving aspect ratio).
|
||||
"""
|
||||
return '{0}?{1}'.format(PROXY_URL, urllib.urlencode({
|
||||
'url': url.replace('http://',''),
|
||||
'url': url.replace('http://', ''),
|
||||
'w': str(maxwidth),
|
||||
}))
|
||||
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ class Event(object):
|
|||
"""
|
||||
pass
|
||||
|
||||
|
||||
class WaitableEvent(Event):
|
||||
"""A waitable event is one encapsulating an action that can be
|
||||
waited for using a select() call. That is, it's an event with an
|
||||
|
|
@ -57,21 +58,25 @@ class WaitableEvent(Event):
|
|||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ValueEvent(Event):
|
||||
"""An event that does nothing but return a fixed value."""
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
|
||||
class ExceptionEvent(Event):
|
||||
"""Raise an exception at the yield point. Used internally."""
|
||||
def __init__(self, exc_info):
|
||||
self.exc_info = exc_info
|
||||
|
||||
|
||||
class SpawnEvent(Event):
|
||||
"""Add a new coroutine thread to the scheduler."""
|
||||
def __init__(self, coro):
|
||||
self.spawned = coro
|
||||
|
||||
|
||||
class JoinEvent(Event):
|
||||
"""Suspend the thread until the specified child thread has
|
||||
completed.
|
||||
|
|
@ -79,11 +84,13 @@ class JoinEvent(Event):
|
|||
def __init__(self, child):
|
||||
self.child = child
|
||||
|
||||
|
||||
class KillEvent(Event):
|
||||
"""Unschedule a child thread."""
|
||||
def __init__(self, child):
|
||||
self.child = child
|
||||
|
||||
|
||||
class DelegationEvent(Event):
|
||||
"""Suspend execution of the current thread, start a new thread and,
|
||||
once the child thread finished, return control to the parent
|
||||
|
|
@ -92,6 +99,7 @@ class DelegationEvent(Event):
|
|||
def __init__(self, coro):
|
||||
self.spawned = coro
|
||||
|
||||
|
||||
class ReturnEvent(Event):
|
||||
"""Return a value the current thread's delegator at the point of
|
||||
delegation. Ends the current (delegate) thread.
|
||||
|
|
@ -99,6 +107,7 @@ class ReturnEvent(Event):
|
|||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
|
||||
class SleepEvent(WaitableEvent):
|
||||
"""Suspend the thread for a given duration.
|
||||
"""
|
||||
|
|
@ -108,6 +117,7 @@ class SleepEvent(WaitableEvent):
|
|||
def time_left(self):
|
||||
return max(self.wakeup_time - time.time(), 0.0)
|
||||
|
||||
|
||||
class ReadEvent(WaitableEvent):
|
||||
"""Reads from a file-like object."""
|
||||
def __init__(self, fd, bufsize):
|
||||
|
|
@ -120,6 +130,7 @@ class ReadEvent(WaitableEvent):
|
|||
def fire(self):
|
||||
return self.fd.read(self.bufsize)
|
||||
|
||||
|
||||
class WriteEvent(WaitableEvent):
|
||||
"""Writes to a file-like object."""
|
||||
def __init__(self, fd, data):
|
||||
|
|
@ -192,15 +203,19 @@ def _event_select(events):
|
|||
|
||||
return ready_events
|
||||
|
||||
|
||||
class ThreadException(Exception):
|
||||
def __init__(self, coro, exc_info):
|
||||
self.coro = coro
|
||||
self.exc_info = exc_info
|
||||
|
||||
def reraise(self):
|
||||
_reraise(self.exc_info[0], self.exc_info[1], self.exc_info[2])
|
||||
|
||||
|
||||
SUSPENDED = Event() # Special sentinel placeholder for suspended threads.
|
||||
|
||||
|
||||
class Delegated(Event):
|
||||
"""Placeholder indicating that a thread has delegated execution to a
|
||||
different thread.
|
||||
|
|
@ -208,6 +223,7 @@ class Delegated(Event):
|
|||
def __init__(self, child):
|
||||
self.child = child
|
||||
|
||||
|
||||
def run(root_coro):
|
||||
"""Schedules a coroutine, running it to completion. This
|
||||
encapsulates the Bluelet scheduler, which the root coroutine can
|
||||
|
|
@ -329,7 +345,7 @@ def run(root_coro):
|
|||
break
|
||||
|
||||
# Wait and fire.
|
||||
event2coro = dict((v,k) for k,v in threads.items())
|
||||
event2coro = dict((v, k) for k, v in threads.items())
|
||||
for event in _event_select(threads.values()):
|
||||
# Run the IO operation, but catch socket errors.
|
||||
try:
|
||||
|
|
@ -378,6 +394,7 @@ def run(root_coro):
|
|||
class SocketClosedError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Listener(object):
|
||||
"""A socket wrapper object for listening sockets.
|
||||
"""
|
||||
|
|
@ -407,6 +424,7 @@ class Listener(object):
|
|||
self._closed = True
|
||||
self.sock.close()
|
||||
|
||||
|
||||
class Connection(object):
|
||||
"""A socket wrapper object for connected sockets.
|
||||
"""
|
||||
|
|
@ -468,6 +486,7 @@ class Connection(object):
|
|||
yield ReturnEvent(line)
|
||||
break
|
||||
|
||||
|
||||
class AcceptEvent(WaitableEvent):
|
||||
"""An event for Listener objects (listening sockets) that suspends
|
||||
execution until the socket gets a connection.
|
||||
|
|
@ -482,6 +501,7 @@ class AcceptEvent(WaitableEvent):
|
|||
sock, addr = self.listener.sock.accept()
|
||||
return Connection(sock, addr)
|
||||
|
||||
|
||||
class ReceiveEvent(WaitableEvent):
|
||||
"""An event for Connection objects (connected sockets) for
|
||||
asynchronously reading data.
|
||||
|
|
@ -496,6 +516,7 @@ class ReceiveEvent(WaitableEvent):
|
|||
def fire(self):
|
||||
return self.conn.sock.recv(self.bufsize)
|
||||
|
||||
|
||||
class SendEvent(WaitableEvent):
|
||||
"""An event for Connection objects (connected sockets) for
|
||||
asynchronously writing data.
|
||||
|
|
@ -523,6 +544,7 @@ def null():
|
|||
"""
|
||||
return ValueEvent(None)
|
||||
|
||||
|
||||
def spawn(coro):
|
||||
"""Event: add another coroutine to the scheduler. Both the parent
|
||||
and child coroutines run concurrently.
|
||||
|
|
@ -531,6 +553,7 @@ def spawn(coro):
|
|||
raise ValueError('%s is not a coroutine' % str(coro))
|
||||
return SpawnEvent(coro)
|
||||
|
||||
|
||||
def call(coro):
|
||||
"""Event: delegate to another coroutine. The current coroutine
|
||||
is resumed once the sub-coroutine finishes. If the sub-coroutine
|
||||
|
|
@ -540,12 +563,14 @@ def call(coro):
|
|||
raise ValueError('%s is not a coroutine' % str(coro))
|
||||
return DelegationEvent(coro)
|
||||
|
||||
|
||||
def end(value=None):
|
||||
"""Event: ends the coroutine and returns a value to its
|
||||
delegator.
|
||||
"""
|
||||
return ReturnEvent(value)
|
||||
|
||||
|
||||
def read(fd, bufsize=None):
|
||||
"""Event: read from a file descriptor asynchronously."""
|
||||
if bufsize is None:
|
||||
|
|
@ -563,10 +588,12 @@ def read(fd, bufsize=None):
|
|||
else:
|
||||
return ReadEvent(fd, bufsize)
|
||||
|
||||
|
||||
def write(fd, data):
|
||||
"""Event: write to a file descriptor asynchronously."""
|
||||
return WriteEvent(fd, data)
|
||||
|
||||
|
||||
def connect(host, port):
|
||||
"""Event: connect to a network address and return a Connection
|
||||
object for communicating on the socket.
|
||||
|
|
@ -575,17 +602,20 @@ def connect(host, port):
|
|||
sock = socket.create_connection(addr)
|
||||
return ValueEvent(Connection(sock, addr))
|
||||
|
||||
|
||||
def sleep(duration):
|
||||
"""Event: suspend the thread for ``duration`` seconds.
|
||||
"""
|
||||
return SleepEvent(duration)
|
||||
|
||||
|
||||
def join(coro):
|
||||
"""Suspend the thread until another, previously `spawn`ed thread
|
||||
completes.
|
||||
"""
|
||||
return JoinEvent(coro)
|
||||
|
||||
|
||||
def kill(coro):
|
||||
"""Halt the execution of a different `spawn`ed thread.
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@ BASESTRING = str if PY3 else basestring
|
|||
NUMERIC_TYPES = (int, float) if PY3 else (int, float, long)
|
||||
TYPE_TYPES = (type,) if PY3 else (type, types.ClassType)
|
||||
|
||||
|
||||
def iter_first(sequence):
|
||||
"""Get the first element from an iterable or raise a ValueError if
|
||||
the iterator generates no values.
|
||||
|
|
@ -67,17 +68,21 @@ class ConfigError(Exception):
|
|||
"""Base class for exceptions raised when querying a configuration.
|
||||
"""
|
||||
|
||||
|
||||
class NotFoundError(ConfigError):
|
||||
"""A requested value could not be found in the configuration trees.
|
||||
"""
|
||||
|
||||
|
||||
class ConfigTypeError(ConfigError, TypeError):
|
||||
"""The value in the configuration did not match the expected type.
|
||||
"""
|
||||
|
||||
|
||||
class ConfigValueError(ConfigError, ValueError):
|
||||
"""The value in the configuration is illegal."""
|
||||
|
||||
|
||||
class ConfigReadError(ConfigError):
|
||||
"""A configuration file could not be read."""
|
||||
def __init__(self, filename, reason=None):
|
||||
|
|
@ -132,6 +137,7 @@ class ConfigSource(dict):
|
|||
else:
|
||||
raise TypeError('source value must be a dict')
|
||||
|
||||
|
||||
class ConfigView(object):
|
||||
"""A configuration "view" is a query into a program's configuration
|
||||
data. A view represents a hypothetical location in the configuration
|
||||
|
|
@ -518,6 +524,7 @@ def _package_path(name):
|
|||
|
||||
return os.path.dirname(os.path.abspath(filepath))
|
||||
|
||||
|
||||
def config_dirs():
|
||||
"""Return a platform-specific list of candidates for user
|
||||
configuration directories on the system.
|
||||
|
|
@ -606,10 +613,12 @@ class Loader(yaml.SafeLoader):
|
|||
plain = super(Loader, self).check_plain()
|
||||
return plain or self.peek() == '%'
|
||||
|
||||
|
||||
Loader.add_constructor('tag:yaml.org,2002:str', Loader._construct_unicode)
|
||||
Loader.add_constructor('tag:yaml.org,2002:map', Loader.construct_yaml_map)
|
||||
Loader.add_constructor('tag:yaml.org,2002:omap', Loader.construct_yaml_map)
|
||||
|
||||
|
||||
def load_yaml(filename):
|
||||
"""Read a YAML document from a file. If the file cannot be read or
|
||||
parsed, a ConfigReadError is raised.
|
||||
|
|
@ -679,11 +688,13 @@ class Dumper(yaml.SafeDumper):
|
|||
"""
|
||||
return self.represent_scalar('tag:yaml.org,2002:null', '')
|
||||
|
||||
|
||||
Dumper.add_representer(OrderedDict, Dumper.represent_dict)
|
||||
Dumper.add_representer(bool, Dumper.represent_bool)
|
||||
Dumper.add_representer(type(None), Dumper.represent_none)
|
||||
Dumper.add_representer(list, Dumper.represent_list)
|
||||
|
||||
|
||||
def restore_yaml_comments(data, default_data):
|
||||
"""Scan default_data for comments (we include empty lines in our
|
||||
definition of comments) and place them before the same keys in data.
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class OrderedEnum(Enum):
|
||||
"""
|
||||
An Enum subclass that allows comparison of members.
|
||||
|
|
@ -22,14 +23,17 @@ class OrderedEnum(Enum):
|
|||
if self.__class__ is other.__class__:
|
||||
return self.value >= other.value
|
||||
return NotImplemented
|
||||
|
||||
def __gt__(self, other):
|
||||
if self.__class__ is other.__class__:
|
||||
return self.value > other.value
|
||||
return NotImplemented
|
||||
|
||||
def __le__(self, other):
|
||||
if self.__class__ is other.__class__:
|
||||
return self.value <= other.value
|
||||
return NotImplemented
|
||||
|
||||
def __lt__(self, other):
|
||||
if self.__class__ is other.__class__:
|
||||
return self.value < other.value
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ POISON = '__PIPELINE_POISON__'
|
|||
|
||||
DEFAULT_QUEUE_SIZE = 16
|
||||
|
||||
|
||||
def _invalidate_queue(q, val=None, sync=True):
|
||||
"""Breaks a Queue such that it never blocks, always has size 1,
|
||||
and has no maximum size. get()ing from the queue returns `val`,
|
||||
|
|
@ -50,8 +51,10 @@ def _invalidate_queue(q, val=None, sync=True):
|
|||
"""
|
||||
def _qsize(len=len):
|
||||
return 1
|
||||
|
||||
def _put(item):
|
||||
pass
|
||||
|
||||
def _get():
|
||||
return val
|
||||
|
||||
|
|
@ -70,6 +73,7 @@ def _invalidate_queue(q, val=None, sync=True):
|
|||
if sync:
|
||||
q.mutex.release()
|
||||
|
||||
|
||||
class CountedQueue(Queue.Queue):
|
||||
"""A queue that keeps track of the number of threads that are
|
||||
still feeding into it. The queue is poisoned when all threads are
|
||||
|
|
@ -104,6 +108,7 @@ class CountedQueue(Queue.Queue):
|
|||
|
||||
# Replacement _get invalidates when no items remain.
|
||||
_old_get = self._get
|
||||
|
||||
def _get():
|
||||
out = _old_get()
|
||||
if not self.queue:
|
||||
|
|
@ -117,18 +122,22 @@ class CountedQueue(Queue.Queue):
|
|||
# No items. Invalidate immediately.
|
||||
_invalidate_queue(self, POISON, False)
|
||||
|
||||
|
||||
class MultiMessage(object):
|
||||
"""A message yielded by a pipeline stage encapsulating multiple
|
||||
values to be sent to the next stage.
|
||||
"""
|
||||
def __init__(self, messages):
|
||||
self.messages = messages
|
||||
|
||||
|
||||
def multiple(messages):
|
||||
"""Yield multiple([message, ..]) from a pipeline stage to send
|
||||
multiple values to the next pipeline stage.
|
||||
"""
|
||||
return MultiMessage(messages)
|
||||
|
||||
|
||||
def _allmsgs(obj):
|
||||
"""Returns a list of all the messages encapsulated in obj. If obj
|
||||
is a MultiMessage, returns its enclosed messages. If obj is BUBBLE,
|
||||
|
|
@ -141,6 +150,7 @@ def _allmsgs(obj):
|
|||
else:
|
||||
return [obj]
|
||||
|
||||
|
||||
class PipelineThread(Thread):
|
||||
"""Abstract base class for pipeline-stage threads."""
|
||||
def __init__(self, all_threads):
|
||||
|
|
@ -169,6 +179,7 @@ class PipelineThread(Thread):
|
|||
for thread in self.all_threads:
|
||||
thread.abort()
|
||||
|
||||
|
||||
class FirstPipelineThread(PipelineThread):
|
||||
"""The thread running the first stage in a parallel pipeline setup.
|
||||
The coroutine should just be a generator.
|
||||
|
|
@ -209,6 +220,7 @@ class FirstPipelineThread(PipelineThread):
|
|||
# Generator finished; shut down the pipeline.
|
||||
self.out_queue.release()
|
||||
|
||||
|
||||
class MiddlePipelineThread(PipelineThread):
|
||||
"""A thread running any stage in the pipeline except the first or
|
||||
last.
|
||||
|
|
@ -256,6 +268,7 @@ class MiddlePipelineThread(PipelineThread):
|
|||
# Pipeline is shutting down normally.
|
||||
self.out_queue.release()
|
||||
|
||||
|
||||
class LastPipelineThread(PipelineThread):
|
||||
"""A thread running the last stage in a pipeline. The coroutine
|
||||
should yield nothing.
|
||||
|
|
@ -291,6 +304,7 @@ class LastPipelineThread(PipelineThread):
|
|||
self.abort_all(sys.exc_info())
|
||||
return
|
||||
|
||||
|
||||
class Pipeline(object):
|
||||
"""Represents a staged pattern of work. Each stage in the pipeline
|
||||
is a coroutine that receives messages from the previous stage and
|
||||
|
|
@ -322,7 +336,8 @@ class Pipeline(object):
|
|||
messages between the stages are stored in queues of the given
|
||||
size.
|
||||
"""
|
||||
queues = [CountedQueue(queue_size) for i in range(len(self.stages)-1)]
|
||||
queue_count = len(self.stages) - 1
|
||||
queues = [CountedQueue(queue_size) for i in range(queue_count)]
|
||||
threads = []
|
||||
|
||||
# Set up first stage.
|
||||
|
|
@ -330,10 +345,10 @@ class Pipeline(object):
|
|||
threads.append(FirstPipelineThread(coro, queues[0], threads))
|
||||
|
||||
# Middle stages.
|
||||
for i in range(1, len(self.stages)-1):
|
||||
for i in range(1, queue_count):
|
||||
for coro in self.stages[i]:
|
||||
threads.append(MiddlePipelineThread(
|
||||
coro, queues[i-1], queues[i], threads
|
||||
coro, queues[i - 1], queues[i], threads
|
||||
))
|
||||
|
||||
# Last stage.
|
||||
|
|
@ -408,17 +423,20 @@ if __name__ == '__main__':
|
|||
print('generating %i' % i)
|
||||
time.sleep(1)
|
||||
yield i
|
||||
|
||||
def work():
|
||||
num = yield
|
||||
while True:
|
||||
print('processing %i' % num)
|
||||
time.sleep(2)
|
||||
num = yield num*2
|
||||
num = yield num * 2
|
||||
|
||||
def consume():
|
||||
while True:
|
||||
num = yield
|
||||
time.sleep(1)
|
||||
print('received %i' % num)
|
||||
|
||||
ts_start = time.time()
|
||||
Pipeline([produce(), work(), consume()]).run_sequential()
|
||||
ts_seq = time.time()
|
||||
|
|
@ -437,6 +455,7 @@ if __name__ == '__main__':
|
|||
print('generating %i' % i)
|
||||
time.sleep(1)
|
||||
yield i
|
||||
|
||||
def exc_work():
|
||||
num = yield
|
||||
while True:
|
||||
|
|
@ -445,10 +464,10 @@ if __name__ == '__main__':
|
|||
if num == 3:
|
||||
raise Exception()
|
||||
num = yield num * 2
|
||||
|
||||
def exc_consume():
|
||||
while True:
|
||||
num = yield
|
||||
#if num == 4:
|
||||
# raise Exception()
|
||||
print('received %i' % num)
|
||||
|
||||
Pipeline([exc_produce(), exc_work(), exc_consume()]).run_parallel(1)
|
||||
|
|
|
|||
|
|
@ -9,4 +9,4 @@ ignore=F401,E241
|
|||
|
||||
# List of files that have not been cleand up yet. We will try to reduce
|
||||
# this with each commit
|
||||
exclude=test/*,beetsplug/*,beets/autotag/hooks.py,beets/autotag/__init__.py,beets/autotag/match.py,beets/autotag/mb.py,beets/dbcore/db.py,beets/importer.py,beets/library.py,beets/plugins.py,beets/ui/commands.py,beets/ui/__init__.py,beets/ui/migrate.py,beets/util/artresizer.py,beets/util/bluelet.py,beets/util/confit.py,beets/util/enumeration.py,beets/util/functemplate.py,beets/util/pipeline.py
|
||||
exclude=test/*,beetsplug/*,beets/autotag/hooks.py,beets/autotag/__init__.py,beets/autotag/match.py,beets/autotag/mb.py,beets/dbcore/db.py,beets/importer.py,beets/library.py,beets/plugins.py,beets/ui/commands.py,beets/ui/migrate.py,beets/util/functemplate.py
|
||||
|
|
|
|||
Loading…
Reference in a new issue