[CalendarServer-changes] [9633] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:08 PDT 2012


Revision: 9633
          http://trac.macosforge.org/projects/calendarserver/changeset/9633
Author:   glyph at apple.com
Date:     2012-08-11 01:55:08 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
terminology clarification (Master/Node->consistently Node), add idle scheduling logic, add sketch

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
    CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/current.sql

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:07 UTC (rev 9632)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:08 UTC (rev 9633)
@@ -22,32 +22,33 @@
 from twext.enterprise.dal.record import fromTable
 from twisted.internet.defer import Deferred
 from twisted.internet.defer import passthru
+from twisted.internet.task import deferLater
 from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
 
 
-def makeMasterSchema(inSchema):
+def makeNodeSchema(inSchema):
     """
-    Create a self-contained schema for L{MasterInfo} to use.
+    Create a self-contained schema for L{NodeInfo} to use.
 
     @return: a schema with just the one table.
     """
     # Initializing this duplicate schema avoids a circular dependency, but this
     # should really be accomplished with independent schema objects that the
     # transaction is made aware of somehow.
-    masterTable = Table(inSchema, 'MASTER_INFO')
-    masterTable.addColumn("HOSTNAME", SQLType("varchar", 255))
-    masterTable.addColumn("PID", SQLType("integer", None))
-    masterTable.addColumn("PORT", SQLType("integer", None))
-    masterTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
+    NodeTable = Table(inSchema, 'NODE_INFO')
+    NodeTable.addColumn("HOSTNAME", SQLType("varchar", 255))
+    NodeTable.addColumn("PID", SQLType("integer", None))
+    NodeTable.addColumn("PORT", SQLType("integer", None))
+    NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
         # Note: in the real data structure, this is actually a not-cleaned-up
         # sqlparse internal data structure, but it *should* look closer to this.
         ProcedureCall("timezone", ["UTC", NamedValue('CURRENT_TIMESTAMP')])
     )
-    for column in masterTable.columns:
-        masterTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+    for column in NodeTable.columns:
+        NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
     return inSchema
 
-masterInfoSchema = SchemaSyntax(makeMasterSchema(Schema(__file__)))
+NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
 
 
 
@@ -84,15 +85,15 @@
 
 
 
-class MasterInfo(fromTable(masterInfoSchema.MASTER_INFO)):
+class NodeInfo(fromTable(NodeInfoSchema.NODE_INFO)):
     """
-    A L{MasterInfo} is information about a currently-active master process.
+    A L{NodeInfo} is information about a currently-active Node process.
     """
 
     def endpoint(self):
         """
-        Create an L{IStreamServerEndpoint} that will talk to the master process
-        that is described by this L{MasterInfo}.
+        Create an L{IStreamServerEndpoint} that will talk to the node process
+        that is described by this L{NodeInfo}.
 
         @return: an endpoint that will connect to this host.
         @rtype: L{IStreamServerEndpoint}
@@ -127,6 +128,10 @@
     @ivar workID: the unique identifier (primary key) for items of this type.
         There must be a corresponding column in the database.
     @type workID: L{int}
+
+    @cvar created: the timestamp that a given item was created, or the column
+        describing its creation time, on the class.
+    @type created: L{datetime.datetime}
     """
 
     @abstract
@@ -173,31 +178,29 @@
 
 
 
-class ConnectionFromPeerMaster(AMP):
+class ConnectionFromPeerNode(AMP):
     """
-    A connection to a peer master.  Symmetric; since the 'client' and the
-    'server' both serve the same role, the logic is the same in every master.
-
-    This only exists in the master.
+    A connection to a peer node.  Symmetric; since the 'client' and the
+    'server' both serve the same role, the logic is the same in every node.
     """
 
     def __init__(self, localWorkerPool, boxReceiver=None, locator=None):
         """
-        Initialize this L{ConnectionFromPeerMaster} with a reference to a pool
+        Initialize this L{ConnectionFromPeerNode} with a reference to a pool
         of local workers.
 
         @param localWorkerPool: the pool of local worker procesess that can
             process queue work.
         """
         self.localWorkerPool = localWorkerPool
