[CalendarServer-changes] [9625] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:02 PDT 2012


Revision: 9625
          http://trac.macosforge.org/projects/calendarserver/changeset/9625
Author:   glyph at apple.com
Date:     2012-08-11 01:55:01 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
lots more detail

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:01 UTC (rev 9624)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:01 UTC (rev 9625)
@@ -3,23 +3,78 @@
 This is the logic associated with queueing.
 """
 
+from socket import getfqdn
+from functools import wraps
+from os import getpid
+from datetime import datetime
+
 from twisted.internet.task import LoopingCall
 from twisted.application.service import Service
 from twisted.internet.protocol import Factory
-from twisted.internet.defer import inlineCallbacks#, Deferred
+from twisted.internet.defer import inlineCallbacks, returnValue
 from twisted.internet.endpoints import TCP4ClientEndpoint
-from twisted.protocols.amp import AMP, Command, Integer, String
+from twisted.protocols.amp import AMP, Command, Integer, Argument
 from twisted.python.reflect import qual
 
-from socket import getfqdn
-from functools import wraps
+from twext.enterprise.dal.syntax import TableSyntax, SchemaSyntax
+from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
 
-from datetime import datetime
-from twisted.internet.defer import returnValue
-from os import getpid
 
+def makeMasterSchema(inSchema):
+    """
+    Create a self-contained schema for L{MasterInfo} to use.
 
+    @return: a schema with just the one table.
+    """
+    # Initializing this duplicate schema avoids a circular dependency, but this
+    # should really be accomplished with independent schema objects that the
+    # transaction is made aware of somehow.
+    masterTable = Table(inSchema, 'MASTER_INFO')
+    masterTable.addColumn("HOSTNAME", SQLType("varchar", 255))
+    masterTable.addColumn("PID", SQLType("integer", None))
+    masterTable.addColumn("PORT", SQLType("integer", None))
+    masterTable.addColumn("TIME", SQLType("timestamp", None))
+    for column in masterTable.columns:
+        masterTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+    return inSchema
 
+masterInfoSchema = SchemaSyntax(makeMasterSchema(Schema(__file__)))
+
+
+
+class TableSyntaxByName(Argument):
+    """
+    Serialize and deserialize L{TableSyntax} objects for an AMP protocol with
+    an attached schema.
+    """
+
+    def fromStringProto(self, inString, proto):
+        """
+        Convert the name of the table into a table, given a C{proto} with an
+        attached C{schema}.
+
+        @param inString: the name of a table, as utf-8 encoded bytes
+        @type inString: L{bytes}
+
+        @param proto: an L{SchemaAMP}
+        """
+        return TableSyntax(proto.schema.tableNamed(inString.decode("UTF-8")))
+
+
+    def toString(self, inObject):
+        """
+        Convert a L{TableSyntax} object into just its name for wire transport.
+
+        @param inObject: a table.
+        @type inObject: L{TableSyntax}
+
+        @return: the name of that table
+        @rtype: L{bytes}
+        """
+        return inObject.model.name.encode("UTF-8")
+
+
+
 class MasterInfo(object):
     """
     A L{MasterInfo} is information about a currently-active master process.
@@ -98,42 +153,174 @@
     """
 
     arguments = [
-        ("table", String()),
+        ("table", TableSyntaxByName()),
         ("workID", Integer()),
     ]
     response = []
 
 
 
-class PeerConnection(AMP):
+class ConnectionFromPeerMaster(AMP):
     """
     A connection to a peer master.  Symmetric; since the 'client' and the
     'server' both serve the same role, the logic is the same in every master.
+
+    This only exists in the master.
     """
+
+    def __init__(self, localWorkerPool, boxReceiver=None, locator=None):
+        """
+        Initialize this L{ConnectionFromPeerMaster} with a reference to a pool
+        of local workers.
+
+        @param localWorkerPool: the pool of local worker procesess that can
+            process queue work.
+        """
+        self.localWorkerPool = localWorkerPool
+        super(ConnectionFromPeerMaster, self).__init__(boxReceiver, locator)
+
+
     def performWork(self, table, workID):
+        """
+        A L{local worker connection <ConnectionFromWorker>} is asking this
+        specific peer master process to perform some work, having already
+        determined that it's appropriate.
+
+        @param table: The table where work is waiting.
+        @type table: L{TableSyntax}
+
+        @param workID: The primary key identifier of the given work.
+        @type workID: L{int}
+
+        @return: a L{Deferred} firing with an empty dictionary when the work is
+            complete.
+        @rtype: L{Deferred} firing L{dict}
+        """
         return self.callRemote(PerformWork,
                                table=table.model.name, workID=workID)
 
 
+    @PerformWork.responder
+    def dispatchToWorker(self, table, workID):
+        """
+        A remote peer master has asked this master to do some work; dispatch it
+        to a local worker.
+        """
+        return self.localWorkerPool.performWork(table, workID)
 
-class LocalConnection(object):
+
+
+class WorkerConnectionPool(object):
     """
