[CalendarServer-changes] [9640] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:13 PDT 2012
Revision: 9640
http://trac.macosforge.org/projects/calendarserver/changeset/9640
Author: glyph at apple.com
Date: 2012-08-11 01:55:13 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
don't call transactions connections, and fix up the real work-executor to be more realistic
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:12 UTC (rev 9639)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:13 UTC (rev 9640)
@@ -22,6 +22,7 @@
from twext.enterprise.dal.model import ProcedureCall
from twext.enterprise.dal.syntax import NamedValue
from twext.enterprise.dal.record import fromTable
+from twisted.python.failure import Failure
from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
@@ -396,9 +397,11 @@
It is the opposite end of the connection from L{ConnectionFromWorker}.
"""
- def __init__(self, schema, boxReceiver=None, locator=None):
+ def __init__(self, transactionFactory, schema,
+ boxReceiver=None, locator=None):
super(ConnectionFromController, self).__init__(schema,
boxReceiver, locator)
+ self.transactionFactory = transactionFactory
@PerformWork.responder
@@ -410,15 +413,22 @@
and do it.
"""
workItemClass = WorkItem.forTable(table)
- # TODO: get a transaction in here.
- workItem = yield workItemClass.load(workID)
- # TODO: what if we fail? error-handling should be recorded someplace,
- # the row should probably be marked, re-tries should be triggerable
- # administratively.
- yield workItem.delete()
- # TODO: verify that workID is the primary key someplace.
- yield workItem.doWork()
- returnValue({})
+ txn = self.transactionFactory()
+ try:
+ workItem = yield workItemClass.load(txn, workID)
+ # TODO: what if we fail? error-handling should be recorded
+ # someplace, the row should probably be marked, re-tries should be
+ # triggerable administratively.
+ yield workItem.delete()
+ # TODO: verify that workID is the primary key someplace.
+ yield workItem.doWork()
+ except:
+ f = Failure()
+ yield txn.abort()
+ returnValue(f)
+ else:
+ yield txn.commit()
+ returnValue({})
@@ -595,7 +605,7 @@
queueProcessTimeout = (10.0 * 60.0)
queueDelayedProcessInterval = (60.0)
- def __init__(self, reactor, connectionFactory, ampPort, schema):
+ def __init__(self, reactor, transactionFactory, ampPort, schema):
"""
Initialize a L{PeerConnectionPool}.
@@ -606,7 +616,7 @@
them as clients.
@type ampPort: L{int}
- @param connectionFactory: a 0- or 1-argument callable that produces an
+ @param transactionFactory: a 0- or 1-argument callable that produces an
L{IAsyncTransaction}
@param schema: The schema which contains all the tables associated with
@@ -614,7 +624,7 @@
@type schema: L{Schema}
"""
self.reactor = reactor
- self.connectionFactory = connectionFactory
+ self.transactionFactory = transactionFactory
self.hostName = self.getfqdn()
self.pid = self.getpid()
self.ampPort = ampPort
@@ -687,7 +697,7 @@
@return: the maximum number of other L{PeerConnectionPool} instances
that may be connected to the database described by
- C{self.connectionFactory}. Note that this is not the current count
+ C{self.transactionFactory}. Note that this is not the current count
by connectivity, but the count according to the database.
@rtype: L{int}
"""
@@ -699,7 +709,7 @@
"""
What ordinal does this node, i.e. this instance of
L{PeerConnectionPool}, occupy within the ordered set of all nodes
- connected to the database described by C{self.connectionFactory}?
+ connected to the database described by C{self.transactionFactory}?
@return: the index of this node within the total collection. For
example, if this L{PeerConnectionPool} is 6 out of 30, this method
@@ -717,7 +727,7 @@
hasn't been dropped on the floor. In order to do that it queries each
work-item table.
"""
- txn = self.connectionFactory()
+ txn = self.transactionFactory()
try:
for itemType in self.allWorkItemTypes():
for overdueItem in (
@@ -770,7 +780,7 @@
nodes know about us. This should also give us a
unique-to-the-whole-database identifier for this process instance.
"""
- txn = self.connectionFactory()
+ txn = self.transactionFactory()
thisProcess = yield NodeInfo.create(
txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
time=datetime.datetime.now()
@@ -781,7 +791,7 @@
an indication that the process isn't dead. On the other hand maybe
there's no concrete feature which actually requires this information.
"""
- lc = LoopingCall(thisProcess.updateCurrent, self.connectionFactory)
+ lc = LoopingCall(thisProcess.updateCurrent, self.transactionFactory)
lc.start(30.0)
"""
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/e9d99b19/attachment-0001.html>
More information about the calendarserver-changes
mailing list