-        super(ConnectionFromPeerMaster, self).__init__(boxReceiver, locator)
+        super(ConnectionFromPeerNode, self).__init__(boxReceiver, locator)
 
 
     def performWork(self, table, workID):
         """
         A L{local worker connection <ConnectionFromWorker>} is asking this
-        specific peer master process to perform some work, having already
-        determined that it's appropriate.
+        specific peer node-controller process to perform some work, having
+        already determined that it's appropriate.
 
         @param table: The table where work is waiting.
         @type table: L{TableSyntax}
@@ -216,8 +219,8 @@
     @PerformWork.responder
     def dispatchToWorker(self, table, workID):
         """
-        A remote peer master has asked this master to do some work; dispatch it
-        to a local worker.
+        A remote peer node has asked this node to do some work; dispatch it to
+        a local worker on this node.
         """
         return self.localWorkerPool.performWork(table, workID)
 
@@ -228,7 +231,7 @@
     A pool of L{ConnectionFromWorker}s.
 
     L{WorkerConnectionPool} also implements the same implicit protocol as a
-    L{ConnectionFromPeerMaster}, but one that dispenses work to the local
+    L{ConnectionFromPeerNode}, but one that dispenses work to the local
     worker processes rather than to a remote connection pool.
     """
 
@@ -322,18 +325,18 @@
         Dispatch work to this worker.
 
         @see: The responder for this should always be
-            L{ConnectionFromMaster.actuallyReallyExecuteWorkHere}.
+            L{ConnectionFromController.actuallyReallyExecuteWorkHere}.
         """
         return self.callRemote(PerformWork,
                                table=table.model.name, workID=workID)
 
 
 
-class ConnectionFromMaster(AMP):
+class ConnectionFromController(AMP):
     """
-    A L{ConnectionFromMaster} is the connection to a master process, in a
-    worker process.  It processes requests from its own master to do work.  It
-    is the opposite end of the connection from L{ConnectionFromWorker}.
+    A L{ConnectionFromController} is the connection to a node-controller process,
+    in a worker process.  It processes requests from its own master to do work.
+    It is the opposite end of the connection from L{ConnectionFromWorker}.
     """
 
     def __init__(self, schemaSyntax):
@@ -342,7 +345,7 @@
             contains (at least) all the tables that we may receive requests for
             work in.
         """
-        super(ConnectionFromMaster, self).__init__()
+        super(ConnectionFromController, self).__init__()
         self.schemaSyntax = schemaSyntax
 
 
@@ -376,25 +379,44 @@
 
 class PeerConnectionPool(Service, object):
     """
-    Each master has a L{PeerConnectionPool} connecting it to all the other
-    masters currently active on the same database.
+    Each node has a L{PeerConnectionPool} connecting it to all the other nodes
+    currently active on the same database.
 
-    @ivar hostName: The hostname of this master process, as reported by the
-        local host's configuration.  Possibly this should be obtained via
-        C{config.ServerHostName} instead of C{socket.getfqdn()}; although hosts
-        within a cluster may be configured with the same C{ServerHostName};
-        TODO need to confirm.
+    @ivar hostName: The hostname where this node process is running, as
+        reported by the local host's configuration.  Possibly this should be
+        obtained via C{config.ServerHostName} instead of C{socket.getfqdn()};
+        although hosts within a cluster may be configured with the same
+        C{ServerHostName}; TODO need to confirm.
 
-    @ivar thisProcess: a L{MasterInfo} representing this process, which is
+    @ivar thisProcess: a L{NodeInfo} representing this process, which is
         initialized when this L{PeerConnectionPool} service is started via
         C{startService}.  May be C{None} if this service is not fully started
         up or if it is shutting down.
+
+    @ivar queueProcessTimeout: The maximum amount of time allowed for a queue
+        item to be processed.  By default, 10 minutes.
+    @type queueProcessTimeout: L{float} (in seconds)
+
+    @ivar queueDelayedProcessInterval: The amount of time between database
+        pings, i.e. checks for over-due queue items that might have been
+        orphaned by a master process that died mid-transaction.  This is how
+        often the shared database should be pinged by I{all} nodes (i.e., all
+        master processes, or each instance of L{PeerConnectionPool}); each
+        individual node will ping commensurately less often as more nodes join
+        the database.
+    @type queueDelayedProcessInterval: L{float} (in seconds)
+
+    @ivar reactor: The reactor used for scheduling timed events.
+    @type reactor: L{IReactorTime} provider.
     """
 
     getfqdn = staticmethod(getfqdn)
     getpid = staticmethod(getpid)
 
