[CalendarServer-changes] [7189] CalendarServer/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Mon Mar 14 05:28:45 PDT 2011
Revision: 7189
http://trac.macosforge.org/projects/calendarserver/changeset/7189
Author: glyph at apple.com
Date: 2011-03-14 05:28:45 -0700 (Mon, 14 Mar 2011)
Log Message:
-----------
Re-connect to the database when connections are lost, and re-start the transaction where it's possible to do that.
Modified Paths:
--------------
CalendarServer/trunk/twext/enterprise/adbapi2.py
CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py 2011-03-13 04:33:13 UTC (rev 7188)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py 2011-03-14 12:28:45 UTC (rev 7189)
@@ -92,6 +92,7 @@
self._cursor = cursor
self._connection = connection
self._holder = threadHolder
+ self._first = True
@_forward
@@ -109,6 +110,8 @@
def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ wasFirst = self._first
+ self._first = False
if args is None:
args = []
derived = None
@@ -120,7 +123,17 @@
derived = []
derived.append(arg)
args[n] = arg.preQuery(self._cursor)
- self._cursor.execute(sql, args)
+ try:
+ self._cursor.execute(sql, args)
+ except:
+ if wasFirst:
+ self._connection.close()
+ self._connection = self._pool.connectionFactory()
+ self._cursor = self._connection.cursor()
+ result = self._reallyExecSQL(sql, args, raiseOnZeroRowCount)
+ return result
+ else:
+ raise
if derived is not None:
for arg in derived:
arg.postQuery(self._cursor)
@@ -153,14 +166,16 @@
def _end(self, really):
"""
- Common logic for commit or abort.
+ Common logic for commit or abort. Executed in the cursor main thread.
"""
if not self._completed:
self._completed = True
def reallySomething():
+ # Executed in the cursor thread.
if self._cursor is None:
return
really()
+ self._first = True
result = self._holder.submit(reallySomething)
self._pool._repoolAfter(self, result)
return result
@@ -173,7 +188,7 @@
def abort(self):
- return self._end(self._connection.rollback)
+ return self._end(self._connection.rollback).addErrback(log.err)
def __del__(self):
@@ -510,11 +525,7 @@
# Phase 3: All of the busy transactions must be aborted first. As each
# one is aborted, it will remove itself from the list.
while self._busy:
- d = self._busy[0].abort()
- try:
- yield d
- except:
- log.err()
+ yield self._busy[0].abort()
# Phase 4: All transactions should now be in the free list, since
# 'abort()' will have put them there. Shut down all the associated
@@ -611,7 +622,12 @@
self._finishing.remove(finishRecord)
self._repoolNow(txn)
return result
- return d.addBoth(repool)
+ def discard(result):
+ self._finishing.remove(finishRecord)
+ txn._releaseConnection()
+ self._startOneMore()
+ return result
+ return d.addCallbacks(repool, discard)
def _repoolNow(self, txn):
Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2011-03-13 04:33:13 UTC (rev 7188)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2011-03-14 12:28:45 UTC (rev 7189)
@@ -75,8 +75,12 @@
class FakeConnection(Parent, Child):
"""
Fake Stand-in for DB-API 2.0 connection.
+
+ @ivar executions: the number of statements which have been executed.
"""
+ executions = 0
+
def __init__(self, factory):
"""
Initialize list of cursors
@@ -84,8 +88,17 @@
Parent.__init__(self)
Child.__init__(self, factory)
self.id = factory.idcounter.next()
+ self._executeFailQueue = []
+ def executeWillFail(self, thunk):
+ """
+ The next call to L{FakeCursor.execute} will fail with an exception
+ returned from the given callable.
+ """
+ self._executeFailQueue.append(thunk)
+
+
@property
def cursors(self):
"Alias to make tests more readable."
@@ -108,6 +121,7 @@
raise RollbackFail()
+
class RollbackFail(Exception):
"""
Sample rollback-failure exception.
@@ -141,6 +155,9 @@
def execute(self, sql, args=()):
+ self.connection.executions += 1
+ if self.connection._executeFailQueue:
+ raise self.connection._executeFailQueue.pop(0)()
self.sql = sql
self.description = True
self.rowcount = 1
@@ -184,7 +201,7 @@
def __init__(self):
Parent.__init__(self)
self.idcounter = count(1)
- self._resultQueue = []
+ self._connectResultQueue = []
self.defaultConnect()
@@ -199,8 +216,8 @@
Implement the C{ConnectionFactory} callable expected by
L{ConnectionPool}.
"""
- if self._resultQueue:
- thunk = self._resultQueue.pop(0)
+ if self._connectResultQueue:
+ thunk = self._connectResultQueue.pop(0)
else:
thunk = self._default
return thunk()
@@ -212,7 +229,7 @@
"""
def thunk():
return FakeConnection(self)
- self._resultQueue.append(thunk)
+ self._connectResultQueue.append(thunk)
def willFail(self):
@@ -221,7 +238,7 @@
"""
def thunk():
raise FakeConnectionError()
- self._resultQueue.append(thunk)
+ self._connectResultQueue.append(thunk)
def defaultConnect(self):
@@ -229,7 +246,7 @@
By default, connection attempts will succeed.
"""
self.willConnect()
- self._default = self._resultQueue.pop()
+ self._default = self._connectResultQueue.pop()
def defaultFail(self):
@@ -237,7 +254,7 @@
By default, connection attempts will fail.
"""
self.willFail()
- self._default = self._resultQueue.pop()
+ self._default = self._connectResultQueue.pop()
@@ -718,3 +735,115 @@
notxn = self.pool.connection()
self.assertEquals(notxn.dialect, TEST_DIALECT)
+
+ def test_reConnectWhenFirstExecFails(self):
+ """
+ Generally speaking, DB-API 2.0 adapters do not provide information about
+ the cause of a failed 'execute' method; they definitely don't provide it
+ in a way which can be identified as related to the syntax of the query,
+ the state of the database itself, the state of the connection, etc.
+
+ Therefore the best general heuristic for whether the connection to the
+ database has been lost and needs to be re-established is to catch
+ exceptions which are raised by the I{first} statement executed in a
+ transaction.
+ """
+ # Allow 'connect' to succeed. This should behave basically the same
+ # whether connect() happened to succeed in some previous transaction and
+ # it's recycling the underlying transaction, or connect() just
+ # succeeded. Either way you just have a _SingleTxn wrapping a
+ # _ConnectedTxn.
+ txn = self.pool.connection()
+ self.assertEquals(len(self.factory.connections), 1,
+ "Sanity check failed.")
+ self.factory.connections[0].executeWillFail(RuntimeError)
+ results = resultOf(txn.execSQL("hello, world!"))
+ [[[counter, echo]]] = results
+ self.assertEquals("hello, world!", echo)
+ # Two execution attempts should have been made, one on each connection.
+ # The first failed with a RuntimeError, but that is deliberately
+ # obscured, because then we tried again and it succeeded.
+ self.assertEquals(len(self.factory.connections), 2,
+ "No new connection opened.")
+ self.assertEquals(self.factory.connections[0].executions, 1)
+ self.assertEquals(self.factory.connections[1].executions, 1)
+ self.assertEquals(self.factory.connections[0].closed, True)
+ self.assertEquals(self.factory.connections[1].closed, False)
+
+
+ def test_reConnectWhenSecondExecFailsThenFirstExecFails(self):
+ """
+ Other connection-oriented errors might raise exceptions if they occur in
+ the middle of a transaction, but that should cause the error to be
+ caught, the transaction to be aborted, and the (closed) connection to be
+ recycled, where the next transaction that attempts to do anything with
+ it will encounter the error immediately and discover it needs to be
+ recycled.
+
+ It would be better if this behavior were invisible, but that could only
+ be accomplished with more precise database exceptions. We may come up
+ with support in the future for more precisely identifying exceptions,
+ but I{unknown} exceptions should continue to be treated in this manner,
+ relaying the exception back to application code but attempting a
+ re-connection on the next try.
+ """
+ txn = self.pool.connection()
+ [[[counter, echo]]] = resultOf(txn.execSQL("hello, world!", []))
+ self.factory.connections[0].executeWillFail(ZeroDivisionError)
+ [f] = resultOf(txn.execSQL("divide by zero", []))
+ f.trap(ZeroDivisionError)
+ self.assertEquals(self.factory.connections[0].executions, 2)
+ # Reconnection should work exactly as before.
+ self.assertEquals(self.factory.connections[0].closed, False)
+ # Application code has to roll back its transaction at this point, since
+ # it failed (and we don't necessarily know why it failed: not enough
+ # information).
+ txn.abort()
+ self.factory.connections[0].executions = 0 # re-set for next test
+ self.assertEquals(len(self.factory.connections), 1)
+ self.test_reConnectWhenFirstExecFails()
+
+
+ def test_disconnectOnFailedRollback(self):
+ """
+ When C{rollback} fails for any reason on a connection object, then we
+ don't know what state it's in. Most likely, it's already been
+ disconnected, so the connection should be closed and the transaction
+ de-pooled instead of recycled.
+
+ Also, a new connection will immediately be established to keep the pool
+ size the same.
+ """
+ txn = self.pool.connection()
+ self.factory.rollbackFail = True
+ [x] = resultOf(txn.abort())
+ # Abort does not propagate the error on, the transaction merely gets
+ # disposed of.
+ self.assertIdentical(x, None)
+ self.assertEquals(len(self.pool._free), 1)
+ self.assertNotIn(txn._baseTxn, self.pool._free)
+ self.assertEquals(self.pool._finishing, [])
+ self.assertEquals(len(self.factory.connections), 2)
+ self.assertEquals(self.factory.connections[0].closed, True)
+ self.assertEquals(self.factory.connections[1].closed, False)
+ self.assertEquals(len(self.flushLoggedErrors(RollbackFail)), 1)
+
+
+ def test_exceptionPropagatesFailedCommit(self):
+ """
+ A failed C{rollback} is fine (the premature death of the connection
+ without C{commit} means that the changes are surely gone), but a failed
+ C{commit} has to be relayed to client code, since that actually means
+ some changes didn't hit the database.
+ """
+ txn = self.pool.connection()
+ self.factory.commitFail = True
+ [x] = resultOf(txn.commit())
+ x.trap(CommitFail)
+ self.assertEquals(len(self.pool._free), 1)
+ self.assertNotIn(txn._baseTxn, self.pool._free)
+ self.assertEquals(self.pool._finishing, [])
+ self.assertEquals(len(self.factory.connections), 2)
+ self.assertEquals(self.factory.connections[0].closed, True)
+ self.assertEquals(self.factory.connections[1].closed, False)
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110314/1a43591f/attachment-0001.html>
More information about the calendarserver-changes
mailing list