[CalendarServer-changes] [9638] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:12 PDT 2012
Revision: 9638
http://trac.macosforge.org/projects/calendarserver/changeset/9638
Author: glyph at apple.com
Date: 2012-08-11 01:55:12 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
more required data
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:11 UTC (rev 9637)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:12 UTC (rev 9638)
@@ -12,6 +12,7 @@
from twisted.application.service import Service
from twisted.internet.protocol import Factory
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from twisted.internet.defer import succeed
from twisted.internet.task import deferLater
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.protocols.amp import AMP, Command, Integer, Argument
@@ -191,13 +192,13 @@
-class ConnectionFromPeerNode(AMP):
+class ConnectionFromPeerNode(SchemaAMP):
"""
A connection to a peer node. Symmetric; since the 'client' and the
'server' both serve the same role, the logic is the same in every node.
"""
- def __init__(self, localWorkerPool, boxReceiver=None, locator=None):
+ def __init__(self, schema, localWorkerPool, boxReceiver=None, locator=None):
"""
Initialize this L{ConnectionFromPeerNode} with a reference to a pool of
local workers.
@@ -209,7 +210,7 @@
@see: L{AMP.__init__}
"""
self.localWorkerPool = localWorkerPool
- super(ConnectionFromPeerNode, self).__init__(boxReceiver, locator)
+ super(ConnectionFromPeerNode, self).__init__(schema, boxReceiver, locator)
def currentLoadEstimate(self):
@@ -409,13 +410,12 @@
and do it.
"""
workItemClass = WorkItem.forTable(table)
+ # TODO: get a transaction in here.
+ workItem = yield workItemClass.load(workID)
# TODO: what if we fail? error-handling should be recorded someplace,
- # the row should probably be removed, re-tries should be triggerable.
- # Note: deletion must happen first.
+ # the row should probably be marked, re-tries should be triggerable
+ # administratively.
yield workItem.delete()
- # TODO: mumble locking something mumble
- # TODO: get a transaction in here.
- workItem = yield workItemClass.load(workID)
# TODO: verify that workID is the primary key someplace.
yield workItem.doWork()
returnValue({})
@@ -595,7 +595,7 @@
queueProcessTimeout = (10.0 * 60.0)
queueDelayedProcessInterval = (60.0)
- def __init__(self, reactor, connectionFactory, ampPort):
+ def __init__(self, reactor, connectionFactory, ampPort, schema):
"""
Initialize a L{PeerConnectionPool}.
@@ -608,6 +608,10 @@
@param connectionFactory: a 0- or 1-argument callable that produces an
L{IAsyncTransaction}
+
+ @param schema: The schema which contains all the tables associated with
+ the L{WorkItem}s that this L{PeerConnectionPool} will process.
+ @type schema: L{Schema}
"""
self.reactor = reactor
self.connectionFactory = connectionFactory
@@ -617,6 +621,7 @@
self.thisProcess = None
self.workerPool = WorkerConnectionPool()
self.peers = []
+ self.schema = schema
def choosePeer(self):
@@ -784,8 +789,6 @@
Each other 'master' here is another L{NodeInfo} which tells us where
to connect.
"""
- f = Factory()
- f.protocol = ConnectionFromPeerNode
for master in masters:
self._startConnectingTo(master)
@@ -798,10 +801,16 @@
@type master: L{NodeInfo}
"""
f = Factory()
+ f.buildProtocol = self.createPeerConnection
master.endpoint().connect(f)
+ def createPeerConnection(self, addr):
+ # TODO: add to peer list, remove from peer list
+ return ConnectionFromPeerNode(self.schema, self.workerPool)
+
+
def sketch():
"""
Example demonstrating how an application would normally talk to the queue.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/46b3a5dc/attachment.html>
More information about the calendarserver-changes
mailing list