[CalendarServer-changes] [9637] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:11 PDT 2012
Revision: 9637
http://trac.macosforge.org/projects/calendarserver/changeset/9637
Author: glyph at apple.com
Date: 2012-08-11 01:55:11 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
Flesh things out still more.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:10 UTC (rev 9636)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:11 UTC (rev 9637)
@@ -176,6 +176,21 @@
+class SchemaAMP(AMP):
+ """
+ An AMP instance which also has a L{Schema} attached to it.
+
+ @ivar schema: The schema to look up L{TableSyntaxByName} arguments in.
+ @type schema: L{Schema}
+ """
+
+ def __init__(self, schema, boxReceiver=None, locator=None):
+ self.schema = schema
+ super(SchemaAMP, self).__init__(boxReceiver, locator)
+
+
+
+
class ConnectionFromPeerNode(AMP):
"""
A connection to a peer node. Symmetric; since the 'client' and the
@@ -184,16 +199,31 @@
def __init__(self, localWorkerPool, boxReceiver=None, locator=None):
"""
- Initialize this L{ConnectionFromPeerNode} with a reference to a pool
- of local workers.
+ Initialize this L{ConnectionFromPeerNode} with a reference to a pool of
+ local workers.
@param localWorkerPool: the pool of local worker procesess that can
process queue work.
+ @type localWorkerPool: L{WorkerConnectionPool}
+
+ @see: L{AMP.__init__}
"""
self.localWorkerPool = localWorkerPool
super(ConnectionFromPeerNode, self).__init__(boxReceiver, locator)
+ def currentLoadEstimate(self):
+ """
+ What is the current load estimate for this peer?
+
+ @return: The number of full "slots", i.e. currently-being-processed
+ queue items (and other items which may contribute to this process's
+ load, such as currently-being-processed client requests).
+ @rtype: L{int}
+ """
+ return 0
+
+
def performWork(self, table, workID):
"""
A L{local worker connection <ConnectionFromWorker>} is asking this
@@ -219,6 +249,14 @@
"""
A remote peer node has asked this node to do some work; dispatch it to
a local worker on this node.
+
+ @param table: the table to work on.
+ @type table: L{TableSyntax}
+
+ @param workID: the identifier within the table.
+ @type workID: L{int}
+
+ @return: a L{Deferred} that fires when the work has been completed.
"""
return self.localWorkerPool.performWork(table, workID)
@@ -233,25 +271,45 @@
worker processes rather than to a remote connection pool.
"""
- def __init__(self):
+ def __init__(self, maximumLoadPerWorker=0):
self.workers = []
+ self.maximumLoadPerWorker = maximumLoadPerWorker
def addWorker(self, worker):
+ """
+ Add a L{ConnectionFromWorker} to this L{WorkerConnectionPool} so that
+ it can be selected.
+ """
self.workers.append(worker)
def removeWorker(self, worker):
+ """
+ Remove a L{ConnectionFromWorker} from this L{WorkerConnectionPool} that
+ was previously added.
+ """
self.workers.remove(worker)
+ def hasAvailableCapacity(self):
+ """
+ Does this worker connection pool have any local workers who have spare
+ capacity to process another queue item?
+ """
+ for worker in self.workers:
+ if worker.currentLoad() < self.maximumLoadPerWorker:
+ return True
+ return False
+
+
def _selectLowestLoadLocalConnection(self):
"""
Select the local connection with the lowest current load, or C{None} if
all workers are too busy.
@return: a worker connection with the lowest current load.
- @rtype: L{ConnectionFromWorker} or L{NoneType}
+ @rtype: L{ConnectionFromWorker}
"""
return sorted(self.workers[:], key=lambda w: w.currentLoad())[0]
@@ -275,7 +333,7 @@
-class ConnectionFromWorker(AMP):
+class ConnectionFromWorker(SchemaAMP):
"""
An individual connection from a worker, as seem from the master's
perspective. L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
@@ -285,9 +343,9 @@
@type workerPool: L{WorkerConnectionPool}
"""
- def __init__(self, workerPool):
+ def __init__(self, schema, workerPool, boxReceiver=None, locator=None):
self.workerPool = workerPool
- super(ConnectionFromWorker, self).__init__()
+ super(ConnectionFromWorker, self).__init__(schema, boxReceiver, locator)
@property
@@ -330,21 +388,16 @@
-class ConnectionFromController(AMP):
+class ConnectionFromController(SchemaAMP):
"""
A L{ConnectionFromController} is the connection to a node-controller process,
in a worker process. It processes requests from its own master to do work.
It is the opposite end of the connection from L{ConnectionFromWorker}.
"""
- def __init__(self, schemaSyntax):
- """
- @param schemaSyntax: The schema that this connection operates on, which
- contains (at least) all the tables that we may receive requests for
- work in.
- """
- super(ConnectionFromController, self).__init__()
- self.schemaSyntax = schemaSyntax
+ def __init__(self, schema, boxReceiver=None, locator=None):
+ super(ConnectionFromController, self).__init__(schema,
+ boxReceiver, locator)
@PerformWork.responder
@@ -356,14 +409,15 @@
and do it.
"""
workItemClass = WorkItem.forTable(table)
+ # TODO: what if we fail? error-handling should be recorded someplace,
+ # the row should probably be removed, re-tries should be triggerable.
+ # Note: deletion must happen first.
+ yield workItem.delete()
# TODO: mumble locking something mumble
# TODO: get a transaction in here.
workItem = yield workItemClass.load(workID)
# TODO: verify that workID is the primary key someplace.
yield workItem.doWork()
- # TODO: what if we fail? error-handling should be recorded someplace,
- # the row should probably be removed, re-tries should be triggerable.
- yield workItem.delete()
returnValue({})
@@ -374,6 +428,7 @@
"""
+
def _cloneDeferred(d):
"""
Make a new Deferred, adding callbacks to C{d}.
@@ -529,6 +584,9 @@
@ivar reactor: The reactor used for scheduling timed events.
@type reactor: L{IReactorTime} provider.
+
+ @ivar peers: The list of currently connected peers.
+ @type peers: L{list} of L{PeerConnectionPool}
"""
getfqdn = staticmethod(getfqdn)
@@ -541,9 +599,11 @@
"""
Initialize a L{PeerConnectionPool}.
- @param ampPort: The AMP port to listen on for inter-host communication.
- This must be an integer because we need to communicate it to the
- other peers in the cluster.
+ @param ampPort: The AMP TCP port number to listen on for inter-host
+ communication. This must be an integer (and not, say, an endpoint,
+ or an endpoint description) because we need to communicate it to
+ the other peers in the cluster in a way that will be meaningful to
+ them as clients.
@type ampPort: L{int}
@param connectionFactory: a 0- or 1-argument callable that produces an
@@ -555,6 +615,8 @@
self.pid = self.getpid()
self.ampPort = ampPort
self.thisProcess = None
+ self.workerPool = WorkerConnectionPool()
+ self.peers = []
def choosePeer(self):
@@ -563,13 +625,17 @@
occupancy of the other masters.
@return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
- with the chosen L{ConnectionFromPeerNode} as soon as one is
- available. Normally this will be synchronous, but we need to
- account for the possibility that we may need to connect to other
- hosts.
+ with the chosen 'peer', i.e. object with a C{performWork} method,
+ as soon as one is available. Normally this will be synchronous,
+ but we need to account for the possibility that we may need to
+ connect to other hosts.
@rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
- L{ConnectionFromPeerNode}
+ L{ConnectionFromPeerNode} or L{WorkerConnectionPool}
"""
+ if not self.workerPool.hasAvailableCapacity() and self.peers:
+ return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
+ else:
+ return succeed(self.workerPool)
def enqueueWork(self, txn, workItemType, **kw):
@@ -610,11 +676,14 @@
"""
How many nodes / master processes are there?
- @return: the number of other L{PeerConnectionPool} instances that are
- connected to the database described by C{self.connectionFactory}.
+ @return: the maximum number of other L{PeerConnectionPool} instances
+ that may be connected to the database described by
+ C{self.connectionFactory}. Note that this is not the current count
+ by connectivity, but the count according to the database.
@rtype: L{int}
"""
# TODO
+ return 20
def nodeIndex(self):
@@ -629,6 +698,7 @@
@rtype: L{int}
"""
# TODO
+ return 6
@inlineCallbacks
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/77b57066/attachment-0001.html>
More information about the calendarserver-changes
mailing list