-    def __init__(self, connectionFactory, ampPort):
+    queueProcessTimeout = (10.0 * 60.0)
+    queueDelayedProcessInterval = (60.0)
+
+    def __init__(self, reactor, connectionFactory, ampPort):
         """
         Initialize a L{PeerConnectionPool}.
 
@@ -406,6 +428,7 @@
         @param connectionFactory: a 0- or 1-argument callable that produces an
             L{IAsyncTransaction}
         """
+        self.reactor = reactor
         self.connectionFactory = connectionFactory
         self.hostName = self.getfqdn()
         self.pid = self.getpid()
@@ -419,12 +442,12 @@
         occupancy of the other masters.
 
         @return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
-            with the chosen L{ConnectionFromPeerMaster} as soon as one is
+            with the chosen L{ConnectionFromPeerNode} as soon as one is
             available.  Normally this will be synchronous, but we need to
             account for the possibility that we may need to connect to other
             hosts.
         @rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
-            L{ConnectionFromPeerMaster}
+            L{ConnectionFromPeerNode}
         """
 
 
@@ -465,7 +488,7 @@
             @inlineCallbacks
             def whenDone():
                 peer = yield self.choosePeer()
-                peer.performWork(workItemType.__tbl__)
+                peer.performWork(result.__tbl__, result.workID)
                 d.callback(None)
             @txn.postAbort
             def whenFailed():
@@ -474,6 +497,80 @@
         return created
 
 
+    def allWorkItemTypes(self):
+        """
+        Load all the L{WorkItem} types that this node can process and return
+        them.
+
+        @return: L{list} of L{type}
+        """
+        # TODO: For completeness, this may need to involve a plugin query to
+        # make sure that all WorkItem subclasses are imported first.
+        return WorkItem.__subclasses__()
+
+
+    def totalNumberOfNodes(self):
+        """
+        How many nodes / master processes are there?
+
+        @return: the number of other L{PeerConnectionPool} instances that are
+            connected to the database described by C{self.connectionFactory}.
+        @rtype: L{int}
+        """
+        # TODO
+
+
+    def nodeIndex(self):
+        """
+        What ordinal does this node, i.e. this instance of
+        L{PeerConnectionPool}, occupy within the ordered set of all nodes
+        connected to the database described by C{self.connectionFactory}?
+
+        @return: the index of this node within the total collection.  For
+            example, if this L{PeerConnectionPool} is 6 out of 30, this method
+            will return C{6}.
+        @rtype: L{int}
+        """
+        # TODO
+
+
+    @inlineCallbacks
+    def _periodicLostWorkCheck(self):
+        """
+        Periodically, every master process has to check to make sure that work
+        hasn't been dropped on the floor.  In order to do that it queries each
+        work-item table.
+        """
+        txn = self.connectionFactory()
+        try:
+            for itemType in self.allWorkItemTypes():
+                for overdueItem in (
+                        yield itemType.query(
+                            txn, itemType.created > self.queueProcessTimeout
+                    )):
+                    peer = yield self.choosePeer()
+                    yield peer.performWork(overdueItem.__tbl__, overdueItem.workID)
+        finally:
+            yield txn.commit()
+
+
+    @inlineCallbacks
+    def _lostWorkCheckLoop(self):
+        """
+        While the service is running, keep checking for any overdue / lost work
+        items and re-submit them to the cluster for processing.
+        """
+        while self.running:
+            yield self._periodicLostWorkCheck()
+            index = self.nodeIndex()
+            now = self.reactor.seconds()
+            interval = self.queueDelayedProcessInterval
+            count = self.totalNumberOfNodes()
+            when = (now - (now % interval)) + (interval * (count + index))
+            delay = when - now
+            yield deferLater(self.reactor, delay, lambda : None)
+
+
     def startService(self):
         """
         Register ourselves with the database and establish all outgoing
