[CalendarServer-changes] [9568] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:54:16 PDT 2012


Revision: 9568
          http://trac.macosforge.org/projects/calendarserver/changeset/9568
Author:   glyph at apple.com
Date:     2012-08-11 01:54:15 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
attempting to fake less and fix a bug

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/glyph/q/twext/enterprise/dal/record.py
    CalendarServer/branches/users/glyph/q/twext/enterprise/dal/test/test_record.py
    CalendarServer/branches/users/glyph/q/twext/enterprise/test/test_adbapi2.py
    CalendarServer/branches/users/glyph/q/twext/internet/threadutils.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/adbapi2.py	2012-08-11 08:54:14 UTC (rev 9567)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/adbapi2.py	2012-08-11 08:54:15 UTC (rev 9568)
@@ -754,8 +754,9 @@
     _retry = None
 
     def __init__(self, pool, holder):
-        self._pool   = pool
-        self._holder = holder
+        self._pool    = pool
+        self._holder  = holder
+        self._aborted = False
 
 
     def abort(self):
@@ -951,6 +952,8 @@
             cursor     = connection.cursor()
             return (connection, cursor)
         def finishInit((connection, cursor)):
+            if txn._aborted:
+                return
             baseTxn = _ConnectedTxn(
                 pool=self,
                 threadHolder=holder,

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/dal/record.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/dal/record.py	2012-08-11 08:54:14 UTC (rev 9567)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/dal/record.py	2012-08-11 08:54:15 UTC (rev 9568)
@@ -44,7 +44,15 @@
         returnValue(self)
 
 
+    @classmethod
+    def create(cls, *a, **k):
+        """
+        Create a row.
+        """
+        return None
 
+
+
 def fromTable(table):
     """
     Create a L{type} that maps the columns from a particular table.

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/dal/test/test_record.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/dal/test/test_record.py	2012-08-11 08:54:14 UTC (rev 9567)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/dal/test/test_record.py	2012-08-11 08:54:15 UTC (rev 9568)
@@ -101,8 +101,3 @@
         self.assertEqual(rows, [[3, 'epsilon']])
 
 
-
-class TestQuery(object):
-    """
-    Tests for loading row objects from the database.
-    """

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/test/test_adbapi2.py	2012-08-11 08:54:14 UTC (rev 9567)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/test/test_adbapi2.py	2012-08-11 08:54:15 UTC (rev 9568)
@@ -27,11 +27,9 @@
 
 from twisted.trial.unittest import TestCase
 
-from twisted.internet.defer import execute
 from twisted.internet.task import Clock
 
 from twisted.internet.interfaces import IReactorThreads
-from twisted.internet.defer import Deferred
 
 from twisted.test.proto_helpers import StringTransport
 
@@ -45,6 +43,7 @@
 from twext.enterprise.adbapi2 import FailsafeException
 from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
 from twext.enterprise.adbapi2 import ConnectionPool
+from twext.internet.threadutils import ThreadHolder
 
 
 def resultOf(deferred, propagate=False):
@@ -326,59 +325,64 @@
 
 
 
-class FakeThreadHolder(object):
+class FakeThreadHolder(ThreadHolder):
     """
-    Run things submitted to this ThreadHolder on the main thread, so that
+    Run things to submitted this ThreadHolder on the main thread, so that
     execution is easier to control.
     """
 
     def __init__(self, test):
+        super(FakeThreadHolder, self).__init__(self)
+        self.test = test
         self.started = False
         self.stopped = False
-        self.test = test
-        self.queue = []
 
 
     def start(self):
-        """
-        Mark this L{FakeThreadHolder} as not started.
-        """
         self.started = True
+        return super(FakeThreadHolder, self).start()
 
 
     def stop(self):
-        """
-        Mark this L{FakeThreadHolder} as stopped.
-        """
-        def stopped(nothing):
-            self.stopped = True
-        return self.submit(lambda : None).addCallback(stopped)
+        self.stopped = True
+        return super(FakeThreadHolder, self).stop()
 
 
-    def submit(self, work):
-        """
-        Call the function (or queue it)
-        """
-        if self.test.paused:
-            d = Deferred()
-            self.queue.append((d, work))
-            return d
-        else:
-            return execute(work)
+    @property
+    def _q(self):
+        return self._q_
 
 
+    @_q.setter
+    def _q(self, newq):
+        if newq is not None:
+            oget = newq.get
+            newq.get = lambda: oget(timeout=0)
+            oput = newq.put
+            def putit(x):
+                p = oput(x)
+                if not self.test.paused:
+                    self.flush()
+                return p
+            newq.put = putit
+        self._q_ = newq
+
+
+    def callFromThread(self, f, *a, **k):
+        result = f(*a, **k)
+        return result
+
+
+    def callInThread(self, f, *a, **k):
+        pass
+
+
     def flush(self):
         """
         Fire all deferreds previously returned from submit.
         """
-        self.queue, queue = [], self.queue
-        for (d, work) in queue:
-            try:
-                result = work()
-            except:
-                d.errback()
-            else:
-                d.callback(result)
+        while not self.stopped and self._q.queue and self._qpull():
+            pass
 
 
 
@@ -845,10 +849,10 @@
         abortResult = self.resultOf(it.abort())
 
         # steal it from the queue so we can do it out of order
-        d, work = self.holders[0].queue.pop()
+        d, work = self.holders[0]._q.get()
         # that should be the only work unit so don't continue if something else
         # got in there
-        self.assertEquals(self.holders[0].queue, [])
+        self.assertEquals(list(self.holders[0]._q.queue), [])
         self.assertEquals(len(self.holders), 1)
         self.flushHolders()
         stopResult = self.resultOf(self.pool.stopService())

Modified: CalendarServer/branches/users/glyph/q/twext/internet/threadutils.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/internet/threadutils.py	2012-08-11 08:54:14 UTC (rev 9567)
+++ CalendarServer/branches/users/glyph/q/twext/internet/threadutils.py	2012-08-11 08:54:15 UTC (rev 9568)
@@ -45,20 +45,30 @@
         """
         Worker function which runs in a non-reactor thread.
         """
-        while True:
-            work = self._q.get()
-            if work is _DONE:
-                def finishStopping():
-                    self._state = _STATE_STOPPED
-                    self._q = None
-                    s = self._stopper
-                    self._stopper = None
-                    s.callback(None)
-                self._reactor.callFromThread(finishStopping)
-                return
-            self._oneWorkUnit(*work)
+        while self._qpull():
+            pass
 
 
+    def _qpull(self):
+        """
+        Pull one item off the queue and react appropriately.
+
+        Return whether or not to keep going.
+        """
+        work = self._q.get()
+        if work is _DONE:
+            def finishStopping():
+                self._state = _STATE_STOPPED
+                self._q = None
+                s = self._stopper
+                self._stopper = None
+                s.callback(None)
+            self._reactor.callFromThread(finishStopping)
+            return False
+        self._oneWorkUnit(*work)
+        return True
+
+
     def _oneWorkUnit(self, deferred, instruction):
         try:
             result = instruction()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/3cc07faf/attachment-0001.html>


More information about the calendarserver-changes mailing list