Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py (6500 => 6501)
--- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py 2010-11-01 21:14:35 UTC (rev 6500)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py 2010-11-01 21:14:49 UTC (rev 6501)
@@ -57,6 +57,10 @@
def initCursor():
# support threadlevel=1; we can't necessarily cursor() in a
# different thread than we do transactions in.
+
+ # TODO: Re-try connect when it fails. Specify a timeout. That
+ # should happen in this layer because we need to be able to stop
+ # the reconnect attempt if it's hanging.
self._connection = connectionFactory()
self._cursor = self._connection.cursor()
@@ -139,6 +143,8 @@
Release the thread and database connection associated with this
transaction.
"""
+ if not self._completed:
+ raise RuntimeError("Un-completed task cannot be stopped.")
self._stopped = True
self._holder.submit(self._connection.close)
return self._holder.stop()
@@ -170,7 +176,16 @@
return d.addCallback(repool)
+ def stop(self):
+ """
+ Completely remove this transaction from the pool.
+ """
+ result = super(PooledSqlTxn, self).stop()
+ self.pool.free.remove(self)
+ return result
+
+
class ConnectionPool(Service, object):
"""
This is a central service that has a threadpool and executes SQL statements
@@ -197,11 +212,13 @@
"""
Forcibly abort any outstanding transactions.
"""
- for busy in self.busy:
+ for busy in self.busy[:]:
try:
yield busy.abort()
except:
log.err()
+ for free in self.free[:]:
+ yield free.stop()
def connection(self):