[CalendarServer-changes] [9633] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:08 PDT 2012
Revision: 9633
http://trac.macosforge.org/projects/calendarserver/changeset/9633
Author: glyph at apple.com
Date: 2012-08-11 01:55:08 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
terminology clarification (Master/Node->consistently Node), add idle scheduling logic, add sketch
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/current.sql
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:07 UTC (rev 9632)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:08 UTC (rev 9633)
@@ -22,32 +22,33 @@
from twext.enterprise.dal.record import fromTable
from twisted.internet.defer import Deferred
from twisted.internet.defer import passthru
+from twisted.internet.task import deferLater
from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
-def makeMasterSchema(inSchema):
+def makeNodeSchema(inSchema):
"""
- Create a self-contained schema for L{MasterInfo} to use.
+ Create a self-contained schema for L{NodeInfo} to use.
@return: a schema with just the one table.
"""
# Initializing this duplicate schema avoids a circular dependency, but this
# should really be accomplished with independent schema objects that the
# transaction is made aware of somehow.
- masterTable = Table(inSchema, 'MASTER_INFO')
- masterTable.addColumn("HOSTNAME", SQLType("varchar", 255))
- masterTable.addColumn("PID", SQLType("integer", None))
- masterTable.addColumn("PORT", SQLType("integer", None))
- masterTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
+ NodeTable = Table(inSchema, 'NODE_INFO')
+ NodeTable.addColumn("HOSTNAME", SQLType("varchar", 255))
+ NodeTable.addColumn("PID", SQLType("integer", None))
+ NodeTable.addColumn("PORT", SQLType("integer", None))
+ NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
# Note: in the real data structure, this is actually a not-cleaned-up
# sqlparse internal data structure, but it *should* look closer to this.
ProcedureCall("timezone", ["UTC", NamedValue('CURRENT_TIMESTAMP')])
)
- for column in masterTable.columns:
- masterTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+ for column in NodeTable.columns:
+ NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
return inSchema
-masterInfoSchema = SchemaSyntax(makeMasterSchema(Schema(__file__)))
+NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
@@ -84,15 +85,15 @@
-class MasterInfo(fromTable(masterInfoSchema.MASTER_INFO)):
+class NodeInfo(fromTable(NodeInfoSchema.NODE_INFO)):
"""
- A L{MasterInfo} is information about a currently-active master process.
+ A L{NodeInfo} is information about a currently-active Node process.
"""
def endpoint(self):
"""
- Create an L{IStreamServerEndpoint} that will talk to the master process
- that is described by this L{MasterInfo}.
+ Create an L{IStreamServerEndpoint} that will talk to the node process
+ that is described by this L{NodeInfo}.
@return: an endpoint that will connect to this host.
@rtype: L{IStreamServerEndpoint}
@@ -127,6 +128,10 @@
@ivar workID: the unique identifier (primary key) for items of this type.
There must be a corresponding column in the database.
@type workID: L{int}
+
+ @cvar created: the timestamp that a given item was created, or the column
+ describing its creation time, on the class.
+ @type created: L{datetime.datetime}
"""
@abstract
@@ -173,31 +178,29 @@
-class ConnectionFromPeerMaster(AMP):
+class ConnectionFromPeerNode(AMP):
"""
- A connection to a peer master. Symmetric; since the 'client' and the
- 'server' both serve the same role, the logic is the same in every master.
-
- This only exists in the master.
+ A connection to a peer node. Symmetric; since the 'client' and the
+ 'server' both serve the same role, the logic is the same in every node.
"""
def __init__(self, localWorkerPool, boxReceiver=None, locator=None):
"""
- Initialize this L{ConnectionFromPeerMaster} with a reference to a pool
+ Initialize this L{ConnectionFromPeerNode} with a reference to a pool
of local workers.
@param localWorkerPool: the pool of local worker procesess that can
process queue work.
"""
self.localWorkerPool = localWorkerPool
- super(ConnectionFromPeerMaster, self).__init__(boxReceiver, locator)
+ super(ConnectionFromPeerNode, self).__init__(boxReceiver, locator)
def performWork(self, table, workID):
"""
A L{local worker connection <ConnectionFromWorker>} is asking this
- specific peer master process to perform some work, having already
- determined that it's appropriate.
+ specific peer node-controller process to perform some work, having
+ already determined that it's appropriate.
@param table: The table where work is waiting.
@type table: L{TableSyntax}
@@ -216,8 +219,8 @@
@PerformWork.responder
def dispatchToWorker(self, table, workID):
"""
- A remote peer master has asked this master to do some work; dispatch it
- to a local worker.
+ A remote peer node has asked this node to do some work; dispatch it to
+ a local worker on this node.
"""
return self.localWorkerPool.performWork(table, workID)
@@ -228,7 +231,7 @@
A pool of L{ConnectionFromWorker}s.
L{WorkerConnectionPool} also implements the same implicit protocol as a
- L{ConnectionFromPeerMaster}, but one that dispenses work to the local
+ L{ConnectionFromPeerNode}, but one that dispenses work to the local
worker processes rather than to a remote connection pool.
"""
@@ -322,18 +325,18 @@
Dispatch work to this worker.
@see: The responder for this should always be
- L{ConnectionFromMaster.actuallyReallyExecuteWorkHere}.
+ L{ConnectionFromController.actuallyReallyExecuteWorkHere}.
"""
return self.callRemote(PerformWork,
table=table.model.name, workID=workID)
-class ConnectionFromMaster(AMP):
+class ConnectionFromController(AMP):
"""
- A L{ConnectionFromMaster} is the connection to a master process, in a
- worker process. It processes requests from its own master to do work. It
- is the opposite end of the connection from L{ConnectionFromWorker}.
+ A L{ConnectionFromController} is the connection to a node-controller process,
+ in a worker process. It processes requests from its own master to do work.
+ It is the opposite end of the connection from L{ConnectionFromWorker}.
"""
def __init__(self, schemaSyntax):
@@ -342,7 +345,7 @@
contains (at least) all the tables that we may receive requests for
work in.
"""
- super(ConnectionFromMaster, self).__init__()
+ super(ConnectionFromController, self).__init__()
self.schemaSyntax = schemaSyntax
@@ -376,25 +379,44 @@
class PeerConnectionPool(Service, object):
"""
- Each master has a L{PeerConnectionPool} connecting it to all the other
- masters currently active on the same database.
+ Each node has a L{PeerConnectionPool} connecting it to all the other nodes
+ currently active on the same database.
- @ivar hostName: The hostname of this master process, as reported by the
- local host's configuration. Possibly this should be obtained via
- C{config.ServerHostName} instead of C{socket.getfqdn()}; although hosts
- within a cluster may be configured with the same C{ServerHostName};
- TODO need to confirm.
+ @ivar hostName: The hostname where this node process is running, as
+ reported by the local host's configuration. Possibly this should be
+ obtained via C{config.ServerHostName} instead of C{socket.getfqdn()};
+ although hosts within a cluster may be configured with the same
+ C{ServerHostName}; TODO need to confirm.
- @ivar thisProcess: a L{MasterInfo} representing this process, which is
+ @ivar thisProcess: a L{NodeInfo} representing this process, which is
initialized when this L{PeerConnectionPool} service is started via
C{startService}. May be C{None} if this service is not fully started
up or if it is shutting down.
+
+ @ivar queueProcessTimeout: The maximum amount of time allowed for a queue
+ item to be processed. By default, 10 minutes.
+ @type queueProcessTimeout: L{float} (in seconds)
+
+ @ivar queueDelayedProcessInterval: The amount of time between database
+ pings, i.e. checks for over-due queue items that might have been
+ orphaned by a master process that died mid-transaction. This is how
+ often the shared database should be pinged by I{all} nodes (i.e., all
+ master processes, or each instance of L{PeerConnectionPool}); each
+ individual node will ping commensurately less often as more nodes join
+ the database.
+ @type queueDelayedProcessInterval: L{float} (in seconds)
+
+ @ivar reactor: The reactor used for scheduling timed events.
+ @type reactor: L{IReactorTime} provider.
"""
getfqdn = staticmethod(getfqdn)
getpid = staticmethod(getpid)
- def __init__(self, connectionFactory, ampPort):
+ queueProcessTimeout = (10.0 * 60.0)
+ queueDelayedProcessInterval = (60.0)
+
+ def __init__(self, reactor, connectionFactory, ampPort):
"""
Initialize a L{PeerConnectionPool}.
@@ -406,6 +428,7 @@
@param connectionFactory: a 0- or 1-argument callable that produces an
L{IAsyncTransaction}
"""
+ self.reactor = reactor
self.connectionFactory = connectionFactory
self.hostName = self.getfqdn()
self.pid = self.getpid()
@@ -419,12 +442,12 @@
occupancy of the other masters.
@return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
- with the chosen L{ConnectionFromPeerMaster} as soon as one is
+ with the chosen L{ConnectionFromPeerNode} as soon as one is
available. Normally this will be synchronous, but we need to
account for the possibility that we may need to connect to other
hosts.
@rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
- L{ConnectionFromPeerMaster}
+ L{ConnectionFromPeerNode}
"""
@@ -465,7 +488,7 @@
@inlineCallbacks
def whenDone():
peer = yield self.choosePeer()
- peer.performWork(workItemType.__tbl__)
+ peer.performWork(result.__tbl__, result.workID)
d.callback(None)
@txn.postAbort
def whenFailed():
@@ -474,6 +497,80 @@
return created
+ def allWorkItemTypes(self):
+ """
+ Load all the L{WorkItem} types that this node can process and return
+ them.
+
+ @return: L{list} of L{type}
+ """
+ # TODO: For completeness, this may need to involve a plugin query to
+ # make sure that all WorkItem subclasses are imported first.
+ return WorkItem.__subclasses__()
+
+
+ def totalNumberOfNodes(self):
+ """
+ How many nodes / master processes are there?
+
+ @return: the number of other L{PeerConnectionPool} instances that are
+ connected to the database described by C{self.connectionFactory}.
+ @rtype: L{int}
+ """
+ # TODO
+
+
+ def nodeIndex(self):
+ """
+ What ordinal does this node, i.e. this instance of
+ L{PeerConnectionPool}, occupy within the ordered set of all nodes
+ connected to the database described by C{self.connectionFactory}?
+
+ @return: the index of this node within the total collection. For
+ example, if this L{PeerConnectionPool} is 6 out of 30, this method
+ will return C{6}.
+ @rtype: L{int}
+ """
+ # TODO
+
+
+ @inlineCallbacks
+ def _periodicLostWorkCheck(self):
+ """
+ Periodically, every master process has to check to make sure that work
+ hasn't been dropped on the floor. In order to do that it queries each
+ work-item table.
+ """
+ txn = self.connectionFactory()
+ try:
+ for itemType in self.allWorkItemTypes():
+ for overdueItem in (
+ yield itemType.query(
+ txn, itemType.created > self.queueProcessTimeout
+ )):
+ peer = yield self.choosePeer()
+ yield peer.performWork(overdueItem.__tbl__, overdueItem.workID)
+ finally:
+ yield txn.commit()
+
+
+ @inlineCallbacks
+ def _lostWorkCheckLoop(self):
+ """
+ While the service is running, keep checking for any overdue / lost work
+ items and re-submit them to the cluster for processing.
+ """
+ while self.running:
+ yield self._periodicLostWorkCheck()
+ index = self.nodeIndex()
+ now = self.reactor.seconds()
+ interval = self.queueDelayedProcessInterval
+ count = self.totalNumberOfNodes()
+ when = (now - (now % interval)) + (interval * (count + index))
+ delay = when - now
+ yield deferLater(self.reactor, delay, lambda : None)
+
+
def startService(self):
"""
Register ourselves with the database and establish all outgoing
@@ -482,23 +579,30 @@
self._doStart()
+ def activeNodes(self, txn):
+ """
+ Load information about all other nodes.
+ """
+ return NodeInfo.query()
+
+
@inlineCallbacks
def _doStart(self):
"""
- First, we tell the database that we're an active master so that other
- masters know about us. This should also give us a
+ First, we tell the database that we're an active node so that other
+ nodes know about us. This should also give us a
unique-to-the-whole-database identifier for this process instance.
"""
txn = self.connectionFactory()
- thisProcess = yield MasterInfo.create(
+ thisProcess = yield NodeInfo.create(
txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
time=datetime.datetime.now()
)
"""
- It might be a good idea to update this periodicially in order to give an
- indication that the process isn't dead. On the other hand maybe there's no
- concrete feature which actually requires this information.
+ It might be a good idea to update this periodicially in order to give
+ an indication that the process isn't dead. On the other hand maybe
+ there's no concrete feature which actually requires this information.
"""
lc = LoopingCall(thisProcess.updateCurrent, self.connectionFactory)
lc.start(30.0)
@@ -506,14 +610,14 @@
"""
Now let's find all the other masters.
"""
- masters = self.activeMasters()
+ masters = self.activeNodes(txn)
"""
- Each other 'master' here is another L{MasterInfo} which tells us where
+ Each other 'master' here is another L{NodeInfo} which tells us where
to connect.
"""
f = Factory()
- f.protocol = ConnectionFromPeerMaster
+ f.protocol = ConnectionFromPeerNode
for master in masters:
self._startConnectingTo(master)
@@ -523,9 +627,32 @@
Start an outgoing connection to another master process.
@param master: a description of the master to connect to.
- @type master: L{MasterInfo}
+ @type master: L{NodeInfo}
"""
f = Factory()
master.endpoint().connect(f)
+def sketch():
+ """
+ Example demonstrating how an application would normally talk to the queue.
+ """
+ # XXX in real life, MyWorkItem would also need to inherit from
+ # fromTable(...), would need to be declared at the top level...
+ class MyWorkItem(WorkItem):
+ @inlineCallbacks
+ def doWork(self):
+ txn = self.__txn__
+ yield self.doSomethingDeferred(txn)
+ returnValue(None)
+
+ @inlineCallbacks
+ def sampleFunction(txn):
+ for x in range(10):
+ # Note: no yield here. Yielding for this to be completed would
+ # generate unnecessary lock contention, potentially a deadlock (we
+ # just created the item in this transaction, the next transaction is
+ # going to want to delete it).
+ txn.enqueueWork(MyWorkItem, someInt=x, someUnicode=u'4321')
+ yield txn.commit()
+
Modified: CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/current.sql 2012-08-11 08:55:07 UTC (rev 9632)
+++ CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/current.sql 2012-08-11 08:55:08 UTC (rev 9633)
@@ -28,8 +28,8 @@
-- Information about a process connected to this database.
--- Note that this must match the master info schema in twext.enterprise.queue.
-create table MASTER_INFO (
+-- Note that this must match the node info schema in twext.enterprise.queue.
+create table NODE_INFO (
HOSTNAME varchar(255) not null,
PID integer not null,
PORT integer not null,
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/2a0da93f/attachment-0001.html>
More information about the calendarserver-changes
mailing list