[CalendarServer-changes] [9568] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:54:16 PDT 2012
Revision: 9568
http://trac.macosforge.org/projects/calendarserver/changeset/9568
Author: glyph at apple.com
Date: 2012-08-11 01:54:15 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
attempting to fake less and fix a bug
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/adbapi2.py
CalendarServer/branches/users/glyph/q/twext/enterprise/dal/record.py
CalendarServer/branches/users/glyph/q/twext/enterprise/dal/test/test_record.py
CalendarServer/branches/users/glyph/q/twext/enterprise/test/test_adbapi2.py
CalendarServer/branches/users/glyph/q/twext/internet/threadutils.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/adbapi2.py 2012-08-11 08:54:14 UTC (rev 9567)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/adbapi2.py 2012-08-11 08:54:15 UTC (rev 9568)
@@ -754,8 +754,9 @@
_retry = None
def __init__(self, pool, holder):
- self._pool = pool
- self._holder = holder
+ self._pool = pool
+ self._holder = holder
+ self._aborted = False
def abort(self):
@@ -951,6 +952,8 @@
cursor = connection.cursor()
return (connection, cursor)
def finishInit((connection, cursor)):
+ if txn._aborted:
+ return
baseTxn = _ConnectedTxn(
pool=self,
threadHolder=holder,
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/dal/record.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/dal/record.py 2012-08-11 08:54:14 UTC (rev 9567)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/dal/record.py 2012-08-11 08:54:15 UTC (rev 9568)
@@ -44,7 +44,15 @@
returnValue(self)
+ @classmethod
+ def create(cls, *a, **k):
+ """
+ Create a row.
+ """
+ return None
+
+
def fromTable(table):
"""
Create a L{type} that maps the columns from a particular table.
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/dal/test/test_record.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/dal/test/test_record.py 2012-08-11 08:54:14 UTC (rev 9567)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/dal/test/test_record.py 2012-08-11 08:54:15 UTC (rev 9568)
@@ -101,8 +101,3 @@
self.assertEqual(rows, [[3, 'epsilon']])
-
-class TestQuery(object):
- """
- Tests for loading row objects from the database.
- """
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/test/test_adbapi2.py 2012-08-11 08:54:14 UTC (rev 9567)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/test/test_adbapi2.py 2012-08-11 08:54:15 UTC (rev 9568)
@@ -27,11 +27,9 @@
from twisted.trial.unittest import TestCase
-from twisted.internet.defer import execute
from twisted.internet.task import Clock
from twisted.internet.interfaces import IReactorThreads
-from twisted.internet.defer import Deferred
from twisted.test.proto_helpers import StringTransport
@@ -45,6 +43,7 @@
from twext.enterprise.adbapi2 import FailsafeException
from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
from twext.enterprise.adbapi2 import ConnectionPool
+from twext.internet.threadutils import ThreadHolder
def resultOf(deferred, propagate=False):
@@ -326,59 +325,64 @@
-class FakeThreadHolder(object):
+class FakeThreadHolder(ThreadHolder):
"""
- Run things submitted to this ThreadHolder on the main thread, so that
+ Run things to submitted this ThreadHolder on the main thread, so that
execution is easier to control.
"""
def __init__(self, test):
+ super(FakeThreadHolder, self).__init__(self)
+ self.test = test
self.started = False
self.stopped = False
- self.test = test
- self.queue = []
def start(self):
- """
- Mark this L{FakeThreadHolder} as not started.
- """
self.started = True
+ return super(FakeThreadHolder, self).start()
def stop(self):
- """
- Mark this L{FakeThreadHolder} as stopped.
- """
- def stopped(nothing):
- self.stopped = True
- return self.submit(lambda : None).addCallback(stopped)
+ self.stopped = True
+ return super(FakeThreadHolder, self).stop()
- def submit(self, work):
- """
- Call the function (or queue it)
- """
- if self.test.paused:
- d = Deferred()
- self.queue.append((d, work))
- return d
- else:
- return execute(work)
+ @property
+ def _q(self):
+ return self._q_
+ @_q.setter
+ def _q(self, newq):
+ if newq is not None:
+ oget = newq.get
+ newq.get = lambda: oget(timeout=0)
+ oput = newq.put
+ def putit(x):
+ p = oput(x)
+ if not self.test.paused:
+ self.flush()
+ return p
+ newq.put = putit
+ self._q_ = newq
+
+
+ def callFromThread(self, f, *a, **k):
+ result = f(*a, **k)
+ return result
+
+
+ def callInThread(self, f, *a, **k):
+ pass
+
+
def flush(self):
"""
Fire all deferreds previously returned from submit.
"""
- self.queue, queue = [], self.queue
- for (d, work) in queue:
- try:
- result = work()
- except:
- d.errback()
- else:
- d.callback(result)
+ while not self.stopped and self._q.queue and self._qpull():
+ pass
@@ -845,10 +849,10 @@
abortResult = self.resultOf(it.abort())
# steal it from the queue so we can do it out of order
- d, work = self.holders[0].queue.pop()
+ d, work = self.holders[0]._q.get()
# that should be the only work unit so don't continue if something else
# got in there
- self.assertEquals(self.holders[0].queue, [])
+ self.assertEquals(list(self.holders[0]._q.queue), [])
self.assertEquals(len(self.holders), 1)
self.flushHolders()
stopResult = self.resultOf(self.pool.stopService())
Modified: CalendarServer/branches/users/glyph/q/twext/internet/threadutils.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/internet/threadutils.py 2012-08-11 08:54:14 UTC (rev 9567)
+++ CalendarServer/branches/users/glyph/q/twext/internet/threadutils.py 2012-08-11 08:54:15 UTC (rev 9568)
@@ -45,20 +45,30 @@
"""
Worker function which runs in a non-reactor thread.
"""
- while True:
- work = self._q.get()
- if work is _DONE:
- def finishStopping():
- self._state = _STATE_STOPPED
- self._q = None
- s = self._stopper
- self._stopper = None
- s.callback(None)
- self._reactor.callFromThread(finishStopping)
- return
- self._oneWorkUnit(*work)
+ while self._qpull():
+ pass
+ def _qpull(self):
+ """
+ Pull one item off the queue and react appropriately.
+
+ Return whether or not to keep going.
+ """
+ work = self._q.get()
+ if work is _DONE:
+ def finishStopping():
+ self._state = _STATE_STOPPED
+ self._q = None
+ s = self._stopper
+ self._stopper = None
+ s.callback(None)
+ self._reactor.callFromThread(finishStopping)
+ return False
+ self._oneWorkUnit(*work)
+ return True
+
+
def _oneWorkUnit(self, deferred, instruction):
try:
result = instruction()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/3cc07faf/attachment-0001.html>
More information about the calendarserver-changes
mailing list