Skip to content

Commit

Permalink
Merge pull request #417 from zodb/issue409
Browse files Browse the repository at this point in the history
Pool store connections
  • Loading branch information
jamadden authored Sep 9, 2020
2 parents 542bb72 + 284bd66 commit bbb64d8
Show file tree
Hide file tree
Showing 38 changed files with 1,496 additions and 505 deletions.
3 changes: 3 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ max-locals=20
# undefined-all-variable crash.
unsafe-load-any-extension = yes

property-classes=zope.cachedescriptors.property.Lazy,zope.cachedescriptors.property.Cached,relstorage._util.Lazy


# Local Variables:
# mode: conf-space
# End:
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ env:
- CCACHE_NOHASHDIR=true
- CFLAGS="-g -pipe -std=gnu++11"
- CXXFLAGS="-g -pipe -std=gnu++11"
- RS_TEST_CMD="-m zope.testrunner --test-path=src --auto-color --auto-progress"
- RS_TEST_CMD="-m zope.testrunner --test-path=src --auto-color -vvv --slow-test=3"

script:
# coverage slows PyPy down from 2minutes to 12+.
- if [[ $TRAVIS_PYTHON_VERSION == 3.7 ]]; then pylint -j 0 --limit-inference-results=1 --rcfile=.pylintrc relstorage -f parseable -r n; fi
- if [[ $TRAVIS_PYTHON_VERSION != pypy* ]]; then coverage run -p --concurrency=greenlet .travis/zope_testrunner_gevent.py -t checkBTreesLengthStress -t check7 -t check2 -t BlobCache -t Switches --layer gevent; fi
- if [[ $TRAVIS_PYTHON_VERSION == pypy* ]]; then python $RS_TEST_CMD --layer "!gevent"; fi
- if [[ $TRAVIS_PYTHON_VERSION != pypy* ]]; then coverage run -p --concurrency=greenlet .travis/zope_testrunner_gevent.py -t checkBTreesLengthStress -t check7 -t check2 -t BlobCache -t Switches --layer gevent -vvv; fi
- if [[ $TRAVIS_PYTHON_VERSION == pypy* ]]; then PYTHONTRACEMALLOC=15 python $RS_TEST_CMD --layer "!gevent"; fi
- if [[ $TRAVIS_PYTHON_VERSION != pypy* ]]; then coverage run -p --concurrency=thread $RS_TEST_CMD --layer "!gevent"; fi
# Make sure we can import without zope.schema, which is intended to
# be a test dependency, and optional for production
Expand Down Expand Up @@ -87,6 +87,7 @@ cache:
directories:
- $HOME/.wheelhouse
- $HOME/.ccache
- $HOME/.cache

before_cache:
- rm -f $HOME/.cache/pip/log/debug.log
12 changes: 11 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Changes
=========

3.2.2 (unreleased)
3.3.0 (unreleased)
==================

- The "MySQLdb" driver didn't properly use server-side cursors when
Expand All @@ -14,6 +14,16 @@
history-preserving and history-free storages work with
``zodbupdate``. See :issue:`389`.

- RelStorage instances now pool their storage connection. Depending on
the workload and ZODB configuration, this can result in requiring
fewer storage connections. See :issue:`409` and :pr:`417`.

There is a potential semantic change: Under some circumstances, the
``loadBefore`` and ``loadSerial`` methods could be used to load
states from the future (not visible to the storage's load
connection) by using the store connection. This ability has been
removed.


3.2.1 (2020-08-28)
==================
Expand Down
42 changes: 31 additions & 11 deletions src/relstorage/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,25 @@

positive_integer = RangeCheckedConversion(integer, min=1)

def _setting_from_environ(converter, environ_name, default):
result = default
env_val = os.environ.get(environ_name, default)
if env_val is not default:
try:
result = converter(env_val)
except (ValueError, TypeError):
logger.exception("Failed to parse environment value %r for key %r",
env_val, environ_name)
result = default

logger.debug('Using value %s from environ %r=%r (default=%r)',
result, environ_name, env_val, default)
return result


def get_positive_integer_from_environ(environ_name, default):
return _setting_from_environ(positive_integer, environ_name, default)

def parse_boolean(val):
if val == '0':
return False
Expand Down Expand Up @@ -126,14 +145,7 @@ def __exit__(self, t, v, tb):
self.duration = self.__end - self.__begin

def get_time_from_environ(environ_name, default):
env_val = os.environ.get(environ_name, default)
try:
result = float(env_val)
except (ValueError, TypeError):
result = default
logger.debug('Using %s from environ %r=%r', result, environ_name, env_val)
return result

