[CalendarServer-changes] [9681] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:49 PDT 2012


Revision: 9681
          http://trac.macosforge.org/projects/calendarserver/changeset/9681
Author:   glyph at apple.com
Date:     2012-08-11 01:55:49 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
manage transactions more consistently and carefully

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:48 UTC (rev 9680)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:49 UTC (rev 9681)
@@ -121,12 +121,43 @@
     )
     for column in NodeTable.columns:
         NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+    NodeTable.primaryKey = [NodeTable.columnNamed("HOSTNAME"),
+                            NodeTable.columnNamed("PORT")]
     return inSchema
 
 NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
 
 
+ at inlineCallbacks
+def inTransaction(transactionCreator, operation):
+    """
+    Perform the given operation in a transaction, committing or aborting as
+    required.
 
+    @param transactionCreator: a 0-arg callable that returns an
+        L{IAsyncTransaction}
+
+    @param operation: a 1-arg callable that takes an L{IAsyncTransaction} and
+        returns a value.
+
+    @return: a L{Deferred} that fires with C{operation}'s result or fails with
+        its error, unless there is an error creating, aborting or committing
+        the transaction.
+    """
+    txn = transactionCreator()
+    try:
+        result = yield operation(txn)
+    except:
+        f = Failure()
+        yield txn.abort()
+        returnValue(f)
+    else:
+        yield txn.commit()
+        returnValue(result)
+
+
+
+
 class TableSyntaxByName(Argument):
     """
     Serialize and deserialize L{TableSyntax} objects for an AMP protocol with
@@ -165,7 +196,7 @@
     A L{NodeInfo} is information about a currently-active Node process.
     """
 
-    def endpoint(self):
+    def endpoint(self, reactor):
         """
         Create an L{IStreamServerEndpoint} that will talk to the node process
         that is described by this L{NodeInfo}.
@@ -173,7 +204,7 @@
         @return: an endpoint that will connect to this host.
         @rtype: L{IStreamServerEndpoint}
         """
-        return TCP4ClientEndpoint(self.hostname, self.port)
+        return TCP4ClientEndpoint(reactor, self.hostname, self.port)
 
 
 
@@ -617,16 +648,15 @@
 
 
     @PerformWork.responder
-    @inlineCallbacks
     def actuallyReallyExecuteWorkHere(self, table, workID):
         """
         This is where it's time to actually do the work.  The controller
         process has instructed this worker to do it; so, look up the data in
         the row, and do it.
         """
-        workItemClass = WorkItem.forTable(table)
-        txn = self.transactionFactory()
-        try:
+        @inlineCallbacks
+        def work(txn):
+            workItemClass = WorkItem.forTable(table)
             workItem = yield workItemClass.load(txn, workID)
             # TODO: what if we fail?  error-handling should be recorded
             # someplace, the row should probably be marked, re-tries should be
@@ -634,13 +664,8 @@
             yield workItem.delete()
             # TODO: verify that workID is the primary key someplace.
             yield workItem.doWork()
-        except:
-            f = Failure()
-            yield txn.abort()
-            returnValue(f)
-        else:
-            yield txn.commit()
             returnValue({})
+        return inTransaction(self.transactionFactory, work)
 
 
 
@@ -983,15 +1008,14 @@
         return self._lastSeenNodeIndex
 
 
-    @inlineCallbacks
     def _periodicLostWorkCheck(self):
         """
         Periodically, every node controller has to check to make sure that work
         hasn't been dropped on the floor by someone.  In order to do that it
         queries each work-item table.
         """
-        txn = self.transactionFactory()
-        try:
+        @inlineCallbacks
+        def workCheck(txn):
             for itemType in self.allWorkItemTypes():
                 for overdueItem in (
                         yield itemType.query(
@@ -1000,8 +1024,7 @@
                     peer = yield self.choosePerformer()
                     yield peer.performWork(overdueItem.table,
                                            overdueItem.workID)
-        finally:
-            yield txn.commit()
+        return inTransaction(self.transactionFactory, workCheck)
 
 
     _currentWorkDeferred = None
@@ -1037,7 +1060,22 @@
         Register ourselves with the database and establish all outgoing
         connections to other servers in the cluster.
         """
-        self._startingUp = self._doStart()
+        @inlineCallbacks
+        def startup(txn):
+            endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+            f = Factory()
+            f.buildProtocol = self.createPeerConnection
+            # If this fails, the failure mode is going to be ugly, just like all
+            # conflicted-port failures.  But, at least it won't proceed.
+            yield endpoint.listen(f)
+            # with (yield NodeInfo.lock()):
+            self.thisProcess = yield NodeInfo.create(
+                txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
+                time=datetime.now()
+            )
+            for node in (yield self.activeNodes(txn)):
+                self._startConnectingTo(node)
+        self._startingUp = inTransaction(self.transactionFactory, startup)
         @self._startingUp.addBoth
         def done(result):
             self._startingUp = None
@@ -1069,29 +1107,6 @@
         return NodeInfo.all(txn)
 
 
-    @inlineCallbacks
-    def _doStart(self):
-        """
-        First, we tell the database that we're an active node so that other
-        nodes know about us.  This should also give us a
-        unique-to-the-whole-database identifier for this process instance.
-        """
-        txn = self.transactionFactory()
-        endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
-        f = Factory()
-        f.buildProtocol = self.createPeerConnection
-        # If this fails, the failure mode is going to be ugly, just like all
-        # conflicted-port failures.  But, at least it won't proceed.
-        yield endpoint.listen(f)
-        # with (yield NodeInfo.lock()):
-        self.thisProcess = yield NodeInfo.create(
-            txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
-            time=datetime.now()
-        )
-        for node in (yield self.activeNodes(txn)):
-            self._startConnectingTo(node)
-
-
     def mapPeer(self, host, port, peer):
         """
         A peer has been identified as belonging to the given host/port
@@ -1113,7 +1128,7 @@
         """
         f = Factory()
         f.buildProtocol = self.createPeerConnection
-        @passthru(node.endpoint().connect(f).addCallback)
+        @passthru(node.endpoint(self.reactor).connect(f).addCallback)
         def connected(proto):
             self.mapPeer(node, proto)
             proto.callRemote(IdentifyNode, self.thisProcess)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/4f0d8cda/attachment-0001.html>


More information about the calendarserver-changes mailing list