[CalendarServer-changes] [9669] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:39 PDT 2012


Revision: 9669
          http://trac.macosforge.org/projects/calendarserver/changeset/9669
Author:   glyph at apple.com
Date:     2012-08-11 01:55:39 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
some clean-ups

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:38 UTC (rev 9668)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:39 UTC (rev 9669)
@@ -82,15 +82,15 @@
 from os import getpid
 from datetime import datetime
 
-from twisted.internet.task import LoopingCall
 from twisted.application.service import Service
 from twisted.internet.protocol import Factory
-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
-from twisted.internet.defer import succeed
-from twisted.internet.task import deferLater
+from twisted.internet.defer import (
+    inlineCallbacks, returnValue, Deferred, succeed
+)
 from twisted.internet.endpoints import TCP4ClientEndpoint
 from twisted.protocols.amp import AMP, Command, Integer, Argument
 from twisted.python.reflect import qual
+from twisted.python import log
 
 from twext.enterprise.dal.syntax import TableSyntax, SchemaSyntax
 from twext.enterprise.dal.model import ProcedureCall
@@ -99,8 +99,8 @@
 from twisted.python.failure import Failure
 from twisted.internet.defer import passthru
 from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
+from twisted.internet.endpoints import TCP4ServerEndpoint
 
-
 def makeNodeSchema(inSchema):
     """
     Create a self-contained schema for L{NodeInfo} to use.
@@ -160,7 +160,7 @@
 
 
 
-class NodeInfo(fromTable(NodeInfoSchema.NODE_INFO)):
+class NodeInfo(Record, fromTable(NodeInfoSchema.NODE_INFO)):
     """
     A L{NodeInfo} is information about a currently-active Node process.
     """
@@ -176,11 +176,7 @@
         return TCP4ClientEndpoint(self.hostname, self.port)
 
 
-    def updateCurrent(self):
-        return self.update(timestamp=datetime.datetime.now())
 
-
-
 def abstract(thunk):
     """
     The decorated function is abstract.
@@ -285,7 +281,6 @@
 
 
 
-
 class ConnectionFromPeerNode(SchemaAMP):
     """
     A connection to a peer node.  Symmetric; since the 'client' and the
@@ -543,8 +538,8 @@
 class ConnectionFromController(SchemaAMP):
     """
     A L{ConnectionFromController} is the connection to a node-controller
-    process, in a worker process.  It processes requests from its own master to
-    do work.  It is the opposite end of the connection from
+    process, in a worker process.  It processes requests from its own
+    controller to do work.  It is the opposite end of the connection from
     L{ConnectionFromWorker}.
     """
 
@@ -559,9 +554,9 @@
     @inlineCallbacks
     def actuallyReallyExecuteWorkHere(self, table, workID):
         """
-        This is where it's time to actually do the work.  The master process
-        has instructed this worker to do it; so, look up the data in the row,
-        and do it.
+        This is where it's time to actually do the work.  The controller
+        process has instructed this worker to do it; so, look up the data in
+        the row, and do it.
         """
         workItemClass = WorkItem.forTable(table)
         txn = self.transactionFactory()
@@ -635,18 +630,19 @@
 
     def _start(self):
         """
-        Execute this L{WorkProposal}.
+        Execute this L{WorkProposal} by creating the work item in the database,
+        waiting for the transaction where that addition was completed to
+        commit, and asking the local node controller process to do the work.
         """
         @passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
-        def created(workItem):
+        def created(item):
             self._whenProposed.callback(None)
             @self.txn.postCommit
             def whenDone():
                 self._whenCommitted.callback(None)
                 @passthru(self.pool.choosePeer().addCallback)
                 def peerChosen(peer):
-                    @passthru(peer.performWork(workItem.table,
-                                               workItem.workID))
+                    @passthru(peer.performWork(item.table, item.workID))
                     def performed(result):
                         self._whenExecuted.callback(None)
                     @performed.addErrback
@@ -733,11 +729,11 @@
 
     @ivar queueDelayedProcessInterval: The amount of time between database
         pings, i.e. checks for over-due queue items that might have been
-        orphaned by a master process that died mid-transaction.  This is how
-        often the shared database should be pinged by I{all} nodes (i.e., all
-        master processes, or each instance of L{PeerConnectionPool}); each
-        individual node will ping commensurately less often as more nodes join
-        the database.
+        orphaned by a controller process that died mid-transaction.  This is
+        how often the shared database should be pinged by I{all} nodes (i.e.,
+        all controller processes, or each instance of L{PeerConnectionPool});
+        each individual node will ping commensurately less often as more nodes
+        join the database.
     @type queueDelayedProcessInterval: L{float} (in seconds)
 
     @ivar reactor: The reactor used for scheduling timed events.
@@ -780,6 +776,8 @@
         self.workerPool = WorkerConnectionPool()
         self.peers = []
         self.schema = schema
+        self._startingUp = None
+        self._listeningPortObject = None
         self._lastSeenTotalNodes = 1
         self._lastSeenNodeIndex = 1
 
@@ -801,11 +799,10 @@
     def choosePeer(self):
         """
         Choose a peer to distribute work to based on the current known slot
