[CalendarServer-changes] [9637] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:11 PDT 2012


Revision: 9637
          http://trac.macosforge.org/projects/calendarserver/changeset/9637
Author:   glyph at apple.com
Date:     2012-08-11 01:55:11 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
Flesh things out still more.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:10 UTC (rev 9636)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:11 UTC (rev 9637)
@@ -176,6 +176,21 @@
 
 
 
+class SchemaAMP(AMP):
+    """
+    An AMP instance which also has a L{Schema} attached to it.
+
+    @ivar schema: The schema to look up L{TableSyntaxByName} arguments in.
+    @type schema: L{Schema}
+    """
+
+    def __init__(self, schema, boxReceiver=None, locator=None):
+        self.schema = schema
+        super(SchemaAMP, self).__init__(boxReceiver, locator)
+
+
+
+
 class ConnectionFromPeerNode(AMP):
     """
     A connection to a peer node.  Symmetric; since the 'client' and the
@@ -184,16 +199,31 @@
 
     def __init__(self, localWorkerPool, boxReceiver=None, locator=None):
         """
-        Initialize this L{ConnectionFromPeerNode} with a reference to a pool
-        of local workers.
+        Initialize this L{ConnectionFromPeerNode} with a reference to a pool of
+        local workers.
 
         @param localWorkerPool: the pool of local worker procesess that can
             process queue work.
+        @type localWorkerPool: L{WorkerConnectionPool}
+
+        @see: L{AMP.__init__}
         """
         self.localWorkerPool = localWorkerPool
         super(ConnectionFromPeerNode, self).__init__(boxReceiver, locator)
 
 
+    def currentLoadEstimate(self):
+        """
+        What is the current load estimate for this peer?
+
+        @return: The number of full "slots", i.e. currently-being-processed
+            queue items (and other items which may contribute to this process's
+            load, such as currently-being-processed client requests).
+        @rtype: L{int}
+        """
+        return 0
+
+
     def performWork(self, table, workID):
         """
         A L{local worker connection <ConnectionFromWorker>} is asking this
@@ -219,6 +249,14 @@
         """
         A remote peer node has asked this node to do some work; dispatch it to
         a local worker on this node.
+
+        @param table: the table to work on.
+        @type table: L{TableSyntax}
+
+        @param workID: the identifier within the table.
+        @type workID: L{int}
+
+        @return: a L{Deferred} that fires when the work has been completed.
         """
         return self.localWorkerPool.performWork(table, workID)
 
@@ -233,25 +271,45 @@
     worker processes rather than to a remote connection pool.
     """
 
-    def __init__(self):
+    def __init__(self, maximumLoadPerWorker=0):
         self.workers = []
+        self.maximumLoadPerWorker = maximumLoadPerWorker
 
 
     def addWorker(self, worker):
+        """
+        Add a L{ConnectionFromWorker} to this L{WorkerConnectionPool} so that
+        it can be selected.
+        """
         self.workers.append(worker)
 
 
     def removeWorker(self, worker):
+        """
+        Remove a L{ConnectionFromWorker} from this L{WorkerConnectionPool} that
+        was previously added.
+        """
         self.workers.remove(worker)
 
 
+    def hasAvailableCapacity(self):
+        """
+        Does this worker connection pool have any local workers who have spare
+        capacity to process another queue item?
+        """
+        for worker in self.workers:
+            if worker.currentLoad() < self.maximumLoadPerWorker:
+                return True
+        return False
+
+
     def _selectLowestLoadLocalConnection(self):
         """
         Select the local connection with the lowest current load, or C{None} if
         all workers are too busy.
 
         @return: a worker connection with the lowest current load.
-        @rtype: L{ConnectionFromWorker} or L{NoneType}
+        @rtype: L{ConnectionFromWorker}
         """
         return sorted(self.workers[:], key=lambda w: w.currentLoad())[0]
 
@@ -275,7 +333,7 @@
 
 
 
-class ConnectionFromWorker(AMP):
+class ConnectionFromWorker(SchemaAMP):
     """
     An individual connection from a worker, as seem from the master's
     perspective.  L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