-    Implements the same implicit protocol as a L{PeerConnection}, but one that
-    dispenses work to the local worker processes rather than to a remote
-    connection pool.
+    A pool of L{ConnectionFromWorker}s.
+
+    L{WorkerConnectionPool} also implements the same implicit protocol as a
+    L{ConnectionFromPeerMaster}, but one that dispenses work to the local
+    worker processes rather than to a remote connection pool.
     """
 
+    def __init__(self):
+        self.workers = []
+
+
+    def addWorker(self, worker):
+        self.workers.append(worker)
+
+
+    def removeWorker(self, worker):
+        self.workers.remove(worker)
+
+
+    def _selectLowestLoadLocalConnection(self):
+        """
+        Select the local connection with the lowest current load, or C{None} if
+        all workers are too busy.
+
+        @return: a worker connection with the lowest current load.
+        @rtype: L{ConnectionFromWorker} or L{NoneType}
+        """
+        return sorted(self.workers[:], key=lambda w: w.currentLoad())[0]
+
+
     def performWork(self, table, workID):
         """
-        Look up a local worker and delegate .performWork to it.
+        Select a local worker that is idle enough to perform the given work,
+        then ask them to perform it.
+
+        @param table: The table where work is waiting.
+        @type table: L{TableSyntax}
+
+        @param workID: The primary key identifier of the given work.
+        @type workID: L{int}
+
+        @return: a L{Deferred} firing with an empty dictionary when the work is
+            complete.
+        @rtype: L{Deferred} firing L{dict}
         """
+        return self._selectLowestLoadLocalConnection().performWork(table, workID)
 
 
 
+class ConnectionFromWorker(AMP):
+    """
+    An individual connection from a worker, as seem from the master's
+    perspective.  L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
+
+    @ivar workerPool: The connection pool that this individual connection is
+        participating in.
+    @type workerPool: L{WorkerConnectionPool}
+    """
+
+    def __init__(self, workerPool):
+        self.workerPool = workerPool
+        super(ConnectionFromWorker, self).__init__()
+
+
+    @property
+    def currentLoad(self):
+        """
+        What is the current load of this worker?
+        """
+        # TODO: this needs to be hooked up to something.
+        return 0
+
+
+    def startReceivingBoxes(self, sender):
+        """
+        Start receiving AMP boxes from the peer.  Initialize all necessary
+        state.
+        """
+        result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
+        self.workerPool.addWorker(self)
+        return result
+
+
+    def stopReceivingBoxes(self, reason):
+        """
+        AMP boxes will no longer be received.
+        """
+        result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
+        self.workerPool.removeWorker(self)
+        return result
+
+
+    def performWork(self, table, workID):
+        """
+        Dispatch work to this worker.
+
+        @see: The responder for this should always be
+            L{ConnectionFromMaster.actuallyReallyExecuteWorkHere}.
+        """
+        return self.callRemote(PerformWork,
+                               table=table.model.name, workID=workID)
+
+
+
 class ConnectionFromMaster(AMP):
     """
