Fix a few remaining downgrade locking errors

Fix a few remaining call sites that could generate downgrade lock
errors. Also speed up locking a little by aliasing the __enter__ and
__exit__ methods on the wrapper lock object instead of wrapping them.
This commit is contained in:
Kovid Goyal 2013-10-17 15:25:26 +05:30
parent 9296118d7a
commit dba9397a5e
5 changed files with 57 additions and 26 deletions

View file

@ -18,7 +18,7 @@
from calibre.customize.ui import run_plugins_on_import, run_plugins_on_postimport
from calibre.db import SPOOL_SIZE, _get_next_series_num_for_list
from calibre.db.categories import get_categories
from calibre.db.locking import create_locks, DowngradeLockError
from calibre.db.locking import create_locks, DowngradeLockError, SafeReadLock
from calibre.db.errors import NoSuchFormat
from calibre.db.fields import create_field, IDENTITY, InvalidLinkTable
from calibre.db.search import Search
@ -57,11 +57,8 @@ def call_func_with_lock(*args, **kwargs):
return func(*args, **kwargs)
except DowngradeLockError:
# We already have an exclusive lock, no need to acquire a shared
# lock. This can happen when updating the search cache in the
# presence of composite columns. Updating the search cache holds an
# exclusive lock, but searching a composite column involves
# reading field values via ProxyMetadata which tries to get a
# shared lock.
# lock. See the safe_read_lock properties documentation for why
# this is necessary.
return func(*args, **kwargs)
return call_func_with_lock
@ -118,6 +115,17 @@ def __init__(self, backend):
self._search_api = Search(self, 'saved_searches', self.field_metadata.get_search_terms())
self.initialize_dynamic()
@property
def safe_read_lock(self):
''' A safe read lock is a lock that does nothing if the thread already
has a write lock, otherwise it acquires a read lock. This is necessary
to prevent DowngradeLockErrors, which can happen when updating the
search cache in the presence of composite columns. Updating the search
cache holds an exclusive lock, but searching a composite column
involves reading field values via ProxyMetadata which tries to get a
shared lock. There may be other scenarios that trigger this as well. '''
return SafeReadLock(self.read_lock)
@write_api
def initialize_dynamic(self):
# Reconstruct the user categories, putting them into field_metadata
@ -501,7 +509,7 @@ def format_metadata(self, book_id, fmt, allow_cache=True, update_db=False):
x = self.format_metadata_cache[book_id].get(fmt, None)
if x is not None:
return x
with self.read_lock:
with self.safe_read_lock:
try:
name = self.fields['formats'].format_fname(book_id, fmt)
path = self._field_for('path', book_id).replace('/', os.sep)
@ -545,7 +553,7 @@ def get_metadata(self, book_id,
cover_as_data is True then as mi.cover_data.
'''
with self.read_lock:
with self.safe_read_lock:
mi = self._get_metadata(book_id, get_user_categories=get_user_categories)
if get_cover:
@ -751,7 +759,7 @@ def format(self, book_id, fmt, as_file=False, as_path=False, preserve_filename=F
ext = ('.'+fmt.lower()) if fmt else ''
if as_path:
if preserve_filename:
with self.read_lock:
with self.safe_read_lock:
try:
fname = self.fields['formats'].format_fname(book_id, fmt)
except:
@ -777,7 +785,7 @@ def format(self, book_id, fmt, as_file=False, as_path=False, preserve_filename=F
return None
ret = pt.name
elif as_file:
with self.read_lock:
with self.safe_read_lock:
try:
fname = self.fields['formats'].format_fname(book_id, fmt)
except:
@ -878,7 +886,7 @@ def search(self, query, restriction='', virtual_fields=None, book_ids=None):
@api
def get_categories(self, sort='name', book_ids=None, icon_map=None, already_fixed=None):
try:
with self.read_lock:
with self.safe_read_lock:
return get_categories(self, sort=sort, book_ids=book_ids, icon_map=icon_map)
except InvalidLinkTable as err:
bad_field = err.field_name

View file

@ -132,7 +132,7 @@ def func(dbref, book_id, cache):
author_ids, adata = cache['adata']
except KeyError:
db = dbref()
with db.read_lock:
with db.safe_read_lock:
author_ids = db._field_ids_for('authors', book_id)
adata = db._author_data(author_ids)
cache['adata'] = (author_ids, adata)

View file

@ -154,7 +154,7 @@ def all_ids(self):
return tuple(self.new_api.all_book_ids())
def is_empty(self):
with self.new_api.read_lock:
with self.new_api.safe_read_lock:
return not bool(self.new_api.fields['title'].table.book_col_map)
def get_usage_count_by_id(self, field):
@ -363,7 +363,7 @@ def author_sort_from_book(self, index, index_is_id=False):
def authors_with_sort_strings(self, index, index_is_id=False):
book_id = index if index_is_id else self.id(index)
with self.new_api.read_lock:
with self.new_api.safe_read_lock:
authors = self.new_api._field_ids_for('authors', book_id)
adata = self.new_api._author_data(authors)
return [(aid, adata[aid]['name'], adata[aid]['sort'], adata[aid]['link']) for aid in authors]
@ -379,7 +379,7 @@ def set_link_field_for_author(self, aid, link, commit=True, notify=False):
self.notify('metadata', list(changed_books))
def book_on_device(self, book_id):
with self.new_api.read_lock:
with self.new_api.safe_read_lock:
return self.new_api.fields['ondevice'].book_on_device(book_id)
def book_on_device_string(self, book_id):
@ -393,7 +393,7 @@ def book_on_device_func(self):
return self.new_api.fields['ondevice'].book_on_device_func
def books_in_series(self, series_id):
with self.new_api.read_lock:
with self.new_api.safe_read_lock:
book_ids = self.new_api._books_for_field('series', series_id)
ff = self.new_api._field_for
return sorted(book_ids, key=lambda x:ff('series_index', x))

View file

@ -213,16 +213,15 @@ def __init__(self, shlock, is_shared=True):
self._shlock = shlock
self._is_shared = is_shared
def __enter__(self):
def acquire(self):
self._shlock.acquire(shared=self._is_shared)
return self
def __exit__(self, *args):
self.release()
def release(self):
def release(self, *args):
self._shlock.release()
__enter__ = acquire
__exit__ = release
def owns_lock(self):
return self._shlock.owns_lock()
@ -231,11 +230,11 @@ class DebugRWLockWrapper(RWLockWrapper):
def __init__(self, *args, **kwargs):
RWLockWrapper.__init__(self, *args, **kwargs)
def __enter__(self):
def acquire(self):
print ('#' * 120, file=sys.stderr)
print ('acquire called: thread id:', current_thread(), 'shared:', self._is_shared, file=sys.stderr)
traceback.print_stack()
RWLockWrapper.__enter__(self)
RWLockWrapper.acquire(self)
print ('acquire done: thread id:', current_thread(), file=sys.stderr)
print ('_' * 120, file=sys.stderr)
@ -247,4 +246,28 @@ def release(self):
print ('release done: thread id:', current_thread(), 'is_shared:', self._shlock.is_shared, 'is_exclusive:', self._shlock.is_exclusive, file=sys.stderr)
print ('_' * 120, file=sys.stderr)
__enter__ = acquire
__exit__ = release
class SafeReadLock(object):
def __init__(self, read_lock):
self.read_lock = read_lock
self.acquired = False
def acquire(self):
try:
self.read_lock.acquire()
except DowngradeLockError:
pass
else:
self.acquired = True
return self
def release(self, *args):
if self.acquired:
self.read_lock.release()
self.acquired = False
__enter__ = acquire
__exit__ = release

View file

@ -205,7 +205,7 @@ def _get(self, field, idx, index_is_id=True, default_value=None, fmt=lambda x:x)
def get_series_sort(self, idx, index_is_id=True, default_value=''):
book_id = idx if index_is_id else self.index_to_id(idx)
with self.cache.read_lock:
with self.cache.safe_read_lock:
lang_map = self.cache.fields['languages'].book_value_map
lang = lang_map.get(book_id, None) or None
if lang:
@ -223,7 +223,7 @@ def get_marked(self, idx, index_is_id=True, default_value=None):
def get_author_data(self, idx, index_is_id=True, default_value=None):
id_ = idx if index_is_id else self.index_to_id(idx)
with self.cache.read_lock:
with self.cache.safe_read_lock:
ids = self.cache._field_ids_for('authors', id_)
adata = self.cache._author_data(ids)
ans = [':::'.join((adata[aid]['name'], adata[aid]['sort'], adata[aid]['link'])) for aid in ids if aid in adata]