@@ -285,9 +343,9 @@
     @type workerPool: L{WorkerConnectionPool}
     """
 
-    def __init__(self, workerPool):
+    def __init__(self, schema, workerPool, boxReceiver=None, locator=None):
         self.workerPool = workerPool
-        super(ConnectionFromWorker, self).__init__()
+        super(ConnectionFromWorker, self).__init__(schema, boxReceiver, locator)
 
 
     @property
@@ -330,21 +388,16 @@
 
 
 
-class ConnectionFromController(AMP):
+class ConnectionFromController(SchemaAMP):
     """
     A L{ConnectionFromController} is the connection to a node-controller process,
     in a worker process.  It processes requests from its own master to do work.
     It is the opposite end of the connection from L{ConnectionFromWorker}.
     """
 
-    def __init__(self, schemaSyntax):
-        """
-        @param schemaSyntax: The schema that this connection operates on, which
-            contains (at least) all the tables that we may receive requests for
-            work in.
-        """
-        super(ConnectionFromController, self).__init__()
-        self.schemaSyntax = schemaSyntax
+    def __init__(self, schema, boxReceiver=None, locator=None):
+        super(ConnectionFromController, self).__init__(schema,
+                                                       boxReceiver, locator)
 
 
     @PerformWork.responder
@@ -356,14 +409,15 @@
         and do it.
         """
         workItemClass = WorkItem.forTable(table)
+        # TODO: what if we fail?  error-handling should be recorded someplace,
+        # the row should probably be removed, re-tries should be triggerable.
+        # Note: deletion must happen first.
+        yield workItem.delete()
         # TODO: mumble locking something mumble
         # TODO: get a transaction in here.
         workItem = yield workItemClass.load(workID)
         # TODO: verify that workID is the primary key someplace.
         yield workItem.doWork()
-        # TODO: what if we fail?  error-handling should be recorded someplace,
-        # the row should probably be removed, re-tries should be triggerable.
-        yield workItem.delete()
         returnValue({})
 
 
@@ -374,6 +428,7 @@
     """
 
 
+
 def _cloneDeferred(d):
     """
     Make a new Deferred, adding callbacks to C{d}.
@@ -529,6 +584,9 @@
 
     @ivar reactor: The reactor used for scheduling timed events.
     @type reactor: L{IReactorTime} provider.
+
+    @ivar peers: The list of currently connected peers.
+    @type peers: L{list} of L{PeerConnectionPool}
     """
 
     getfqdn = staticmethod(getfqdn)
@@ -541,9 +599,11 @@
         """
         Initialize a L{PeerConnectionPool}.
 
-        @param ampPort: The AMP port to listen on for inter-host communication.
-            This must be an integer because we need to communicate it to the
-            other peers in the cluster.
+        @param ampPort: The AMP TCP port number to listen on for inter-host
+            communication.  This must be an integer (and not, say, an endpoint,
+            or an endpoint description) because we need to communicate it to
+            the other peers in the cluster in a way that will be meaningful to
+            them as clients.
         @type ampPort: L{int}
 
         @param connectionFactory: a 0- or 1-argument callable that produces an
@@ -555,6 +615,8 @@
         self.pid = self.getpid()
         self.ampPort = ampPort
         self.thisProcess = None
+        self.workerPool = WorkerConnectionPool()
+        self.peers = []
 
 
     def choosePeer(self):
@@ -563,13 +625,17 @@
         occupancy of the other masters.
 
         @return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
-            with the chosen L{ConnectionFromPeerNode} as soon as one is
-            available.  Normally this will be synchronous, but we need to
-            account for the possibility that we may need to connect to other
-            hosts.
+            with the chosen 'peer', i.e. object with a C{performWork} method,
+            as soon as one is available.  Normally this will be synchronous,
+            but we need to account for the possibility that we may need to
+            connect to other hosts.
         @rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
-            L{ConnectionFromPeerNode}
+            L{ConnectionFromPeerNode} or L{WorkerConnectionPool}
         """
+        if not self.workerPool.hasAvailableCapacity() and self.peers:
+            return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
+        else:
+            return succeed(self.workerPool)
 
 
     def enqueueWork(self, txn, workItemType, **kw):
@@ -610,11 +676,14 @@
         """
         How many nodes / master processes are there?
 
-        @return: the number of other L{PeerConnectionPool} instances that are
-            connected to the database described by C{self.connectionFactory}.
+        @return: the maximum number of other L{PeerConnectionPool} instances
+            that may be connected to the database described by
+            C{self.connectionFactory}.  Note that this is not the current count
+            by connectivity, but the count according to the database.
         @rtype: L{int}
         """
         # TODO
+        return 20
 
 
     def nodeIndex(self):
@@ -629,6 +698,7 @@
         @rtype: L{int}
         """
         # TODO
+        return 6
 
 
     @inlineCallbacks
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/77b57066/attachment-0001.html>


More information about the calendarserver-changes mailing list