[CalendarServer-changes] [6827] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/ adbapi2.py

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 28 17:39:46 PST 2011


Revision: 6827
          http://trac.macosforge.org/projects/calendarserver/changeset/6827
Author:   glyph at apple.com
Date:     2011-01-28 17:39:45 -0800 (Fri, 28 Jan 2011)
Log Message:
-----------
starting to move logic around

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-01-29 01:39:35 UTC (rev 6826)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-01-29 01:39:45 UTC (rev 6827)
@@ -68,28 +68,13 @@
     # FIXME: this should *really* be 
     paramstyle = DEFAULT_PARAM_STYLE
 
-    def __init__(self, connectionFactory, threadHolder):
-        self._completed = False
-        self._cursor = None
+    def __init__(self, connectionFactory, threadHolder, connection, cursor):
+        self._completed = True
+        self._cursor = cursor
+        self._connection = connection
         self._holder = threadHolder
-        self._holder.start()
 
-        def initCursor():
-            # support threadlevel=1; we can't necessarily cursor() in a
-            # different thread than we do transactions in.
 
-            # TODO: Re-try connect when it fails.  Specify a timeout.  That
-            # should happen in this layer because we need to be able to stop
-            # the reconnect attempt if it's hanging.
-            self._connection = connectionFactory()
-            self._cursor = self._connection.cursor()
-
-        # Note: no locking necessary here; since this gets submitted first, all
-        # subsequent submitted work-units will be in line behind it and the
-        # cursor will already have been initialized.
-        self._holder.submit(initCursor).addErrback(log.err)
-
-
     def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
         if args is None:
             args = []
@@ -304,6 +289,19 @@
 
 
 
+class _ConnectingPsuedoTxn(object):
+
+    def __init__(self):
+        pass
+
+
+    def abort(self):
+        # not implemented yet, but let's fail rather than break the test
+        # raise NotImplementedError()
+        pass
+
+
+
 class ConnectionPool(Service, object):
     """
     This is a central service that has a threadpool and executes SQL statements
@@ -379,27 +377,58 @@
         tracking = self.busy
         if self.free:
             basetxn = self.free.pop(0)
-        elif len(self.busy) < self.maxConnections:
-            basetxn = BaseSqlTxn(
-                connectionFactory=self.connectionFactory,
-                threadHolder=self._createHolder()
-            )
         else:
             basetxn = SpooledTxn()
             tracking = self.waiting
         txn = PooledSqlTxn(self, basetxn)
         tracking.append(txn)
+        if tracking is self.waiting and len(self.busy) < self.maxConnections:
+            self._startOneMore()
         return txn
 
 
+    def _startOneMore(self):
+        """
+        Start one more BaseSqlTxn.
+        """
+        holder = self._createHolder()
+        holder.start()
+        # FIXME: attach the holder to the txn so it can be aborted.
+        txn = _ConnectingPsuedoTxn()
+        # take up a slot in the 'busy' list, sit there so we can be aborted.
+        self.busy.append(txn)
+        def initCursor():
+            # support threadlevel=1; we can't necessarily cursor() in a
+            # different thread than we do transactions in.
+
+            # TODO: Re-try connect when it fails.  Specify a timeout.  That
+            # should happen in this layer because we need to be able to stop
+            # the reconnect attempt if it's hanging.
+            connection = self.connectionFactory()
+            cursor = connection.cursor()
+            return (connection, cursor)
+        d = holder.submit(initCursor)
+        def finishInit((connection, cursor)):
+            baseTxn = BaseSqlTxn(
+                connectionFactory=self.connectionFactory,
+                threadHolder=holder,
+                connection=connection,
+                cursor=cursor
+            )
+            txn._baseTxn = baseTxn
+            self.reclaim(txn)
+        d.addCallbacks(finishInit, log.err)
+        return txn
+
+
     def reclaim(self, txn):
         """
         Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
         BaseSqlTxn into the free list.
         """
+        self.busy.remove(txn)
         baseTxn = txn._baseTxn
         baseTxn.reset()
-        self.busy.remove(txn)
         if self.waiting:
             waiting = self.waiting.pop(0)
             waiting._baseTxn._unspool(baseTxn)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110128/384924be/attachment-0001.html>


More information about the calendarserver-changes mailing list