diff --git a/beets/dbcore.py b/beets/dbcore.py index 4ff63560f..2189c0c88 100644 --- a/beets/dbcore.py +++ b/beets/dbcore.py @@ -1,5 +1,9 @@ import time import os +from collections import defaultdict +import threading +import sqlite3 +import contextlib import beets from beets.util.functemplate import Template @@ -373,3 +377,238 @@ class Model(object): if isinstance(template, basestring): template = Template(template) return template.substitute(mapping, funcs) + + + +# Database controllers and supporting interfaces. + + +class Results(object): + """An item query result set. Iterating over the collection lazily + constructs LibModel objects that reflect database rows. + """ + def __init__(self, model_class, rows, lib, query=None): + """Create a result set that will construct objects of type + `model_class`, which should be a subclass of `LibModel`, out of + the query result mapping in `rows`. The new objects are + associated with the library `lib`. If `query` is provided, it is + used as a predicate to filter the results for a "slow query" that + cannot be evaluated by the database directly. + """ + self.model_class = model_class + self.rows = rows + self.lib = lib + self.query = query + + def __iter__(self): + """Construct Python objects for all rows that pass the query + predicate. + """ + for row in self.rows: + # Get the flexible attributes for the object. + with self.lib.transaction() as tx: + flex_rows = tx.query( + 'SELECT * FROM {0} WHERE entity_id=?'.format( + self.model_class._flex_table + ), + (row['id'],) + ) + values = dict(row) + values.update( + dict((row['key'], row['value']) for row in flex_rows) + ) + + # Construct the Python object and yield it if it passes the + # predicate. + obj = self.model_class(self.lib, **values) + if not self.query or self.query.match(obj): + yield obj + + def __len__(self): + """Get the number of matching objects. + """ + if self.query: + # A slow query. Fall back to testing every object. + count = 0 + for obj in self: + count += 1 + return count + + else: + # A fast query. Just count the rows. + return len(self.rows) + + def __nonzero__(self): + """Does this result contain any objects? + """ + return bool(len(self)) + + def __getitem__(self, n): + """Get the nth item in this result set. This is inefficient: all + items up to n are materialized and thrown away. + """ + it = iter(self) + try: + for i in range(n): + it.next() + return it.next() + except StopIteration: + raise IndexError('result index {0} out of range'.format(n)) + + def get(self): + """Return the first matching object, or None if no objects + match. + """ + it = iter(self) + try: + return it.next() + except StopIteration: + return None + + +class Transaction(object): + """A context manager for safe, concurrent access to the database. + All SQL commands should be executed through a transaction. + """ + def __init__(self, lib): + self.lib = lib + + def __enter__(self): + """Begin a transaction. This transaction may be created while + another is active in a different thread. + """ + with self.lib._tx_stack() as stack: + first = not stack + stack.append(self) + if first: + # Beginning a "root" transaction, which corresponds to an + # SQLite transaction. + self.lib._db_lock.acquire() + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Complete a transaction. This must be the most recently + entered but not yet exited transaction. If it is the last active + transaction, the database updates are committed. + """ + with self.lib._tx_stack() as stack: + assert stack.pop() is self + empty = not stack + if empty: + # Ending a "root" transaction. End the SQLite transaction. + self.lib._connection().commit() + self.lib._db_lock.release() + + def query(self, statement, subvals=()): + """Execute an SQL statement with substitution values and return + a list of rows from the database. + """ + cursor = self.lib._connection().execute(statement, subvals) + return cursor.fetchall() + + def mutate(self, statement, subvals=()): + """Execute an SQL statement with substitution values and return + the row ID of the last affected row. + """ + cursor = self.lib._connection().execute(statement, subvals) + plugins.send('database_change', lib=self.lib) + return cursor.lastrowid + + def script(self, statements): + """Execute a string containing multiple SQL statements.""" + self.lib._connection().executescript(statements) + + +class Database(object): + """A container for Model objects that wraps an SQLite database as + the backend. + """ + def __init__(self, path): + self.path = path + + self._connections = {} + self._tx_stacks = defaultdict(list) + + # A lock to protect the _connections and _tx_stacks maps, which + # both map thread IDs to private resources. + self._shared_map_lock = threading.Lock() + + # A lock to protect access to the database itself. SQLite does + # allow multiple threads to access the database at the same + # time, but many users were experiencing crashes related to this + # capability: where SQLite was compiled without HAVE_USLEEP, its + # backoff algorithm in the case of contention was causing + # whole-second sleeps (!) that would trigger its internal + # timeout. Using this lock ensures only one SQLite transaction + # is active at a time. + self._db_lock = threading.Lock() + + + # Primitive access control: connections and transactions. + + def _connection(self): + """Get a SQLite connection object to the underlying database. + One connection object is created per thread. + """ + thread_id = threading.current_thread().ident + with self._shared_map_lock: + if thread_id in self._connections: + return self._connections[thread_id] + else: + # Make a new connection. + conn = sqlite3.connect( + self.path, + timeout=beets.config['timeout'].as_number(), + ) + + # Access SELECT results like dictionaries. + conn.row_factory = sqlite3.Row + + self._connections[thread_id] = conn + return conn + + @contextlib.contextmanager + def _tx_stack(self): + """A context manager providing access to the current thread's + transaction stack. The context manager synchronizes access to + the stack map. Transactions should never migrate across threads. + """ + thread_id = threading.current_thread().ident + with self._shared_map_lock: + yield self._tx_stacks[thread_id] + + def transaction(self): + """Get a :class:`Transaction` object for interacting directly + with the underlying SQLite database. + """ + return Transaction(self) + + + # Querying. + + def _fetch(self, model_cls, query, order_by=None): + """Fetch the objects of type `model_cls` matching the given + query. The query may be given as a string, string sequence, a + Query object, or None (to fetch everything). If provided, + `order_by` is a SQLite ORDER BY clause for sorting. + """ + query = get_query(query, model_cls) + where, subvals = query.clause() + + sql = "SELECT * FROM {0} WHERE {1}".format( + model_cls._table, + where or '1', + ) + if order_by: + sql += " ORDER BY {0}".format(order_by) + with self.transaction() as tx: + rows = tx.query(sql, subvals) + + return Results(model_cls, rows, self, None if where else query) + + def _get(self, model_cls, id): + """Get a LibModel object by its id or None if the id does not + exist. + """ + return self._fetch(model_cls, MatchQuery('id', id)).get() + diff --git a/beets/library.py b/beets/library.py index cf462d9ca..ab2c8d383 100644 --- a/beets/library.py +++ b/beets/library.py @@ -14,18 +14,14 @@ """The core data store and collection logic for beets. """ -import sqlite3 import os import re import sys import logging import shlex import unicodedata -import threading -import contextlib import traceback import time -from collections import defaultdict from unidecode import unidecode from beets.mediafile import MediaFile from beets import plugins @@ -1154,143 +1150,7 @@ def get_query(val, model_cls): # The Library: interface to the database. -class Results(object): - """An item query result set. Iterating over the collection lazily - constructs LibModel objects that reflect database rows. - """ - def __init__(self, model_class, rows, lib, query=None): - """Create a result set that will construct objects of type - `model_class`, which should be a subclass of `LibModel`, out of - the query result mapping in `rows`. The new objects are - associated with the library `lib`. If `query` is provided, it is - used as a predicate to filter the results for a "slow query" that - cannot be evaluated by the database directly. - """ - self.model_class = model_class - self.rows = rows - self.lib = lib - self.query = query - - def __iter__(self): - """Construct Python objects for all rows that pass the query - predicate. - """ - for row in self.rows: - # Get the flexible attributes for the object. - with self.lib.transaction() as tx: - flex_rows = tx.query( - 'SELECT * FROM {0} WHERE entity_id=?'.format( - self.model_class._flex_table - ), - (row['id'],) - ) - values = dict(row) - values.update( - dict((row['key'], row['value']) for row in flex_rows) - ) - - # Construct the Python object and yield it if it passes the - # predicate. - obj = self.model_class(self.lib, **values) - if not self.query or self.query.match(obj): - yield obj - - def __len__(self): - """Get the number of matching objects. - """ - if self.query: - # A slow query. Fall back to testing every object. - count = 0 - for obj in self: - count += 1 - return count - - else: - # A fast query. Just count the rows. - return len(self.rows) - - def __nonzero__(self): - """Does this result contain any objects? - """ - return bool(len(self)) - - def __getitem__(self, n): - """Get the nth item in this result set. This is inefficient: all - items up to n are materialized and thrown away. - """ - it = iter(self) - try: - for i in range(n): - it.next() - return it.next() - except StopIteration: - raise IndexError('result index {0} out of range'.format(n)) - - def get(self): - """Return the first matching object, or None if no objects - match. - """ - it = iter(self) - try: - return it.next() - except StopIteration: - return None - - -class Transaction(object): - """A context manager for safe, concurrent access to the database. - All SQL commands should be executed through a transaction. - """ - def __init__(self, lib): - self.lib = lib - - def __enter__(self): - """Begin a transaction. This transaction may be created while - another is active in a different thread. - """ - with self.lib._tx_stack() as stack: - first = not stack - stack.append(self) - if first: - # Beginning a "root" transaction, which corresponds to an - # SQLite transaction. - self.lib._db_lock.acquire() - return self - - def __exit__(self, exc_type, exc_value, traceback): - """Complete a transaction. This must be the most recently - entered but not yet exited transaction. If it is the last active - transaction, the database updates are committed. - """ - with self.lib._tx_stack() as stack: - assert stack.pop() is self - empty = not stack - if empty: - # Ending a "root" transaction. End the SQLite transaction. - self.lib._connection().commit() - self.lib._db_lock.release() - - def query(self, statement, subvals=()): - """Execute an SQL statement with substitution values and return - a list of rows from the database. - """ - cursor = self.lib._connection().execute(statement, subvals) - return cursor.fetchall() - - def mutate(self, statement, subvals=()): - """Execute an SQL statement with substitution values and return - the row ID of the last affected row. - """ - cursor = self.lib._connection().execute(statement, subvals) - plugins.send('database_change', lib=self.lib) - return cursor.lastrowid - - def script(self, statements): - """Execute a string containing multiple SQL statements.""" - self.lib._connection().executescript(statements) - - -class Library(object): +class Library(dbcore.Database): """A database of music containing songs and albums. """ def __init__(self, path='library.blb', @@ -1304,27 +1164,14 @@ class Library(object): self.path = path else: self.path = bytestring_path(normpath(path)) + super(Library, self).__init__(self.path) + self.directory = bytestring_path(normpath(directory)) self.path_formats = path_formats self.replacements = replacements self._memotable = {} # Used for template substitution performance. - self._connections = {} - self._tx_stacks = defaultdict(list) - # A lock to protect the _connections and _tx_stacks maps, which - # both map thread IDs to private resources. - self._shared_map_lock = threading.Lock() - # A lock to protect access to the database itself. SQLite does - # allow multiple threads to access the database at the same - # time, but many users were experiencing crashes related to this - # capability: where SQLite was compiled without HAVE_USLEEP, its - # backoff algorithm in the case of contention was causing - # whole-second sleeps (!) that would trigger its internal - # timeout. Using this lock ensures only one SQLite transaction - # is active at a time. - self._db_lock = threading.Lock() - # Set up database schema. self._make_table(Item._table, item_fields) self._make_table(Album._table, album_fields) @@ -1398,43 +1245,6 @@ class Library(object): ON {0} (entity_id); """.format(flex_table)) - def _connection(self): - """Get a SQLite connection object to the underlying database. - One connection object is created per thread. - """ - thread_id = threading.current_thread().ident - with self._shared_map_lock: - if thread_id in self._connections: - return self._connections[thread_id] - else: - # Make a new connection. - conn = sqlite3.connect( - self.path, - timeout=beets.config['timeout'].as_number(), - ) - - # Access SELECT results like dictionaries. - conn.row_factory = sqlite3.Row - - self._connections[thread_id] = conn - return conn - - @contextlib.contextmanager - def _tx_stack(self): - """A context manager providing access to the current thread's - transaction stack. The context manager synchronizes access to - the stack map. Transactions should never migrate across threads. - """ - thread_id = threading.current_thread().ident - with self._shared_map_lock: - yield self._tx_stacks[thread_id] - - def transaction(self): - """Get a :class:`Transaction` object for interacting directly - with the underlying SQLite database. - """ - return Transaction(self) - # Adding objects to the database. @@ -1471,26 +1281,6 @@ class Library(object): # Querying. - def _fetch(self, model_cls, query, order_by=None): - """Fetch the objects of type `model_cls` matching the given - query. The query may be given as a string, string sequence, a - Query object, or None (to fetch everything). If provided, - `order_by` is a SQLite ORDER BY clause for sorting. - """ - query = get_query(query, model_cls) - where, subvals = query.clause() - - sql = "SELECT * FROM {0} WHERE {1}".format( - model_cls._table, - where or '1', - ) - if order_by: - sql += " ORDER BY {0}".format(order_by) - with self.transaction() as tx: - rows = tx.query(sql, subvals) - - return Results(model_cls, rows, self, None if where else query) - def albums(self, query=None): """Get a sorted list of :class:`Album` objects matching the given query. @@ -1512,12 +1302,6 @@ class Library(object): # Convenience accessors. - def _get(self, model_cls, id): - """Get a LibModel object by its id or None if the id does not - exist. - """ - return self._fetch(model_cls, MatchQuery('id', id)).get() - def get_item(self, id): """Fetch an :class:`Item` by its ID. Returns `None` if no match is found.