[CalendarServer-changes] [9681] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:49 PDT 2012
Revision: 9681
http://trac.macosforge.org/projects/calendarserver/changeset/9681
Author: glyph at apple.com
Date: 2012-08-11 01:55:49 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
manage transactions more consistently and carefully
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:48 UTC (rev 9680)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:49 UTC (rev 9681)
@@ -121,12 +121,43 @@
)
for column in NodeTable.columns:
NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+ NodeTable.primaryKey = [NodeTable.columnNamed("HOSTNAME"),
+ NodeTable.columnNamed("PORT")]
return inSchema
NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
+ at inlineCallbacks
+def inTransaction(transactionCreator, operation):
+ """
+ Perform the given operation in a transaction, committing or aborting as
+ required.
+ @param transactionCreator: a 0-arg callable that returns an
+ L{IAsyncTransaction}
+
+ @param operation: a 1-arg callable that takes an L{IAsyncTransaction} and
+ returns a value.
+
+ @return: a L{Deferred} that fires with C{operation}'s result or fails with
+ its error, unless there is an error creating, aborting or committing
+ the transaction.
+ """
+ txn = transactionCreator()
+ try:
+ result = yield operation(txn)
+ except:
+ f = Failure()
+ yield txn.abort()
+ returnValue(f)
+ else:
+ yield txn.commit()
+ returnValue(result)
+
+
+
+
class TableSyntaxByName(Argument):
"""
Serialize and deserialize L{TableSyntax} objects for an AMP protocol with
@@ -165,7 +196,7 @@
A L{NodeInfo} is information about a currently-active Node process.
"""
- def endpoint(self):
+ def endpoint(self, reactor):
"""
Create an L{IStreamServerEndpoint} that will talk to the node process
that is described by this L{NodeInfo}.
@@ -173,7 +204,7 @@
@return: an endpoint that will connect to this host.
@rtype: L{IStreamServerEndpoint}
"""
- return TCP4ClientEndpoint(self.hostname, self.port)
+ return TCP4ClientEndpoint(reactor, self.hostname, self.port)
@@ -617,16 +648,15 @@
@PerformWork.responder
- @inlineCallbacks
def actuallyReallyExecuteWorkHere(self, table, workID):
"""
This is where it's time to actually do the work. The controller
process has instructed this worker to do it; so, look up the data in
the row, and do it.
"""
- workItemClass = WorkItem.forTable(table)
- txn = self.transactionFactory()
- try:
+ @inlineCallbacks
+ def work(txn):
+ workItemClass = WorkItem.forTable(table)
workItem = yield workItemClass.load(txn, workID)
# TODO: what if we fail? error-handling should be recorded
# someplace, the row should probably be marked, re-tries should be
@@ -634,13 +664,8 @@
yield workItem.delete()
# TODO: verify that workID is the primary key someplace.
yield workItem.doWork()
- except:
- f = Failure()
- yield txn.abort()
- returnValue(f)
- else:
- yield txn.commit()
returnValue({})
+ return inTransaction(self.transactionFactory, work)
@@ -983,15 +1008,14 @@
return self._lastSeenNodeIndex
- @inlineCallbacks
def _periodicLostWorkCheck(self):
"""
Periodically, every node controller has to check to make sure that work
hasn't been dropped on the floor by someone. In order to do that it
queries each work-item table.
"""
- txn = self.transactionFactory()
- try:
+ @inlineCallbacks
+ def workCheck(txn):
for itemType in self.allWorkItemTypes():
for overdueItem in (
yield itemType.query(
@@ -1000,8 +1024,7 @@
peer = yield self.choosePerformer()
yield peer.performWork(overdueItem.table,
overdueItem.workID)
- finally:
- yield txn.commit()
+ return inTransaction(self.transactionFactory, workCheck)
_currentWorkDeferred = None
@@ -1037,7 +1060,22 @@
Register ourselves with the database and establish all outgoing
connections to other servers in the cluster.
"""
- self._startingUp = self._doStart()
+ @inlineCallbacks
+ def startup(txn):
+ endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+ f = Factory()
+ f.buildProtocol = self.createPeerConnection
+ # If this fails, the failure mode is going to be ugly, just like all
+ # conflicted-port failures. But, at least it won't proceed.
+ yield endpoint.listen(f)
+ # with (yield NodeInfo.lock()):
+ self.thisProcess = yield NodeInfo.create(
+ txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
+ time=datetime.now()
+ )
+ for node in (yield self.activeNodes(txn)):
+ self._startConnectingTo(node)
+ self._startingUp = inTransaction(self.transactionFactory, startup)
@self._startingUp.addBoth
def done(result):
self._startingUp = None
@@ -1069,29 +1107,6 @@
return NodeInfo.all(txn)
- @inlineCallbacks
- def _doStart(self):
- """
- First, we tell the database that we're an active node so that other
- nodes know about us. This should also give us a
- unique-to-the-whole-database identifier for this process instance.
- """
- txn = self.transactionFactory()
- endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
- f = Factory()
- f.buildProtocol = self.createPeerConnection
- # If this fails, the failure mode is going to be ugly, just like all
- # conflicted-port failures. But, at least it won't proceed.
- yield endpoint.listen(f)
- # with (yield NodeInfo.lock()):
- self.thisProcess = yield NodeInfo.create(
- txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
- time=datetime.now()
- )
- for node in (yield self.activeNodes(txn)):
- self._startConnectingTo(node)
-
-
def mapPeer(self, host, port, peer):
"""
A peer has been identified as belonging to the given host/port
@@ -1113,7 +1128,7 @@
"""
f = Factory()
f.buildProtocol = self.createPeerConnection
- @passthru(node.endpoint().connect(f).addCallback)
+ @passthru(node.endpoint(self.reactor).connect(f).addCallback)
def connected(proto):
self.mapPeer(node, proto)
proto.callRemote(IdentifyNode, self.thisProcess)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/4f0d8cda/attachment-0001.html>
More information about the calendarserver-changes
mailing list