library has per-thread transaction stacks (GC-399)

The Library object is now (almost) safe to share across threads. Per-thread
resources are now automatically managed internally. There is no longer any need
for the _reopen_lib hack to get multiple copies of the Library object.
This commit is contained in:
Adrian Sampson 2012-06-08 11:09:45 -07:00
parent d88afbad11
commit 00c47b6811
3 changed files with 42 additions and 60 deletions

View file

@ -81,23 +81,6 @@ def log_choice(config, task, duplicate=False):
elif task.choice_flag is action.SKIP:
tag_log(config.logfile, 'skip', path)
def _reopen_lib(lib):
"""Because of limitations in SQLite, a given Library is bound to
the thread in which it was created. This function reopens Library
objects so that they can be used from separate threads.
"""
if isinstance(lib, library.Library):
return library.Library(
lib.path,
lib.directory,
lib.path_formats,
lib.art_filename,
lib.timeout,
lib.replacements,
)
else:
return lib
def _duplicate_check(lib, task):
"""Check whether an album already exists in the library. Returns a
list of Album objects (empty if no duplicates are found).
@ -552,16 +535,14 @@ def query_tasks(config):
Instead of finding files from the filesystem, a query is used to
match items from the library.
"""
lib = _reopen_lib(config.lib)
if config.singletons:
# Search for items.
for item in lib.items(config.query):
for item in config.lib.items(config.query):
yield ImportTask.item_task(item)
else:
# Search for albums.
for album in lib.albums(config.query):
for album in config.lib.albums(config.query):
log.debug('yielding album %i: %s - %s' %
(album.id, album.albumartist, album.album))
items = list(album.items())
@ -593,7 +574,6 @@ def user_query(config):
a file-like object for logging the import process. The coroutine
accepts and yields ImportTask objects.
"""
lib = _reopen_lib(config.lib)
recent = set()
task = None
while True:
@ -631,7 +611,7 @@ def user_query(config):
# The "recent" set keeps track of identifiers for recently
# imported albums -- those that haven't reached the database
# yet.
if ident in recent or _duplicate_check(lib, task):
if ident in recent or _duplicate_check(config.lib, task):
config.resolve_duplicate_func(task, config)
log_choice(config, task, True)
recent.add(ident)
@ -657,7 +637,6 @@ def apply_choices(config):
"""A coroutine for applying changes to albums during the autotag
process.
"""
lib = _reopen_lib(config.lib)
task = None
while True:
task = yield task
@ -690,7 +669,7 @@ def apply_choices(config):
# when the last item is removed.
task.replaced_items = defaultdict(list)
for item in items:
dup_items = lib.items(library.MatchQuery('path', item.path))
dup_items = config.lib.items(library.MatchQuery('path', item.path))
for dup_item in dup_items:
task.replaced_items[item].append(dup_item)
log.debug('replacing item %i: %s' %
@ -703,49 +682,48 @@ def apply_choices(config):
duplicate_items = []
if task.remove_duplicates:
if task.is_album:
for album in _duplicate_check(lib, task):
for album in _duplicate_check(config.lib, task):
duplicate_items += album.items()
else:
duplicate_items = _item_duplicate_check(lib, task)
duplicate_items = _item_duplicate_check(config.lib, task)
log.debug('removing %i old duplicated items' %
len(duplicate_items))
# Delete duplicate files that are located inside the library
# directory.
for duplicate_path in [i.path for i in duplicate_items]:
if lib.directory in util.ancestry(duplicate_path):
if config.lib.directory in util.ancestry(duplicate_path):
log.debug(u'deleting replaced duplicate %s' %
util.displayable_path(duplicate_path))
util.remove(duplicate_path)
util.prune_dirs(os.path.dirname(duplicate_path),
lib.directory)
config.lib.directory)
# Add items -- before path changes -- to the library. We add the
# items now (rather than at the end) so that album structures
# are in place before calls to destination().
with lib.transaction():
with config.lib.transaction():
# Remove old items.
for replaced in task.replaced_items.itervalues():
for item in replaced:
lib.remove(item)
config.lib.remove(item)
for item in duplicate_items:
lib.remove(item)
config.lib.remove(item)
# Add new ones.
if task.is_album:
# Add an album.
album = lib.add_album(items)
album = config.lib.add_album(items)
task.album_id = album.id
else:
# Add tracks.
for item in items:
lib.add(item)
config.lib.add(item)
def manipulate_files(config):
"""A coroutine (pipeline stage) that performs necessary file
manipulations *after* items have been added to the library.
"""
lib = _reopen_lib(config.lib)
task = None
while True:
task = yield task
@ -759,7 +737,7 @@ def manipulate_files(config):
if config.move:
# Just move the file.
old_path = item.path
lib.move(item, False)
config.lib.move(item, False)
task.prune(old_path)
elif config.copy:
# If it's a reimport, move in-library files and copy
@ -769,31 +747,30 @@ def manipulate_files(config):
if task.replaced_items[item]:
# This is a reimport. Move in-library files and copy
# out-of-library files.
if lib.directory in util.ancestry(old_path):
lib.move(item, False)
if config.lib.directory in util.ancestry(old_path):
config.lib.move(item, False)
# We moved the item, so remove the
# now-nonexistent file from old_paths.
task.old_paths.remove(old_path)
else:
lib.move(item, True)
config.lib.move(item, True)
else:
# A normal import. Just copy files and keep track of
# old paths.
lib.move(item, True)
config.lib.move(item, True)
if config.write and task.should_write_tags():
item.write()
# Save new paths.
with lib.transaction():
with config.lib.transaction():
for item in items:
lib.store(item)
config.lib.store(item)
def fetch_art(config):
"""A coroutine that fetches and applies album art for albums where
appropriate.
"""
lib = _reopen_lib(config.lib)
task = None
while True:
task = yield task
@ -805,7 +782,7 @@ def fetch_art(config):
# Save the art if any was found.
if artpath:
album = lib.get_album(task.album_id)
album = config.lib.get_album(task.album_id)
album.set_art(artpath, not (config.delete or config.move))
if config.delete or config.move:
@ -816,7 +793,6 @@ def finalize(config):
coroutine sends plugin events, deletes old files, and saves
progress. This is a "terminal" coroutine (it yields None).
"""
lib = _reopen_lib(config.lib)
while True:
task = yield
if task.should_skip():
@ -830,11 +806,13 @@ def finalize(config):
# Announce that we've added an album.
if task.is_album:
album = lib.get_album(task.album_id)
plugins.send('album_imported', lib=lib, album=album, config=config)
album = config.lib.get_album(task.album_id)
plugins.send('album_imported',
lib=config.lib, album=album, config=config)
else:
for item in items:
plugins.send('item_imported', lib=lib, item=item, config=config)
plugins.send('item_imported',
lib=config.lib, item=item, config=config)
# Finally, delete old files.
if config.copy and config.delete:
@ -872,7 +850,6 @@ def item_query(config):
"""A coroutine that queries the user for input on single-item
lookups.
"""
lib = _reopen_lib(config.lib)
task = None
recent = set()
while True:
@ -888,7 +865,7 @@ def item_query(config):
# Duplicate check.
if task.choice_flag in (action.ASIS, action.APPLY):
ident = task.chosen_ident()
if ident in recent or _item_duplicate_check(lib, task):
if ident in recent or _item_duplicate_check(config.lib, task):
config.resolve_duplicate_func(task, config)
log_choice(config, task, True)
recent.add(ident)