-    This is in a worker process.  It processes requests from its own master to
-    do work.
+    A L{ConnectionFromMaster} is the connection to a master process, in a
+    worker process.  It processes requests from its own master to do work.  It
+    is the opposite end of the connection from L{ConnectionFromWorker}.
     """
 
     def __init__(self, schemaSyntax):
@@ -148,23 +335,26 @@
 
     @PerformWork.responder
     @inlineCallbacks
-    def perform(self, table, workID):
+    def actuallyReallyExecuteWorkHere(self, table, workID):
         """
         This is where it's time to actually do the work.  The master process
         has instructed this worker to do it; so, look up the data in the row,
         and do it.
         """
-        tableSyntax = getattr(self.schemaSyntax, table)
-        workItemClass = WorkItem.forTable(tableSyntax)
+        workItemClass = WorkItem.forTable(table)
         # TODO: mumble locking something mumble
+        # TODO: get a transaction in here.
         workItem = yield workItemClass.load(workID)
         # TODO: verify that workID is the primary key someplace.
         yield workItem.doWork()
+        # TODO: what if we fail?  error-handling should be recorded someplace,
+        # the row should probably be removed, re-tries should be triggerable.
+        yield workItem.delete()
         returnValue({})
 
 
 
-class PeerConnectionPool(Service):
+class PeerConnectionPool(Service, object):
     """
     Each master has a L{PeerConnectionPool} connecting it to all the other
     masters currently active on the same database.
@@ -181,6 +371,9 @@
         up or if it is shutting down.
     """
 
+    getfqdn = staticmethod(getfqdn)
+    getpid = staticmethod(getpid)
+
     def __init__(self, connectionFactory, ampPort):
         """
         Initialize a L{PeerConnectionPool}.
@@ -194,7 +387,8 @@
             L{IAsyncTransaction}
         """
         self.connectionFactory = connectionFactory
-        self.hostName = getfqdn()
+        self.hostName = self.getfqdn()
+        self.pid = self.getpid()
         self.ampPort = ampPort
         self.thisProcess = None
 
@@ -204,21 +398,41 @@
         Choose a peer to distribute work to based on the current known slot
         occupancy of the other masters.
 
-        @return: a Deferred which fires with the chosen L{PeerConnection} as
-            soon as one is available.  Normally this will be synchronous, but
-            we need to account for the possibility that we may need to connect
-            to other hosts.
-        @rtype: L{twisted.internet.defer.Deferred} firing L{PeerConnection}
+        @return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
+            with the chosen L{ConnectionFromPeerMaster} as soon as one is
+            available.  Normally this will be synchronous, but we need to
+            account for the possibility that we may need to connect to other
+            hosts.
+        @rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
+            L{ConnectionFromPeerMaster}
         """
 
 
     def enqueueWork(self, workItem):
         """
-        There is some work to do.  Do it, someplace.
+        There is some work to do.  Do it, someplace else, ideally in parallel.
+        Later, let the caller know that the work has been completed by firing a
+        L{Deferred}.
 
+        @note: The L{Deferred} returned by C{enqueueWork} should be used with
+            caution.  If an application decides to do any database-persistent
+            work as a result of this L{Deferred} firing, that work I{may be
+            lost} as a result of a service being normally shut down between the
+            time that the work is scheduled and the time that it is executed.
+            So, the only things that should be added as callbacks to this
+            L{Deferred} are those which are ephemeral, in memory, and reflect
+            only presentation state associated with the user's perception of
+            the completion of work, not logical chains of work which need to be
+            completed in sequence; those should all be completed within the
+            transaction of the L{WorkItem.doWork} that gets executed.
+
+        @param workItem: An item of work to be done in another process.
         @type workItem: A L{WorkItem}
+
+        @return: a L{Deferred} that fires when the work has been completed.
+        @rtype: L{Deferred} firing L{None}
         """
-        @workItem.transaction.postCommit
+        @workItem.__txn__.postCommit
         @inlineCallbacks
         def whenDone():
             peer = yield self.choosePeer()
@@ -231,13 +445,20 @@
         connections to other servers in the cluster.
         """
 
+        self._doStart()
+        # Is there any need for a callback?
+
+
+    @inlineCallbacks
+    def _doStart(self):
         """
         First, we tell the database that we're an active master so that other
         masters know about us.  This should also give us a
         unique-to-the-whole-database identifier for this process instance.
         """
-        thisProcess = MasterInfo.create(
-            host=self.hostName, pid=getpid(), port=self.ampPort,
+        txn = self.connectionFactory()
+        thisProcess = yield MasterInfo.create(
+            txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
             time=datetime.datetime.now()
         )
 
@@ -259,7 +480,7 @@
         to connect.
         """
         f = Factory()
-        f.protocol = PeerConnection
+        f.protocol = ConnectionFromPeerMaster
         for master in masters:
             self._startConnectingTo(master)
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/4196d270/attachment-0001.html>


More information about the calendarserver-changes mailing list