new Database class in dbcore

This moves most of the abstract Library functionality. Much more to come. Some
stuff is temporarily broken, to be followed up on in the next few commits.
This commit is contained in:
Adrian Sampson 2014-01-13 15:02:16 -08:00
parent a9ef11c311
commit c5b92919c6
2 changed files with 242 additions and 219 deletions

View file

@ -1,5 +1,9 @@
import time
import os
from collections import defaultdict
import threading
import sqlite3
import contextlib
import beets
from beets.util.functemplate import Template
@ -373,3 +377,238 @@ class Model(object):
if isinstance(template, basestring):
template = Template(template)
return template.substitute(mapping, funcs)
# Database controllers and supporting interfaces.
class Results(object):
"""An item query result set. Iterating over the collection lazily
constructs LibModel objects that reflect database rows.
"""
def __init__(self, model_class, rows, lib, query=None):
"""Create a result set that will construct objects of type
`model_class`, which should be a subclass of `LibModel`, out of
the query result mapping in `rows`. The new objects are
associated with the library `lib`. If `query` is provided, it is
used as a predicate to filter the results for a "slow query" that
cannot be evaluated by the database directly.
"""
self.model_class = model_class
self.rows = rows
self.lib = lib
self.query = query
def __iter__(self):
"""Construct Python objects for all rows that pass the query
predicate.
"""
for row in self.rows:
# Get the flexible attributes for the object.
with self.lib.transaction() as tx:
flex_rows = tx.query(
'SELECT * FROM {0} WHERE entity_id=?'.format(
self.model_class._flex_table
),
(row['id'],)
)
values = dict(row)
values.update(
dict((row['key'], row['value']) for row in flex_rows)
)
# Construct the Python object and yield it if it passes the
# predicate.
obj = self.model_class(self.lib, **values)
if not self.query or self.query.match(obj):
yield obj
def __len__(self):
"""Get the number of matching objects.
"""
if self.query:
# A slow query. Fall back to testing every object.
count = 0
for obj in self:
count += 1
return count
else:
# A fast query. Just count the rows.
return len(self.rows)
def __nonzero__(self):
"""Does this result contain any objects?
"""
return bool(len(self))
def __getitem__(self, n):
"""Get the nth item in this result set. This is inefficient: all
items up to n are materialized and thrown away.
"""
it = iter(self)
try:
for i in range(n):
it.next()
return it.next()
except StopIteration:
raise IndexError('result index {0} out of range'.format(n))
def get(self):
"""Return the first matching object, or None if no objects
match.
"""
it = iter(self)
try:
return it.next()
except StopIteration:
return None
class Transaction(object):
"""A context manager for safe, concurrent access to the database.
All SQL commands should be executed through a transaction.
"""
def __init__(self, lib):
self.lib = lib
def __enter__(self):
"""Begin a transaction. This transaction may be created while
another is active in a different thread.
"""
with self.lib._tx_stack() as stack:
first = not stack
stack.append(self)
if first:
# Beginning a "root" transaction, which corresponds to an
# SQLite transaction.
self.lib._db_lock.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Complete a transaction. This must be the most recently
entered but not yet exited transaction. If it is the last active
transaction, the database updates are committed.
"""
with self.lib._tx_stack() as stack:
assert stack.pop() is self
empty = not stack
if empty:
# Ending a "root" transaction. End the SQLite transaction.
self.lib._connection().commit()
self.lib._db_lock.release()
def query(self, statement, subvals=()):
"""Execute an SQL statement with substitution values and return
a list of rows from the database.
"""
cursor = self.lib._connection().execute(statement, subvals)
return cursor.fetchall()
def mutate(self, statement, subvals=()):
"""Execute an SQL statement with substitution values and return
the row ID of the last affected row.
"""
cursor = self.lib._connection().execute(statement, subvals)
plugins.send('database_change', lib=self.lib)
return cursor.lastrowid
def script(self, statements):
"""Execute a string containing multiple SQL statements."""
self.lib._connection().executescript(statements)
class Database(object):
"""A container for Model objects that wraps an SQLite database as
the backend.
"""
def __init__(self, path):
self.path = path
self._connections = {}
self._tx_stacks = defaultdict(list)
# A lock to protect the _connections and _tx_stacks maps, which
# both map thread IDs to private resources.
self._shared_map_lock = threading.Lock()
# A lock to protect access to the database itself. SQLite does
# allow multiple threads to access the database at the same
# time, but many users were experiencing crashes related to this
# capability: where SQLite was compiled without HAVE_USLEEP, its
# backoff algorithm in the case of contention was causing
# whole-second sleeps (!) that would trigger its internal
# timeout. Using this lock ensures only one SQLite transaction
# is active at a time.
self._db_lock = threading.Lock()
# Primitive access control: connections and transactions.
def _connection(self):
"""Get a SQLite connection object to the underlying database.
One connection object is created per thread.
"""
thread_id = threading.current_thread().ident
with self._shared_map_lock:
if thread_id in self._connections:
return self._connections[thread_id]
else:
# Make a new connection.
conn = sqlite3.connect(
self.path,
timeout=beets.config['timeout'].as_number(),
)
# Access SELECT results like dictionaries.
conn.row_factory = sqlite3.Row
self._connections[thread_id] = conn
return conn
@contextlib.contextmanager
def _tx_stack(self):
"""A context manager providing access to the current thread's
transaction stack. The context manager synchronizes access to
the stack map. Transactions should never migrate across threads.
"""
thread_id = threading.current_thread().ident
with self._shared_map_lock:
yield self._tx_stacks[thread_id]
def transaction(self):
"""Get a :class:`Transaction` object for interacting directly
with the underlying SQLite database.
"""
return Transaction(self)
# Querying.
def _fetch(self, model_cls, query, order_by=None):
"""Fetch the objects of type `model_cls` matching the given
query. The query may be given as a string, string sequence, a
Query object, or None (to fetch everything). If provided,
`order_by` is a SQLite ORDER BY clause for sorting.
"""
query = get_query(query, model_cls)
where, subvals = query.clause()
sql = "SELECT * FROM {0} WHERE {1}".format(
model_cls._table,
where or '1',
)
if order_by:
sql += " ORDER BY {0}".format(order_by)
with self.transaction() as tx:
rows = tx.query(sql, subvals)
return Results(model_cls, rows, self, None if where else query)
def _get(self, model_cls, id):
"""Get a LibModel object by its id or None if the id does not
exist.
"""
return self._fetch(model_cls, MatchQuery('id', id)).get()

View file

@ -14,18 +14,14 @@
"""The core data store and collection logic for beets.
"""
import sqlite3
import os
import re
import sys
import logging
import shlex
import unicodedata
import threading
import contextlib
import traceback
import time
from collections import defaultdict
from unidecode import unidecode
from beets.mediafile import MediaFile
from beets import plugins
@ -1154,143 +1150,7 @@ def get_query(val, model_cls):
# The Library: interface to the database.
class Results(object):
"""An item query result set. Iterating over the collection lazily
constructs LibModel objects that reflect database rows.
"""
def __init__(self, model_class, rows, lib, query=None):
"""Create a result set that will construct objects of type
`model_class`, which should be a subclass of `LibModel`, out of
the query result mapping in `rows`. The new objects are
associated with the library `lib`. If `query` is provided, it is
used as a predicate to filter the results for a "slow query" that
cannot be evaluated by the database directly.
"""
self.model_class = model_class
self.rows = rows
self.lib = lib
self.query = query
def __iter__(self):
"""Construct Python objects for all rows that pass the query
predicate.
"""
for row in self.rows:
# Get the flexible attributes for the object.
with self.lib.transaction() as tx:
flex_rows = tx.query(
'SELECT * FROM {0} WHERE entity_id=?'.format(
self.model_class._flex_table
),
(row['id'],)
)
values = dict(row)
values.update(
dict((row['key'], row['value']) for row in flex_rows)
)
# Construct the Python object and yield it if it passes the
# predicate.
obj = self.model_class(self.lib, **values)
if not self.query or self.query.match(obj):
yield obj
def __len__(self):
"""Get the number of matching objects.
"""
if self.query:
# A slow query. Fall back to testing every object.
count = 0
for obj in self:
count += 1
return count
else:
# A fast query. Just count the rows.
return len(self.rows)
def __nonzero__(self):
"""Does this result contain any objects?
"""
return bool(len(self))
def __getitem__(self, n):
"""Get the nth item in this result set. This is inefficient: all
items up to n are materialized and thrown away.
"""
it = iter(self)
try:
for i in range(n):
it.next()
return it.next()
except StopIteration:
raise IndexError('result index {0} out of range'.format(n))
def get(self):
"""Return the first matching object, or None if no objects
match.
"""
it = iter(self)
try:
return it.next()
except StopIteration:
return None
class Transaction(object):
"""A context manager for safe, concurrent access to the database.
All SQL commands should be executed through a transaction.
"""
def __init__(self, lib):
self.lib = lib
def __enter__(self):
"""Begin a transaction. This transaction may be created while
another is active in a different thread.
"""
with self.lib._tx_stack() as stack:
first = not stack
stack.append(self)
if first:
# Beginning a "root" transaction, which corresponds to an
# SQLite transaction.
self.lib._db_lock.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Complete a transaction. This must be the most recently
entered but not yet exited transaction. If it is the last active
transaction, the database updates are committed.
"""
with self.lib._tx_stack() as stack:
assert stack.pop() is self
empty = not stack
if empty:
# Ending a "root" transaction. End the SQLite transaction.
self.lib._connection().commit()
self.lib._db_lock.release()
def query(self, statement, subvals=()):
"""Execute an SQL statement with substitution values and return
a list of rows from the database.
"""
cursor = self.lib._connection().execute(statement, subvals)
return cursor.fetchall()
def mutate(self, statement, subvals=()):
"""Execute an SQL statement with substitution values and return
the row ID of the last affected row.
"""
cursor = self.lib._connection().execute(statement, subvals)
plugins.send('database_change', lib=self.lib)
return cursor.lastrowid
def script(self, statements):
"""Execute a string containing multiple SQL statements."""
self.lib._connection().executescript(statements)
class Library(object):
class Library(dbcore.Database):
"""A database of music containing songs and albums.
"""
def __init__(self, path='library.blb',
@ -1304,27 +1164,14 @@ class Library(object):
self.path = path
else:
self.path = bytestring_path(normpath(path))
super(Library, self).__init__(self.path)
self.directory = bytestring_path(normpath(directory))
self.path_formats = path_formats
self.replacements = replacements
self._memotable = {} # Used for template substitution performance.
self._connections = {}
self._tx_stacks = defaultdict(list)
# A lock to protect the _connections and _tx_stacks maps, which
# both map thread IDs to private resources.
self._shared_map_lock = threading.Lock()
# A lock to protect access to the database itself. SQLite does
# allow multiple threads to access the database at the same
# time, but many users were experiencing crashes related to this
# capability: where SQLite was compiled without HAVE_USLEEP, its
# backoff algorithm in the case of contention was causing
# whole-second sleeps (!) that would trigger its internal
# timeout. Using this lock ensures only one SQLite transaction
# is active at a time.
self._db_lock = threading.Lock()
# Set up database schema.
self._make_table(Item._table, item_fields)
self._make_table(Album._table, album_fields)
@ -1398,43 +1245,6 @@ class Library(object):
ON {0} (entity_id);
""".format(flex_table))
def _connection(self):
"""Get a SQLite connection object to the underlying database.
One connection object is created per thread.
"""
thread_id = threading.current_thread().ident
with self._shared_map_lock:
if thread_id in self._connections:
return self._connections[thread_id]
else:
# Make a new connection.
conn = sqlite3.connect(
self.path,
timeout=beets.config['timeout'].as_number(),
)
# Access SELECT results like dictionaries.
conn.row_factory = sqlite3.Row
self._connections[thread_id] = conn
return conn
@contextlib.contextmanager
def _tx_stack(self):
"""A context manager providing access to the current thread's
transaction stack. The context manager synchronizes access to
the stack map. Transactions should never migrate across threads.
"""
thread_id = threading.current_thread().ident
with self._shared_map_lock:
yield self._tx_stacks[thread_id]
def transaction(self):
"""Get a :class:`Transaction` object for interacting directly
with the underlying SQLite database.
"""
return Transaction(self)
# Adding objects to the database.
@ -1471,26 +1281,6 @@ class Library(object):
# Querying.
def _fetch(self, model_cls, query, order_by=None):
"""Fetch the objects of type `model_cls` matching the given
query. The query may be given as a string, string sequence, a
Query object, or None (to fetch everything). If provided,
`order_by` is a SQLite ORDER BY clause for sorting.
"""
query = get_query(query, model_cls)
where, subvals = query.clause()
sql = "SELECT * FROM {0} WHERE {1}".format(
model_cls._table,
where or '1',
)
if order_by:
sql += " ORDER BY {0}".format(order_by)
with self.transaction() as tx:
rows = tx.query(sql, subvals)
return Results(model_cls, rows, self, None if where else query)
def albums(self, query=None):
"""Get a sorted list of :class:`Album` objects matching the
given query.
@ -1512,12 +1302,6 @@ class Library(object):
# Convenience accessors.
def _get(self, model_cls, id):
"""Get a LibModel object by its id or None if the id does not
exist.
"""
return self._fetch(model_cls, MatchQuery('id', id)).get()
def get_item(self, id):
"""Fetch an :class:`Item` by its ID. Returns `None` if no match is
found.