View file

@ -22,6 +22,7 @@ import logging
import shlex
import unicodedata
import threading
from collections import defaultdict
from unidecode import unidecode
from beets.mediafile import MediaFile
from beets import plugins
@ -905,11 +906,19 @@ class Transaction(object):
def __init__(self, lib):
self.lib = lib
@property
def _stack(self):
"""Return the transaction stack that this transaction belongs
to. This is the associated library's stack for the current
thread ID. Transactions should never migrate across threads.
"""
return self.lib._tx_stacks[threading.current_thread().ident]
def __enter__(self):
"""Begin a transaction. This transaction may be created while
another is active.
another is active in a different thread.
"""
self.lib._tx_stack.append(self)
self._stack.append(self)
return self
def __exit__(self, exc_type, exc_value, traceback):
@ -917,8 +926,8 @@ class Transaction(object):
entered but not yet exited transaction. If it is the last active
transaction, the database updates are committed.
"""
assert self.lib._tx_stack.pop() is self
if not self.lib._tx_stack:
assert self._stack.pop() is self
if not self._stack:
self.lib._connection().commit()
def query(self, statement, subvals=()):
@ -960,10 +969,10 @@ class Library(BaseLibrary):
self.replacements = replacements
self._memotable = {} # Used for template substitution performance.
self._tx_stack = []
self.timeout = timeout
self._connections = {}
self._tx_stacks = defaultdict(list)
self._make_table('items', item_fields)
self._make_table('albums', album_fields)

View file

@ -15,7 +15,6 @@
"""A Web interface to beets."""
from beets.plugins import BeetsPlugin
from beets import ui
from beets.importer import _reopen_lib
import beets.library
import flask
from flask import g
@ -44,10 +43,7 @@ app = flask.Flask(__name__)
@app.before_request
def before_request():
g.lib = _reopen_lib(app.config['lib'])
@app.teardown_request
def teardown_request(req):
g.lib._connection().close()
g.lib = app.config['lib']
# Items.