[CalendarServer-changes] [9638] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:12 PDT 2012


Revision: 9638
          http://trac.macosforge.org/projects/calendarserver/changeset/9638
Author:   glyph at apple.com
Date:     2012-08-11 01:55:12 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
more required data

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:11 UTC (rev 9637)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:12 UTC (rev 9638)
@@ -12,6 +12,7 @@
 from twisted.application.service import Service
 from twisted.internet.protocol import Factory
 from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from twisted.internet.defer import succeed
 from twisted.internet.task import deferLater
 from twisted.internet.endpoints import TCP4ClientEndpoint
 from twisted.protocols.amp import AMP, Command, Integer, Argument
@@ -191,13 +192,13 @@
 
 
 
-class ConnectionFromPeerNode(AMP):
+class ConnectionFromPeerNode(SchemaAMP):
     """
     A connection to a peer node.  Symmetric; since the 'client' and the
     'server' both serve the same role, the logic is the same in every node.
     """
 
-    def __init__(self, localWorkerPool, boxReceiver=None, locator=None):
+    def __init__(self, schema, localWorkerPool, boxReceiver=None, locator=None):
         """
         Initialize this L{ConnectionFromPeerNode} with a reference to a pool of
         local workers.
@@ -209,7 +210,7 @@
         @see: L{AMP.__init__}
         """
         self.localWorkerPool = localWorkerPool
-        super(ConnectionFromPeerNode, self).__init__(boxReceiver, locator)
+        super(ConnectionFromPeerNode, self).__init__(schema, boxReceiver, locator)
 
 
     def currentLoadEstimate(self):
@@ -409,13 +410,12 @@
         and do it.
         """
         workItemClass = WorkItem.forTable(table)
+        # TODO: get a transaction in here.
+        workItem = yield workItemClass.load(workID)
         # TODO: what if we fail?  error-handling should be recorded someplace,
-        # the row should probably be removed, re-tries should be triggerable.
-        # Note: deletion must happen first.
+        # the row should probably be marked, re-tries should be triggerable
+        # administratively.
         yield workItem.delete()
-        # TODO: mumble locking something mumble
-        # TODO: get a transaction in here.
-        workItem = yield workItemClass.load(workID)
         # TODO: verify that workID is the primary key someplace.
         yield workItem.doWork()
         returnValue({})
@@ -595,7 +595,7 @@
     queueProcessTimeout = (10.0 * 60.0)
     queueDelayedProcessInterval = (60.0)
 
-    def __init__(self, reactor, connectionFactory, ampPort):
+    def __init__(self, reactor, connectionFactory, ampPort, schema):
         """
         Initialize a L{PeerConnectionPool}.
 
@@ -608,6 +608,10 @@
 
         @param connectionFactory: a 0- or 1-argument callable that produces an
             L{IAsyncTransaction}
+
+        @param schema: The schema which contains all the tables associated with
+            the L{WorkItem}s that this L{PeerConnectionPool} will process.
+        @type schema: L{Schema}
         """
         self.reactor = reactor
         self.connectionFactory = connectionFactory
@@ -617,6 +621,7 @@
         self.thisProcess = None
         self.workerPool = WorkerConnectionPool()
         self.peers = []
+        self.schema = schema
 
 
     def choosePeer(self):
@@ -784,8 +789,6 @@
         Each other 'master' here is another L{NodeInfo} which tells us where
         to connect.
         """
-        f = Factory()
-        f.protocol = ConnectionFromPeerNode
         for master in masters:
             self._startConnectingTo(master)
 
@@ -798,10 +801,16 @@
         @type master: L{NodeInfo}
         """
         f = Factory()
+        f.buildProtocol = self.createPeerConnection
         master.endpoint().connect(f)
 
 
+    def createPeerConnection(self, addr):
+        # TODO: add to peer list, remove from peer list
+        return ConnectionFromPeerNode(self.schema, self.workerPool)
 
+
+
 def sketch():
     """
     Example demonstrating how an application would normally talk to the queue.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/46b3a5dc/attachment.html>


More information about the calendarserver-changes mailing list