[CalendarServer-changes] [9669] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:39 PDT 2012
Revision: 9669
http://trac.macosforge.org/projects/calendarserver/changeset/9669
Author: glyph at apple.com
Date: 2012-08-11 01:55:39 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
some clean-ups
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:38 UTC (rev 9668)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:39 UTC (rev 9669)
@@ -82,15 +82,15 @@
from os import getpid
from datetime import datetime
-from twisted.internet.task import LoopingCall
from twisted.application.service import Service
from twisted.internet.protocol import Factory
-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
-from twisted.internet.defer import succeed
-from twisted.internet.task import deferLater
+from twisted.internet.defer import (
+ inlineCallbacks, returnValue, Deferred, succeed
+)
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.protocols.amp import AMP, Command, Integer, Argument
from twisted.python.reflect import qual
+from twisted.python import log
from twext.enterprise.dal.syntax import TableSyntax, SchemaSyntax
from twext.enterprise.dal.model import ProcedureCall
@@ -99,8 +99,8 @@
from twisted.python.failure import Failure
from twisted.internet.defer import passthru
from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
+from twisted.internet.endpoints import TCP4ServerEndpoint
-
def makeNodeSchema(inSchema):
"""
Create a self-contained schema for L{NodeInfo} to use.
@@ -160,7 +160,7 @@
-class NodeInfo(fromTable(NodeInfoSchema.NODE_INFO)):
+class NodeInfo(Record, fromTable(NodeInfoSchema.NODE_INFO)):
"""
A L{NodeInfo} is information about a currently-active Node process.
"""
@@ -176,11 +176,7 @@
return TCP4ClientEndpoint(self.hostname, self.port)
- def updateCurrent(self):
- return self.update(timestamp=datetime.datetime.now())
-
-
def abstract(thunk):
"""
The decorated function is abstract.
@@ -285,7 +281,6 @@
-
class ConnectionFromPeerNode(SchemaAMP):
"""
A connection to a peer node. Symmetric; since the 'client' and the
@@ -543,8 +538,8 @@
class ConnectionFromController(SchemaAMP):
"""
A L{ConnectionFromController} is the connection to a node-controller
- process, in a worker process. It processes requests from its own master to
- do work. It is the opposite end of the connection from
+ process, in a worker process. It processes requests from its own
+ controller to do work. It is the opposite end of the connection from
L{ConnectionFromWorker}.
"""
@@ -559,9 +554,9 @@
@inlineCallbacks
def actuallyReallyExecuteWorkHere(self, table, workID):
"""
- This is where it's time to actually do the work. The master process
- has instructed this worker to do it; so, look up the data in the row,
- and do it.
+ This is where it's time to actually do the work. The controller
+ process has instructed this worker to do it; so, look up the data in
+ the row, and do it.
"""
workItemClass = WorkItem.forTable(table)
txn = self.transactionFactory()
@@ -635,18 +630,19 @@
def _start(self):
"""
- Execute this L{WorkProposal}.
+ Execute this L{WorkProposal} by creating the work item in the database,
+ waiting for the transaction where that addition was completed to
+ commit, and asking the local node controller process to do the work.
"""
@passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
- def created(workItem):
+ def created(item):
self._whenProposed.callback(None)
@self.txn.postCommit
def whenDone():
self._whenCommitted.callback(None)
@passthru(self.pool.choosePeer().addCallback)
def peerChosen(peer):
- @passthru(peer.performWork(workItem.table,
- workItem.workID))
+ @passthru(peer.performWork(item.table, item.workID))
def performed(result):
self._whenExecuted.callback(None)
@performed.addErrback
@@ -733,11 +729,11 @@
@ivar queueDelayedProcessInterval: The amount of time between database
pings, i.e. checks for over-due queue items that might have been
- orphaned by a master process that died mid-transaction. This is how
- often the shared database should be pinged by I{all} nodes (i.e., all
- master processes, or each instance of L{PeerConnectionPool}); each
- individual node will ping commensurately less often as more nodes join
- the database.
+ orphaned by a controller process that died mid-transaction. This is
+ how often the shared database should be pinged by I{all} nodes (i.e.,
+ all controller processes, or each instance of L{PeerConnectionPool});
+ each individual node will ping commensurately less often as more nodes
+ join the database.
@type queueDelayedProcessInterval: L{float} (in seconds)
@ivar reactor: The reactor used for scheduling timed events.
@@ -780,6 +776,8 @@
self.workerPool = WorkerConnectionPool()
self.peers = []
self.schema = schema
+ self._startingUp = None
+ self._listeningPortObject = None
self._lastSeenTotalNodes = 1
self._lastSeenNodeIndex = 1
@@ -801,11 +799,10 @@
def choosePeer(self):
"""
Choose a peer to distribute work to based on the current known slot
- occupancy of the other masters. Note that this will prefer
- distributing work to local workers until the current node is full,
- because that should be lower-latency. Also, if no peers are available,
- work will be submitted locally even if the worker pool is already
- over-subscribed.
+ occupancy of the other nodes. Note that this will prefer distributing
+ work to local workers until the current node is full, because that
+ should be lower-latency. Also, if no peers are available, work will be
+ submitted locally even if the worker pool is already over-subscribed.
@return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
with the chosen 'peer', i.e. object with a C{performWork} method,
@@ -857,7 +854,7 @@
def totalNumberOfNodes(self):
"""
- How many nodes / master processes are there?
+ How many nodes are there, total?
@return: the maximum number of other L{PeerConnectionPool} instances
that may be connected to the database described by
@@ -887,9 +884,9 @@
@inlineCallbacks
def _periodicLostWorkCheck(self):
"""
- Periodically, every master process has to check to make sure that work
- hasn't been dropped on the floor. In order to do that it queries each
- work-item table.
+ Periodically, every node controller has to check to make sure that work
+ hasn't been dropped on the floor by someone. In order to do that it
+ queries each work-item table.
"""
txn = self.transactionFactory()
try:
@@ -905,21 +902,32 @@
yield txn.commit()
- @inlineCallbacks
+ _currentWorkDeferred = None
+ _lostWorkCheckCall = None
+
def _lostWorkCheckLoop(self):
"""
While the service is running, keep checking for any overdue / lost work
- items and re-submit them to the cluster for processing.
+ items and re-submit them to the cluster for processing. Space out
+ those checks in time based on the size of the cluster.
"""
- while self.running:
- yield self._periodicLostWorkCheck()
+ self._lostWorkCheckCall = None
+ @passthru(self._periodicLostWorkCheck().addErrback(log.err)
+ .addCallback)
+ def scheduleNext(result):
+ self._currentWorkDeferred = None
+ if not self.running:
+ return
index = self.nodeIndex()
now = self.reactor.seconds()
interval = self.queueDelayedProcessInterval
count = self.totalNumberOfNodes()
when = (now - (now % interval)) + (interval * (count + index))
delay = when - now
- yield deferLater(self.reactor, delay, lambda : None)
+ self._lostWorkCheckCall = self.reactor.callLater(
+ delay, self._lostWorkCheckLoop
+ )
+ self._currentWorkDeferred = scheduleNext
def startService(self):
@@ -927,14 +935,36 @@
Register ourselves with the database and establish all outgoing
connections to other servers in the cluster.
"""
- self._doStart()
+ self._startingUp = self._doStart()
+ @self._startingUp.addBoth
+ def done(result):
+ self._startingUp = None
+ return result
+ @inlineCallbacks
+ def stopService(self):
+ """
+ Stop this service, terminating any incoming or outgoing connections.
+ """
+ yield super(PeerConnectionPool, self).stopService()
+ if self._startingUp is not None:
+ yield self._startingUp
+ if self._listeningPortObject is not None:
+ return self._listeningPortObject.stopListening()
+ if self._lostWorkCheckCall is not None:
+ self._lostWorkCheckCall.cancel()
+ if self._currentWorkDeferred is not None:
+ yield self._currentWorkDeferred
+ for peer in self.peers:
+ peer.transport.loseConnection()
+
+
def activeNodes(self, txn):
"""
Load information about all other nodes.
"""
- return NodeInfo.query()
+ return NodeInfo.query(txn)
@inlineCallbacks
@@ -945,43 +975,35 @@
unique-to-the-whole-database identifier for this process instance.
"""
txn = self.transactionFactory()
- thisProcess = yield NodeInfo.create(
+ endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+ f = Factory()
+ f.buildProtocol = self.createPeerConnection
+ # If this fails, the failure mode is going to be ugly, just like all
+ # conflicted-port failures. But, at least it won't proceed.
+ yield endpoint.listen(f)
+ # with (yield NodeInfo.lock()):
+ self.thisProcess = yield NodeInfo.create(
txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
time=datetime.datetime.now()
)
+ for node in (yield self.activeNodes(txn)):
+ self._startConnectingTo(node)
- """
- It might be a good idea to update this periodicially in order to give
- an indication that the process isn't dead. On the other hand maybe
- there's no concrete feature which actually requires this information.
- """
- lc = LoopingCall(thisProcess.updateCurrent, self.transactionFactory)
- lc.start(30.0)
+ def _startConnectingTo(self, node):
"""
- Now let's find all the other masters.
- """
- masters = self.activeNodes(txn)
-
- """
- Each other 'master' here is another L{NodeInfo} which tells us where
- to connect.
- """
- for master in masters:
- self._startConnectingTo(master)
-
-
- def _startConnectingTo(self, master):
- """
Start an outgoing connection to another master process.
- @param master: a description of the master to connect to.
- @type master: L{NodeInfo}
+ @param node: a description of the master to connect to.
+ @type node: L{NodeInfo}
"""
f = Factory()
f.buildProtocol = self.createPeerConnection
- master.endpoint().connect(f)
+ node.endpoint().connect(f)
def createPeerConnection(self, addr):
return ConnectionFromPeerNode(self)
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/050bab19/attachment-0001.html>
More information about the calendarserver-changes
mailing list