return _setting_from_environ(float, environ_name, default)

def _get_log_time_level(level_int, default):
level_name = logging.getLevelName(level_int)
Expand Down Expand Up @@ -161,14 +173,16 @@ def _get_log_time_level(level_int, default):
# The 'log_details_threshold' property of the function can be
# assigned to make it different than the default.
_LOG_TIMED_DEFAULT_DETAILS_THRESHOLD = logging.getLevelName(
os.environ.get('RS_PERF_LOG_DETAILS_LEVEL', 'WARN')
_setting_from_environ(str, 'RS_PERF_LOG_DETAILS_LEVEL', 'WARN')
)


# If this is true when a module is imported, timer decorations
# are omitted.
_LOG_TIMED_COMPILETIME_ENABLE = parse_boolean(
os.environ.get('RS_PERF_LOG_ENABLE', 'on')
_LOG_TIMED_COMPILETIME_ENABLE = _setting_from_environ(
parse_boolean,
'RS_PERF_LOG_ENABLE',
'on'
)

def do_log_duration_info(basic_msg, func,
Expand Down Expand Up @@ -374,8 +388,14 @@ def __get__(self, inst, class_):
func, name = self.data
value = func(inst)
inst.__dict__[name] = value
self._stored_value_for_name_in_inst(value, name, inst)
return value

@staticmethod
def _stored_value_for_name_in_inst(value, name, inst):
"""
Hook for subclasses.
"""

class CachedIn(object):
"""Cached method with given cache attribute."""
Expand Down
15 changes: 9 additions & 6 deletions src/relstorage/adapters/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,19 @@ def __init__(self, options=None):
self.driver = driver = self._select_driver()
self._binary = driver.Binary

# If it was already set, that means it shared with other
# instances, so no need to register the openings.
connmanager_was_set = self.connmanager is not None
self._create()
if not driver.supports_64bit_unsigned_id:
self.packundo.MAX_TID = MAX_S_TID
self.MAX_TID = MAX_S_TID
self.dbiter.MAX_TID = MAX_S_TID

self.connmanager.add_on_store_opened(self.mover.on_store_opened)
self.connmanager.add_on_load_opened(self.mover.on_load_opened)
self.connmanager.add_on_store_opened(self.locker.on_store_opened)
if not connmanager_was_set:
self.connmanager.add_on_store_opened(self.mover.on_store_opened)
self.connmanager.add_on_load_opened(self.mover.on_load_opened)
self.connmanager.add_on_store_opened(self.locker.on_store_opened)

def _create(self):
raise NotImplementedError
Expand Down Expand Up @@ -128,7 +132,7 @@ def lock_database_and_choose_next_tid(self, cursor,
@metricmethod_sampled
def lock_database_and_move(self,
store_connection, load_connection,
blobhelper,
transaction_has_blobs,
ude,
commit=True,
committing_tid_int=None,
Expand All @@ -155,9 +159,8 @@ def lock_database_and_move(self,
# TODO: Figure out how to do as much as possible of this before holding
# the commit lock. For example, use a dummy TID that we later replace.
# (This has FK issues in HP dbs).
txn_has_blobs = blobhelper.txn_has_blobs

self.mover.move_from_temp(cursor, committing_tid_int, txn_has_blobs)
self.mover.move_from_temp(cursor, committing_tid_int, transaction_has_blobs)

after_selecting_tid(committing_tid_int)

Expand Down
168 changes: 167 additions & 1 deletion src/relstorage/adapters/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from .._compat import wraps
from .._util import Lazy
from .._util import get_positive_integer_from_environ
from . import interfaces

logger = __import__('logging').getLogger(__name__)
Expand Down Expand Up @@ -153,12 +154,21 @@ def _open_connection(self):
def __noop(*args):
"does nothing"

def _restart_connection(self):
"Restart just the connection when we have no cursor."

def restart(self):
"""
Restart the connection if there is any chance that it has any associated state.
"""
if not self:
assert not self.active, self.__dict__
if self.connection is not None: # But we have no cursor We
# do this so that if the connection has closed itself
# (or been closed by a unit test) we can detect that
# and restart automatically. We only actually do
# anything there for store connections.
self._restart_connection()
return

# If we've never accessed the cursor, we shouldn't have any
Expand Down Expand Up @@ -213,7 +223,7 @@ def call(self, f, can_reconnect, *args, **kwargs):
The function may be called up to twice, if the *fresh_connection_p* is false
on the first call and a disconnected exception is raised.
:keyword bool can_reconnect: If True, then we will attempt to reconnect
the connection and try again if an exception is raised if *f*. If False,
the connection and try again if a disconnected exception is raised in *f*. If False,
we let that exception propagate. For example, if a transaction is in progress,
set this to false.
"""
Expand Down Expand Up @@ -269,6 +279,7 @@ def __repr__(self):
self._cursor
)


@implementer(interfaces.IManagedLoadConnection)
class LoadConnection(AbstractManagedConnection):

Expand All @@ -290,10 +301,14 @@ class StoreConnection(AbstractManagedConnection):
def begin(self):
self.connmanager.begin(*self.open_if_needed())

def _restart_connection(self):
self.rollback_quietly()

class PrePackConnection(StoreConnection):
__slots__ = ()
_NEW_CONNECTION_NAME = 'open_for_pre_pack'


@implementer(interfaces.IManagedDBConnection)
class ClosedConnection(object):
"""
Expand All @@ -319,3 +334,154 @@ def isolated_connection(self, *args, **kwargs):

restart_and_call = isolated_connection
enter_critical_phase_until_transaction_end = isolated_connection


class StoreConnectionPool(object):
"""
A thread-safe pool of `StoreConnection` objects.
Connections are opened on demand; opening a connection on demand
does not block.
By default, it will keep around a `StoreConnection` for every instance
of a RelStorage that has ever used one, which ordinarily corresponds to the
ZODB Connection pool size. It can be made to keep a smaller (but not larger)
number around by setting ``MAX_STORE_CONNECTIONS_IN_POOL``.
"""

MAX_STORE_CONNECTIONS_IN_POOL = get_positive_integer_from_environ(
'RS_MAX_POOLED_STORE_CONNECTIONS',
None
)

def __init__(self, connmanager):
import threading
self._lock = threading.Lock()
self._connmanager = connmanager
self._connections = []
self._count = 1
self._factory = StoreConnection

# MVCC protocol
def new_instance(self):
with self._lock:
self._count += 1
return self

def release(self):
with self._lock:
self._count -= 1
self._shrink()

def close(self):
with self._lock:
self._count = 0
self._factory = ClosedConnection
self._shrink()
self._connections = ()

@contextlib.contextmanager
def borrowing(self, commit=False):
rollback = True
try:
conn = self.borrow()
yield conn
if commit:
conn.commit()
rollback = False
finally:
self._replace(conn, rollback)

def borrow(self):
conn = None
with self._lock:
if self._connections:
conn = self._connections.pop()

if conn is None:
conn = self._factory(self._connmanager)
else:
conn.restart()

conn.begin()
return conn

def replace(self, connection):
self._replace(connection, True)

def _replace(self, connection, needs_rollback):
if needs_rollback:
clean_rollback = connection.rollback_quietly()
else:
clean_rollback = True
if not clean_rollback:
connection.drop()
else:
connection.exit_critical_phase()
with self._lock:
self._connections.append(connection)
self._shrink()

def _shrink(self):
# Call while holding the lock, after putting a connection
# back in the pool.
# Limits the number of pooled connections to be no more than
# ``instance_count`` (i.e., one per open RelStorage), or ``MAX_STORE_CONNECTIONS_IN_POOL``,
# if set and if less than instance_count
keep_connections = min(self._count, self.MAX_STORE_CONNECTIONS_IN_POOL or self._count)

while len(self._connections) > keep_connections and self._connections:
conn = self._connections.pop()
conn.drop() # TODO: This could potentially be slow? Might
# want to do this outside the lock.
conn.connmanager = None # It can't be opened again.

def drop_all(self):
with self._lock:
while self._connections:
conn = self._connections.pop()
conn.drop()

def hard_close_all_connections(self):
# Testing only.
for conn in self._connections:
conn.connection.close()

@property
def pooled_connection_count(self):
return len(self._connections)

@property
def instance_count(self):
return self._count

class ClosedConnectionPool(object):

def borrow(self):
return ClosedConnection()

def replace(self, connection):
"Does Nothing"

def new_instance(self):
"Does nothing"

release = new_instance
close = release
drop_all = release

pooled_connection_count = instance_count = 0


class SingleConnectionPool(object):
__slots__ = ('connection',)

def __init__(self, connection):
self.connection = connection

@contextlib.contextmanager
def borrowing(self, commit=False): # pylint:disable=unused-argument
"""
The *commit* parameter is ignored
"""
yield self.connection
Loading

0 comments on commit bbb64d8

Please sign in to comment.