Revision: 11810 http://trac.calendarserver.org//changeset/11810 Author: cdaboo@apple.com Date: 2013-10-13 07:57:55 -0700 (Sun, 13 Oct 2013) Log Message: ----------- Keep track of created work items for logging purposes. Modified Paths: -------------- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py =================================================================== --- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py 2013-10-12 01:05:34 UTC (rev 11809) +++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py 2013-10-13 14:57:55 UTC (rev 11810) @@ -17,7 +17,7 @@ """ L{twext.enterprise.queue} is an U{eventually consistent -<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queueing system for +<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queuing system for use by applications with multiple front-end servers talking to a single database instance, that want to defer and parallelize work that involves storing the results of computation. @@ -35,7 +35,7 @@ would also be acceptable to have that happen at a later time, outside the critical path. -Such an application might be implemented with this queueing system like so:: +Such an application might be implemented with this queuing system like so:: from twext.enterprise.queue import WorkItem, queueFromTransaction from twext.enterprise.dal.parseschema import addSQLToSchema @@ -115,7 +115,7 @@ (in the worst case) pass from worker->controller->controller->worker. """ - def performWork(table, workID): + def performWork(table, workID): #@NoSelf """ @param table: The table where work is waiting. @type table: L{TableSyntax} @@ -284,7 +284,7 @@ the exact timing and location of the work execution may differ. L{WorkItem}s may be constrained in the ordering and timing of their - execution, to control concurrency and for performance reasons repsectively. + execution, to control concurrency and for performance reasons respectively. Although all the usual database mutual-exclusion rules apply to work executed in L{WorkItem.doWork}, implicit database row locking is not always @@ -305,7 +305,7 @@ cheap operations all have to wait for the expensive ones to complete, but continue to consume whatever database resources they were using. - In order to ameliorate these problems with potentiallly concurrent work + In order to ameliorate these problems with potentially concurrent work that uses the same resources, L{WorkItem} provides a database-wide mutex that is automatically acquired at the beginning of the transaction and released at the end. To use it, simply L{align @@ -367,10 +367,9 @@ This method will be invoked in a worker process. This method does I{not} need to delete the row referencing it; that - will be taken care of by the job queueing machinery. + will be taken care of by the job queuing machinery. """ - @classmethod def forTable(cls, table): """ @@ -431,7 +430,7 @@ necessary because we don't want to rely on DNS; if reverse DNS weren't set up perfectly, the listener would not be able to identify its peer, and it is easier to modify local configuration so that L{socket.getfqdn} returns - the right value than to ensure that DNS doesself. + the right value than to ensure that DNS does self. """ arguments = [ @@ -460,7 +459,7 @@ A connection to a peer node. Symmetric; since the 'client' and the 'server' both serve the same role, the logic is the same in every node. - @ivar localWorkerPool: the pool of local worker procesess that can process + @ivar localWorkerPool: the pool of local worker processes that can process queue work. @type localWorkerPool: L{WorkerConnectionPool} @@ -865,9 +864,6 @@ - - - class WorkerFactory(Factory, object): """ Factory, to be used as the client to connect from the worker to the @@ -942,6 +938,7 @@ self._whenProposed = Deferred() self._whenExecuted = Deferred() self._whenCommitted = Deferred() + self.workItem = None def _start(self): @@ -952,6 +949,7 @@ """ @passthru(self.workItemType.create(self.txn, **self.kw).addCallback) def whenCreated(item): + self.workItem = item self._whenProposed.callback(self) @self.txn.postCommit def whenDone(): @@ -1023,6 +1021,8 @@ """ return _cloneDeferred(self._whenCommitted) + + class _BaseQueuer(object): implements(IQueuer) @@ -1030,13 +1030,16 @@ super(_BaseQueuer, self).__init__() self.proposalCallbacks = set() + def callWithNewProposals(self, callback): - self.proposalCallbacks.add(callback); + self.proposalCallbacks.add(callback) + def transferProposalCallbacks(self, newQueuer): newQueuer.proposalCallbacks = self.proposalCallbacks return newQueuer + def enqueueWork(self, txn, workItemType, **kw): """ There is some work to do. Do it, someplace else, ideally in parallel. @@ -1061,6 +1064,7 @@ return wp + class PeerConnectionPool(_BaseQueuer, MultiService, object): """ Each node has a L{PeerConnectionPool} connecting it to all the other nodes @@ -1277,7 +1281,6 @@ overdueItem.workID) return inTransaction(self.transactionFactory, workCheck) - _currentWorkDeferred = None _lostWorkCheckCall = None @@ -1430,8 +1433,6 @@ - - class LocalQueuer(_BaseQueuer): """ When work is enqueued with this queuer, it is just executed locally. @@ -1469,6 +1470,7 @@ return succeed(None) + class NonPerformingQueuer(_BaseQueuer): """ When work is enqueued with this queuer, it is never executed locally. @@ -1487,4 +1489,4 @@ """ Choose to perform the work locally. """ - return NonPerformer() \ No newline at end of file + return NonPerformer()