[CalendarServer-changes] [6858] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Thu Feb 3 11:00:31 PST 2011
Revision: 6858
http://trac.macosforge.org/projects/calendarserver/changeset/6858
Author: glyph at apple.com
Date: 2011-02-03 11:00:31 -0800 (Thu, 03 Feb 2011)
Log Message:
-----------
Do re-pooling internally in BaseSqlTxn now, fixing an issue where bogus transactions (ones with no backend) were being kept around in the busy list.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py 2011-02-03 19:00:27 UTC (rev 6857)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py 2011-02-03 19:00:31 UTC (rev 6858)
@@ -72,11 +72,12 @@
# See DEFAULT_PARAM_STYLE FIXME above.
paramstyle = DEFAULT_PARAM_STYLE
- def __init__(self, threadHolder, connection, cursor):
- self._completed = True
- self._cursor = cursor
+ def __init__(self, pool, threadHolder, connection, cursor):
+ self._pool = pool
+ self._completed = True
+ self._cursor = cursor
self._connection = connection
- self._holder = threadHolder
+ self._holder = threadHolder
def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
@@ -110,30 +111,29 @@
return result
- def commit(self):
+ def _end(self, really):
+ """
+ Common logic for commit or abort.
+ """
if not self._completed:
self._completed = True
- def reallyCommit():
+ def reallySomething():
if self._cursor is None:
return
- self._connection.commit()
- result = self._holder.submit(reallyCommit)
+ really()
+ result = self._holder.submit(reallySomething)
+ self._pool._repoolAfter(self, result)
return result
else:
raise AlreadyFinishedError()
+ def commit(self):
+ return self._end(self._connection.commit)
+
+
def abort(self):
- if not self._completed:
- self._completed = True
- def reallyAbort():
- if self._cursor is None:
- return
- self._connection.rollback()
- result = self._holder.submit(reallyAbort)
- return result
- else:
- raise AlreadyFinishedError()
+ return self._end(self._connection.rollback)
def __del__(self):
@@ -269,11 +269,18 @@
self._complete = False
+ def __repr__(self):
+ """
+ Reveal the backend in the string representation.
+ """
+ return 'PooledSqlTxn(%r)' % (self._baseTxn,)
+
+
def _unspoolOnto(self, baseTxn):
"""
Replace my C{_baseTxn}, currently a L{SpooledTxn}, with a L{BaseSqlTxn}.
"""
- spooledBase = self._baseTxn
+ spooledBase = self._baseTxn
self._baseTxn = baseTxn
spooledBase._unspool(baseTxn)
@@ -285,20 +292,19 @@
def commit(self):
self._markComplete()
- return self._pool._repoolAfter(self, super(PooledSqlTxn, self).commit())
+ return super(PooledSqlTxn, self).commit()
def abort(self):
self._markComplete()
if self in self._pool.waiting:
- return self.stopWaiting()
- else:
- return self._pool._repoolAfter(self, super(PooledSqlTxn, self).abort())
+ return self._stopWaiting()
+ return super(PooledSqlTxn, self).abort()
- def stopWaiting(self):
+ def _stopWaiting(self):
"""
- Stop waiting for a transaction and fail.
+ Stop waiting for a free transaction and fail.
"""
self._pool.waiting.remove(self)
self._unspoolOnto(FailedTxn())
@@ -327,7 +333,7 @@
_retry = None
def __init__(self, pool, holder):
- self._pool = pool
+ self._pool = pool
self._holder = holder
@@ -366,8 +372,8 @@
attached to a L{PooledSqlTxn} object, and have active connections ready
for processing a new transaction.
- @ivar busy: The list of busy (currently bound to a L{BaseSqlTxn})
- L{PooledSqlTxn} objects.
+ @ivar busy: The list of busy L{BaseSqlTxn} objects; those currently
+ servicing an unfinished L{PooledSqlTxn} object.
@ivar finishing: The list of 2-tuples of L{PooledSqlTxn} objects which have
had C{abort} or C{commit} called on them, but are not done executing
@@ -393,12 +399,12 @@
self.connectionFactory = connectionFactory
self.maxConnections = maxConnections
- self.free = []
- self.busy = []
- self.waiting = []
- self.finishing = []
+ self.free = []
+ self.busy = []
+ self.waiting = []
+ self.finishing = []
self.connecting = []
- self.stopping = False
+ self.stopping = False
def startService(self):
@@ -419,7 +425,7 @@
# eagerly acquire new connections as they flow into the free-list.
while self.waiting:
waiting = self.waiting[0]
- yield waiting.stopWaiting()
+ yield waiting._stopWaiting()
# FIXME: there should be tests for these 'yield's.
# Phase 1: All of the busy transactions must be aborted first. As each
# one is aborted, it will remove itself from the list.
@@ -487,17 +493,17 @@
# support threadlevel=1; we can't necessarily cursor() in a
# different thread than we do transactions in.
connection = self.connectionFactory()
- cursor = connection.cursor()
+ cursor = connection.cursor()
return (connection, cursor)
def finishInit((connection, cursor)):
baseTxn = BaseSqlTxn(
+ pool=self,
threadHolder=holder,
connection=connection,
cursor=cursor
)
self.busy.remove(txn)
- txn._baseTxn = baseTxn
- self._repoolNow(txn)
+ self._repoolNow(baseTxn)
def maybeTryAgain(f):
log.err(f, "Re-trying connection due to connection failure")
txn._retry = self.reactor.callLater(self.RETRY_TIMEOUT, resubmit)
@@ -509,15 +515,9 @@
def _repoolAfter(self, txn, d):
"""
- Re-pool the back-end for the given L{PooledSqlTxn} after the given
- L{Deferred} has fired.
+ Re-pool the given L{BaseSqlTxn} after the given L{Deferred} has fired.
"""
- if txn in self.busy:
- # should only _not_ in self.busy if its underlying txn is a
- # SpooledTxn. TODO: when the SpooledTxn unspools, does it properly
- # go into 'finishing' once it's re-submitted its commit or abort? I
- # don't think so.
- self.busy.remove(txn)
+ self.busy.remove(txn)
finishRecord = (txn, d)
self.finishing.append(finishRecord)
def repool(result):
@@ -529,26 +529,15 @@
def _repoolNow(self, txn):
"""
- Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
- L{BaseSqlTxn} into the free list.
+ Recycle a L{BaseSqlTxn} into the free list.
"""
- baseTxn = txn._baseTxn
- txn._baseTxn = None
- if isinstance(baseTxn, FailedTxn):
- # A bit of a hack, maybe clean this up, but: if a SpooledTxn spools
- # a commit(), PooledTxn() will repoolAfter(commit()). If the
- # service stops before that PooledTxn gets a real backend, then we
- # will receive it here. If that's the case we can give up and just
- # throw it away. (Maybe the pool-management logic needs to be moved
- # to BaseSqlTxn to avoid this kind of layering violation?)
- return
- baseTxn.reset()
+ txn.reset()
if self.waiting:
waiting = self.waiting.pop(0)
- self.busy.append(waiting)
- waiting._unspoolOnto(baseTxn)
+ self.busy.append(txn)
+ waiting._unspoolOnto(txn)
else:
- self.free.append(baseTxn)
+ self.free.append(txn)
@@ -672,7 +661,7 @@
Initialize a mapping of transaction IDs to transaction objects.
"""
super(ConnectionPoolConnection, self).__init__()
- self.pool = pool
+ self.pool = pool
self._txns = {}
@@ -728,16 +717,16 @@
"""
def __init__(self):
super(ConnectionPoolClient, self).__init__()
- self._nextID = count().next
- self._txns = {}
+ self._nextID = count().next
+ self._txns = {}
self._queries = {}
def newTransaction(self):
- txnid = str(self._nextID())
- self.callRemote(StartTxn, transactionID=txnid)
- txn = Transaction(client=self, transactionID=txnid)
+ txnid = str(self._nextID())
+ txn = Transaction(client=self, transactionID=txnid)
self._txns[txnid] = txn
+ self.callRemote(StartTxn, transactionID=txnid)
return txn
@@ -756,8 +745,8 @@
class _Query(object):
def __init__(self, raiseOnZeroRowCount):
- self.results = []
- self.deferred = Deferred()
+ self.results = []
+ self.deferred = Deferred()
self.raiseOnZeroRowCount = raiseOnZeroRowCount
@@ -797,9 +786,9 @@
Initialize a transaction with a L{ConnectionPoolClient} and a unique
transaction identifier.
"""
- self._client = client
+ self._client = client
self._transactionID = transactionID
- self._completed = False
+ self._completed = False
def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py 2011-02-03 19:00:27 UTC (rev 6857)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py 2011-02-03 19:00:31 UTC (rev 6858)
@@ -271,12 +271,13 @@
Create a L{ConnectionPool} attached to a C{ConnectionFactory}. Start
the L{ConnectionPool}.
"""
- self.paused = False
- self.holders = []
- self.factory = ConnectionFactory()
- self.pool = ConnectionPool(self.factory.connect, maxConnections=2)
+ self.paused = False
+ self.holders = []
+ self.factory = ConnectionFactory()
+ self.pool = ConnectionPool(self.factory.connect,
+ maxConnections=2)
self.pool._createHolder = self.makeAHolder
- self.clock = self.pool.reactor = Clock()
+ self.clock = self.pool.reactor = Clock()
self.pool.startService()
@@ -364,6 +365,7 @@
"""
a = self.pool.connection()
[[[counter, echo]]] = resultOf(a.execSQL("alpha"))
+ self.assertEquals(len(self.factory.connections), 1)
self.assertEquals(len(self.holders), 1)
[holder] = self.holders
self.assertEquals(holder.started, True)
@@ -374,6 +376,8 @@
self.assertEquals(len(self.holders), 1)
self.assertEquals(holder.started, True)
self.assertEquals(holder.stopped, True)
+ # Closing fake connections removes them from the list.
+ self.assertEquals(len(self.factory.connections), 0)
def test_retryAfterConnectError(self):
@@ -503,3 +507,25 @@
self.assertEquals(ce[0].type, ConnectionError)
+ def test_repoolSpooled(self):
+ """
+ Regression test for a somewhat tricky-to-explain bug: when a spooled
+ transaction which has already had commit() called on it before it's
+ received a real connection to start executing on, it will not leave
+ behind any detritus that prevents stopService from working.
+ """
+ self.pauseHolders()
+ c = self.pool.connection()
+ c2 = self.pool.connection()
+ c3 = self.pool.connection()
+ c.commit()
+ c2.commit()
+ c3.commit()
+ self.flushHolders()
+ self.assertEquals(len(self.factory.connections), 2)
+ stopResult = resultOf(self.pool.stopService())
+ self.assertEquals(stopResult, [None])
+ self.assertEquals(len(self.factory.connections), 0)
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110203/5e0714f6/attachment-0001.html>
More information about the calendarserver-changes
mailing list