[CalendarServer-changes] [9654] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:26 PDT 2012


Revision: 9654
          http://trac.macosforge.org/projects/calendarserver/changeset/9654
Author:   glyph at apple.com
Date:     2012-08-11 01:55:26 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
sketch of currentLoadEstimate

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:25 UTC (rev 9653)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:26 UTC (rev 9654)
@@ -179,6 +179,18 @@
 
 
 
+class ReportLoad(Command):
+    """
+    Notify another node of the total, current load for this whole node (all of
+    its workers).
+    """
+    arguments = [
+        ("load", Integer())
+    ]
+    response = []
+
+
+
 class SchemaAMP(AMP):
     """
     An AMP instance which also has a L{Schema} attached to it.
@@ -213,10 +225,29 @@
         """
         self.peerPool = peerPool
         self.localWorkerPool = peerPool.workerPool
+        self._bonusLoad = 0
+        self._reportedLoad = 0
         super(ConnectionFromPeerNode, self).__init__(peerPool.schema,
                                                      boxReceiver, locator)
 
 
+    def reportCurrentLoad(self):
+        """
+        Report the current load for the local worker pool to this peer.
+        """
+        return self.callRemote(ReportLoad,
+                               load=self.localWorkerPool.totalLoad())
+
+
+    @ReportLoad.responder
+    def repotedLoad(self, load):
+        """
+        The peer reports its load.
+        """
+        self._reportedLoad = (load - self._bonusLoad)
+        return {}
+
+
     def startReceivingBoxes(self, sender):
         """
         Connection is up and running; add this to the list of active peers.
@@ -245,7 +276,7 @@
             load, such as currently-being-processed client requests).
         @rtype: L{int}
         """
-        return 0
+        return self._reportedLoad + self._bonusLoad
 
 
     def performWork(self, table, workID):
@@ -264,8 +295,13 @@
             complete.
         @rtype: L{Deferred} firing L{dict}
         """
-        return self.callRemote(PerformWork,
-                               table=table.model.name, workID=workID)
+        d = self.callRemote(PerformWork, table=table.model.name, workID=workID)
+        self._bonusLoad += 1
+        @d.addBoth
+        def performed(result):
+            self._bonusLoad -= 1
+            return result
+        return d
 
 
     @PerformWork.responder
@@ -291,8 +327,8 @@
     A pool of L{ConnectionFromWorker}s.
 
     L{WorkerConnectionPool} also implements the same implicit protocol as a
-    L{ConnectionFromPeerNode}, but one that dispenses work to the local
-    worker processes rather than to a remote connection pool.
+    L{ConnectionFromPeerNode}, but one that dispenses work to the local worker
+    processes rather than to a remote connection pool.
     """
 
     def __init__(self, maximumLoadPerWorker=0):
@@ -327,8 +363,15 @@
         return False
 
 
-    def _selectLowestLoadLocalConnection(self):
+    def totalLoad(self):
         """
+        The total load of all currently connected workers.
+        """
+        return sum(worker.currentLoad() for worker in self.workers)
+
+
+    def _selectLowestLoadWorker(self):
+        """
         Select the local connection with the lowest current load, or C{None} if
         all workers are too busy.
 
@@ -353,7 +396,7 @@
             complete.
         @rtype: L{Deferred} firing L{dict}
         """
-        return self._selectLowestLoadLocalConnection().performWork(table, workID)
+        return self._selectLowestLoadWorker().performWork(table, workID)
 
 
 
@@ -419,9 +462,10 @@
 
 class ConnectionFromController(SchemaAMP):
     """
-    A L{ConnectionFromController} is the connection to a node-controller process,
-    in a worker process.  It processes requests from its own master to do work.
-    It is the opposite end of the connection from L{ConnectionFromWorker}.
+    A L{ConnectionFromController} is the connection to a node-controller
+    process, in a worker process.  It processes requests from its own master to
+    do work.  It is the opposite end of the connection from
+    L{ConnectionFromWorker}.
     """
 
     def __init__(self, transactionFactory, schema,
@@ -858,7 +902,6 @@
 
 
     def createPeerConnection(self, addr):
-        # TODO: add to peer list, remove from peer list
         return ConnectionFromPeerNode(self)
 
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/ce370b8e/attachment-0001.html>


More information about the calendarserver-changes mailing list