[CalendarServer-changes] [10272] CalendarServer/branches/users/glyph/queue-locking-and-timing

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 4 16:39:39 PST 2013


Revision: 10272
          http://trac.calendarserver.org//changeset/10272
Author:   glyph at apple.com
Date:     2013-01-04 16:39:39 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Refactor and vastly simplify local queuer, while making it somewhat more realistic.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
    CalendarServer/branches/users/glyph/queue-locking-and-timing/txdav/common/datastore/sql.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:39:38 UTC (rev 10271)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:39:39 UTC (rev 10272)
@@ -272,7 +272,7 @@
     both created and thereby implicitly scheduled to be executed by calling
     L{enqueueWork <twext.enterprise.ienterprise.IQueuer.enqueueWork>} with the
     appropriate L{WorkItem} concrete subclass.  There are different queue
-    implementations (L{PeerConnectionPool} and L{NullQueuer}, for example), so
+    implementations (L{PeerConnectionPool} and L{LocalQueuer}, for example), so
     the exact timing and location of the work execution may differ.
 
     L{WorkItem}s may be constrained in the ordering and timing of their
@@ -899,9 +899,9 @@
     A L{WorkProposal} is a proposal for work that will be executed, perhaps on
     another node, perhaps in the future.
 
-    @ivar pool: the connection pool which this L{WorkProposal} will use to
-        submit its work.
-    @type pool: L{PeerConnectionPool}
+    @ivar _chooser: The object which will choose where the work in this
+        proposal gets performed.
+    @type _chooser: L{PeerConnectionPool} or L{LocalQueuer}
 
     @ivar txn: The transaction where the work will be enqueued.
     @type txn: L{IAsyncTransaction}
@@ -914,8 +914,8 @@
     @type kw: L{dict}
     """
 
-    def __init__(self, pool, txn, workItemType, kw):
-        self.pool = pool
+    def __init__(self, chooser, txn, workItemType, kw):
+        self._chooser = chooser
         self.txn = txn
         self.workItemType = workItemType
         self.kw = kw
@@ -936,7 +936,7 @@
             @self.txn.postCommit
             def whenDone():
                 self._whenCommitted.callback(None)
-                performer = self.pool.choosePerformer()
+                performer = self._chooser.choosePerformer()
                 @passthru(performer.performWork(item.table, item.workID)
                           .addCallback)
                 def performed(result):
@@ -1380,39 +1380,25 @@
 
 
 
-class ImmediateWorkProposal(object):
+class LocalQueuer(object):
     """
-    Like L{WorkProposal}, but for items that must be executed immediately
-    because no real queue is set up yet.
-
-    @see: L{WorkProposal}, L{NullQueuer.enqueueWork}
-    """
-    def __init__(self, proposed, done):
-        self.proposed = proposed
-        self.done = done
-
-
-    def whenExecuted(self):
-        return _cloneDeferred(self.done)
-
-
-    def whenProposed(self):
-        return _cloneDeferred(self.proposed)
-
-
-    def whenCommitted(self):
-        return _cloneDeferred(self.done)
-
-
-
-class NullQueuer(object):
-    """
     When work is enqueued with this queuer, it is just executed immediately,
     within the same transaction.  While this is technically correct, it is not
     very efficient.
     """
     implements(IQueuer)
 
+    def __init__(self, txnFactory):
+        self.txnFactory = txnFactory
+
+
+    def choosePerformer(self):
+        """
+        Choose to perform the work locally.
+        """
+        return LocalPerformer(self.txnFactory)
+
+
     def enqueueWork(self, txn, workItemType, **kw):
         """
         Do this work immediately.
@@ -1421,26 +1407,12 @@
 
         @return: a pseudo work proposal, since everything completes at the same
             time.
-        @rtype: L{ImmediateWorkProposal}
+        @rtype: L{WorkProposal}
         """
-        proposed = Deferred()
-        done = Deferred()
-        @inlineCallbacks
-        def doit():
-            item = yield self.workItemType.create(self.txn, **self.kw)
-            proposed.callback(True)
-            yield item.delete()
-            yield item.doWork()
-        @txn.postCommit
-        def committed():
-            done.callback(True)
-        @txn.postAbort
-        def aborted():
-            tf = TransactionFailed()
-            done.errback(tf)
-            if not proposed.called:
-                proposed.errback(tf)
-        return ImmediateWorkProposal(proposed, done)
+        wp = WorkProposal(self, txn, workItemType, kw)
+        wp._start()
+        return wp
 
 
 
+

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/txdav/common/datastore/sql.py	2013-01-05 00:39:38 UTC (rev 10271)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/txdav/common/datastore/sql.py	2013-01-05 00:39:39 UTC (rev 10272)
@@ -87,7 +87,7 @@
 from twistedcaldav.dateops import datetimeMktime, pyCalendarTodatetime
 
 from txdav.base.datastore.util import normalizeUUIDOrNot
-from twext.enterprise.queue import NullQueuer
+from twext.enterprise.queue import LocalQueuer
 from twext.enterprise.util import parseSQLTimestamp
 
 from pycalendar.datetime import PyCalendarDateTime
@@ -139,7 +139,7 @@
     @type quota: C{int} or C{NoneType}
 
     @ivar queuer: An object with an C{enqueueWork} method, from
-        L{twext.enterprise.queue}.  Initially, this is a L{NullQueuer}, so it
+        L{twext.enterprise.queue}.  Initially, this is a L{LocalQueuer}, so it
         is always usable, but in a properly configured environment it will be
         upgraded to a more capable object that can distribute work throughout a
         cluster.
@@ -171,7 +171,7 @@
         self.logSQL = logSQL
         self.logTransactionWaits = logTransactionWaits
         self.timeoutTransactions = timeoutTransactions
-        self.queuer = NullQueuer()
+        self.queuer = LocalQueuer(self.newTransaction)
         self._migrating = False
         self._enableNotifications = True
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/4164f203/attachment.html>


More information about the calendarserver-changes mailing list