[CalendarServer-changes] [9625] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:02 PDT 2012
Revision: 9625
http://trac.macosforge.org/projects/calendarserver/changeset/9625
Author: glyph at apple.com
Date: 2012-08-11 01:55:01 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
lots more detail
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:01 UTC (rev 9624)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:01 UTC (rev 9625)
@@ -3,23 +3,78 @@
This is the logic associated with queueing.
"""
+from socket import getfqdn
+from functools import wraps
+from os import getpid
+from datetime import datetime
+
from twisted.internet.task import LoopingCall
from twisted.application.service import Service
from twisted.internet.protocol import Factory
-from twisted.internet.defer import inlineCallbacks#, Deferred
+from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.endpoints import TCP4ClientEndpoint
-from twisted.protocols.amp import AMP, Command, Integer, String
+from twisted.protocols.amp import AMP, Command, Integer, Argument
from twisted.python.reflect import qual
-from socket import getfqdn
-from functools import wraps
+from twext.enterprise.dal.syntax import TableSyntax, SchemaSyntax
+from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
-from datetime import datetime
-from twisted.internet.defer import returnValue
-from os import getpid
+def makeMasterSchema(inSchema):
+ """
+ Create a self-contained schema for L{MasterInfo} to use.
+ @return: a schema with just the one table.
+ """
+ # Initializing this duplicate schema avoids a circular dependency, but this
+ # should really be accomplished with independent schema objects that the
+ # transaction is made aware of somehow.
+ masterTable = Table(inSchema, 'MASTER_INFO')
+ masterTable.addColumn("HOSTNAME", SQLType("varchar", 255))
+ masterTable.addColumn("PID", SQLType("integer", None))
+ masterTable.addColumn("PORT", SQLType("integer", None))
+ masterTable.addColumn("TIME", SQLType("timestamp", None))
+ for column in masterTable.columns:
+ masterTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+ return inSchema
+masterInfoSchema = SchemaSyntax(makeMasterSchema(Schema(__file__)))
+
+
+
+class TableSyntaxByName(Argument):
+ """
+ Serialize and deserialize L{TableSyntax} objects for an AMP protocol with
+ an attached schema.
+ """
+
+ def fromStringProto(self, inString, proto):
+ """
+ Convert the name of the table into a table, given a C{proto} with an
+ attached C{schema}.
+
+ @param inString: the name of a table, as utf-8 encoded bytes
+ @type inString: L{bytes}
+
+ @param proto: an L{SchemaAMP}
+ """
+ return TableSyntax(proto.schema.tableNamed(inString.decode("UTF-8")))
+
+
+ def toString(self, inObject):
+ """
+ Convert a L{TableSyntax} object into just its name for wire transport.
+
+ @param inObject: a table.
+ @type inObject: L{TableSyntax}
+
+ @return: the name of that table
+ @rtype: L{bytes}
+ """
+ return inObject.model.name.encode("UTF-8")
+
+
+
class MasterInfo(object):
"""
A L{MasterInfo} is information about a currently-active master process.
@@ -98,42 +153,174 @@
"""
arguments = [
- ("table", String()),
+ ("table", TableSyntaxByName()),
("workID", Integer()),
]
response = []
-class PeerConnection(AMP):
+class ConnectionFromPeerMaster(AMP):
"""
A connection to a peer master. Symmetric; since the 'client' and the
'server' both serve the same role, the logic is the same in every master.
+
+ This only exists in the master.
"""
+
+ def __init__(self, localWorkerPool, boxReceiver=None, locator=None):
+ """
+ Initialize this L{ConnectionFromPeerMaster} with a reference to a pool
+ of local workers.
+
+ @param localWorkerPool: the pool of local worker procesess that can
+ process queue work.
+ """
+ self.localWorkerPool = localWorkerPool
+ super(ConnectionFromPeerMaster, self).__init__(boxReceiver, locator)
+
+
def performWork(self, table, workID):
+ """
+ A L{local worker connection <ConnectionFromWorker>} is asking this
+ specific peer master process to perform some work, having already
+ determined that it's appropriate.
+
+ @param table: The table where work is waiting.
+ @type table: L{TableSyntax}
+
+ @param workID: The primary key identifier of the given work.
+ @type workID: L{int}
+
+ @return: a L{Deferred} firing with an empty dictionary when the work is
+ complete.
+ @rtype: L{Deferred} firing L{dict}
+ """
return self.callRemote(PerformWork,
table=table.model.name, workID=workID)
+ @PerformWork.responder
+ def dispatchToWorker(self, table, workID):
+ """
+ A remote peer master has asked this master to do some work; dispatch it
+ to a local worker.
+ """
+ return self.localWorkerPool.performWork(table, workID)
-class LocalConnection(object):
+
+
+class WorkerConnectionPool(object):
"""
- Implements the same implicit protocol as a L{PeerConnection}, but one that
- dispenses work to the local worker processes rather than to a remote
- connection pool.
+ A pool of L{ConnectionFromWorker}s.
+
+ L{WorkerConnectionPool} also implements the same implicit protocol as a
+ L{ConnectionFromPeerMaster}, but one that dispenses work to the local
+ worker processes rather than to a remote connection pool.
"""
+ def __init__(self):
+ self.workers = []
+
+
+ def addWorker(self, worker):
+ self.workers.append(worker)
+
+
+ def removeWorker(self, worker):
+ self.workers.remove(worker)
+
+
+ def _selectLowestLoadLocalConnection(self):
+ """
+ Select the local connection with the lowest current load, or C{None} if
+ all workers are too busy.
+
+ @return: a worker connection with the lowest current load.
+ @rtype: L{ConnectionFromWorker} or L{NoneType}
+ """
+ return sorted(self.workers[:], key=lambda w: w.currentLoad())[0]
+
+
def performWork(self, table, workID):
"""
- Look up a local worker and delegate .performWork to it.
+ Select a local worker that is idle enough to perform the given work,
+ then ask them to perform it.
+
+ @param table: The table where work is waiting.
+ @type table: L{TableSyntax}
+
+ @param workID: The primary key identifier of the given work.
+ @type workID: L{int}
+
+ @return: a L{Deferred} firing with an empty dictionary when the work is
+ complete.
+ @rtype: L{Deferred} firing L{dict}
"""
+ return self._selectLowestLoadLocalConnection().performWork(table, workID)
+class ConnectionFromWorker(AMP):
+ """
+ An individual connection from a worker, as seem from the master's
+ perspective. L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
+
+ @ivar workerPool: The connection pool that this individual connection is
+ participating in.
+ @type workerPool: L{WorkerConnectionPool}
+ """
+
+ def __init__(self, workerPool):
+ self.workerPool = workerPool
+ super(ConnectionFromWorker, self).__init__()
+
+
+ @property
+ def currentLoad(self):
+ """
+ What is the current load of this worker?
+ """
+ # TODO: this needs to be hooked up to something.
+ return 0
+
+
+ def startReceivingBoxes(self, sender):
+ """
+ Start receiving AMP boxes from the peer. Initialize all necessary
+ state.
+ """
+ result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
+ self.workerPool.addWorker(self)
+ return result
+
+
+ def stopReceivingBoxes(self, reason):
+ """
+ AMP boxes will no longer be received.
+ """
+ result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
+ self.workerPool.removeWorker(self)
+ return result
+
+
+ def performWork(self, table, workID):
+ """
+ Dispatch work to this worker.
+
+ @see: The responder for this should always be
+ L{ConnectionFromMaster.actuallyReallyExecuteWorkHere}.
+ """
+ return self.callRemote(PerformWork,
+ table=table.model.name, workID=workID)
+
+
+
class ConnectionFromMaster(AMP):
"""
- This is in a worker process. It processes requests from its own master to
- do work.
+ A L{ConnectionFromMaster} is the connection to a master process, in a
+ worker process. It processes requests from its own master to do work. It
+ is the opposite end of the connection from L{ConnectionFromWorker}.
"""
def __init__(self, schemaSyntax):
@@ -148,23 +335,26 @@
@PerformWork.responder
@inlineCallbacks
- def perform(self, table, workID):
+ def actuallyReallyExecuteWorkHere(self, table, workID):
"""
This is where it's time to actually do the work. The master process
has instructed this worker to do it; so, look up the data in the row,
and do it.
"""
- tableSyntax = getattr(self.schemaSyntax, table)
- workItemClass = WorkItem.forTable(tableSyntax)
+ workItemClass = WorkItem.forTable(table)
# TODO: mumble locking something mumble
+ # TODO: get a transaction in here.
workItem = yield workItemClass.load(workID)
# TODO: verify that workID is the primary key someplace.
yield workItem.doWork()
+ # TODO: what if we fail? error-handling should be recorded someplace,
+ # the row should probably be removed, re-tries should be triggerable.
+ yield workItem.delete()
returnValue({})
-class PeerConnectionPool(Service):
+class PeerConnectionPool(Service, object):
"""
Each master has a L{PeerConnectionPool} connecting it to all the other
masters currently active on the same database.
@@ -181,6 +371,9 @@
up or if it is shutting down.
"""
+ getfqdn = staticmethod(getfqdn)
+ getpid = staticmethod(getpid)
+
def __init__(self, connectionFactory, ampPort):
"""
Initialize a L{PeerConnectionPool}.
@@ -194,7 +387,8 @@
L{IAsyncTransaction}
"""
self.connectionFactory = connectionFactory
- self.hostName = getfqdn()
+ self.hostName = self.getfqdn()
+ self.pid = self.getpid()
self.ampPort = ampPort
self.thisProcess = None
@@ -204,21 +398,41 @@
Choose a peer to distribute work to based on the current known slot
occupancy of the other masters.
- @return: a Deferred which fires with the chosen L{PeerConnection} as
- soon as one is available. Normally this will be synchronous, but
- we need to account for the possibility that we may need to connect
- to other hosts.
- @rtype: L{twisted.internet.defer.Deferred} firing L{PeerConnection}
+ @return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
+ with the chosen L{ConnectionFromPeerMaster} as soon as one is
+ available. Normally this will be synchronous, but we need to
+ account for the possibility that we may need to connect to other
+ hosts.
+ @rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
+ L{ConnectionFromPeerMaster}
"""
def enqueueWork(self, workItem):
"""
- There is some work to do. Do it, someplace.
+ There is some work to do. Do it, someplace else, ideally in parallel.
+ Later, let the caller know that the work has been completed by firing a
+ L{Deferred}.
+ @note: The L{Deferred} returned by C{enqueueWork} should be used with
+ caution. If an application decides to do any database-persistent
+ work as a result of this L{Deferred} firing, that work I{may be
+ lost} as a result of a service being normally shut down between the
+ time that the work is scheduled and the time that it is executed.
+ So, the only things that should be added as callbacks to this
+ L{Deferred} are those which are ephemeral, in memory, and reflect
+ only presentation state associated with the user's perception of
+ the completion of work, not logical chains of work which need to be
+ completed in sequence; those should all be completed within the
+ transaction of the L{WorkItem.doWork} that gets executed.
+
+ @param workItem: An item of work to be done in another process.
@type workItem: A L{WorkItem}
+
+ @return: a L{Deferred} that fires when the work has been completed.
+ @rtype: L{Deferred} firing L{None}
"""
- @workItem.transaction.postCommit
+ @workItem.__txn__.postCommit
@inlineCallbacks
def whenDone():
peer = yield self.choosePeer()
@@ -231,13 +445,20 @@
connections to other servers in the cluster.
"""
+ self._doStart()
+ # Is there any need for a callback?
+
+
+ @inlineCallbacks
+ def _doStart(self):
"""
First, we tell the database that we're an active master so that other
masters know about us. This should also give us a
unique-to-the-whole-database identifier for this process instance.
"""
- thisProcess = MasterInfo.create(
- host=self.hostName, pid=getpid(), port=self.ampPort,
+ txn = self.connectionFactory()
+ thisProcess = yield MasterInfo.create(
+ txn, hostname=self.hostName, pid=self.pid, port=self.ampPort,
time=datetime.datetime.now()
)
@@ -259,7 +480,7 @@
to connect.
"""
f = Factory()
- f.protocol = PeerConnection
+ f.protocol = ConnectionFromPeerMaster
for master in masters:
self._startConnectingTo(master)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/4196d270/attachment-0001.html>
More information about the calendarserver-changes
mailing list