-        occupancy of the other masters.  Note that this will prefer
-        distributing work to local workers until the current node is full,
-        because that should be lower-latency.  Also, if no peers are available,
-        work will be submitted locally even if the worker pool is already
-        over-subscribed.
+        occupancy of the other nodes.  Note that this will prefer distributing
+        work to local workers until the current node is full, because that
+        should be lower-latency.  Also, if no peers are available, work will be
+        submitted locally even if the worker pool is already over-subscribed.
 
         @return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
             with the chosen 'peer', i.e. object with a C{performWork} method,
@@ -857,7 +854,7 @@
 
     def totalNumberOfNodes(self):
         """
-        How many nodes / master processes are there?
+        How many nodes are there, total?
 
         @return: the maximum number of other L{PeerConnectionPool} instances
             that may be connected to the database described by
@@ -887,9 +884,9 @@
     @inlineCallbacks
     def _periodicLostWorkCheck(self):
         """
-        Periodically, every master process has to check to make sure that work
-        hasn't been dropped on the floor.  In order to do that it queries each
-        work-item table.
+        Periodically, every node controller has to check to make sure that work
+        hasn't been dropped on the floor by someone.  In order to do that it
+        queries each work-item table.
         """
         txn = self.transactionFactory()
         try:
@@ -905,21 +902,32 @@
             yield txn.commit()
 
 
-    @inlineCallbacks
+    _currentWorkDeferred = None
+    _lostWorkCheckCall = None
+
     def _lostWorkCheckLoop(self):
         """
         While the service is running, keep checking for any overdue / lost work
-        items and re-submit them to the cluster for processing.
+        items and re-submit them to the cluster for processing.  Space out
+        those checks in time based on the size of the cluster.
         """
-        while self.running:
-            yield self._periodicLostWorkCheck()
+        self._lostWorkCheckCall = None
+        @passthru(self._periodicLostWorkCheck().addErrback(log.err)
+                  .addCallback)
+        def scheduleNext(result):
+            self._currentWorkDeferred = None
+            if not self.running:
+                return
             index = self.nodeIndex()
             now = self.reactor.seconds()
             interval = self.queueDelayedProcessInterval
             count = self.totalNumberOfNodes()
             when = (now - (now % interval)) + (interval * (count + index))
             delay = when - now
-            yield deferLater(self.reactor, delay, lambda : None)
+            self._lostWorkCheckCall = self.reactor.callLater(
+                delay, self._lostWorkCheckLoop
+            )
+        self._currentWorkDeferred = scheduleNext
 
 
     def startService(self):
@@ -927,14 +935,36 @@
         Register ourselves with the database and establish all outgoing
         connections to other servers in the cluster.
         """
-        self._doStart()
+        self._startingUp = self._doStart()
+        @self._startingUp.addBoth
+        def done(result):
+            self._startingUp = None
+            return result
 
 
+    @inlineCallbacks
+    def stopService(self):
+        """
+        Stop this service, terminating any incoming or outgoing connections.
+        """
+        yield super(PeerConnectionPool, self).stopService()
+        if self._startingUp is not None:
+            yield self._startingUp
+        if self._listeningPortObject is not None:
+            return self._listeningPortObject.stopListening()
+        if self._lostWorkCheckCall is not None:
+            self._lostWorkCheckCall.cancel()
+        if self._currentWorkDeferred is not None:
+            yield self._currentWorkDeferred
+        for peer in self.peers:
+            peer.transport.loseConnection()
+
+
     def activeNodes(self, txn):
         """
         Load information about all other nodes.
         """
-        return NodeInfo.query()
+        return NodeInfo.query(txn)
 
 
     @inlineCallbacks
@@ -945,43 +975,35 @@
         unique-to-the-whole-database identifier for this process instance.
         """
         txn = self.transactionFactory()
-        thisProcess = yield NodeInfo.create(
+        endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+        f = Factory()
+        f.buildProtocol = self.createPeerConnection
+        # If this fails, the failure mode is going to be ugly, just like all
+        # conflicted-port failures.  But, at least it won't proceed.
+        yield endpoint.listen(f)
+        # with (yield NodeInfo.lock()):
+        self.thisProcess = yield NodeInfo.create(
             txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
             time=datetime.datetime.now()
         )
+        for node in (yield self.activeNodes(txn)):
+            self._startConnectingTo(node)
 
-        """
-        It might be a good idea to update this periodicially in order to give
-        an indication that the process isn't dead.  On the other hand maybe
-        there's no concrete feature which actually requires this information.
-        """
-        lc = LoopingCall(thisProcess.updateCurrent, self.transactionFactory)
-        lc.start(30.0)
 
+    def _startConnectingTo(self, node):
         """
-        Now let's find all the other masters.
-        """
-        masters = self.activeNodes(txn)
-
-        """
-        Each other 'master' here is another L{NodeInfo} which tells us where
-        to connect.
-        """
-        for master in masters:
-            self._startConnectingTo(master)
-
-
-    def _startConnectingTo(self, master):
-        """
         Start an outgoing connection to another master process.
 
-        @param master: a description of the master to connect to.
-        @type master: L{NodeInfo}
+        @param node: a description of the master to connect to.
+        @type node: L{NodeInfo}
         """
         f = Factory()
         f.buildProtocol = self.createPeerConnection
-        master.endpoint().connect(f)
+        node.endpoint().connect(f)
 
 
     def createPeerConnection(self, addr):
         return ConnectionFromPeerNode(self)
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/050bab19/attachment-0001.html>


More information about the calendarserver-changes mailing list