@@ -482,23 +579,30 @@
         self._doStart()
 
 
+    def activeNodes(self, txn):
+        """
+        Load information about all other nodes.
+        """
+        return NodeInfo.query()
+
+
     @inlineCallbacks
     def _doStart(self):
         """
-        First, we tell the database that we're an active master so that other
-        masters know about us.  This should also give us a
+        First, we tell the database that we're an active node so that other
+        nodes know about us.  This should also give us a
         unique-to-the-whole-database identifier for this process instance.
         """
         txn = self.connectionFactory()
-        thisProcess = yield MasterInfo.create(
+        thisProcess = yield NodeInfo.create(
             txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
             time=datetime.datetime.now()
         )
 
         """
-        It might be a good idea to update this periodicially in order to give an
-        indication that the process isn't dead.  On the other hand maybe there's no
-        concrete feature which actually requires this information.
+        It might be a good idea to update this periodicially in order to give
+        an indication that the process isn't dead.  On the other hand maybe
+        there's no concrete feature which actually requires this information.
         """
         lc = LoopingCall(thisProcess.updateCurrent, self.connectionFactory)
         lc.start(30.0)
@@ -506,14 +610,14 @@
         """
         Now let's find all the other masters.
         """
-        masters = self.activeMasters()
+        masters = self.activeNodes(txn)
 
         """
-        Each other 'master' here is another L{MasterInfo} which tells us where
+        Each other 'master' here is another L{NodeInfo} which tells us where
         to connect.
         """
         f = Factory()
-        f.protocol = ConnectionFromPeerMaster
+        f.protocol = ConnectionFromPeerNode
         for master in masters:
             self._startConnectingTo(master)
 
@@ -523,9 +627,32 @@
         Start an outgoing connection to another master process.
 
         @param master: a description of the master to connect to.
-        @type master: L{MasterInfo}
+        @type master: L{NodeInfo}
         """
         f = Factory()
         master.endpoint().connect(f)
 
+def sketch():
+    """
+    Example demonstrating how an application would normally talk to the queue.
+    """
+    # XXX in real life, MyWorkItem would also need to inherit from
+    # fromTable(...), would need to be declared at the top level...
+    class MyWorkItem(WorkItem):
+        @inlineCallbacks
+        def doWork(self):
+            txn = self.__txn__
+            yield self.doSomethingDeferred(txn)
+            returnValue(None)
 
+
+    @inlineCallbacks
+    def sampleFunction(txn):
+        for x in range(10):
+            # Note: no yield here.  Yielding for this to be completed would
+            # generate unnecessary lock contention, potentially a deadlock (we
+            # just created the item in this transaction, the next transaction is
+            # going to want to delete it).
+            txn.enqueueWork(MyWorkItem, someInt=x, someUnicode=u'4321')
+        yield txn.commit()
+

Modified: CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/current.sql	2012-08-11 08:55:07 UTC (rev 9632)
+++ CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/current.sql	2012-08-11 08:55:08 UTC (rev 9633)
@@ -28,8 +28,8 @@
 
 -- Information about a process connected to this database.
 
--- Note that this must match the master info schema in twext.enterprise.queue.
-create table MASTER_INFO (
+-- Note that this must match the node info schema in twext.enterprise.queue.
+create table NODE_INFO (
   HOSTNAME  varchar(255) not null,
   PID       integer not null,
   PORT      integer not null,
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/2a0da93f/attachment-0001.html>


More information about the calendarserver-changes mailing list