diff --git a/.pylintrc b/.pylintrc index e04b198f..3d549c21 100644 --- a/.pylintrc +++ b/.pylintrc @@ -114,6 +114,9 @@ max-locals=20 # undefined-all-variable crash. unsafe-load-any-extension = yes +property-classes=zope.cachedescriptors.property.Lazy,zope.cachedescriptors.property.Cached,relstorage._util.Lazy + + # Local Variables: # mode: conf-space # End: diff --git a/.travis.yml b/.travis.yml index 5214d46f..6c1a869d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -47,13 +47,13 @@ env: - CCACHE_NOHASHDIR=true - CFLAGS="-g -pipe -std=gnu++11" - CXXFLAGS="-g -pipe -std=gnu++11" - - RS_TEST_CMD="-m zope.testrunner --test-path=src --auto-color --auto-progress" + - RS_TEST_CMD="-m zope.testrunner --test-path=src --auto-color -vvv --slow-test=3" script: # coverage slows PyPy down from 2minutes to 12+. - if [[ $TRAVIS_PYTHON_VERSION == 3.7 ]]; then pylint -j 0 --limit-inference-results=1 --rcfile=.pylintrc relstorage -f parseable -r n; fi - - if [[ $TRAVIS_PYTHON_VERSION != pypy* ]]; then coverage run -p --concurrency=greenlet .travis/zope_testrunner_gevent.py -t checkBTreesLengthStress -t check7 -t check2 -t BlobCache -t Switches --layer gevent; fi - - if [[ $TRAVIS_PYTHON_VERSION == pypy* ]]; then python $RS_TEST_CMD --layer "!gevent"; fi + - if [[ $TRAVIS_PYTHON_VERSION != pypy* ]]; then coverage run -p --concurrency=greenlet .travis/zope_testrunner_gevent.py -t checkBTreesLengthStress -t check7 -t check2 -t BlobCache -t Switches --layer gevent -vvv; fi + - if [[ $TRAVIS_PYTHON_VERSION == pypy* ]]; then PYTHONTRACEMALLOC=15 python $RS_TEST_CMD --layer "!gevent"; fi - if [[ $TRAVIS_PYTHON_VERSION != pypy* ]]; then coverage run -p --concurrency=thread $RS_TEST_CMD --layer "!gevent"; fi # Make sure we can import without zope.schema, which is intended to # be a test dependency, and optional for production @@ -87,6 +87,7 @@ cache: directories: - $HOME/.wheelhouse - $HOME/.ccache + - $HOME/.cache before_cache: - rm -f $HOME/.cache/pip/log/debug.log diff --git a/CHANGES.rst b/CHANGES.rst index daa29243..176c8fc6 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,7 +2,7 @@ Changes ========= -3.2.2 (unreleased) +3.3.0 (unreleased) ================== - The "MySQLdb" driver didn't properly use server-side cursors when @@ -14,6 +14,16 @@ history-preserving and history-free storages work with ``zodbupdate``. See :issue:`389`. +- RelStorage instances now pool their storage connection. Depending on + the workload and ZODB configuration, this can result in requiring + fewer storage connections. See :issue:`409` and :pr:`417`. + + There is a potential semantic change: Under some circumstances, the + ``loadBefore`` and ``loadSerial`` methods could be used to load + states from the future (not visible to the storage's load + connection) by using the store connection. This ability has been + removed. + 3.2.1 (2020-08-28) ================== diff --git a/src/relstorage/_util.py b/src/relstorage/_util.py index 6f15f3b4..bc153eda 100644 --- a/src/relstorage/_util.py +++ b/src/relstorage/_util.py @@ -77,6 +77,25 @@ positive_integer = RangeCheckedConversion(integer, min=1) +def _setting_from_environ(converter, environ_name, default): + result = default + env_val = os.environ.get(environ_name, default) + if env_val is not default: + try: + result = converter(env_val) + except (ValueError, TypeError): + logger.exception("Failed to parse environment value %r for key %r", + env_val, environ_name) + result = default + + logger.debug('Using value %s from environ %r=%r (default=%r)', + result, environ_name, env_val, default) + return result + + +def get_positive_integer_from_environ(environ_name, default): + return _setting_from_environ(positive_integer, environ_name, default) + def parse_boolean(val): if val == '0': return False @@ -126,14 +145,7 @@ def __exit__(self, t, v, tb): self.duration = self.__end - self.__begin def get_time_from_environ(environ_name, default): - env_val = os.environ.get(environ_name, default) - try: - result = float(env_val) - except (ValueError, TypeError): - result = default - logger.debug('Using %s from environ %r=%r', result, environ_name, env_val) - return result - + return _setting_from_environ(float, environ_name, default) def _get_log_time_level(level_int, default): level_name = logging.getLevelName(level_int) @@ -161,14 +173,16 @@ def _get_log_time_level(level_int, default): # The 'log_details_threshold' property of the function can be # assigned to make it different than the default. _LOG_TIMED_DEFAULT_DETAILS_THRESHOLD = logging.getLevelName( - os.environ.get('RS_PERF_LOG_DETAILS_LEVEL', 'WARN') + _setting_from_environ(str, 'RS_PERF_LOG_DETAILS_LEVEL', 'WARN') ) # If this is true when a module is imported, timer decorations # are omitted. -_LOG_TIMED_COMPILETIME_ENABLE = parse_boolean( - os.environ.get('RS_PERF_LOG_ENABLE', 'on') +_LOG_TIMED_COMPILETIME_ENABLE = _setting_from_environ( + parse_boolean, + 'RS_PERF_LOG_ENABLE', + 'on' ) def do_log_duration_info(basic_msg, func, @@ -374,8 +388,14 @@ def __get__(self, inst, class_): func, name = self.data value = func(inst) inst.__dict__[name] = value + self._stored_value_for_name_in_inst(value, name, inst) return value + @staticmethod + def _stored_value_for_name_in_inst(value, name, inst): + """ + Hook for subclasses. + """ class CachedIn(object): """Cached method with given cache attribute.""" diff --git a/src/relstorage/adapters/adapter.py b/src/relstorage/adapters/adapter.py index 45bf53d8..1380ce8b 100644 --- a/src/relstorage/adapters/adapter.py +++ b/src/relstorage/adapters/adapter.py @@ -64,15 +64,19 @@ def __init__(self, options=None): self.driver = driver = self._select_driver() self._binary = driver.Binary + # If it was already set, that means it shared with other + # instances, so no need to register the openings. + connmanager_was_set = self.connmanager is not None self._create() if not driver.supports_64bit_unsigned_id: self.packundo.MAX_TID = MAX_S_TID self.MAX_TID = MAX_S_TID self.dbiter.MAX_TID = MAX_S_TID - self.connmanager.add_on_store_opened(self.mover.on_store_opened) - self.connmanager.add_on_load_opened(self.mover.on_load_opened) - self.connmanager.add_on_store_opened(self.locker.on_store_opened) + if not connmanager_was_set: + self.connmanager.add_on_store_opened(self.mover.on_store_opened) + self.connmanager.add_on_load_opened(self.mover.on_load_opened) + self.connmanager.add_on_store_opened(self.locker.on_store_opened) def _create(self): raise NotImplementedError @@ -128,7 +132,7 @@ def lock_database_and_choose_next_tid(self, cursor, @metricmethod_sampled def lock_database_and_move(self, store_connection, load_connection, - blobhelper, + transaction_has_blobs, ude, commit=True, committing_tid_int=None, @@ -155,9 +159,8 @@ def lock_database_and_move(self, # TODO: Figure out how to do as much as possible of this before holding # the commit lock. For example, use a dummy TID that we later replace. # (This has FK issues in HP dbs). - txn_has_blobs = blobhelper.txn_has_blobs - self.mover.move_from_temp(cursor, committing_tid_int, txn_has_blobs) + self.mover.move_from_temp(cursor, committing_tid_int, transaction_has_blobs) after_selecting_tid(committing_tid_int) diff --git a/src/relstorage/adapters/connections.py b/src/relstorage/adapters/connections.py index 9d4266fd..a83d57a4 100644 --- a/src/relstorage/adapters/connections.py +++ b/src/relstorage/adapters/connections.py @@ -27,6 +27,7 @@ from .._compat import wraps from .._util import Lazy +from .._util import get_positive_integer_from_environ from . import interfaces logger = __import__('logging').getLogger(__name__) @@ -153,12 +154,21 @@ def _open_connection(self): def __noop(*args): "does nothing" + def _restart_connection(self): + "Restart just the connection when we have no cursor." + def restart(self): """ Restart the connection if there is any chance that it has any associated state. """ if not self: assert not self.active, self.__dict__ + if self.connection is not None: # But we have no cursor We + # do this so that if the connection has closed itself + # (or been closed by a unit test) we can detect that + # and restart automatically. We only actually do + # anything there for store connections. + self._restart_connection() return # If we've never accessed the cursor, we shouldn't have any @@ -213,7 +223,7 @@ def call(self, f, can_reconnect, *args, **kwargs): The function may be called up to twice, if the *fresh_connection_p* is false on the first call and a disconnected exception is raised. :keyword bool can_reconnect: If True, then we will attempt to reconnect - the connection and try again if an exception is raised if *f*. If False, + the connection and try again if a disconnected exception is raised in *f*. If False, we let that exception propagate. For example, if a transaction is in progress, set this to false. """ @@ -269,6 +279,7 @@ def __repr__(self): self._cursor ) + @implementer(interfaces.IManagedLoadConnection) class LoadConnection(AbstractManagedConnection): @@ -290,10 +301,14 @@ class StoreConnection(AbstractManagedConnection): def begin(self): self.connmanager.begin(*self.open_if_needed()) + def _restart_connection(self): + self.rollback_quietly() + class PrePackConnection(StoreConnection): __slots__ = () _NEW_CONNECTION_NAME = 'open_for_pre_pack' + @implementer(interfaces.IManagedDBConnection) class ClosedConnection(object): """ @@ -319,3 +334,154 @@ def isolated_connection(self, *args, **kwargs): restart_and_call = isolated_connection enter_critical_phase_until_transaction_end = isolated_connection + + +class StoreConnectionPool(object): + """ + A thread-safe pool of `StoreConnection` objects. + + Connections are opened on demand; opening a connection on demand + does not block. + + By default, it will keep around a `StoreConnection` for every instance + of a RelStorage that has ever used one, which ordinarily corresponds to the + ZODB Connection pool size. It can be made to keep a smaller (but not larger) + number around by setting ``MAX_STORE_CONNECTIONS_IN_POOL``. + """ + + MAX_STORE_CONNECTIONS_IN_POOL = get_positive_integer_from_environ( + 'RS_MAX_POOLED_STORE_CONNECTIONS', + None + ) + + def __init__(self, connmanager): + import threading + self._lock = threading.Lock() + self._connmanager = connmanager + self._connections = [] + self._count = 1 + self._factory = StoreConnection + + # MVCC protocol + def new_instance(self): + with self._lock: + self._count += 1 + return self + + def release(self): + with self._lock: + self._count -= 1 + self._shrink() + + def close(self): + with self._lock: + self._count = 0 + self._factory = ClosedConnection + self._shrink() + self._connections = () + + @contextlib.contextmanager + def borrowing(self, commit=False): + rollback = True + try: + conn = self.borrow() + yield conn + if commit: + conn.commit() + rollback = False + finally: + self._replace(conn, rollback) + + def borrow(self): + conn = None + with self._lock: + if self._connections: + conn = self._connections.pop() + + if conn is None: + conn = self._factory(self._connmanager) + else: + conn.restart() + + conn.begin() + return conn + + def replace(self, connection): + self._replace(connection, True) + + def _replace(self, connection, needs_rollback): + if needs_rollback: + clean_rollback = connection.rollback_quietly() + else: + clean_rollback = True + if not clean_rollback: + connection.drop() + else: + connection.exit_critical_phase() + with self._lock: + self._connections.append(connection) + self._shrink() + + def _shrink(self): + # Call while holding the lock, after putting a connection + # back in the pool. + # Limits the number of pooled connections to be no more than + # ``instance_count`` (i.e., one per open RelStorage), or ``MAX_STORE_CONNECTIONS_IN_POOL``, + # if set and if less than instance_count + keep_connections = min(self._count, self.MAX_STORE_CONNECTIONS_IN_POOL or self._count) + + while len(self._connections) > keep_connections and self._connections: + conn = self._connections.pop() + conn.drop() # TODO: This could potentially be slow? Might + # want to do this outside the lock. + conn.connmanager = None # It can't be opened again. + + def drop_all(self): + with self._lock: + while self._connections: + conn = self._connections.pop() + conn.drop() + + def hard_close_all_connections(self): + # Testing only. + for conn in self._connections: + conn.connection.close() + + @property + def pooled_connection_count(self): + return len(self._connections) + + @property + def instance_count(self): + return self._count + +class ClosedConnectionPool(object): + + def borrow(self): + return ClosedConnection() + + def replace(self, connection): + "Does Nothing" + + def new_instance(self): + "Does nothing" + + release = new_instance + close = release + drop_all = release + + pooled_connection_count = instance_count = 0 + + +class SingleConnectionPool(object): + __slots__ = ('connection',) + + def __init__(self, connection): + self.connection = connection + + @contextlib.contextmanager + def borrowing(self, commit=False): # pylint:disable=unused-argument + """ + The *commit* parameter is ignored + """ + yield self.connection diff --git a/src/relstorage/adapters/interfaces.py b/src/relstorage/adapters/interfaces.py index 75a54993..2c1d80cb 100644 --- a/src/relstorage/adapters/interfaces.py +++ b/src/relstorage/adapters/interfaces.py @@ -1278,7 +1278,7 @@ def lock_database_and_choose_next_tid(cursor, def lock_database_and_move( store_connection, load_connection, - blobhelper, + transaction_has_blobs, ude, commit=True, committing_tid_int=None, diff --git a/src/relstorage/adapters/mysql/adapter.py b/src/relstorage/adapters/mysql/adapter.py index e2136105..504ecf58 100644 --- a/src/relstorage/adapters/mysql/adapter.py +++ b/src/relstorage/adapters/mysql/adapter.py @@ -66,6 +66,7 @@ from ..interfaces import IRelStorageAdapter from ..interfaces import UnableToLockRowsToReadCurrentError from ..interfaces import UnableToLockRowsToModifyError +from ..interfaces import UnableToAcquireCommitLockError from ..poller import Poller from ..scriptrunner import ScriptRunner from ..batch import RowBatcher @@ -93,10 +94,18 @@ class MySQLAdapter(AbstractAdapter): driver_options = drivers - def __init__(self, options=None, oidallocator=None, version_detector=None, **params): + def __init__(self, options=None, oidallocator=None, + version_detector=None, + connmanager=None, + locker=None, + mover=None, + **params): self._params = params self.oidallocator = oidallocator self.version_detector = version_detector + self.connmanager = connmanager + self.locker = locker + self.mover = mover super(MySQLAdapter, self).__init__(options) def _create(self): @@ -108,19 +117,20 @@ def _create(self): if self.version_detector is None: self.version_detector = MySQLVersionDetector() - self.connmanager = MySQLdbConnectionManager( - driver, - params=params, - options=options, - ) + if self.connmanager is None: + self.connmanager = MySQLdbConnectionManager( + driver, + params=params, + options=options, + ) + if self.locker is None: + self.locker = MySQLLocker( + options=options, + driver=driver, + batcher_factory=RowBatcher, + version_detector=self.version_detector, + ) self.runner = ScriptRunner() - self.locker = MySQLLocker( - options=options, - driver=driver, - batcher_factory=RowBatcher, - version_detector=self.version_detector, - ) - self.schema = MySQLSchemaInstaller( driver=driver, connmanager=self.connmanager, @@ -128,10 +138,11 @@ def _create(self): keep_history=self.keep_history, version_detector=self.version_detector, ) - self.mover = MySQLObjectMover( - driver, - options=options, - ) + if self.mover is None: + self.mover = MySQLObjectMover( + driver, + options=options, + ) if self.oidallocator is None: self.oidallocator = MySQLOIDAllocator(driver) @@ -187,6 +198,9 @@ def new_instance(self): options=self.options, oidallocator=self.oidallocator.new_instance(), version_detector=self.version_detector, + connmanager=self.connmanager, + locker=self.locker, + mover=self.mover, **self._params ) @@ -230,7 +244,7 @@ def lock_database_and_choose_next_tid(self, @metricmethod_sampled def lock_database_and_move(self, store_connection, load_connection, - blobhelper, + transaction_has_blobs, ude, commit=True, committing_tid_int=None, @@ -240,7 +254,7 @@ def lock_database_and_move(self, # MySQL past 5.7.12. return super(MySQLAdapter, self).lock_database_and_move( store_connection, load_connection, - blobhelper, + transaction_has_blobs, ude, commit=commit, committing_tid_int=committing_tid_int, @@ -254,12 +268,19 @@ def lock_database_and_move(self, # (p_committing_tid, p_commit, p_user, p_desc, p_ext) proc = 'lock_and_choose_tid_and_move(%s, %s, %s, %s, %s)' - multi_results = self.driver.callproc_multi_result( - store_connection.cursor, - proc, - params, - exit_critical_phase=commit - ) + try: + multi_results = self.driver.callproc_multi_result( + store_connection.cursor, + proc, + params, + exit_critical_phase=commit + ) + except self.driver.lock_exceptions: + self.locker.reraise_commit_lock_error( + store_connection.cursor, + proc, + UnableToAcquireCommitLockError, + ) tid_int, = multi_results[0][0] after_selecting_tid(tid_int) diff --git a/src/relstorage/adapters/mysql/drivers/_mysqldb_gevent.py b/src/relstorage/adapters/mysql/drivers/_mysqldb_gevent.py index bfb87c8a..d37d3ff4 100644 --- a/src/relstorage/adapters/mysql/drivers/_mysqldb_gevent.py +++ b/src/relstorage/adapters/mysql/drivers/_mysqldb_gevent.py @@ -119,10 +119,10 @@ def query(self, query): # argument that could be used to do this, but it was removed. # So we implement it ourself. self.gevent_wait_write() - self.send_query(query) + self.send_query(query) # pylint:disable=no-member self.gevent_wait_read() - self.read_query_result() + self.read_query_result() # pylint:disable=no-member # The default implementations of 'rollback' and 'commit' use only # C API functions `mysql_rollback` and `mysql_commit`; it doesn't @@ -168,13 +168,13 @@ def _critical_query(self, query): def _critical_rollback(self): try: - self._direct_rollback() + self._direct_rollback() # pylint:disable=no-member finally: self.__exit_critical_phase() def _critical_commit(self): try: - self._direct_commit() + self._direct_commit() # pylint:disable=no-member finally: self.__exit_critical_phase() diff --git a/src/relstorage/adapters/mysql/locker.py b/src/relstorage/adapters/mysql/locker.py index 1c3f139d..c65ce4ff 100644 --- a/src/relstorage/adapters/mysql/locker.py +++ b/src/relstorage/adapters/mysql/locker.py @@ -169,6 +169,9 @@ def on_store_opened(self, cursor, restart=False): if restart: return + # Setting state in an `on_store_opened` is not good. With pooling, it may never + # get executed. We work around this in our adapter by being sure to only use one + # connmanager and one locker instance, shared among all the RelStorage instances. if self.supports_row_lock_nowait is None: self.supports_row_lock_nowait = self.version_detector.supports_nowait(cursor) diff --git a/src/relstorage/adapters/oracle/adapter.py b/src/relstorage/adapters/oracle/adapter.py index 81b9e974..d1c60d93 100644 --- a/src/relstorage/adapters/oracle/adapter.py +++ b/src/relstorage/adapters/oracle/adapter.py @@ -52,7 +52,11 @@ def __init__(self, password='relstoragetest', dsn='192.168.1.131/orcl', commit_lock_id=0, - twophase=False, options=None): + twophase=False, options=None, + locker=None, + mover=None, + connmanager=None, + ): """Create an Oracle adapter. The user, password, and dsn parameters are provided to @@ -69,6 +73,9 @@ def __init__(self, self._password = password self._dsn = dsn self._twophase = twophase + self.locker = locker + self.mover = mover + self.connmanager = connmanager super(OracleAdapter, self).__init__(options) @@ -83,43 +90,46 @@ def _create(self): batcher_factory = lambda cursor, row_limit=None: OracleRowBatcher( cursor, inputsizes, row_limit ) - self.connmanager = CXOracleConnectionManager( - driver, - user=user, - password=password, - dsn=dsn, - twophase=twophase, - options=options, - ) + if self.connmanager is None: + self.connmanager = CXOracleConnectionManager( + driver, + user=user, + password=password, + dsn=dsn, + twophase=twophase, + options=options, + ) self.runner = CXOracleScriptRunner(driver) - self.locker = OracleLocker( - options=self.options, - driver=driver, - inputsize_NUMBER=driver.NUMBER, - batcher_factory=batcher_factory, - ) + if self.locker is None: + self.locker = OracleLocker( + options=self.options, + driver=driver, + inputsize_NUMBER=driver.NUMBER, + batcher_factory=batcher_factory, + ) self.schema = OracleSchemaInstaller( connmanager=self.connmanager, runner=self.runner, keep_history=self.keep_history, ) - inputsizes = { - 'blobdata': driver.BLOB, - 'rawdata': driver.BINARY, - 'oid': driver.NUMBER, - 'tid': driver.NUMBER, - 'prev_tid': driver.NUMBER, - 'chunk_num': driver.NUMBER, - 'md5sum': driver.STRING, - } - self.mover = OracleObjectMover( - driver, - options=options, - runner=self.runner, - batcher_factory=batcher_factory, - ) - self.mover.inputsizes = inputsizes - self.connmanager.add_on_store_opened(self.mover.on_store_opened) + if self.mover is None: + inputsizes = { + 'blobdata': driver.BLOB, + 'rawdata': driver.BINARY, + 'oid': driver.NUMBER, + 'tid': driver.NUMBER, + 'prev_tid': driver.NUMBER, + 'chunk_num': driver.NUMBER, + 'md5sum': driver.STRING, + } + self.mover = OracleObjectMover( + driver, + options=options, + runner=self.runner, + batcher_factory=batcher_factory, + ) + self.mover.inputsizes = inputsizes + self.oidallocator = OracleOIDAllocator( connmanager=self.connmanager, ) @@ -182,6 +192,9 @@ def new_instance(self): dsn=self._dsn, twophase=self._twophase, options=self.options, + locker=self.locker, + mover=self.mover, + connmanager=self.connmanager, ) def __str__(self): diff --git a/src/relstorage/adapters/postgresql/adapter.py b/src/relstorage/adapters/postgresql/adapter.py index b9ce96ba..7cb95be8 100644 --- a/src/relstorage/adapters/postgresql/adapter.py +++ b/src/relstorage/adapters/postgresql/adapter.py @@ -63,10 +63,17 @@ class PostgreSQLAdapter(AbstractAdapter): driver_options = drivers - def __init__(self, dsn='', options=None, oidallocator=None): + def __init__(self, dsn='', options=None, oidallocator=None, + locker=None, + mover=None, + connmanager=None + ): # options is a relstorage.options.Options or None self._dsn = dsn self.oidallocator = oidallocator + self.locker = locker + self.mover = mover + self.connmanager = connmanager super(PostgreSQLAdapter, self).__init__(options) def _create(self): @@ -75,17 +82,19 @@ def _create(self): dsn = self._dsn self.version_detector = PostgreSQLVersionDetector() - self.connmanager = Psycopg2ConnectionManager( - driver, - dsn=dsn, - options=options, - ) + if self.connmanager is None: + self.connmanager = Psycopg2ConnectionManager( + driver, + dsn=dsn, + options=options, + ) self.runner = ScriptRunner() - self.locker = PostgreSQLLocker( - options, - driver, - PostgreSQLRowBatcher, - ) + if self.locker is None: + self.locker = PostgreSQLLocker( + options, + driver, + PostgreSQLRowBatcher, + ) self.schema = PostgreSQLSchemaInstaller( options=options, connmanager=self.connmanager, @@ -93,13 +102,14 @@ def _create(self): locker=self.locker, ) - self.mover = PostgreSQLObjectMover( - driver, - options=options, - runner=self.runner, - version_detector=self.version_detector, - batcher_factory=PostgreSQLRowBatcher, - ) + if self.mover is None: + self.mover = PostgreSQLObjectMover( + driver, + options=options, + runner=self.runner, + version_detector=self.version_detector, + batcher_factory=PostgreSQLRowBatcher, + ) if self.oidallocator is None: self.oidallocator = PostgreSQLOIDAllocator() @@ -155,7 +165,10 @@ def new_instance(self): inst = type(self)( dsn=self._dsn, options=self.options, - oidallocator=self.oidallocator.new_instance() + oidallocator=self.oidallocator.new_instance(), + locker=self.locker, + mover=self.mover, + connmanager=self.connmanager, ) return inst @@ -200,7 +213,7 @@ def lock_database_and_choose_next_tid(self, def lock_database_and_move(self, store_connection, load_connection, - blobhelper, # pylint:disable=unused-argument + transaction_has_blobs, # pylint:disable=unused-argument ude, commit=True, committing_tid_int=None, diff --git a/src/relstorage/adapters/postgresql/mover.py b/src/relstorage/adapters/postgresql/mover.py index fe2b89c6..6293d26d 100644 --- a/src/relstorage/adapters/postgresql/mover.py +++ b/src/relstorage/adapters/postgresql/mover.py @@ -39,6 +39,15 @@ class PostgreSQLRowBatcherStoreTemps(RowBatcherStoreTemps): @implementer(IObjectMover) class PostgreSQLObjectMover(AbstractObjectMover): + def __init__(self, *args, **kwargs): + super(PostgreSQLObjectMover, self).__init__(*args, **kwargs) + if not self.driver.supports_copy: + batcher = PostgreSQLRowBatcherStoreTemps(self.keep_history, + self.driver.Binary, + self.make_batcher) + self.replace_temps = batcher.replace_temps + self.store_temps = batcher.store_temps + @metricmethod_sampled def on_store_opened(self, cursor, restart=False): """Create the temporary tables for storing objects""" @@ -87,13 +96,6 @@ def on_store_opened(self, cursor, restart=False): cursor.execute(stmt) cursor.connection.commit() - if not self.driver.supports_copy: - batcher = PostgreSQLRowBatcherStoreTemps(self.keep_history, - self.driver.Binary, - self.make_batcher) - self.replace_temps = batcher.replace_temps - self.store_temps = batcher.store_temps - AbstractObjectMover.on_store_opened(self, cursor, restart) @metricmethod_sampled diff --git a/src/relstorage/adapters/postgresql/tests/test_mover.py b/src/relstorage/adapters/postgresql/tests/test_mover.py index 5a154ae5..e3e02ec5 100644 --- a/src/relstorage/adapters/postgresql/tests/test_mover.py +++ b/src/relstorage/adapters/postgresql/tests/test_mover.py @@ -24,6 +24,8 @@ from .. import mover +class PGMockDriver(MockDriver): + supports_copy = False @unittest.skip("Needs moved to test__sql") class TestFunctions(TestCase): @@ -86,7 +88,7 @@ def _getClass(self): return mover.PostgreSQLObjectMover def _makeOne(self, **options): - return self._getClass()(MockDriver(), + return self._getClass()(PGMockDriver(), MockOptions.from_args(**options)) _expected_move_from_temp_hf_insert_query = 'EXECUTE move_from_temp(%s)' diff --git a/src/relstorage/adapters/sqlite/adapter.py b/src/relstorage/adapters/sqlite/adapter.py index af6457ef..49e5198a 100644 --- a/src/relstorage/adapters/sqlite/adapter.py +++ b/src/relstorage/adapters/sqlite/adapter.py @@ -52,31 +52,38 @@ class Sqlite3Adapter(AbstractAdapter): def __init__(self, data_dir, pragmas, options=None, oidallocator=None, - gevent_yield_interval=None): + locker=None, + mover=None, + connmanager=None): self.data_dir = os.path.abspath(data_dir) self.pragmas = pragmas self.oidallocator = oidallocator - self.gevent_yield_interval = gevent_yield_interval + self.locker = locker + self.mover = mover + self.connmanager = connmanager super(Sqlite3Adapter, self).__init__(options) def _create(self): driver = self.driver options = self.options - self.connmanager = Sqlite3ConnectionManager( - driver, - path=os.path.join(self.data_dir, 'main.sqlite3'), - pragmas=self.pragmas, - options=options - ) + if self.connmanager is None: + self.connmanager = Sqlite3ConnectionManager( + driver, + path=os.path.join(self.data_dir, 'main.sqlite3'), + pragmas=self.pragmas, + options=options + ) - self.mover = Sqlite3ObjectMover( - driver, - options=options, - ) - self.locker = Sqlite3Locker( - options, - driver, - batcher_factory=Sqlite3RowBatcher) + if self.mover is None: + self.mover = Sqlite3ObjectMover( + driver, + options=options, + ) + if self.locker is None: + self.locker = Sqlite3Locker( + options, + driver, + batcher_factory=Sqlite3RowBatcher) if not self.oidallocator: self.oidallocator = Sqlite3OIDAllocator( @@ -146,5 +153,9 @@ def new_instance(self): self.data_dir, self.pragmas, options=self.options, - oidallocator=self.oidallocator.new_instance()) + oidallocator=self.oidallocator.new_instance(), + locker=self.locker, + mover=self.mover, + connmanager=self.connmanager, + ) return inst diff --git a/src/relstorage/blobhelper/cached.py b/src/relstorage/blobhelper/cached.py index c012aa80..35b0c07b 100644 --- a/src/relstorage/blobhelper/cached.py +++ b/src/relstorage/blobhelper/cached.py @@ -381,6 +381,10 @@ def storeBlob(self, cursor, store_func, self.upload_blob(cursor, oid, None, temp_path) def restoreBlob(self, cursor, oid, serial, blobfilename): + # This bypasses ``txn_has_blobs``. That shouldn't cause any problems + # because ``serial`` should not be none here, so we are avoiding the + # temp tables. The use of ``txn_has_blobs`` in tpc_vote() is to check if we need + # to move temporary blobs into place. self.upload_blob(cursor, oid, serial, blobfilename) def copy_undone(self, copied, tid): diff --git a/src/relstorage/cache/mvcc.py b/src/relstorage/cache/mvcc.py index 885793c9..8d672464 100644 --- a/src/relstorage/cache/mvcc.py +++ b/src/relstorage/cache/mvcc.py @@ -658,7 +658,7 @@ def _poll(self, cache, conn, cursor, if polled_tid == 0 or polled_tid < polling_since: assert change_iter is None - # Freshly zapped or empty database (tid==0) or stale and + # Freshly zapped or empty database (tid==0) or stale replica and # asked to revert (as opposed to raising # ReadConflictError). Mark not just this one, but all # other extent indexes as needing a full rebuild. @@ -668,7 +668,8 @@ def _poll(self, cache, conn, cursor, return None # Ok cool, we got data to move us forward. - # We must be careful to always consume the iterator, even if we exit early. + # We must be careful to always consume the iterator, even if we exit early + # (because it could be a server-side cursor holding connection state). # So we do that now. change_iter = list(change_iter) self.log( @@ -697,7 +698,8 @@ def _poll(self, cache, conn, cursor, # it's older than our poll, we take control; # but the history of the index must always move forward, # so we build it starting from what's currently installed. - # There could be some overlap. + # There could be some overlap. Since we moved the index forward, + # we can vacuum. change_index = self.object_index = installed_index.with_polled_changes( polled_tid, polling_since, diff --git a/src/relstorage/config.py b/src/relstorage/config.py index 03411cc3..67be4fce 100644 --- a/src/relstorage/config.py +++ b/src/relstorage/config.py @@ -106,6 +106,5 @@ def create(self, options): pragmas = {} return Sqlite3Adapter( self.config.data_dir, - gevent_yield_interval=self.config.gevent_yield_interval, pragmas=pragmas, options=options) diff --git a/src/relstorage/storage/__init__.py b/src/relstorage/storage/__init__.py index 25ef8ff3..e2cf124b 100644 --- a/src/relstorage/storage/__init__.py +++ b/src/relstorage/storage/__init__.py @@ -47,8 +47,10 @@ from ..options import Options from ..interfaces import IRelStorage from ..adapters.connections import LoadConnection -from ..adapters.connections import StoreConnection +from ..adapters.connections import StoreConnectionPool from ..adapters.connections import ClosedConnection +from ..adapters.connections import ClosedConnectionPool +from ..adapters.connections import SingleConnectionPool from .._compat import clear_frames from .._compat import metricmethod from .._util import int64_to_8bytes @@ -98,6 +100,7 @@ def close(self): stats = lambda s: {'closed': True} afterCompletion = lambda s, c: None + @implementer(IRelStorage) class RelStorage(LegacyMethodsMixin, ConflictResolution.ConflictResolvingStorage): @@ -112,8 +115,8 @@ class RelStorage(LegacyMethodsMixin, _options = None _is_read_only = False _read_only_error = ReadOnlyError - # _ltid is the ID of the last transaction committed by this instance. - _ltid = z64 + # ZODB TID of the last transaction committed by this instance. + _last_tid_i_committed_bytes = z64 # _closed is True after self.close() is called. _closed = False @@ -136,19 +139,13 @@ class RelStorage(LegacyMethodsMixin, _instances = () _load_connection = ClosedConnection() - _store_connection = ClosedConnection() - _tpc_begin_factory = None + _store_connection_pool = ClosedConnectionPool() _oids = ReadOnlyOIDs() - # Attributes in our dictionary that shouldn't have stale()/no_longer_stale() - # called on them. At this writing, it's just the type object. - _STALE_IGNORED_ATTRS = ( - '_tpc_begin_factory', - ) - def __init__(self, adapter, name=None, create=None, options=None, cache=None, blobhelper=None, + store_connection_pool=None, **kwoptions): # pylint:disable=too-many-branches, too-many-statements if options and kwoptions: @@ -197,8 +194,10 @@ def __init__(self, adapter, name=None, create=None, self._load_connection = LoadConnection(self._adapter.connmanager) self._load_connection.on_first_use = self.__on_load_first_use self.__queued_changes = OID_SET_TYPE() - if not self._is_read_only: - self._store_connection = StoreConnection(self._adapter.connmanager) + if store_connection_pool is not None: + self._store_connection_pool = store_connection_pool + elif not self._is_read_only: + self._store_connection_pool = StoreConnectionPool(self._adapter.connmanager) if cache is not None: self._cache = cache @@ -218,14 +217,14 @@ def __init__(self, adapter, name=None, create=None, else: self.blobhelper = BlobHelper(options=options, adapter=adapter) - self._tpc_begin_factory = HistoryPreserving if self._options.keep_history else HistoryFree + tpc_begin_factory = HistoryPreserving if self._options.keep_history else HistoryFree if hasattr(self._adapter.packundo, 'deleteObject'): interface.alsoProvides(self, ZODB.interfaces.IExternalGC) - self._tpc_phase = NotInTransaction.from_storage(self) + self._tpc_phase = NotInTransaction(tpc_begin_factory, self._is_read_only) if not self._is_read_only: - self._oids = OIDs(self._adapter.oidallocator, self._store_connection) + self._oids = OIDs(self._adapter.oidallocator) # Now copy in a bunch of methods from our component objects. # Many of these are 'stale_aware', meaning that we can ask @@ -238,7 +237,7 @@ def __init__(self, adapter, name=None, create=None, # itself when it is created, from the storage it is wrapping. # Because of this, stale aware methods like history() do not # do the right thing when we're wrapped by zc.zlibstorage. - loader = Loader(self._adapter, self._load_connection, self._store_connection, self._cache) + loader = Loader(self._adapter, self._load_connection, self._cache) copy_storage_methods(self, loader) storer = Storer() copy_storage_methods(self, storer) @@ -257,7 +256,7 @@ def __init__(self, adapter, name=None, create=None, loader = BlobLoader(self._load_connection, self.blobhelper) copy_storage_methods(self, loader) - storer = BlobStorer(self.blobhelper, self._store_connection) + storer = BlobStorer() copy_storage_methods(self, storer) @property @@ -287,7 +286,8 @@ def new_instance(self, before=None): blobhelper = self.blobhelper.new_instance(adapter=adapter) other = type(self)(adapter=adapter, name=self.__name__, create=False, options=options, cache=cache, - blobhelper=blobhelper) + blobhelper=blobhelper, + store_connection_pool=self._store_connection_pool.new_instance()) if before: other._read_only_error = ReadOnlyHistoryError other.tpc_begin = make_cannot_write(other, other.tpc_begin) @@ -333,7 +333,7 @@ def before_instance(self, before): @property def highest_visible_tid(self): cache_tid = self._cache.highest_visible_tid or 0 - committed_tid = bytes8_to_int64(self._ltid) + committed_tid = bytes8_to_int64(self._last_tid_i_committed_bytes) # In case we haven't polled yet. return max(cache_tid, committed_tid) @@ -342,9 +342,9 @@ def zap_all(self, **kwargs): Used by the test suite and the ZODBConvert script. """ - self._adapter.schema.zap_all(**kwargs) self._load_connection.drop() - self._store_connection.drop() + self._store_connection_pool.drop_all() + self._adapter.schema.zap_all(**kwargs) self._cache.zap_all() def release(self): @@ -359,14 +359,15 @@ def release(self): on other instances of the same base object). """ self._load_connection.drop() - self._store_connection.drop() + self._store_connection_pool.release() self._cache.release() self._cache = _ClosedCache() - self._tpc_phase = None + self._tpc_phase.close() + self._tpc_phase = _ClosedCache() self._oids = None self._load_connection = ClosedConnection() - self._store_connection = ClosedConnection() + self._store_connection_pool = ClosedConnectionPool() self._adapter.release() if not self._instances: self._closed = True @@ -378,9 +379,9 @@ def close(self): self._closed = True self._load_connection.drop() - self._store_connection.drop() + self._store_connection_pool.close() self._load_connection = ClosedConnection() - self._store_connection = ClosedConnection() + self._store_connection_pool = ClosedConnectionPool() self.blobhelper.close() for wref in self._instances: @@ -391,8 +392,8 @@ def close(self): logger.debug("Closing storage cache with stats %s", self._cache.stats()) self._cache.close() self._cache = _ClosedCache() - - self._tpc_phase = None + self._tpc_phase.close() + self._tpc_phase = _ClosedCache() self._oids = None self._adapter.close() @@ -457,13 +458,12 @@ def checkCurrentSerialInTransaction(self, oid, serial, transaction): @metricmethod def tpc_begin(self, transaction, tid=None, status=' '): try: - self._tpc_phase = self._tpc_phase.tpc_begin(transaction, self._tpc_begin_factory) + self._tpc_phase = self._tpc_phase.tpc_begin(self, transaction) except: # Could be a database (connection) error, could be a programming # bug. Either way, we're fine to roll everything back and hope # for the best on a retry. Perhaps we need to raise a TransientError? self._load_connection.drop() - self._store_connection.drop() raise if tid is not None: @@ -471,10 +471,12 @@ def tpc_begin(self, transaction, tid=None, status=' '): # The allowed actions are carefully prescribed. # This argument is specified by IStorageRestoreable try: - self._tpc_phase = Restore(self._tpc_phase, tid, status) + next_phase = Restore(self._tpc_phase, tid, status) except: self.tpc_abort(transaction, _force=True) raise + else: + self._tpc_phase = next_phase @metricmethod def tpc_vote(self, transaction): @@ -483,18 +485,18 @@ def tpc_vote(self, transaction): # the object has changed during the commit process, due to # conflict resolution or undo. try: - next_phase = self._tpc_phase.tpc_vote(transaction, self) + next_phase = self._tpc_phase.tpc_vote(self, transaction) except: self.tpc_abort(transaction, _force=True) raise else: self._tpc_phase = next_phase - return self._tpc_phase.invalidated_oids + return next_phase.invalidated_oids @metricmethod def tpc_finish(self, transaction, f=None): try: - next_phase, committed_tid = self._tpc_phase.tpc_finish(transaction, f) + next_phase = self._tpc_phase.tpc_finish(self, transaction, f) except: # OH NO! This isn't supposed to happen! # It's unlikely tpc_abort will get called... @@ -503,7 +505,12 @@ def tpc_finish(self, transaction, f=None): # The store connection is either committed or rolledback; # the load connection is now rolledback. self._tpc_phase = next_phase - self._ltid = committed_tid + # It might be nice to de-dup this so we're not storing it both + # in the phase and in self, but if it was needed during TPC, + # when our phase is not ``ITPCStateNotInTransaction``, we couldn't + # get it. + committed_tid = int64_to_8bytes(next_phase.last_committed_tid_int) + self._last_tid_i_committed_bytes = committed_tid return committed_tid @metricmethod @@ -516,24 +523,34 @@ def tpc_abort(self, transaction, _force=False): if _force: # We're here under unexpected circumstances. It's possible something # might go wrong rolling back. - self._tpc_phase = NotInTransaction.from_storage(self) + self._tpc_phase = self._tpc_phase.initial_state raise def lastTransaction(self): - if self._ltid == z64 and self._cache.highest_visible_tid is None: + if self._last_tid_i_committed_bytes == z64 and self._cache.highest_visible_tid is None: # We haven't committed *or* polled for transactions, # so our MVCC state is "floating". # Read directly from the database to get the latest value, return int64_to_8bytes(self._adapter.txncontrol.get_tid(self._load_connection.cursor)) - return max(self._ltid, int64_to_8bytes(self._cache.highest_visible_tid or 0)) + return max(self._last_tid_i_committed_bytes, + int64_to_8bytes(self._cache.highest_visible_tid or 0)) def lastTransactionInt(self): return bytes8_to_int64(self.lastTransaction()) def new_oid(self): + # This is called from ``Connection.add`` which can be called at any time + # by the application, so we don't know what state we're in. It is also called from + # ``Connection._commit``, which is called during TPC. # If we're committing, we can't restart the connection. - return self._oids.new_oid(bool(self._tpc_phase)) + pool = self._store_connection_pool + commit_in_progress = False + if self._tpc_phase: + commit_in_progress = True + pool = SingleConnectionPool(self._tpc_phase.shared_state.store_connection) + + return self._oids.new_oid(pool, commit_in_progress) def iterator(self, start=None, stop=None): # XXX: This is broken for purposes of copyTransactionsFrom() because @@ -729,8 +746,6 @@ def __stale(self, stale_error): replacements = {} my_ns = vars(self) for k, v in my_ns.items(): - if k in self._STALE_IGNORED_ATTRS: - continue if callable(getattr(v, 'stale', None)): new_v = v.stale(stale_error) replacements[k] = new_v @@ -759,8 +774,7 @@ def __no_longer_stale(self, _conn, _cursor): replacements = { k: v.no_longer_stale() for k, v in my_ns.items() - if k not in self._STALE_IGNORED_ATTRS - and callable(getattr(v, 'no_longer_stale', None)) + if callable(getattr(v, 'no_longer_stale', None)) } my_ns.update(replacements) @@ -786,8 +800,8 @@ def __on_load_first_use(self, conn, cursor): # by this connection, we don't want to ghost objects that we're sure # are up-to-date unless someone else has changed them. # Note that transactions can happen between us committing and polling. - if self._ltid is not None: - ignore_tid = bytes8_to_int64(self._ltid) + if self._last_tid_i_committed_bytes is not None: + ignore_tid = bytes8_to_int64(self._last_tid_i_committed_bytes) else: ignore_tid = None @@ -853,7 +867,7 @@ def __on_load_first_use(self, conn, cursor): # Hmm, ok, the Connection isn't polling us in a timely fashion. # Maybe we're the root storage? Maybe our APIs are being used # independently? At any rate, we're going to stop tracking now; - # if a connection eventually gets around to polling us, they'll + # if a Connection eventually gets around to polling us, they'll # need to clear their whole cache self.__queued_changes = None diff --git a/src/relstorage/storage/interfaces.py b/src/relstorage/storage/interfaces.py index cfa913a8..3b440e41 100644 --- a/src/relstorage/storage/interfaces.py +++ b/src/relstorage/storage/interfaces.py @@ -25,6 +25,7 @@ from __future__ import print_function from zope.interface import Interface +from zope.interface import Attribute from transaction.interfaces import TransientError from ZODB.POSException import StorageTransactionError @@ -67,3 +68,130 @@ class VoteReadConflictError(ReadConflictError): A read conflict (from Connection.readCurrent()) that wasn't detected until the storage voted. """ + +class ITPCState(Interface): + """ + An object representing the current state (phase) of the two-phase commit protocol, + and how to transition between it and other phases. + + The initial state is always :class:`ITPCStateNotInTransaction` + """ + + transaction = Attribute("The *transaction* object from ZODB.") + + initial_state = Attribute("The ITPCStateNotInTransaction that started the process.") + + def tpc_abort(transaction, force=False): + """ + Clear any used resources, and return the object + representing :class:`ITPCStateNotInTransaction`. + + :param transaction: The transaction object from ZODB. + If this is not the current transaction, does nothing + unless *force* is true. + :keyword bool force: Whether to forcibly abort the transaction. + If this is true, then it doesn't matter if the *transaction* parameter + matches or not. Also, underlying RDBMS connections should also be closed + and discarded. + :return: The previous `ITPCStateNotInTransaction`. + """ + + +class ITPCStateNotInTransaction(ITPCState): + """ + The "idle" state. + + In this state, no store connection is available, + and the *transaction* is always `None`. + + Because ZODB tests this, this method has to define + a bunch of methods that are also defined by various other states. + These methods should raise ``StorageTransactionError``, or + ``ReadOnlyError``, as appropriate. + + Implementations of this interface should be false in a boolean context + to easily permit testing whether a TPC phase is active or not. + """ + + last_committed_tid_int = Attribute( + """ + The TID of the last transaction committed to get us to this state. + + Initially, this is 0. In the value returned from :meth:`ITPCPhaseVoting.tpc_finish`, + it is the TID just committed. + """ + ) + + def tpc_begin(storage, transaction): + """ + Enter the two-phase commit protocol. + + :return: An implementation of :class:`ITPCStateBegan`. + """ + + def tpc_finish(*args, **kwargs): + """ + Raises ``StorageTranasctionError`` + """ + + def tpc_vote(*args, **kwargs): + """ + As for `tpc_finish`. + """ + + def store(*args, **kwargs): + """ + Raises ``ReadOnlyError`` or ``StorageTranasctionError`` + """ + + restore = deleteObject = undo = restoreBlob = store + + +class ITPCStateDatabaseAvailable(ITPCState, IStaleAware): + """ + A state where the writable database connection is available. + """ + + store_connection = Attribute("The IManagedStoreConnection in use.") + + +class ITPCStateBegan(ITPCStateDatabaseAvailable): + """ + The first phase where the database is available for storage. + + Storing objects, restoring objects, storing blobs, deleting objects, + all happen in this phase. + """ + + def tpc_vote(storage, transaction): + """ + Enter the voting state. + + :return: An implementation of :class:`ITPCStateVoting` + """ + +class ITPCStateBeganHP(ITPCStateBegan): + """ + The extra methods that are available for history-preserving + storages. + """ + +class ITPCStateVoting(ITPCStateDatabaseAvailable): + """ + The phase where voting happens. This follows the beginning phase. + """ + + invalidated_oids = Attribute( + """An iterable of OID bytes, returned from the storage's ``tpc_vote`` method. + + The Connection will ghost all cached objects in this iterable. This includes + things things during conflict resolution or undo. + """ + ) + + def tpc_finish(storage, transaction, f=None): + """ + Finish the transaction. + + :return: The next implementation of :class:`ITPCPhaseNotInTransaction` + """ diff --git a/src/relstorage/storage/load.py b/src/relstorage/storage/load.py index f38fa68b..4cd4b638 100644 --- a/src/relstorage/storage/load.py +++ b/src/relstorage/storage/load.py @@ -69,15 +69,13 @@ class Loader(object): __slots__ = ( 'adapter', 'load_connection', - 'store_connection', 'cache', '__dict__', ) - def __init__(self, adapter, load_connection, store_connection, cache): + def __init__(self, adapter, load_connection, cache): self.adapter = adapter self.load_connection = load_connection - self.store_connection = store_connection self.cache = cache def __load_using_method(self, load_cursor, meth, argument): @@ -92,7 +90,6 @@ def __pke(self, oid_bytes, **extra): defs = { 'cache': self.cache, 'load': self.load_connection, - 'store': self.store_connection, 'adapter': self.adapter, } defs.update(extra) @@ -190,13 +187,16 @@ def loadSerial(self, oid, serial): if state: return state - state = self.adapter.mover.load_revision( - self.store_connection.cursor, - oid_int, - tid_int) - - if state: - return state + # Actually using the store_connection to pull into the future was + # removed as part of the pooling of store_connection. The above comments + # indicate that we really shouldn't need to get here, and no tests break + # with this commented out. What's a legitimate need for pulling into the future? + # state = self.adapter.mover.load_revision( + # self.store_connection.cursor, + # oid_int, + # tid_int) + # if state: + # return state raise self.__pke(oid, tid_int=tid_int, state=state) @@ -225,20 +225,20 @@ def loadBefore(self, oid, tid): # TODO: This makes three separate queries, and also bypasses the cache. # We should be able to fix at least the multiple queries. - if self.store_connection: - # Allow loading data from later transactions - # for conflict resolution. - - # XXX: This doesn't seem to be used in conflict - # resolution. ZODB.ConflictResolution.tryToResolveConflict - # calls loadSerial(); About the only call in ZODB to - # loadBefore() is from BlobStorage.undo() (which - # RelStorage does not extend). Mixing and matching calls - # between connections using different isolation levels - # isn't great. Can we stop doing this? - cursor = self.store_connection.cursor - else: - cursor = self.load_connection.cursor + + # In the past, we would use the store connection (only if it was already open) + # to "allow leading dato from later transactions for conflict resolution". + # However, this doesn't seem to be used in conflict + # resolution. ZODB.ConflictResolution.tryToResolveConflict + # calls loadSerial(); About the only call in ZODB to + # loadBefore() is from BlobStorage.undo() (which + # RelStorage does not extend). Mixing and matching calls + # between connections using different isolation levels + # isn't great. + # + # We had it as a todo for a long time to stop doing that, and + # pooling store connections was a great time to try it. + cursor = self.load_connection.cursor if not self.adapter.mover.exists(cursor, oid_int): raise self.__pke(oid, exists=False) diff --git a/src/relstorage/storage/oid.py b/src/relstorage/storage/oid.py index 06fb5c12..b879b214 100644 --- a/src/relstorage/storage/oid.py +++ b/src/relstorage/storage/oid.py @@ -41,26 +41,25 @@ def stale(self, ex): def no_longer_stale(self): return self - def new_oid(self, commit_in_progress): + def new_oid(self, store_connection_pool, commit_in_progress): raise NotImplementedError - def set_min_oid(self, max_observed_oid): + def set_min_oid(self, store_connection, max_observed_oid): raise NotImplementedError class OIDs(AbstractOIDs): - def __init__(self, oidallocator, store_connection): + def __init__(self, oidallocator): # From largest to smallest: [16, 15, 14, ..., 1] self.preallocated_oids = [] # type: list # The maximum OID we've handed out (or that has been observed) # A value of 0 is not legal for the oidallocator to produce. self.max_allocated_oid = 0 # type: int self.oidallocator = oidallocator - self.store_connection = store_connection # type: StoreConnection if hasattr(oidallocator, 'new_oids_no_cursor'): self.__preallocate_oids = self.__preallocate_oids_no_cursor - def set_min_oid(self, max_observed_oid): + def set_min_oid(self, store_connection, max_observed_oid): """ Ensure that the next oid we produce is greater than *max_observed_oid*. @@ -74,7 +73,7 @@ def set_min_oid(self, max_observed_oid): # transactions from an external storage. # Set it in the database for everyone. - self.oidallocator.set_min_oid(self.store_connection.cursor, + self.oidallocator.set_min_oid(store_connection.cursor, max_observed_oid) # Then, set it in the storage for this thread # so we don't have to keep doing this if it only ever @@ -90,7 +89,7 @@ def set_min_oid(self, max_observed_oid): while preallocated_oids and preallocated_oids[-1] < max_observed_oid: preallocated_oids.pop() - def new_oid(self, commit_in_progress): + def new_oid(self, store_connection_pool, commit_in_progress): # Prior to ZODB 5.1.2, this method was actually called on the # storage object of the DB, not the instance storage object of # a Connection. This meant that this method (and the oid @@ -99,9 +98,9 @@ def new_oid(self, commit_in_progress): # long-running transaction). # The DB.new_oid() method still exists, but shouldn't be used; - # if it is, we'll open a database connection and transaction that's - # going to sit there idle, possibly holding row locks. That's bad. - # But we don't take any counter measures. + # In the past, because that storage was never in a transaction, that meant that + # we could open a database transaction and never close it. RelStorage 3.3 and + # store connection pooling fixes this. # Connection.new_oid() can be called at just about any time # thanks to the Connection.add() API, which clients can use @@ -111,7 +110,7 @@ def new_oid(self, commit_in_progress): # Thus we may or may not have a store connection already open; # if we do, we can't restart it or drop it. if not self.preallocated_oids: - self.__preallocate_oids(commit_in_progress) + self.__preallocate_oids(store_connection_pool, commit_in_progress) # OIDs are monotonic, always increasing. It should never # go down or return equal to what we've already seen. self.max_allocated_oid = max(self.preallocated_oids[0], self.max_allocated_oid) @@ -119,13 +118,14 @@ def new_oid(self, commit_in_progress): oid_int = self.preallocated_oids.pop() return int64_to_8bytes(oid_int) - def __preallocate_oids(self, commit_in_progress): # pylint:disable=method-hidden - self.preallocated_oids = self.store_connection.call( - self.__new_oid_callback, - can_reconnect=not commit_in_progress - ) + def __preallocate_oids(self, store_connection_pool, commit_in_progress): # pylint:disable=method-hidden + with store_connection_pool.borrowing(commit=True) as store_connection: + self.preallocated_oids = store_connection.call( + self.__new_oid_callback, + can_reconnect=not commit_in_progress + ) - def __preallocate_oids_no_cursor(self, commit_in_progress):# pylint:disable=unused-argument + def __preallocate_oids_no_cursor(self, _store_connection, commit_in_progress):# pylint:disable=unused-argument self.preallocated_oids = self.oidallocator.new_oids_no_cursor() def __new_oid_callback(self, _store_conn, store_cursor, _fresh_connection): @@ -138,10 +138,10 @@ class ReadOnlyOIDs(AbstractOIDs): __slots__ = ( ) - def new_oid(self, commit_in_progress): + def new_oid(self, store_connection_pool, commit_in_progress): raise ReadOnlyError - def set_min_oid(self, max_observed_oid): + def set_min_oid(self, store_connection, max_observed_oid): raise ReadOnlyError @implementer(IStaleAware) @@ -162,8 +162,8 @@ def no_longer_stale(self): def stale(self, ex): return self - def new_oid(self, commit_in_progress): + def new_oid(self, store_connection_pool, commit_in_progress): raise self.stale_error - def set_min_oid(self, max_observed_oid): + def set_min_oid(self, store_connection, max_observed_oid): raise self.stale_error diff --git a/src/relstorage/storage/store.py b/src/relstorage/storage/store.py index 51ed6f5e..5271dfab 100644 --- a/src/relstorage/storage/store.py +++ b/src/relstorage/storage/store.py @@ -66,14 +66,8 @@ def deleteObject(self, tpc_phase, oid, oldserial, transaction): class BlobStorer(object): __slots__ = ( - 'blobhelper', - 'store_connection', ) - def __init__(self, blobhelper, store_connection): - self.blobhelper = blobhelper - self.store_connection = store_connection - @phase_dependent_aborts_early @writable_storage_method @metricmethod @@ -91,9 +85,10 @@ def storeBlob(self, tpc_phase, oid, serial, data, blobfilename, version, txn): assert not version # We used to flush the batcher here, for some reason. store_func = tpc_phase.store - cursor = self.store_connection.cursor - self.blobhelper.storeBlob(cursor, store_func, - oid, serial, data, blobfilename, version, txn) + cursor = tpc_phase.shared_state.store_connection.cursor + blobhelper = tpc_phase.shared_state.blobhelper + blobhelper.storeBlob(cursor, store_func, + oid, serial, data, blobfilename, version, txn) @phase_dependent_aborts_early @writable_storage_method diff --git a/src/relstorage/storage/tpc/__init__.py b/src/relstorage/storage/tpc/__init__.py index 3fb5f51f..0fddeabb 100644 --- a/src/relstorage/storage/tpc/__init__.py +++ b/src/relstorage/storage/tpc/__init__.py @@ -31,11 +31,22 @@ from transaction._transaction import rm_key from transaction import get as get_thread_local_transaction +from zope.interface import implementer + from ZODB.POSException import ReadOnlyError from ZODB.POSException import StorageTransactionError +from ..interfaces import ITPCStateNotInTransaction +from ..interfaces import ITPCStateDatabaseAvailable +from ...adapters.connections import ClosedConnection +from ..._util import Lazy as BaseLazy + +from .temporary_storage import TemporaryStorage + log = logging.getLogger("relstorage") +_CLOSED_CONNECTION = ClosedConnection() + #: Set the ``RELSTORAGE_LOCK_EARLY`` environment variable if you #: experience deadlocks or failures to commit (``tpc_finish``). This #: will cause the commit lock to be taken as part of ``tpc_vote`` @@ -46,32 +57,188 @@ #: it. LOCK_EARLY = os.environ.get('RELSTORAGE_LOCK_EARLY') -class _StorageFacade(object): - # makes a storage look like a previous state for construction - # purposes - transaction = None +class _LazyResource(BaseLazy): + + # If not None, a callable ``(storage, resource, force)`` + # that aborts the *resource*, possibly forcefully (*force*). + # The return value will be the new value in the object + # instance. + abort_function = None + # If not None, a callable ``(storage, resource)`` to clean up + # any use of the *resource* after success. + release_function = None + + def _stored_value_for_name_in_inst(self, value, name, inst): + # type: (Any, str, SharedTPCState) -> None + if name == 'store_connection': + # Try to do this first + inst._used_resources.insert(0, self) + else: + inst._used_resources.append(self) + + def aborter(self, func): + self.abort_function = func + return self + + def releaser(self, func): + self.release_function = func + return self + + +class SharedTPCState(object): + """ + Contains attributes marking resources that *might* be used during the commit + process. If any of them are, then the `abort` method takes care of cleaning them up. + + Accessing a resource implicitly begins it, if needed. + """ + + # pylint:disable=method-hidden + prepared_txn = None + transaction = None + not_in_transaction_state = None + read_only = False # Or we wouldn't allocate this object. + + def __init__(self, initial_state, storage, transaction): + self.initial_state = initial_state + self._storage = storage + self.transaction = transaction + self._used_resources = [] + + @_LazyResource + def store_connection(self): + return self._storage._store_connection_pool.borrow() + + @store_connection.aborter + def store_connection(self, storage, store_connection, force): + try: + adapter = storage._adapter + if store_connection: + # It's possible that this connection/cursor was + # already closed if an error happened (which would + # release the locks). Don't try to re-open it. + adapter.locker.release_commit_lock(store_connection.cursor) + + # Though, this might re-open it. + adapter.txncontrol.abort( + store_connection, + self.prepared_txn) + + if force: + store_connection.drop() + finally: + storage._store_connection_pool.replace(store_connection) + return _CLOSED_CONNECTION + + @store_connection.releaser + def store_connection(self, storage, store_connection): + storage._store_connection_pool.replace(store_connection) + return _CLOSED_CONNECTION + + @_LazyResource + def load_connection(self): + return self._storage._load_connection + + @load_connection.aborter + def load_connection(self, _storage, load_connection, force): + if force: + load_connection.drop() + else: + load_connection.rollback_quietly() + load_connection.exit_critical_phase() + return _CLOSED_CONNECTION + + @load_connection.releaser + def load_connection(self, _storage, load_connection): + load_connection.rollback_quietly() + load_connection.exit_critical_phase() + return _CLOSED_CONNECTION + + @_LazyResource + def blobhelper(self): + blobhelper = self._storage.blobhelper + blobhelper.begin() + return blobhelper + + @blobhelper.aborter + def blobhelper(self, _storage, blobhelper, _force): + blobhelper.abort() + + @blobhelper.releaser + def blobhelper(self, _storage, blobhelper): + blobhelper.clear_temp() + + def has_blobs(self): + # pylint:disable=no-member + return ( + 'blobhelper' in self.__dict__ + and self.blobhelper is not None + and self.blobhelper.txn_has_blobs + ) + + @BaseLazy + def cache(self): + return self._storage._cache + + @BaseLazy + def adapter(self): + return self._storage._adapter + + @_LazyResource + def temp_storage(self): + return TemporaryStorage() + + @temp_storage.aborter + def temp_storage(self, _storage, temp_storage, _force): + temp_storage.close() + + @temp_storage.releaser + def temp_storage(self, _storage, temp_storage): + temp_storage.close() + + def has_temp_data(self): + return 'temp_storage' in self.__dict__ and self.temp_storage + + def __cleanup(self, method_name, method_args): + storage = self._storage + resources = self._used_resources + self._used_resources = () # No more opening resources. + + exceptions = [] - def __init__(self, storage): - self.adapter = storage._adapter - self.load_connection = storage._load_connection - self.store_connection = storage._store_connection - self.blobhelper = storage.blobhelper - self.cache = storage._cache - self.read_only = storage._is_read_only + for resource in resources: + assert resource.__name__ in vars(self) -class AbstractTPCState(object): + cleaner = getattr(resource, method_name) + if not cleaner: + setattr(self, resource.__name__, None) + continue + + value = getattr(self, resource.__name__) + new_value = None + try: + new_value = cleaner(self, storage, value, *method_args) + except Exception as ex: # pylint:disable=broad-except + exceptions.append(ex) + setattr(self, resource.__name__, new_value) + + if exceptions: # pragma: no cover + raise Exception("Failed to close one or more resources: %s" % (exceptions,)) + + def abort(self, force=False): + self.__cleanup('abort_function', (force,)) + + def release(self): + self.__cleanup('release_function', ()) + + +@implementer(ITPCStateDatabaseAvailable) +class AbstractTPCStateDatabaseAvailable(object): __slots__ = ( - 'adapter', - 'load_connection', - 'store_connection', - 'transaction', - 'prepared_txn', - 'blobhelper', - 'cache', - 'read_only', + 'shared_state', ) # - store @@ -87,29 +254,25 @@ class AbstractTPCState(object): # read only, this needs to happen in the "not in transaction" # state. - @classmethod - def from_storage(cls, storage): - return cls(_StorageFacade(storage), None) - - def __init__(self, previous_state, transaction=None): - if 0: # pylint:disable=using-constant-test - # This block (elided from the bytecode) - # is for pylint static analysis - self.adapter = self.load_connection = self.store_connection = self.transaction = None - self.prepared_txn = self.blobhelper = None - self.cache = None # type: relstorage.cache.storage_cache.StorageCache - self.read_only = False - for attr in AbstractTPCState.__slots__: - val = getattr(previous_state, attr) - setattr(self, attr, val) + def __init__(self, shared_state): + self.shared_state = shared_state # type: SharedTPCState - self.transaction = transaction + @property + def transaction(self): + return self.shared_state.transaction + + @property + def initial_state(self): + return self.shared_state.initial_state + + @property + def store_connection(self): + return self.shared_state.store_connection def __repr__(self): - result = "<%s at 0x%x blobhelper=%r stored_count=%s %s" % ( + result = "<%s at 0x%x stored_count=%s %s" % ( type(self).__name__, id(self), - self.blobhelper, len(getattr(self, 'temp_storage', ()) or ()), self._tpc_state_transaction_data(), ) @@ -154,8 +317,7 @@ def _tpc_state_transaction_data(self): resources = sorted(global_tx._resources, key=rm_key) return "transaction=%r resources=%r" % (global_tx, resources) - - def tpc_finish(self, transaction, f=None): # pylint:disable=unused-argument + def tpc_finish(self, storage, transaction, f=None): # pylint:disable=unused-argument # For the sake of some ZODB tests, we need to implement this everywhere, # even if it's not actually usable, and the first thing it needs to # do is check the transaction. @@ -163,46 +325,17 @@ def tpc_finish(self, transaction, f=None): # pylint:disable=unused-argument raise StorageTransactionError('tpc_finish called with wrong transaction') raise NotImplementedError("tpc_finish not allowed in this state.") + def tpc_begin(self, _storage, transaction): + # Ditto as for tpc_finish + raise StorageTransactionError('tpc_begin not allowed in this state', type(self)) + def tpc_abort(self, transaction, force=False): if not force: if transaction is not self.transaction: return self - try: - # Drop locks first. - if self.store_connection: - # It's possible that this connection/cursor was - # already closed if an error happened (which would - # release the locks). Don't try to re-open it. - self.adapter.locker.release_commit_lock(self.store_connection.cursor) - self.adapter.txncontrol.abort( - self.store_connection, - self.prepared_txn) - - if force: - self.load_connection.drop() - self.store_connection.drop() - else: - self.load_connection.rollback_quietly() - self.blobhelper.abort() - finally: - self._clear_temp() - return NotInTransaction(self) - - def _clear_temp(self): - """ - Clear all attributes used for transaction commit. - Subclasses should override. Called on tpc_abort; subclasses - should call on other exit states. - """ - - def tpc_begin(self, transaction, begin_factory): - if transaction is self.transaction: - raise StorageTransactionError("Duplicate tpc_begin calls for same transaction.") - # XXX: Shouldn't we tpc_abort() first (well, not that exactly, because - # the transaction won't match, but logically)? The original storage - # code didn't do that, but it seems like it should. - return begin_factory(self, transaction) + self.shared_state.abort(force) + return self.initial_state def no_longer_stale(self): return self @@ -210,17 +343,36 @@ def no_longer_stale(self): def stale(self, e): return Stale(self, e) + def close(self): + if self.shared_state is not None: + self.tpc_abort(None, True) + self.shared_state = None -class NotInTransaction(AbstractTPCState): + +@implementer(ITPCStateNotInTransaction) +class NotInTransaction(object): # The default state, when the storage is not attached to a # transaction. - __slots__ = () + __slots__ = ( + 'last_committed_tid_int', + 'read_only', + 'begin_factory', + ) + + transaction = None + + def __init__(self, begin_factory, read_only, committed_tid_int=0): + self.begin_factory = begin_factory + self.read_only = read_only + self.last_committed_tid_int = committed_tid_int - def __init__(self, previous_state, transaction=None): - super(NotInTransaction, self).__init__(previous_state) - # Reset some things that need to go away. - self.prepared_txn = None + def with_committed_tid_int(self, committed_tid_int): + return NotInTransaction( + self.begin_factory, + self.read_only, + committed_tid_int + ) def tpc_abort(self, *args, **kwargs): # pylint:disable=arguments-differ,unused-argument,signature-differs # Nothing to do @@ -230,6 +382,7 @@ def _no_transaction(self, *args, **kwargs): raise StorageTransactionError("No transaction in progress") tpc_finish = tpc_vote = _no_transaction + checkCurrentSerialInTransaction = _no_transaction def store(self, *_args, **_kwargs): if self.read_only: @@ -238,18 +391,33 @@ def store(self, *_args, **_kwargs): restore = deleteObject = undo = restoreBlob = store - def tpc_begin(self, transaction, begin_factory): + def tpc_begin(self, storage, transaction): # XXX: Signature needs to change. if self.read_only: raise ReadOnlyError() - return super(NotInTransaction, self).tpc_begin(transaction, begin_factory) + if transaction is self.transaction: # Also handles None. + raise StorageTransactionError("Duplicate tpc_begin calls for same transaction.") + state = SharedTPCState(self, storage, transaction) + try: + return self.begin_factory(state) + except: + state.abort() + raise + + @property + def initial_state(self): + return self # This object appears to be false. def __bool__(self): return False __nonzero__ = __bool__ + def close(self): + pass -class Stale(AbstractTPCState): + +@implementer(ITPCStateNotInTransaction) +class Stale(object): """ An error that lets us know we are stale was encountered. @@ -258,8 +426,10 @@ class Stale(AbstractTPCState): re-raising that error. """ + transaction = None + last_committed_tid_int = 0 + def __init__(self, previous_state, stale_error): - super(Stale, self).__init__(previous_state, None) self.previous_state = previous_state self.stale_error = stale_error @@ -267,11 +437,22 @@ def _stale(self, *args, **kwargs): raise self.stale_error store = restore = checkCurrentSerialInTransaction = _stale - undo = deleteObject = _stale - tpc_begin = _stale + undo = deleteObject = restoreBlob = _stale + tpc_begin = tpc_finish = tpc_vote = _stale + + def tpc_abort(self, *args, **kwargs): + return self.previous_state.tpc_abort(*args, **kwargs) + + @property + def initial_state(self): + return self.previous_state.initial_state def no_longer_stale(self): return self.previous_state - def stale(self, e): + def stale(self, _e): return self + + def __bool__(self): + return False + __nonzero__ = __bool__ diff --git a/src/relstorage/storage/tpc/begin.py b/src/relstorage/storage/tpc/begin.py index 7b7d1a0e..5095a47c 100644 --- a/src/relstorage/storage/tpc/begin.py +++ b/src/relstorage/storage/tpc/begin.py @@ -31,8 +31,7 @@ from relstorage._compat import OID_TID_MAP_TYPE from relstorage._util import to_utf8 -from . import AbstractTPCState -from .temporary_storage import TemporaryStorage +from . import AbstractTPCStateDatabaseAvailable from .vote import DatabaseLockedForTid from .vote import HistoryFree as HFVoteFactory from .vote import HistoryPreserving as HPVoteFactory @@ -49,7 +48,7 @@ class _BadFactory(object): def enter(self, storage): raise NotImplementedError -class AbstractBegin(AbstractTPCState): +class AbstractBegin(AbstractTPCStateDatabaseAvailable): """ The phase we enter after ``tpc_begin`` has been called. """ @@ -71,21 +70,21 @@ class AbstractBegin(AbstractTPCState): # OIDs of things we have deleted or undone. # Stored in their 8 byte form 'invalidated_oids', + ) _DEFAULT_TPC_VOTE_FACTORY = _BadFactory # type: Callable[..., AbstractTPCState] - def __init__(self, previous_state, transaction): - super(AbstractBegin, self).__init__(previous_state, transaction) + def __init__(self, shared_state): + super(AbstractBegin, self).__init__(shared_state) self.invalidated_oids = () # We'll replace this later with the right type when it's needed. self.required_tids = {} # type: Dict[int, int] self.tpc_vote_factory = self._DEFAULT_TPC_VOTE_FACTORY # type: ignore - self.temp_storage = TemporaryStorage() - user = to_utf8(transaction.user) - desc = to_utf8(transaction.description) - ext = transaction.extension + user = to_utf8(self.transaction.user) + desc = to_utf8(self.transaction.description) + ext = self.transaction.extension if ext: ext = dumps(ext, 1) @@ -93,19 +92,7 @@ def __init__(self, previous_state, transaction): ext = b"" self.ude = user, desc, ext - # In many cases we can defer this; we only need it - # if we do deleteObject() or store a blob (which we're not fully in - # control of) - self.store_connection.restart() - - self.store_connection.begin() - self.blobhelper.begin() - - def _clear_temp(self): - # Clear all attributes used for transaction commit. - self.temp_storage.close() - - def tpc_vote(self, transaction, storage): + def tpc_vote(self, storage, transaction): if transaction is not self.transaction: raise StorageTransactionError( "tpc_vote called with wrong transaction") @@ -136,8 +123,7 @@ def store(self, oid, previous_tid, data, transaction): # Save the data locally in a temporary place. Later, closer to commit time, # we'll send it all over at once. This lets us do things like use # COPY in postgres. - self.temp_storage.store_temp(oid_int, data, prev_tid_int) - + self.shared_state.temp_storage.store_temp(oid_int, data, prev_tid_int) @metricmethod_sampled def checkCurrentSerialInTransaction(self, oid, required_tid, transaction): @@ -216,16 +202,16 @@ def deleteObject(self, oid, oldserial, transaction): # delete a specific verison? Etc. oid_int = bytes8_to_int64(oid) tid_int = bytes8_to_int64(oldserial) - self.cache.remove_cached_data(oid_int, tid_int) + self.shared_state.cache.remove_cached_data(oid_int, tid_int) # We delegate the actual operation to the adapter's packundo, # just like native pack - cursor = self.store_connection.cursor + cursor = self.shared_state.store_connection.cursor # When this is done, we get a tpc_vote, # and a tpc_finish. # The interface doesn't specify a return value, so for testing # we return the count of rows deleted (should be 1 if successful) - deleted = self.adapter.packundo.deleteObject(cursor, oid, oldserial) + deleted = self.shared_state.adapter.packundo.deleteObject(cursor, oid, oldserial) self._invalidated_oids(oid) return deleted @@ -247,8 +233,8 @@ class HistoryPreserving(AbstractBegin): _DEFAULT_TPC_VOTE_FACTORY = HPVoteFactory - def __init__(self, storage, transaction): - AbstractBegin.__init__(self, storage, transaction) + def __init__(self, *args): + AbstractBegin.__init__(self, *args) self.committing_tid_lock = None def _obtain_commit_lock(self, cursor): @@ -258,7 +244,7 @@ def _obtain_commit_lock(self, cursor): # because the database adapters also acquire in that # order during packing. tid_lock = DatabaseLockedForTid.lock_database_for_next_tid( - cursor, self.adapter, self.ude) + cursor, self.shared_state.adapter, self.ude) self.committing_tid_lock = tid_lock def deleteObject(self, oid, oldserial, transaction): @@ -266,7 +252,7 @@ def deleteObject(self, oid, oldserial, transaction): # theoretically these are unreachable? Our custom # vote stage just removes this transaction anyway; maybe it # can skip the committing. - self._obtain_commit_lock(self.store_connection.cursor) + self._obtain_commit_lock(self.shared_state.store_connection.cursor) # A transaction that deletes objects can *only* delete objects. # That way we don't need to store an entry in the transaction table # (and add extra bloat to the DB; that kind of defeats the point of @@ -296,8 +282,8 @@ def undo(self, transaction_id, transaction): assert len(undo_tid) == 8 undo_tid_int = bytes8_to_int64(undo_tid) - adapter = self.adapter - cursor = self.store_connection.cursor + adapter = self.shared_state.adapter + cursor = self.shared_state.store_connection.cursor assert cursor is not None adapter.locker.hold_pack_lock(cursor) @@ -318,15 +304,15 @@ def undo(self, transaction_id, transaction): # we're probably just undoing the latest state. Still, play it # a bit safer. oid_ints = [oid_int for oid_int, _ in copied] - self.cache.remove_all_cached_data_for_oids(oid_ints) + self.shared_state.cache.remove_all_cached_data_for_oids(oid_ints) # Update the current object pointers immediately, so that # subsequent undo operations within this transaction will see # the new current objects. adapter.mover.update_current(cursor, self_tid_int) - self.blobhelper.copy_undone(copied, - self.committing_tid_lock.tid) + self.shared_state.blobhelper.copy_undone(copied, + self.committing_tid_lock.tid) oids = [int64_to_8bytes(oid_int) for oid_int in oid_ints] self._invalidated_oids(*oids) diff --git a/src/relstorage/storage/tpc/finish.py b/src/relstorage/storage/tpc/finish.py index 759682cc..6ac5a92a 100644 --- a/src/relstorage/storage/tpc/finish.py +++ b/src/relstorage/storage/tpc/finish.py @@ -17,9 +17,7 @@ from __future__ import absolute_import from __future__ import print_function -from . import NotInTransaction - -def Finish(vote_state, needs_store_commit=True): +def Finish(vote_state, committed_tid_int, needs_store_commit=True): """ The state we enter with tpc_finish. @@ -27,23 +25,25 @@ def Finish(vote_state, needs_store_commit=True): to the not-in-transaction state. """ # Bring the load connection to current status. - vote_state.load_connection.rollback_quietly() + vote_state.shared_state.load_connection.rollback_quietly() if needs_store_commit: # We may have already committed the store connection, so there's # no point doing so again. Also no point in rolling it back either. - txn = vote_state.prepared_txn + txn = vote_state.shared_state.prepared_txn assert txn is not None - vote_state.adapter.txncontrol.commit_phase2( - vote_state.store_connection, + vote_state.shared_state.adapter.txncontrol.commit_phase2( + vote_state.shared_state.store_connection, txn, - vote_state.load_connection) + vote_state.shared_state.load_connection) - vote_state.committing_tid_lock.release_commit_lock(vote_state.store_connection.cursor) - vote_state.cache.after_tpc_finish(vote_state.committing_tid_lock.tid, - vote_state.temp_storage) + vote_state.committing_tid_lock.release_commit_lock( + vote_state.shared_state.store_connection.cursor) + vote_state.shared_state.cache.after_tpc_finish(vote_state.committing_tid_lock.tid, + vote_state.shared_state.temp_storage) - # Make sure we're not holding any elevated privileges still; - # that would be a bug in the driver. - vote_state.load_connection.exit_critical_phase() - vote_state.store_connection.exit_critical_phase() - return NotInTransaction(vote_state) + # The vote caller is responsible for releasing the shared + # resources in vote_state.shared_state. + # XXX: Which may not make much sense? + return vote_state.initial_state.with_committed_tid_int( + committed_tid_int + ) diff --git a/src/relstorage/storage/tpc/restore.py b/src/relstorage/storage/tpc/restore.py index 7e57219b..ba82e215 100644 --- a/src/relstorage/storage/tpc/restore.py +++ b/src/relstorage/storage/tpc/restore.py @@ -51,10 +51,12 @@ class Restore(object): 'tpc_vote', 'tpc_abort', 'no_longer_stale', + 'shared_state', ) def __init__(self, begin_state, committing_tid, status): - # type: (AbstractBegin, DatabaseLockedForTid, str) -> None + # type: (relstorage.storage.tpc.begin.AbstractBegin, DatabaseLockedForTid, str) -> None + # This is an extension we use for copyTransactionsFrom; # it is not part of the IStorage API. assert committing_tid is not None @@ -65,8 +67,8 @@ def __init__(self, begin_state, committing_tid, status): # other than this transaction. We currently avoid the temp tables, # though, so if we do multiple things in a restore transaction, # we could still wind up with locking issues (I think?) - adapter = begin_state.adapter - cursor = begin_state.store_connection.cursor + adapter = begin_state.shared_state.adapter + cursor = begin_state.shared_state.store_connection.cursor packed = (status == 'p') try: committing_tid_lock = DatabaseLockedForTid.lock_database_for_given_tid( @@ -74,7 +76,7 @@ def __init__(self, begin_state, committing_tid, status): cursor, adapter, begin_state.ude ) except: - begin_state.store_connection.drop() + begin_state.shared_state.store_connection.drop() raise # This is now only used for restore() @@ -94,13 +96,14 @@ def __init__(self, begin_state, committing_tid, status): # the mover is unaware of it. factory = begin_state.tpc_vote_factory assert factory is HFVoteFactory or factory is HPVoteFactory - def tpc_vote_factory(state, + def tpc_vote_factory(begin_state, _f=_HFVoteFactory if factory is HFVoteFactory else _HPVoteFactory, _c=committing_tid_lock, _b=batcher): - return _f(state, _c, _b) + return _f(begin_state, _c, _b) begin_state.tpc_vote_factory = tpc_vote_factory - begin_state.temp_storage = _TempStorageWrapper(begin_state.temp_storage) + begin_state.shared_state.temp_storage = _TempStorageWrapper( + begin_state.shared_state.temp_storage) def restore(self, oid, this_tid, data, prev_txn, transaction): # Similar to store() (see comments in FileStorage.restore for @@ -113,16 +116,17 @@ def restore(self, oid, this_tid, data, prev_txn, transaction): if transaction is not state.transaction: raise StorageTransactionError(self, transaction) - adapter = state.adapter - cursor = state.store_connection.cursor + adapter = state.shared_state.adapter + cursor = state.shared_state.store_connection.cursor assert cursor is not None oid_int = bytes8_to_int64(oid) tid_int = bytes8_to_int64(this_tid) # Save the `data`. Note that `data` can be None. # Note also that this doesn't go through the cache. - state.temp_storage.max_restored_oid = max(state.temp_storage.max_restored_oid, - oid_int) + state.shared_state.temp_storage.max_restored_oid = max( + state.shared_state.temp_storage.max_restored_oid, + oid_int) # TODO: Make it go through the cache, or at least the same # sort of queing thing, so that we can do a bulk COPY. # The way we do it now complicates restoreBlob() and it complicates voting. @@ -139,8 +143,8 @@ def restoreBlob(self, oid, serial, data, blobfilename, prev_txn, txn): # (we'd prefer having DEFERRABLE INITIALLY DEFERRED FK # constraints, but as-of 8.0 MySQL doesn't support that.) self.batcher.flush() - cursor = state.store_connection.cursor - state.blobhelper.restoreBlob(cursor, oid, serial, blobfilename) + cursor = state.shared_state.store_connection.cursor + state.shared_state.blobhelper.restoreBlob(cursor, oid, serial, blobfilename) class _TempStorageWrapper(object): @@ -169,9 +173,9 @@ def __getattr__(self, name): class _VoteFactoryMixin(object): __slots__ = () - def __init__(self, state, committing_tid_lock, batcher): + def __init__(self, begin_state, committing_tid_lock, batcher): # type: (Restore, Optional[DatabaseLockedForTid], Any) -> None - super(_VoteFactoryMixin, self).__init__(state) + super(_VoteFactoryMixin, self).__init__(begin_state) # pylint:disable=assigning-non-slot self.committing_tid_lock = committing_tid_lock self.batcher = batcher diff --git a/src/relstorage/storage/tpc/tests/__init__.py b/src/relstorage/storage/tpc/tests/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/src/relstorage/storage/tpc/tests/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/relstorage/storage/tpc/tests/test_tpc.py b/src/relstorage/storage/tpc/tests/test_tpc.py new file mode 100644 index 00000000..dc0bc4f9 --- /dev/null +++ b/src/relstorage/storage/tpc/tests/test_tpc.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- +""" +Tests for the tpc module. + +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import unittest + +from ZODB.POSException import ReadOnlyError +from ZODB.POSException import StorageTransactionError + +from hamcrest import assert_that + +from nti.testing.matchers import validly_provides + +from ...interfaces import ITPCStateNotInTransaction + + +class TestNotInTransactionCommon(unittest.TestCase): + def _makeOne(self, read_only=False, begin_factory=None): + from .. import NotInTransaction + return NotInTransaction(begin_factory, read_only) + + def test_verify_interface(self): + assert_that(self._makeOne(), validly_provides(ITPCStateNotInTransaction)) + + def test_is_false(self): + self.assertFalse(self._makeOne()) + + no_transaction_methods = ( + 'store', 'restore', 'checkCurrentSerialInTransaction', + 'undo', 'deleteObject', 'restoreBlob', + 'tpc_finish', 'tpc_vote' + ) + + def test_no_transaction(self): + meths = self.no_transaction_methods + for meth in meths: + with self.assertRaises(StorageTransactionError): + getattr(self._makeOne(), meth)() + +class TestNotInTransaction(TestNotInTransactionCommon): + + def test_abort_returns_self(self): + inst = self._makeOne() + self.assertIs(inst, inst.tpc_abort()) + + def test_initial_state_is_self(self): + inst = self._makeOne() + self.assertIs(inst.initial_state, inst) + + def test_read_only_or_no_transaction(self): + meths = ( + 'store', + 'restore', + 'deleteObject', + 'undo', + 'restoreBlob', + ) + + read_only_inst = self._makeOne(read_only=True) + for meth in meths: + with self.assertRaises(ReadOnlyError): + getattr(read_only_inst, meth)() + + writable_inst = self._makeOne(read_only=False) + for meth in meths: + with self.assertRaises(StorageTransactionError): + getattr(writable_inst, meth)() + + def test_with_committed_tid_int(self): + begin_factory = object() + read_only = object() + inst = self._makeOne(read_only, begin_factory) + self.assertIs(inst.read_only, read_only) + self.assertIs(inst.begin_factory, begin_factory) + self.assertEqual(inst.last_committed_tid_int, 0) + + new_inst = inst.with_committed_tid_int(42) + self.assertIs(new_inst.read_only, read_only) + self.assertIs(new_inst.begin_factory, begin_factory) + self.assertEqual(new_inst.last_committed_tid_int, 42) + + +class TestStale(TestNotInTransactionCommon): + + def _makeOne(self, read_only=False, begin_factory=None): + from .. import Stale + from .. import NotInTransaction + return Stale(NotInTransaction(begin_factory, read_only), StorageTransactionError) + + def test_no_longer_stale(self): + from .. import NotInTransaction + inst = self._makeOne() + self.assertIs(inst.stale(None), inst) + self.assertIsInstance(inst.no_longer_stale(), NotInTransaction) + + # We piggyback test_no_transaction to mean the stale error + no_transaction_methods = TestNotInTransactionCommon.no_transaction_methods + ( + 'tpc_begin', + ) + +class TestSharedTPCState(unittest.TestCase): + + def _makeOne(self, initial_state=None, storage=None, transaction=None): + from .. import SharedTPCState + return SharedTPCState(initial_state, storage, transaction) + + class Pool(object): + replaced = False + + def __init__(self, conn): + self.conn = conn + + def borrow(self): + return self.conn + + def replace(self, obj): + assert obj is self.conn + self.replaced = True + + class Storage(object): + def __init__(self, store_pool=None, load_conn=None, blobhelper=None): + self._store_connection_pool = store_pool + self._load_connection = load_conn + self.blobhelper = blobhelper + + def test_store_connection_borrow_release(self): + from .. import _CLOSED_CONNECTION + conn = object() + + pool = self.Pool(conn) + + inst = self._makeOne(storage=self.Storage(pool)) + + self.assertIs(inst.store_connection, conn) + inst.release() + self.assertTrue(pool.replaced) + self.assertIs(inst.store_connection, _CLOSED_CONNECTION) + + def test_abort_load_connection_store_connection_load_fails(self): + # Even if we access the load_connection first, the store_connection + # is aborted first; aborting the store connection doesn't stop the load + # connection from being aborted. + from .. import _CLOSED_CONNECTION + + class LoadConn(object): + aborted = False + def drop(self): + self.aborted = True + + exited = False + def exit_critical_phase(self): + self.exited = True + + store_conn = object() + pool = self.Pool(store_conn) + load_conn = LoadConn() + + storage = self.Storage(pool, load_conn) + inst = self._makeOne(storage=storage) + + # Order matters + self.assertIs(inst.load_connection, load_conn) + self.assertIs(inst.store_connection, store_conn) + + with self.assertRaises(Exception): + inst.abort(force=True) + + self.assertTrue(load_conn.aborted) + self.assertTrue(load_conn.exited) + self.assertTrue(pool.replaced) + # This got None indicating an exception + self.assertIsNone(inst.store_connection) + self.assertIs(inst.load_connection, _CLOSED_CONNECTION) + + def test_blobhelper(self): + class BlobHelper(object): + began = False + aborted = False + cleared = False + + def begin(self): + self.began = True + def abort(self): + self.aborted = True + def clear_temp(self): + self.cleared = True + + txn_has_blobs = True + + blobhelper = BlobHelper() + storage = self.Storage(blobhelper=blobhelper) + inst = self._makeOne(storage=storage) + self.assertFalse(inst.has_blobs()) + self.assertNotIn('blobhelper', vars(inst)) + + self.assertIs(inst.blobhelper, blobhelper) + self.assertTrue(inst.has_blobs()) + self.assertTrue(blobhelper.began) + inst.abort() + self.assertIsNone(inst.blobhelper) + self.assertFalse(inst.has_blobs()) + self.assertTrue(blobhelper.aborted) + + blobhelper = BlobHelper() + storage = self.Storage(blobhelper=blobhelper) + inst = self._makeOne(storage=storage) + + self.assertIs(inst.blobhelper, blobhelper) + inst.release() + self.assertTrue(blobhelper.cleared) + self.assertIsNone(inst.blobhelper) + + def test_temp_storage(self): + # pylint:disable=no-member + inst = self._makeOne() + self.assertFalse(inst.has_temp_data()) + self.assertNotIn('temp_storage', vars(inst)) + + self.assertIs(inst.temp_storage, inst.temp_storage) + self.assertIn('temp_storage', vars(inst)) + self.assertFalse(inst.has_temp_data()) + ts = inst.temp_storage + inst.release() + self.assertIsNone(ts._queue) + self.assertIsNone(inst.temp_storage) + self.assertFalse(inst.has_temp_data()) + + inst = self._makeOne() + ts = inst.temp_storage + inst.abort() + self.assertIsNone(ts._queue) + self.assertIsNone(inst.temp_storage) + self.assertFalse(inst.has_temp_data()) diff --git a/src/relstorage/storage/tpc/tests/test_vote.py b/src/relstorage/storage/tpc/tests/test_vote.py new file mode 100644 index 00000000..89cb3758 --- /dev/null +++ b/src/relstorage/storage/tpc/tests/test_vote.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +""" +Tests for vote.py. + +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + + +import unittest + +from hamcrest import assert_that +from nti.testing.matchers import verifiably_provides + +from ...interfaces import ITPCStateVoting +from .. import SharedTPCState + +class MockStoreConnectionPool(object): + + def __init__(self): + self.conn = object() + + def borrow(self): + return self.conn + +class MockStorage(object): + + def __init__(self): + self._store_connection_pool = MockStoreConnectionPool() + +class MockBeginState(object): + + def __init__(self, shared_state=None): + self.shared_state = shared_state or SharedTPCState(None, MockStorage(), None) + self.required_tids = {} + self.ude = (b'user', b'description', b'extension') + self.invalidated_oids = set() + +class MockLockedBeginState(MockBeginState): + + def __init__(self, shared_state=None): + from relstorage.tests import MockAdapter + from ..vote import DatabaseLockedForTid + MockBeginState.__init__(self, shared_state) + self.committing_tid_lock = DatabaseLockedForTid( + b'12345678', + 12345678, + MockAdapter() + ) + +class _InterfaceMixin(object): + + BeginState = MockBeginState + + def _getClass(self): + raise NotImplementedError + + def _makeOne(self): + return self._getClass()(self.BeginState()) + + def test_provides_interface(self): + assert_that(self._makeOne(), verifiably_provides(ITPCStateVoting)) + + + +class TestHistoryFree(_InterfaceMixin, unittest.TestCase): + + def _getClass(self): + from ..vote import HistoryFree + return HistoryFree + +class TestHistoryPreserving(_InterfaceMixin, unittest.TestCase): + + BeginState = MockLockedBeginState + + def _getClass(self): + from ..vote import HistoryPreserving + return HistoryPreserving + +class TestHistoryPreservingDeletOnly(_InterfaceMixin, unittest.TestCase): + + BeginState = MockLockedBeginState + + def _getClass(self): + from ..vote import HistoryPreservingDeleteOnly + return HistoryPreservingDeleteOnly diff --git a/src/relstorage/storage/tpc/vote.py b/src/relstorage/storage/tpc/vote.py index b5e7e102..7c0eb6a6 100644 --- a/src/relstorage/storage/tpc/vote.py +++ b/src/relstorage/storage/tpc/vote.py @@ -23,6 +23,8 @@ import time +from zope.interface import implementer + from ZODB.ConflictResolution import ConflictResolvingStorage from ZODB.POSException import ConflictError from ZODB.POSException import StorageTransactionError @@ -34,9 +36,10 @@ from relstorage._util import do_log_duration_info from relstorage._util import TRACE from ..interfaces import VoteReadConflictError +from ..interfaces import ITPCStateVoting from . import LOCK_EARLY -from . import AbstractTPCState +from . import AbstractTPCStateDatabaseAvailable from .finish import Finish @@ -94,7 +97,7 @@ def __repr__(self): self.local_allocation_time ) -class AbstractVote(AbstractTPCState): +class AbstractVote(AbstractTPCStateDatabaseAvailable): """ The state we're in following ``tpc_vote``. @@ -106,15 +109,13 @@ class AbstractVote(AbstractTPCState): __slots__ = ( # (user, description, extension) from the transaction. byte objects. 'ude', - # The TemporaryStorage. - 'temp_storage', # required_tids: {oid_int: tid_int}; confirms that certain objects # have not changed at commit. May be a BTree 'required_tids', # The DatabaseLockedForTid object 'committing_tid_lock', # {oid_bytes}: Things that get changed as part of the vote process - # and thus need to be invalidated. + # and thus need to be invalidated. TODO: Move to shared state? 'invalidated_oids', # How many conflicts there were to resolve. None if we're not there yet. 'count_conflicts', @@ -130,10 +131,9 @@ def __init__(self, begin_state, committing_tid_lock=None): # If committing_tid is passed to this method, it means the # database has already been locked and the TID is locked in. # This is (only!) done when we're restoring transactions. - super(AbstractVote, self).__init__(begin_state, begin_state.transaction) + super(AbstractVote, self).__init__(begin_state.shared_state) self.required_tids = begin_state.required_tids or {} # type: Dict[int, int] - self.temp_storage = begin_state.temp_storage # type: .temporary_storage.TemporaryStorage self.ude = begin_state.ude self.committing_tid_lock = committing_tid_lock # type: Optional[DatabaseLockedForTid] self.count_conflicts = None @@ -142,10 +142,6 @@ def __init__(self, begin_state, committing_tid_lock=None): # Anything that we've undone or deleted is also invalidated. self.invalidated_oids = begin_state.invalidated_oids or set() # type: Set[bytes] - def _clear_temp(self): - # Clear all attributes used for transaction commit. - self.temp_storage.close() - def _tpc_state_extra_repr_info(self): return { 'share_lock_count': len(self.required_tids), @@ -160,13 +156,13 @@ def enter(self, storage): @log_timed def _flush_temps_to_db(self, cursor): - if self.temp_storage: + if self.shared_state.has_temp_data(): # Don't bother if we're empty. - self.adapter.mover.store_temps(cursor, self.temp_storage) + self.shared_state.adapter.mover.store_temps(cursor, self.shared_state.temp_storage) def __enter_critical_phase_until_transaction_end(self): - self.load_connection.enter_critical_phase_until_transaction_end() - self.store_connection.enter_critical_phase_until_transaction_end() + self.shared_state.load_connection.enter_critical_phase_until_transaction_end() + self.shared_state.store_connection.enter_critical_phase_until_transaction_end() def _vote(self, storage): @@ -185,10 +181,11 @@ def _vote(self, storage): """ # It is assumed that self._lock.acquire was called before this # method was called. - cursor = self.store_connection.cursor - __traceback_info__ = self.store_connection, cursor + store_connection = self.shared_state.store_connection + cursor = store_connection.cursor + __traceback_info__ = store_connection, cursor assert cursor is not None - adapter = self.adapter + adapter = self.shared_state.adapter # execute all remaining batch store operations. # This exists as an extension point. @@ -219,7 +216,9 @@ def _vote(self, storage): # used, or whether we're updating existing objects and avoid a # bit more overhead, but benchmarking suggests that it's not # worth it in common cases. - storage._oids.set_min_oid(self.temp_storage.max_stored_oid) + storage._oids.set_min_oid( + store_connection, + self.shared_state.temp_storage.max_stored_oid) # Lock objects being modified and those registered with # readCurrent(). This could raise ReadConflictError or locking @@ -244,19 +243,20 @@ def _vote(self, storage): invalidated_oid_ints = self.__check_and_resolve_conflicts(storage, conflicts) blobs_must_be_moved_now = False - blobhelper = self.blobhelper committing_tid_bytes = None if self.committing_tid_lock: # We've already picked a TID. Must have called undo(). committing_tid_bytes = self.committing_tid_lock.tid - try: - blobhelper.vote(committing_tid_bytes) - except StorageTransactionError: - # If this raises an STE, it must be a shared (non-db) - # blobhelper, and the TID must not be locked. - assert committing_tid_bytes is None - blobs_must_be_moved_now = True + if self.shared_state.has_blobs(): + # Avoid accessing the actual blobhelper unless we need it + try: + self.shared_state.blobhelper.vote(committing_tid_bytes) + except StorageTransactionError: + # If this raises an STE, it must be a shared (non-db) + # blobhelper, and the TID must not be locked. + assert committing_tid_bytes is None + blobs_must_be_moved_now = True if blobs_must_be_moved_now or LOCK_EARLY: logger.log(TRACE, "Locking early (for blobs? %s)", blobs_must_be_moved_now) @@ -359,8 +359,8 @@ def __check_and_resolve_conflicts(self, storage, conflicts): # priority and regain control ASAP. self.__enter_critical_phase_until_transaction_end() - old_states_and_tids = self.cache.prefetch_for_conflicts( - self.load_connection.cursor, + old_states_and_tids = self.shared_state.cache.prefetch_for_conflicts( + self.shared_state.load_connection.cursor, old_states_to_prefetch ) @@ -368,9 +368,9 @@ def __check_and_resolve_conflicts(self, storage, conflicts): storage, old_states_and_tids ).tryToResolveConflict - adapter = self.adapter - read_temp = self.temp_storage.read_temp - store_temp = self.temp_storage.store_temp + adapter = self.shared_state.adapter + read_temp = self.shared_state.temp_storage.read_temp + store_temp = self.shared_state.temp_storage.store_temp # The conflicts can be very large binary strings, no need to include # them in traceback info. (Plus they could be sensitive.) @@ -406,8 +406,8 @@ def __check_and_resolve_conflicts(self, storage, conflicts): # We resolved some conflicts, so we need to send them over to the database. adapter.mover.replace_temps( - self.store_connection.cursor, - self.temp_storage.iter_for_oids(invalidated_oid_ints) + self.shared_state.store_connection.cursor, + self.shared_state.temp_storage.iter_for_oids(invalidated_oid_ints) ) return invalidated_oid_ints @@ -426,9 +426,10 @@ def _lock_and_move(self, vote_only=False): # a shared blob dir. # # Returns True if we also committed to the database. - if self.prepared_txn: + if self.shared_state.prepared_txn: # Already done; *should* have been vote_only. - assert self.committing_tid_lock, (self.prepared_txn, self.committing_tid_lock) + assert self.committing_tid_lock, (self.shared_state.prepared_txn, + self.committing_tid_lock) return False kwargs = { @@ -438,11 +439,14 @@ def _lock_and_move(self, vote_only=False): kwargs['committing_tid_int'] = self.committing_tid_lock.tid_int if vote_only: # Must be voting. - blob_meth = self.blobhelper.vote - kwargs['after_selecting_tid'] = lambda tid_int: blob_meth(int64_to_8bytes(tid_int)) kwargs['commit'] = False + if self.shared_state.has_blobs(): + # Avoid accessing the blobhelper unless we need it + blob_meth = self.shared_state.blobhelper.vote + kwargs['after_selecting_tid'] = lambda tid_int: blob_meth(int64_to_8bytes(tid_int)) - if vote_only or self.adapter.DEFAULT_LOCK_OBJECTS_AND_DETECT_CONFLICTS_INTERLEAVABLE: + if vote_only \ + or self.shared_state.adapter.DEFAULT_LOCK_OBJECTS_AND_DETECT_CONFLICTS_INTERLEAVABLE: # If we're going to have to make two trips to the database, one to lock it and get a # tid and then one to commit and release locks, either because we're # just voting right now, not committing, or because the database doesn't @@ -452,14 +456,14 @@ def _lock_and_move(self, vote_only=False): # Note that this may commit the load_connection and make it not # viable for a historical view anymore. - committing_tid_int, prepared_txn = self.adapter.lock_database_and_move( - self.store_connection, self.load_connection, - self.blobhelper, + committing_tid_int, prepared_txn = self.shared_state.adapter.lock_database_and_move( + self.shared_state.store_connection, self.shared_state.load_connection, + self.shared_state.has_blobs(), self.ude, **kwargs ) - self.prepared_txn = prepared_txn + self.shared_state.prepared_txn = prepared_txn committing_tid_lock = self.committing_tid_lock assert committing_tid_lock is None or committing_tid_int == committing_tid_lock.tid_int, ( committing_tid_int, committing_tid_lock) @@ -469,7 +473,7 @@ def _lock_and_move(self, vote_only=False): self.committing_tid_lock = DatabaseLockedForTid( int64_to_8bytes(committing_tid_int), committing_tid_int, - self.adapter + self.shared_state.adapter ) log_msg = "Adapter locked database and allocated tid: %s" @@ -478,7 +482,7 @@ def _lock_and_move(self, vote_only=False): return kwargs['commit'] @log_timed - def tpc_finish(self, transaction, f=None): + def tpc_finish(self, storage, transaction, f=None): if transaction is not self.transaction: raise StorageTransactionError( "tpc_finish called with wrong transaction") @@ -505,20 +509,22 @@ def tpc_finish(self, transaction, f=None): # and commit, releasing any locks it can (some adapters do, # some don't). So we may or may not have a database lock at # this point. - assert not self.blobhelper.NEEDS_DB_LOCK_TO_FINISH - try: - self.blobhelper.finish(self.committing_tid_lock.tid) - except (IOError, OSError): - # If something failed to move, that's not really a problem: - # if we did any moving now, we're just a cache. - logger.exception( - "Failed to update blob-cache; ignoring (will re-download)" - ) + if self.shared_state.has_blobs(): + # Avoid accessing the actual blobhelper unless we need it + assert not self.shared_state.blobhelper.NEEDS_DB_LOCK_TO_FINISH + try: + self.shared_state.blobhelper.finish(self.committing_tid_lock.tid) + except (IOError, OSError): + # If something failed to move, that's not really a problem: + # if we did any moving now, we're just a cache. + logger.exception( + "Failed to update blob-cache; ignoring (will re-download)" + ) if f is not None: f(self.committing_tid_lock.tid) - next_phase = Finish(self, not did_commit) + next_phase = Finish(self, self.committing_tid_lock.tid_int, not did_commit) if not did_commit: locks_released = time.time() @@ -539,15 +545,17 @@ def tpc_finish(self, transaction, f=None): perf_logger ) - return next_phase, self.committing_tid_lock.tid + return next_phase finally: - self._clear_temp() + self.shared_state.release() +@implementer(ITPCStateVoting) class HistoryFree(AbstractVote): __slots__ = () +@implementer(ITPCStateVoting) class HistoryPreserving(AbstractVote): __slots__ = () @@ -563,15 +571,15 @@ class HistoryPreservingDeleteOnly(HistoryPreserving): __slots__ = () def _vote(self, storage): - if self.temp_storage and self.temp_storage.stored_oids: + if self.shared_state.temp_storage and self.shared_state.temp_storage.stored_oids: raise StorageTransactionError("Cannot store and delete at the same time.") # We only get here if we've deleted objects, meaning we hold their row locks. # We only delete objects once we hold the commit lock. assert self.committing_tid_lock # Holding the commit lock put an entry in the transaction table, # but we don't want to bump the TID or store that data. - self.adapter.txncontrol.delete_transaction( - self.store_connection.cursor, + self.shared_state.adapter.txncontrol.delete_transaction( + self.shared_state.store_connection.cursor, self.committing_tid_lock.tid_int ) self.lock_and_vote_times[0] = time.time() @@ -580,8 +588,8 @@ def _vote(self, storage): def _lock_and_move(self, vote_only=False): # We don't do the final commit, # we just prepare. - self.prepared_txn = self.adapter.txncontrol.commit_phase1( - self.store_connection, + self.shared_state.prepared_txn = self.shared_state.adapter.txncontrol.commit_phase1( + self.shared_state.store_connection, self.committing_tid_lock.tid_int ) return False diff --git a/src/relstorage/tests/__init__.py b/src/relstorage/tests/__init__.py index 8a4cd62c..68242a50 100644 --- a/src/relstorage/tests/__init__.py +++ b/src/relstorage/tests/__init__.py @@ -375,6 +375,7 @@ class LockException(Exception): pass class MockDriver(object): + Binary = bytes supports_64bit_unsigned_id = True cursor_arraysize = 64 disconnected_exceptions = (DisconnectedException,) @@ -419,6 +420,11 @@ def current_object_tids(self, _cursor, oids, timeout=None): if oid in self.data } +class MockLocker(object): + + def release_commit_lock(self, cursor): + pass + class MockAdapter(object): def __init__(self): @@ -428,3 +434,4 @@ def __init__(self): self.oidallocator = MockOIDAllocator() self.poller = MockPoller(self.driver) self.mover = MockObjectMover() + self.locker = MockLocker() diff --git a/src/relstorage/tests/hftestbase.py b/src/relstorage/tests/hftestbase.py index c380518a..0214218f 100644 --- a/src/relstorage/tests/hftestbase.py +++ b/src/relstorage/tests/hftestbase.py @@ -364,12 +364,16 @@ def checkSetMinOid(self): oid = storage.new_oid() self.assertEqual(offset, I(oid)) - storage._oids.set_min_oid(32768) + with storage._store_connection_pool.borrowing() as store_conn: + storage._oids.set_min_oid(store_conn, 32768) + store_conn.commit() oid = storage.new_oid() self.assertEqual(32769, I(oid)) # Close to the 64-bit boundary. - storage._oids.set_min_oid(2 ** 62) + with storage._store_connection_pool.borrowing() as store_conn: + storage._oids.set_min_oid(store_conn, 2 ** 62) + store_conn.commit() # Iterate through several ranges. This has been a problem in the past. for offset in range(1, 50): self.assertEqual(2 ** 62 + offset, I(storage.new_oid())) diff --git a/src/relstorage/tests/locking.py b/src/relstorage/tests/locking.py index d1d30f37..73bd0231 100644 --- a/src/relstorage/tests/locking.py +++ b/src/relstorage/tests/locking.py @@ -80,8 +80,8 @@ def __tiny_commit_time(self): return 0.1 return 1 - def __store_two_for_read_current_error(self): - db = self._closing(DB(self._storage)) + def __store_two_for_read_current_error(self, release_extra_storage=False): + db = self._closing(DB(self._storage, pool_size=1)) conn = db.open() root = conn.root() root['object1'] = MinPO('object1') @@ -95,12 +95,17 @@ def __store_two_for_read_current_error(self): conn.close() # We can't close the DB, that will close the storage that we - # still need. + # still need. But we can release its storage, since we'll never use + # this again. + if release_extra_storage: + conn._normal_storage.release() return obj1_oid, obj2_oid, obj1_tid, db - def __read_current_and_lock(self, storage, read_current_oid, lock_oid, tid): - tx = TransactionMetaData() - storage.tpc_begin(tx) + def __read_current_and_lock(self, storage, read_current_oid, lock_oid, tid, + begin=True, tx=None): + tx = tx if tx is not None else TransactionMetaData() + if begin: + storage.tpc_begin(tx) if read_current_oid is not None: storage.checkCurrentSerialInTransaction(read_current_oid, tid, tx) storage.store(lock_oid, tid, b'bad pickle2', '', tx) @@ -135,7 +140,11 @@ def __do_check_error_with_conflicting_concurrent_read_current( storageB._adapter.force_lock_objects_and_detect_conflicts_interleavable = should_ileave # First, store the two objects in an accessible location. - obj1_oid, obj2_oid, tid, _ = self.__store_two_for_read_current_error() + obj1_oid, obj2_oid, tid, _db = self.__store_two_for_read_current_error( + release_extra_storage=True) + # XXX: We'd like to close the DB here, but the best we can do is to release the + # extra storage it made. + # Now transaction A readCurrent 1 and modify 2 # up through the vote phase @@ -247,7 +256,7 @@ def checkTL_ReadCurrentConflict_DoesNotTakeExclusiveLocks(self): # on obj1. We should immediately get a read current error and not conflict with the # exclusive lock. with self.assertRaisesRegex(VoteReadConflictError, "serial this txn started"): - self.__read_current_and_lock(storageB, obj2_oid, obj1_oid, tid) + self.__read_current_and_lock(storageB, obj2_oid, obj1_oid, tid, begin=False, tx=txb) # Which is still held because we cannot lock it. with self.assertRaises(UnableToLockRowsToModifyError): @@ -264,14 +273,21 @@ def checkTL_OverlappedReadCurrent_SharedLocksFirst(self): # error. (We use the same two objects instead of a new object in transaction B to prove # shared locks are taken first.) from relstorage.adapters.interfaces import UnableToLockRowsToReadCurrentError + from relstorage.tests.util import RUNNING_ON_CI commit_lock_timeout = self.__tiny_commit_time duration_blocking = self.__do_check_error_with_conflicting_concurrent_read_current( UnableToLockRowsToReadCurrentError, commit_lock_timeout=commit_lock_timeout, ) # The NOWAIT lock should be very quick to fire. + assert self._storage._adapter.locker.supports_row_lock_nowait is not None if self._storage._adapter.locker.supports_row_lock_nowait: - self.assertLessEqual(duration_blocking, commit_lock_timeout) + multiplier = 1.3 + if RUNNING_ON_CI and 'mysql' in type(self).__name__: + # On Travis CI, we observe MySQL 8.0.21 to be very slow at this + # for some reason. + multiplier = 2.1 + self.assertLessEqual(duration_blocking, commit_lock_timeout * multiplier) else: # Sigh. Old MySQL. Very slow. This takes around 4.5s to run both iterations. self.assertLessEqual(duration_blocking, commit_lock_timeout * 2.5) @@ -331,6 +347,7 @@ def checkTL_InterleavedConflictingReadCurrent(self): @skipIfNoConcurrentWriters def checkTL_InterleavedConflictingReadCurrentDeadlock(self): + # pylint:disable=too-many-statements # Like # ``checkTL_InterleavedConflictingReadCurrent`` # except that we interleave both txA and txB: txA takes modify @@ -340,11 +357,18 @@ def checkTL_InterleavedConflictingReadCurrentDeadlock(self): # # We have to use a thread to do the shared locks because it blocks. from relstorage.adapters.interfaces import UnableToLockRowsToReadCurrentError + from relstorage.storage.tpc.vote import AbstractVote as VotePhase + from relstorage.storage.tpc import NotInTransaction + + store_conn_pool = self._storage._store_connection_pool + self.assertEqual(store_conn_pool.instance_count, 1) storageA = self._closing(self._storage.new_instance()) storageB = self._closing(self._storage.new_instance()) storageA.last_error = storageB.last_error = None + self.assertEqual(store_conn_pool.instance_count, 3) + storageA._adapter.locker.lock_current_objects = partial( self.__lock_rows_being_modified_only, storageA) @@ -363,11 +387,15 @@ def checkTL_InterleavedConflictingReadCurrentDeadlock(self): storageB=storageB, abort=False ) + self.assertEqual(store_conn_pool.instance_count, 3) + + self.assertIsInstance(storageA._tpc_phase, VotePhase) + self.assertIsInstance(storageB._tpc_phase, VotePhase) cond = threading.Condition() cond.acquire() def lock_shared(storage, notify=True): - cursor = storage._store_connection.cursor + cursor = storage._tpc_phase.shared_state.store_connection.cursor read_current_oids = storage._tpc_phase.required_tids.keys() if notify: cond.acquire(5) @@ -431,6 +459,19 @@ def lock_shared(storage, notify=True): self.__assert_small_blocking_duration(storageA, duration_blocking) self.__assert_small_blocking_duration(storageB, duration_blocking) + storageA.tpc_abort(storageA._tpc_phase.transaction) + storageB.tpc_abort(storageB._tpc_phase.transaction) + + self.assertIsInstance(storageA._tpc_phase, NotInTransaction) + self.assertIsInstance(storageB._tpc_phase, NotInTransaction) + + self.assertEqual(store_conn_pool.instance_count, 3) + storageA.release() + storageB.release() + + self.assertLessEqual(store_conn_pool.instance_count, 1) + self.assertLessEqual(store_conn_pool.pooled_connection_count, 1) + def __is_oracle(self): # This is an anti-pattern. we really should subclass the tests # in our testoracle.py file. diff --git a/src/relstorage/tests/reltestbase.py b/src/relstorage/tests/reltestbase.py index b5ac25a7..9ff28f11 100644 --- a/src/relstorage/tests/reltestbase.py +++ b/src/relstorage/tests/reltestbase.py @@ -605,8 +605,9 @@ def checkNoDuplicateOIDsManyThreads(self): oids_by_thread = [list() for _ in range(thread_count)] def allocate_oids(thread_storage, thread_num): + conn_pool = thread_storage._store_connection_pool + store_conn = conn_pool.borrow() try: - store_conn = thread_storage._store_connection allocator = thread_storage._oids my_oids = oids_by_thread[thread_num] for _ in range(segment_count): @@ -616,11 +617,14 @@ def allocate_oids(thread_storage, thread_num): ) # Periodically call set_min_oid, like the storage does, # to check for interference. - allocator.set_min_oid(my_oids[-1]) - store_conn.commit() + with conn_pool.borrowing() as store_conn: + allocator.set_min_oid(store_conn, my_oids[-1]) + store_conn.commit() finally: + self.assertLessEqual(conn_pool.pooled_connection_count, len(threads)) thread_storage.release() + threads = [threading.Thread(target=allocate_oids, args=(self._storage.new_instance(), i)) for i in range(thread_count)] @@ -630,6 +634,10 @@ def allocate_oids(thread_storage, thread_num): for t in threads: t.join(99) + # All of them are released, so we should be down to only one instance. + self.assertEqual(1, self._storage._store_connection_pool.instance_count) + self.assertLessEqual(self._storage._store_connection_pool.pooled_connection_count, 1) + # They all have the desired length, and each one has no duplicates. self.assertEqual( [len(s) for s in oids_by_thread], @@ -747,7 +755,10 @@ def checkAutoReconnect(self): # Going behind its back. c1._storage._load_connection.connection.close() - c1._storage._store_connection.connection.close() + c1._storage._store_connection_pool.hard_close_all_connections() + store_pool = c1._storage._store_connection_pool + self.assertEqual(store_pool.instance_count, 2) + self.assertLessEqual(store_pool.pooled_connection_count, 1) # ZODB5 implicitly calls sync # immediately when a connection is opened; # fake that here for older releases. @@ -757,7 +768,7 @@ def checkAutoReconnect(self): r = c2.root() self.assertEqual(r['alpha'], 1) r['beta'] = PersistentMapping() - c2.add(r['beta']) + c2.add(r['beta']) # Calling new_oid outside of TPC transaction.commit() c2.close() @@ -785,10 +796,17 @@ def checkAutoReconnectOnSync(self): c1.close() c1._storage._load_connection.connection.close() - c1._storage._store_connection.connection.close() + + c1._storage._store_connection_pool.hard_close_all_connections() + store_pool = c1._storage._store_connection_pool + self.assertEqual(store_pool.instance_count, 2) + self.assertLessEqual(store_pool.pooled_connection_count, 1) + c2 = db.open() self.assertIs(c2, c1) + self.assertEqual(store_pool.instance_count, 2) + self.assertLessEqual(store_pool.pooled_connection_count, 1) r = c2.root() self.assertEqual(r['alpha'], 1) diff --git a/src/relstorage/tests/testmysql.py b/src/relstorage/tests/testmysql.py index 908f57c8..34c55bc3 100644 --- a/src/relstorage/tests/testmysql.py +++ b/src/relstorage/tests/testmysql.py @@ -251,30 +251,32 @@ def checkIsolationLevels(self): def assert_storage(storage): load_cur = storage._load_connection.cursor - store_cur = storage._store_connection.cursor + version_detector = storage._adapter.version_detector if not version_detector.supports_transaction_isolation(load_cur): raise unittest.SkipTest("Needs MySQL better than %s" % ( version_detector.get_version(load_cur) )) - for cur, ex_iso, ex_ro, ex_timeout in ( - # Timeout for load is mysql default. - [load_cur, 'REPEATABLE-READ', True, 50], - [store_cur, 'READ-COMMITTED', False, self.DEFAULT_COMMIT_LOCK_TIMEOUT], - ): - cur.execute(""" - SELECT @@transaction_isolation, - @@transaction_read_only, - @@innodb_lock_wait_timeout - """) - row, = cur.fetchall() - iso, ro, timeout = row - __traceback_info__ = row - iso = iso.decode('ascii') if not isinstance(iso, str) else iso - self.assertEqual(iso, ex_iso) - self.assertEqual(ro, ex_ro) - self.assertEqual(timeout, ex_timeout) + with storage._store_connection_pool.borrowing() as store_connection: + store_cur = store_connection.cursor + for cur, ex_iso, ex_ro, ex_timeout in ( + # Timeout for load is mysql default. + [load_cur, 'REPEATABLE-READ', True, 50], + [store_cur, 'READ-COMMITTED', False, self.DEFAULT_COMMIT_LOCK_TIMEOUT], + ): + cur.execute(""" + SELECT @@transaction_isolation, + @@transaction_read_only, + @@innodb_lock_wait_timeout + """) + row, = cur.fetchall() + iso, ro, timeout = row + __traceback_info__ = row + iso = iso.decode('ascii') if not isinstance(iso, str) else iso + self.assertEqual(iso, ex_iso) + self.assertEqual(ro, ex_ro) + self.assertEqual(timeout, ex_timeout) # By default assert_storage(self._storage) diff --git a/src/relstorage/tests/testpostgresql.py b/src/relstorage/tests/testpostgresql.py index a117cbe4..d2c438d4 100644 --- a/src/relstorage/tests/testpostgresql.py +++ b/src/relstorage/tests/testpostgresql.py @@ -92,22 +92,24 @@ def test_merge_blobs_on_open(self): # Insert some extra chunks. Get them big to be sure we loop # properly second_chunk = b'second chunk' * 800 - cursor = conn._storage._store_connection.cursor - cursor.execute(""" - INSERT INTO blob_chunk (zoid, chunk_num, tid, chunk) - SELECT zoid, 1, tid, lo_from_bytea(0, %s) - FROM blob_chunk WHERE chunk_num = 0; - """, (second_chunk,)) - third_chunk = b'third chunk' * 900 - cursor.execute(""" - INSERT INTO blob_chunk (zoid, chunk_num, tid, chunk) - SELECT zoid, 2, tid, lo_from_bytea(0, %s) - FROM blob_chunk WHERE chunk_num = 0; - """, (third_chunk,)) + with conn._storage._store_connection_pool.borrowing(commit=True) as store_connection: + cursor = store_connection.cursor + + cursor.execute(""" + INSERT INTO blob_chunk (zoid, chunk_num, tid, chunk) + SELECT zoid, 1, tid, lo_from_bytea(0, %s) + FROM blob_chunk WHERE chunk_num = 0; + """, (second_chunk,)) + third_chunk = b'third chunk' * 900 + cursor.execute(""" + INSERT INTO blob_chunk (zoid, chunk_num, tid, chunk) + SELECT zoid, 2, tid, lo_from_bytea(0, %s) + FROM blob_chunk WHERE chunk_num = 0; + """, (third_chunk,)) + + cursor.execute('SELECT COUNT(*) FROM blob_chunk') + self.assertEqual(3, cursor.fetchone()[0]) - cursor.execute('SELECT COUNT(*) FROM blob_chunk') - self.assertEqual(3, cursor.fetchone()[0]) - cursor.connection.commit() # Now open again and find everything put together. # But we need to use a new blob dir, because # we changed data behind its back. diff --git a/src/relstorage/tests/util.py b/src/relstorage/tests/util.py index f14cd2ec..e18ba8a6 100644 --- a/src/relstorage/tests/util.py +++ b/src/relstorage/tests/util.py @@ -23,14 +23,16 @@ def dec(f): skipOnAppveyor = unittest.skip else: skipOnAppveyor = _do_not_skip - +PYPY = platform.python_implementation() == 'PyPy' CACHE_SERVERS = None CACHE_MODULE_NAME = None -if RUNNING_ON_TRAVIS: +if RUNNING_ON_TRAVIS and not PYPY: # We expect to have access to a local memcache server # on travis. Use it if we can import drivers. + # + # This definitely leaks sockets on PyPy, and I don't think it's our fault. # pylint:disable=unused-import try: import pylibmc