[CalendarServer-changes] [9641] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:14 PDT 2012


Revision: 9641
          http://trac.macosforge.org/projects/calendarserver/changeset/9641
Author:   glyph at apple.com
Date:     2012-08-11 01:55:14 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
Let PeerConnectionPool start living up to its name as a connection pool.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:13 UTC (rev 9640)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:14 UTC (rev 9641)
@@ -23,6 +23,7 @@
 from twext.enterprise.dal.syntax import NamedValue
 from twext.enterprise.dal.record import fromTable
 from twisted.python.failure import Failure
+from twisted.internet.defer import passthru
 from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
 
 
@@ -199,7 +200,7 @@
     'server' both serve the same role, the logic is the same in every node.
     """
 
-    def __init__(self, schema, localWorkerPool, boxReceiver=None, locator=None):
+    def __init__(self, peerPool, boxReceiver=None, locator=None):
         """
         Initialize this L{ConnectionFromPeerNode} with a reference to a pool of
         local workers.
@@ -210,10 +211,30 @@
 
         @see: L{AMP.__init__}
         """
-        self.localWorkerPool = localWorkerPool
-        super(ConnectionFromPeerNode, self).__init__(schema, boxReceiver, locator)
+        self.peerPool = peerPool
+        self.localWorkerPool = peerPool.workerPool
+        super(ConnectionFromPeerNode, self).__init__(peerPool.schema,
+                                                     boxReceiver, locator)
 
 
+    def startReceivingBoxes(self, sender):
+        """
+        Connection is up and running.
+        """
+        r = super(ConnectionFromPeerNode, self).startReceivingBoxes(sender)
+        self.peerPool.addPeerConnection(self)
+        return r
+
+
+    def stopReceivingBoxes(self, reason):
+        """
+        Stop receiving boxes.
+        """
+        self.peerPool.removePeerConnection(self)
+        r = super(ConnectionFromPeerNode, self).stopReceivingBoxes(reason)
+        return r
+
+
     def currentLoadEstimate(self):
         """
         What is the current load estimate for this peer?
@@ -297,7 +318,7 @@
     def hasAvailableCapacity(self):
         """
         Does this worker connection pool have any local workers who have spare
-        capacity to process another queue item?
+        hasAvailableCapacity to process another queue item?
         """
         for worker in self.workers:
             if worker.currentLoad() < self.maximumLoadPerWorker:
@@ -486,19 +507,16 @@
         """
         Execute this L{WorkProposal}.
         """
-        creation = self.workItemType.create(self.txn, **self.kw)
-        @creation.addCallback
+        @passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
         def created(workItem):
             self._whenProposed.callback(None)
             @self.txn.postCommit
             def whenDone():
                 self._whenCommitted.callback(None)
-                peerChosen = self.pool.choosePeer()
-                @peerChosen.addCallback
-                def gotPeer(peer):
-                    performed = peer.performWork(workItem.__tbl__,
-                                                 workItem.workID)
-                    @performed.addCallback
+                @passthru(self.pool.choosePeer().addCallback)
+                def peerChosen(peer):
+                    @passthru(peer.performWork(workItem.__tbl__,
+                                               workItem.workID))
                     def performed(result):
                         self._whenExecuted.callback(None)
                     @performed.addErrback
@@ -634,6 +652,20 @@
         self.schema = schema
 
 
+    def addPeerConnection(self, peer):
+        """
+        Add a L{ConnectionFromPeerNode} to the active list of peers.
+        """
+        self.peers.append(peer)
+
+
+    def removePeerConnection(self, peer):
+        """
+        Remove a L{ConnectionFromPeerNode} to the active list of peers.
+        """
+        self.peers.remove(peer)
+
+
     def choosePeer(self):
         """
         Choose a peer to distribute work to based on the current known slot
@@ -821,7 +853,7 @@
 
     def createPeerConnection(self, addr):
         # TODO: add to peer list, remove from peer list
-        return ConnectionFromPeerNode(self.schema, self.workerPool)
+        return ConnectionFromPeerNode(self)
 
 
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/addc07c5/attachment.html>


More information about the calendarserver-changes mailing list