[CalendarServer-changes] [9641] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:14 PDT 2012
Revision: 9641
http://trac.macosforge.org/projects/calendarserver/changeset/9641
Author: glyph at apple.com
Date: 2012-08-11 01:55:14 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
Let PeerConnectionPool start living up to its name as a connection pool.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:13 UTC (rev 9640)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:14 UTC (rev 9641)
@@ -23,6 +23,7 @@
from twext.enterprise.dal.syntax import NamedValue
from twext.enterprise.dal.record import fromTable
from twisted.python.failure import Failure
+from twisted.internet.defer import passthru
from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
@@ -199,7 +200,7 @@
'server' both serve the same role, the logic is the same in every node.
"""
- def __init__(self, schema, localWorkerPool, boxReceiver=None, locator=None):
+ def __init__(self, peerPool, boxReceiver=None, locator=None):
"""
Initialize this L{ConnectionFromPeerNode} with a reference to a pool of
local workers.
@@ -210,10 +211,30 @@
@see: L{AMP.__init__}
"""
- self.localWorkerPool = localWorkerPool
- super(ConnectionFromPeerNode, self).__init__(schema, boxReceiver, locator)
+ self.peerPool = peerPool
+ self.localWorkerPool = peerPool.workerPool
+ super(ConnectionFromPeerNode, self).__init__(peerPool.schema,
+ boxReceiver, locator)
+ def startReceivingBoxes(self, sender):
+ """
+ Connection is up and running.
+ """
+ r = super(ConnectionFromPeerNode, self).startReceivingBoxes(sender)
+ self.peerPool.addPeerConnection(self)
+ return r
+
+
+ def stopReceivingBoxes(self, reason):
+ """
+ Stop receiving boxes.
+ """
+ self.peerPool.removePeerConnection(self)
+ r = super(ConnectionFromPeerNode, self).stopReceivingBoxes(reason)
+ return r
+
+
def currentLoadEstimate(self):
"""
What is the current load estimate for this peer?
@@ -297,7 +318,7 @@
def hasAvailableCapacity(self):
"""
Does this worker connection pool have any local workers who have spare
- capacity to process another queue item?
+ hasAvailableCapacity to process another queue item?
"""
for worker in self.workers:
if worker.currentLoad() < self.maximumLoadPerWorker:
@@ -486,19 +507,16 @@
"""
Execute this L{WorkProposal}.
"""
- creation = self.workItemType.create(self.txn, **self.kw)
- @creation.addCallback
+ @passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
def created(workItem):
self._whenProposed.callback(None)
@self.txn.postCommit
def whenDone():
self._whenCommitted.callback(None)
- peerChosen = self.pool.choosePeer()
- @peerChosen.addCallback
- def gotPeer(peer):
- performed = peer.performWork(workItem.__tbl__,
- workItem.workID)
- @performed.addCallback
+ @passthru(self.pool.choosePeer().addCallback)
+ def peerChosen(peer):
+ @passthru(peer.performWork(workItem.__tbl__,
+ workItem.workID))
def performed(result):
self._whenExecuted.callback(None)
@performed.addErrback
@@ -634,6 +652,20 @@
self.schema = schema
+ def addPeerConnection(self, peer):
+ """
+ Add a L{ConnectionFromPeerNode} to the active list of peers.
+ """
+ self.peers.append(peer)
+
+
+ def removePeerConnection(self, peer):
+ """
+ Remove a L{ConnectionFromPeerNode} to the active list of peers.
+ """
+ self.peers.remove(peer)
+
+
def choosePeer(self):
"""
Choose a peer to distribute work to based on the current known slot
@@ -821,7 +853,7 @@
def createPeerConnection(self, addr):
# TODO: add to peer list, remove from peer list
- return ConnectionFromPeerNode(self.schema, self.workerPool)
+ return ConnectionFromPeerNode(self)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/addc07c5/attachment.html>
More information about the calendarserver-changes
mailing list