[CalendarServer-changes] [6857] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/ adbapi2.py
source_changes at macosforge.org
source_changes at macosforge.org
Thu Feb 3 11:00:27 PST 2011
Revision: 6857
http://trac.macosforge.org/projects/calendarserver/changeset/6857
Author: glyph at apple.com
Date: 2011-02-03 11:00:27 -0800 (Thu, 03 Feb 2011)
Log Message:
-----------
fix the outstanding-spooled-connection case.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py 2011-02-03 19:00:24 UTC (rev 6856)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py 2011-02-03 19:00:27 UTC (rev 6857)
@@ -51,6 +51,8 @@
from twext.internet.threadutils import ThreadHolder
from twisted.internet.defer import succeed
+from twext.enterprise.ienterprise import ConnectionError
+from twisted.internet.defer import fail
from twext.enterprise.ienterprise import AlreadyFinishedError, IAsyncTransaction
@@ -168,7 +170,25 @@
return holder.stop()
+class FailedTxn(object):
+ """
+ An L{IAsyncTransaction} that indicates a local failure before we could even
+ communicate any statements (or possibly even any connection attempts) to the
+ server.
+ """
+ def _everything(self, *a, **kw):
+ """
+ Everything fails with a L{ConnectionError}.
+ """
+ return fail(ConnectionError())
+
+ execSQL = _everything
+ commit = _everything
+ abort = _everything
+
+
+
class SpooledTxn(object):
"""
A L{SpooledTxn} is an implementation of L{IAsyncTransaction} which cannot
@@ -270,15 +290,21 @@
def abort(self):
self._markComplete()
- waiting = self._pool.waiting
- if self in waiting:
- waiting.remove(self)
- # FIXME: waiting.remove()
- return succeed(None)
+ if self in self._pool.waiting:
+ return self.stopWaiting()
else:
return self._pool._repoolAfter(self, super(PooledSqlTxn, self).abort())
+ def stopWaiting(self):
+ """
+ Stop waiting for a transaction and fail.
+ """
+ self._pool.waiting.remove(self)
+ self._unspoolOnto(FailedTxn())
+ return succeed(None)
+
+
def _checkComplete(self):
"""
If the transaction is complete, raise L{AlreadyFinishedError}
@@ -306,8 +332,6 @@
def abort(self):
- # not implemented yet, but let's fail rather than break the test
- # raise NotImplementedError()
if self._retry is not None:
self._retry.cancel()
d = self._holder.stop()
@@ -353,6 +377,9 @@
@ivar waiting: The list of L{PooledSqlTxn} objects attached to a
L{SpooledTxn}; i.e. those which are awaiting a connection to become free
so that they can be executed.
+
+ @ivar stopping: Is this L{ConnectionPool} in the process of shutting down?
+ (If so, new connections will not be established.)
"""
reactor = _reactor
@@ -371,6 +398,7 @@
self.waiting = []
self.finishing = []
self.connecting = []
+ self.stopping = False
def startService(self):
@@ -385,6 +413,14 @@
Forcibly abort any outstanding transactions, and release all resources
(notably, threads).
"""
+ # FIXME: actually honor this flag
+ self.stopping = True
+ # Phase 1: Cancel any transactions that are waiting so they won't try to
+ # eagerly acquire new connections as they flow into the free-list.
+ while self.waiting:
+ waiting = self.waiting[0]
+ yield waiting.stopWaiting()
+ # FIXME: there should be tests for these 'yield's.
# Phase 1: All of the busy transactions must be aborted first. As each
# one is aborted, it will remove itself from the list.
while self.busy:
@@ -477,7 +513,10 @@
L{Deferred} has fired.
"""
if txn in self.busy:
- # should only _not_ in self.busy if its underlying txn is a SpooledTxn
+ # should only _not_ in self.busy if its underlying txn is a
+ # SpooledTxn. TODO: when the SpooledTxn unspools, does it properly
+ # go into 'finishing' once it's re-submitted its commit or abort? I
+ # don't think so.
self.busy.remove(txn)
finishRecord = (txn, d)
self.finishing.append(finishRecord)
@@ -495,6 +534,14 @@
"""
baseTxn = txn._baseTxn
txn._baseTxn = None
+ if isinstance(baseTxn, FailedTxn):
+ # A bit of a hack, maybe clean this up, but: if a SpooledTxn spools
+ # a commit(), PooledTxn() will repoolAfter(commit()). If the
+ # service stops before that PooledTxn gets a real backend, then we
+ # will receive it here. If that's the case we can give up and just
+ # throw it away. (Maybe the pool-management logic needs to be moved
+ # to BaseSqlTxn to avoid this kind of layering violation?)
+ return
baseTxn.reset()
if self.waiting:
waiting = self.waiting.pop(0)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110203/1d1d0f24/attachment-0001.html>
More information about the calendarserver-changes
mailing list