[CalendarServer-changes] [6513] CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/ asyncsqlpool.py
source_changes at macosforge.org
source_changes at macosforge.org
Mon Nov 1 14:18:37 PDT 2010
Revision: 6513
http://trac.macosforge.org/projects/calendarserver/changeset/6513
Author: glyph at apple.com
Date: 2010-11-01 14:18:34 -0700 (Mon, 01 Nov 2010)
Log Message:
-----------
composition > inheritance, always, forever
Modified Paths:
--------------
CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py
Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py 2010-11-01 21:18:16 UTC (rev 6512)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py 2010-11-01 21:18:34 UTC (rev 6513)
@@ -37,6 +37,7 @@
from txdav.base.datastore.threadutils import ThreadHolder
from txdav.idav import AlreadyFinishedError
from twisted.python import log
+from twisted.python.components import proxyForInterface
class BaseSqlTxn(object):
@@ -112,9 +113,9 @@
def abort(self):
if not self._completed:
+ self._completed = True
def reallyAbort():
self._connection.rollback()
- self._completed = True
result = self._holder.submit(reallyAbort)
return result
else:
@@ -123,7 +124,7 @@
def __del__(self):
if not self._completed:
- print 'CommonStoreTransaction.__del__: OK'
+ print 'BaseSqlTxn.__del__: OK'
self.abort()
@@ -136,6 +137,7 @@
"""
if not self._completed:
raise RuntimeError("Attempt to re-set active transaction.")
+ self._completed = False
def stop(self):
@@ -143,49 +145,54 @@
Release the thread and database connection associated with this
transaction.
"""
- if not self._completed:
- raise RuntimeError("Un-completed task cannot be stopped.")
self._stopped = True
self._holder.submit(self._connection.close)
return self._holder.stop()
-class PooledSqlTxn(BaseSqlTxn):
+class PooledSqlTxn(proxyForInterface(iface=IAsyncTransaction,
+ originalAttribute='_baseTxn')):
+ """
+ This is a temporary throw-away wrapper for the longer-lived BaseSqlTxn, so
+ that if a badly-behaved API client accidentally hangs on to one of these
+ and, for example C{.abort()}s it multiple times once another client is
+ using that connection, it will get some harmless tracebacks.
+ """
- def __init__(self, pool):
- self.pool = pool
- super(PooledSqlTxn, self).__init__(
- self.pool.connectionFactory,
- self.pool.reactor
- )
+ def __init__(self, pool, baseTxn):
+ self._pool = pool
+ self._baseTxn = baseTxn
+ self._complete = False
+ def execSQL(self, *a, **kw):
+ if self._complete:
+ raise AlreadyFinishedError()
+ return super(PooledSqlTxn, self).execSQL(*a, **kw)
+
+
def commit(self):
- return self.repoolAfter(super(PooledSqlTxn, self).commit())
+ if self._complete:
+ raise AlreadyFinishedError()
+ return self._repoolAfter(super(PooledSqlTxn, self).commit())
def abort(self):
- return self.repoolAfter(super(PooledSqlTxn, self).abort())
+ if self._complete:
+ raise AlreadyFinishedError()
+ return self._repoolAfter(super(PooledSqlTxn, self).abort())
- def repoolAfter(self, d):
+ def _repoolAfter(self, d):
+ self._complete = True
def repool(result):
- self.pool.reclaim(self)
+ self._pool.reclaim(self)
return result
return d.addCallback(repool)
- def stop(self):
- """
- Completely remove this transaction from the pool.
- """
- result = super(PooledSqlTxn, self).stop()
- self.pool.free.remove(self)
- return result
-
-
class ConnectionPool(Service, object):
"""
This is a central service that has a threadpool and executes SQL statements
@@ -217,22 +224,39 @@
yield busy.abort()
except:
log.err()
- for free in self.free[:]:
+ # all transactions should now be in the free list, since 'abort()' will
+ # have put them there.
+ for free in self.free:
yield free.stop()
def connection(self):
+ """
+ Find a transaction; either retrieve a free one from the list or
+ allocate a new one if no free ones are available.
+
+ @return: an L{IAsyncTransaction}
+ """
if self.free:
- txn = self.free.pop(0)
+ basetxn = self.free.pop(0)
else:
- txn = PooledSqlTxn(self)
+ basetxn = BaseSqlTxn(
+ connectionFactory=self.connectionFactory,
+ reactor=self.reactor
+ )
+ txn = PooledSqlTxn(self, basetxn)
self.busy.append(txn)
- return self.txn
+ return txn
def reclaim(self, txn):
- txn.reset()
- self.free.append(txn)
+ """
+ Shuck the L{PooledSqlTxn} wrapper off, and put the BaseSqlTxn into the
+ free list.
+ """
+ baseTxn = txn._baseTxn
+ baseTxn.reset()
+ self.free.append(baseTxn)
self.busy.remove(txn)
@@ -440,7 +464,7 @@
class Transaction(object):
"""
- Async transaction implementation.
+ Async protocol-based transaction implementation.
"""
implements(IAsyncTransaction)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20101101/e61f181d/attachment-0001.html>
More information about the calendarserver-changes
mailing list