[CalendarServer-changes] [11066] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Thu Apr 18 18:04:09 PDT 2013


Revision: 11066
          http://trac.calendarserver.org//changeset/11066
Author:   glyph at apple.com
Date:     2013-04-18 18:04:09 -0700 (Thu, 18 Apr 2013)
Log Message:
-----------
Merge start-service-start-loop branch.

Several test refactorings, plus make sure to actually kick off the orphaned queue item recovery loop.

Modified Paths:
--------------
    CalendarServer/trunk/twext/enterprise/dal/syntax.py
    CalendarServer/trunk/twext/enterprise/fixtures.py
    CalendarServer/trunk/twext/enterprise/queue.py
    CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
    CalendarServer/trunk/twext/enterprise/test/test_queue.py

Property Changed:
----------------
    CalendarServer/trunk/


Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
   + /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop:11060-11065
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593

Modified: CalendarServer/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/syntax.py	2013-04-18 23:33:01 UTC (rev 11065)
+++ CalendarServer/trunk/twext/enterprise/dal/syntax.py	2013-04-19 01:04:09 UTC (rev 11066)
@@ -147,7 +147,8 @@
 
     _paramstyles = {
         'pyformat': partial(FixedPlaceholder, "%s"),
-        'numeric': NumericPlaceholder
+        'numeric': NumericPlaceholder,
+        'qmark': defaultPlaceholder,
     }
 
 
@@ -1673,6 +1674,12 @@
 
 
     def _toSQL(self, queryGenerator):
+        if queryGenerator.dialect == SQLITE_DIALECT:
+            # FIXME - this is only stubbed out for testing right now, actual
+            # concurrency would require some kind of locking statement here.
+            # BEGIN IMMEDIATE maybe, if that's okay in the middle of a
+            # transaction or repeatedly?
+            return SQLFragment('select null')
         return SQLFragment('lock table ').append(
             self.table.subSQL(queryGenerator, [self.table])).append(
             SQLFragment(' in %s mode' % (self.mode,)))

Modified: CalendarServer/trunk/twext/enterprise/fixtures.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/fixtures.py	2013-04-18 23:33:01 UTC (rev 11065)
+++ CalendarServer/trunk/twext/enterprise/fixtures.py	2013-04-19 01:04:09 UTC (rev 11066)
@@ -20,10 +20,24 @@
 """
 
 import sqlite3
+from Queue import Empty
+from itertools import count
 
+from zope.interface import implementer
+from zope.interface.verify import verifyClass
+
+from twisted.internet.interfaces import IReactorThreads
+from twisted.python.threadpool import ThreadPool
+
+from twisted.internet.task import Clock
+
 from twext.enterprise.adbapi2 import ConnectionPool
 from twext.enterprise.ienterprise import SQLITE_DIALECT
+from twext.enterprise.ienterprise import POSTGRES_DIALECT
+from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
+from twext.internet.threadutils import ThreadHolder
 
+
 def buildConnectionPool(testCase, schemaText="", dialect=SQLITE_DIALECT):
     """
     Build a L{ConnectionPool} for testing purposes, with the given C{testCase}.
@@ -57,3 +71,513 @@
     pool.startService()
     testCase.addCleanup(pool.stopService)
     return pool
+
+
+
+def resultOf(deferred, propagate=False):
+    """
+    Add a callback and errback which will capture the result of a L{Deferred} in
+    a list, and return that list.  If 'propagate' is True, pass through the
+    results.
+    """
+    results = []
+    if propagate:
+        def cb(r):
+            results.append(r)
+            return r
+    else:
+        cb = results.append
+    deferred.addBoth(cb)
+    return results
+
+
+
+class FakeThreadHolder(ThreadHolder):
+    """
+    Run things to submitted this ThreadHolder on the main thread, so that
+    execution is easier to control.
+    """
+
+    def __init__(self, test):
+        super(FakeThreadHolder, self).__init__(self)
+        self.test = test
+        self.started = False
+        self.stopped = False
+        self._workerIsRunning = False
+
+
+    def start(self):
+        self.started = True
+        return super(FakeThreadHolder, self).start()
+
+
+    def stop(self):
+        result = super(FakeThreadHolder, self).stop()
+        self.stopped = True
+        return result
+
+
+    @property
+    def _get_q(self):
+        return self._q_
+
+
+    @_get_q.setter
+    def _q(self, newq):
+        if newq is not None:
+            oget = newq.get
+            newq.get = lambda: oget(timeout=0)
+            oput = newq.put
+            def putit(x):
+                p = oput(x)
+                if not self.test.paused:
+                    self.flush()
+                return p
+            newq.put = putit
+        self._q_ = newq
+
+
+    def callFromThread(self, f, *a, **k):
+        result = f(*a, **k)
+        return result
+
+
+    def callInThread(self, f, *a, **k):
+        """
+        This should be called only once, to start the worker function that
+        dedicates a thread to this L{ThreadHolder}.
+        """
+        self._workerIsRunning = True
+
+
+    def flush(self):
+        """
+        Fire all deferreds previously returned from submit.
+        """
+        try:
+            while self._workerIsRunning and self._qpull():
+                pass
+            else:
+                self._workerIsRunning = False
+        except Empty:
+            pass
+
+
+
+ at implementer(IReactorThreads)
+class ClockWithThreads(Clock):
+    """
+    A testing reactor that supplies L{IReactorTime} and L{IReactorThreads}.
+    """
+
+    def __init__(self):
+        super(ClockWithThreads, self).__init__()
+        self._pool = ThreadPool()
+
+
+    def getThreadPool(self):
+        """
+        Get the threadpool.
+        """
+        return self._pool
+
+
+    def suggestThreadPoolSize(self, size):
+        """
+        Approximate the behavior of a 'real' reactor.
+        """
+        self._pool.adjustPoolsize(maxthreads=size)
+
+
+    def callInThread(self, thunk, *a, **kw):
+        """
+        No implementation.
+        """
+
+
+    def callFromThread(self, thunk, *a, **kw):
+        """
+        No implementation.
+        """
+
+verifyClass(IReactorThreads, ClockWithThreads)
+
+
+
+class ConnectionPoolHelper(object):
+    """
+    Connection pool setting-up facilities for tests that need a
+    L{ConnectionPool}.
+    """
+
+    dialect = POSTGRES_DIALECT
+    paramstyle = DEFAULT_PARAM_STYLE
+
+    def setUp(self, test=None, connect=None):
+        """
+        Support inheritance by L{TestCase} classes.
+        """
+        if test is None:
+            test = self
+        if connect is None:
+            self.factory = ConnectionFactory()
+            connect = self.factory.connect
+        self.connect = connect
+        self.paused             = False
+        self.holders            = []
+        self.pool               = ConnectionPool(connect,
+                                                 maxConnections=2,
+                                                 dialect=self.dialect,
+                                                 paramstyle=self.paramstyle)
+        self.pool._createHolder = self.makeAHolder
+        self.clock              = self.pool.reactor = ClockWithThreads()
+        self.pool.startService()
+        test.addCleanup(self.flushHolders)
+
+
+    def flushHolders(self):
+        """
+        Flush all pending C{submit}s since C{pauseHolders} was called.  This
+        makes sure the service is stopped and the fake ThreadHolders are all
+        executing their queues so failed tsets can exit cleanly.
+        """
+        self.paused = False
+        for holder in self.holders:
+            holder.flush()
+
+
+    def pauseHolders(self):
+        """
+        Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
+        L{Deferred}.
+        """
+        self.paused = True
+
+
+    def makeAHolder(self):
+        """
+        Make a ThreadHolder-alike.
+        """
+        fth = FakeThreadHolder(self)
+        self.holders.append(fth)
+        return fth
+
+
+    def resultOf(self, it):
+        return resultOf(it)
+
+
+    def createTransaction(self):
+        return self.pool.connection()
+
+
+    def translateError(self, err):
+        return err
+
+
+
+class SteppablePoolHelper(ConnectionPoolHelper):
+    """
+    A version of L{ConnectionPoolHelper} that can set up a connection pool
+    capable of firing all its L{Deferred}s on demand, synchronously, by using
+    SQLite.
+    """
+    dialect = SQLITE_DIALECT
+    paramstyle = sqlite3.paramstyle
+
+    def __init__(self, schema):
+        self.schema = schema
+
+
+    def setUp(self, test):
+        connect = synchronousConnectionFactory(test)
+        con = connect()
+        cur = con.cursor()
+        cur.executescript(self.schema)
+        con.commit()
+        super(SteppablePoolHelper, self).setUp(test, connect)
+
+
+    def rows(self, sql):
+        """
+        Get some rows from the database to compare in a test.
+        """
+        con = self.connect()
+        cur = con.cursor()
+        cur.execute(sql)
+        result = cur.fetchall()
+        con.commit()
+        return result
+
+
+
+def synchronousConnectionFactory(test):
+    tmpdb = test.mktemp()
+    def connect():
+        return sqlite3.connect(tmpdb)
+    return connect
+
+
+
+class Child(object):
+    """
+    An object with a L{Parent}, in its list of C{children}.
+    """
+    def __init__(self, parent):
+        self.closed = False
+        self.parent = parent
+        self.parent.children.append(self)
+
+
+    def close(self):
+        if self.parent._closeFailQueue:
+            raise self.parent._closeFailQueue.pop(0)
+        self.closed = True
+
+
+
+class Parent(object):
+    """
+    An object with a list of L{Child}ren.
+    """
+
+    def __init__(self):
+        self.children = []
+        self._closeFailQueue = []
+
+
+    def childCloseWillFail(self, exception):
+        """
+        Closing children of this object will result in the given exception.
+
+        @see: L{ConnectionFactory}
+        """
+        self._closeFailQueue.append(exception)
+
+
+
+class FakeConnection(Parent, Child):
+    """
+    Fake Stand-in for DB-API 2.0 connection.
+
+    @ivar executions: the number of statements which have been executed.
+
+    """
+
+    executions = 0
+
+    def __init__(self, factory):
+        """
+        Initialize list of cursors
+        """
+        Parent.__init__(self)
+        Child.__init__(self, factory)
+        self.id = factory.idcounter.next()
+        self._executeFailQueue = []
+        self._commitCount = 0
+        self._rollbackCount = 0
+
+
+    def executeWillFail(self, thunk):
+        """
+        The next call to L{FakeCursor.execute} will fail with an exception
+        returned from the given callable.
+        """
+        self._executeFailQueue.append(thunk)
+
+
+    @property
+    def cursors(self):
+        "Alias to make tests more readable."
+        return self.children
+
+
+    def cursor(self):
+        return FakeCursor(self)
+
+
+    def commit(self):
+        self._commitCount += 1
+        if self.parent.commitFail:
+            self.parent.commitFail = False
+            raise CommitFail()
+
+
+    def rollback(self):
+        self._rollbackCount += 1
+        if self.parent.rollbackFail:
+            self.parent.rollbackFail = False
+            raise RollbackFail()
+
+
+
+class RollbackFail(Exception):
+    """
+    Sample rollback-failure exception.
+    """
+
+
+
+class CommitFail(Exception):
+    """
+    Sample Commit-failure exception.
+    """
+
+
+
+class FakeCursor(Child):
+    """
+    Fake stand-in for a DB-API 2.0 cursor.
+    """
+    def __init__(self, connection):
+        Child.__init__(self, connection)
+        self.rowcount = 0
+        # not entirely correct, but all we care about is its truth value.
+        self.description = False
+        self.variables = []
+        self.allExecutions = []
+
+
+    @property
+    def connection(self):
+        "Alias to make tests more readable."
+        return self.parent
+
+
+    def execute(self, sql, args=()):
+        self.connection.executions += 1
+        if self.connection._executeFailQueue:
+            raise self.connection._executeFailQueue.pop(0)()
+        self.allExecutions.append((sql, args))
+        self.sql = sql
+        factory = self.connection.parent
+        self.description = factory.hasResults
+        if factory.hasResults and factory.shouldUpdateRowcount:
+            self.rowcount = 1
+        else:
+            self.rowcount = 0
+        return
+
+
+    def var(self, type, *args):
+        """
+        Return a database variable in the style of the cx_Oracle bindings.
+        """
+        v = FakeVariable(self, type, args)
+        self.variables.append(v)
+        return v
+
+
+    def fetchall(self):
+        """
+        Just echo the SQL that was executed in the last query.
+        """
+        if self.connection.parent.hasResults:
+            return [[self.connection.id, self.sql]]
+        if self.description:
+            return []
+        return None
+
+
+
+class FakeVariable(object):
+    def __init__(self, cursor, type, args):
+        self.cursor = cursor
+        self.type = type
+        self.args = args
+
+
+    def getvalue(self):
+        vv = self.cursor.connection.parent.varvals
+        if vv:
+            return vv.pop(0)
+        return self.cursor.variables.index(self) + 300
+
+
+    def __reduce__(self):
+        raise RuntimeError("Not pickleable (since oracle vars aren't)")
+
+
+
+class ConnectionFactory(Parent):
+    """
+    A factory for L{FakeConnection} objects.
+
+    @ivar shouldUpdateRowcount: Should C{execute} on cursors produced by
+        connections produced by this factory update their C{rowcount} or just
+        their C{description} attribute?
+
+    @ivar hasResults: should cursors produced by connections by this factory
+        have any results returned by C{fetchall()}?
+    """
+
+    rollbackFail = False
+    commitFail = False
+
+    def __init__(self, shouldUpdateRowcount=True, hasResults=True):
+        Parent.__init__(self)
+        self.idcounter = count(1)
+        self._connectResultQueue = []
+        self.defaultConnect()
+        self.varvals = []
+        self.shouldUpdateRowcount = shouldUpdateRowcount
+        self.hasResults = hasResults
+
+
+    @property
+    def connections(self):
+        "Alias to make tests more readable."
+        return self.children
+
+
+    def connect(self):
+        """
+        Implement the C{ConnectionFactory} callable expected by
+        L{ConnectionPool}.
+        """
+        if self._connectResultQueue:
+            thunk = self._connectResultQueue.pop(0)
+        else:
+            thunk = self._default
+        return thunk()
+
+
+    def willConnect(self):
+        """
+        Used by tests to queue a successful result for connect().
+        """
+        def thunk():
+            return FakeConnection(self)
+        self._connectResultQueue.append(thunk)
+
+
+    def willFail(self):
+        """
+        Used by tests to queue a successful result for connect().
+        """
+        def thunk():
+            raise FakeConnectionError()
+        self._connectResultQueue.append(thunk)
+
+
+    def defaultConnect(self):
+        """
+        By default, connection attempts will succeed.
+        """
+        self.willConnect()
+        self._default = self._connectResultQueue.pop()
+
+
+    def defaultFail(self):
+        """
+        By default, connection attempts will fail.
+        """
+        self.willFail()
+        self._default = self._connectResultQueue.pop()
+
+
+
+class FakeConnectionError(Exception):
+    """
+    Synthetic error that might occur during connection.
+    """

Modified: CalendarServer/trunk/twext/enterprise/queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/queue.py	2013-04-18 23:33:01 UTC (rev 11065)
+++ CalendarServer/trunk/twext/enterprise/queue.py	2013-04-19 01:04:09 UTC (rev 11066)
@@ -1342,6 +1342,7 @@
         def done(result):
             self._startingUp = None
             super(PeerConnectionPool, self).startService()
+            self._lostWorkCheckLoop()
             return result
 
 

Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2013-04-18 23:33:01 UTC (rev 11065)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2013-04-19 01:04:09 UTC (rev 11066)
@@ -18,22 +18,14 @@
 Tests for L{twext.enterprise.adbapi2}.
 """
 
-from itertools import count
-from Queue import Empty
+from zope.interface.verify import verifyObject
 
-from zope.interface.verify import verifyClass, verifyObject
-from zope.interface.declarations import implements
-
-from twisted.python.threadpool import ThreadPool
 from twisted.python.failure import Failure
 
 from twisted.trial.unittest import TestCase
 
-from twisted.internet.task import Clock
 from twisted.internet.defer import Deferred, fail
 
-from twisted.internet.interfaces import IReactorThreads
-
 from twisted.test.proto_helpers import StringTransport
 
 from twext.enterprise.ienterprise import ConnectionError
@@ -41,33 +33,17 @@
 from twext.enterprise.adbapi2 import ConnectionPoolClient
 from twext.enterprise.adbapi2 import ConnectionPoolConnection
 from twext.enterprise.ienterprise import IAsyncTransaction
-from twext.enterprise.ienterprise import POSTGRES_DIALECT
 from twext.enterprise.ienterprise import ICommandBlock
 from twext.enterprise.adbapi2 import FailsafeException
-from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
 from twext.enterprise.adbapi2 import ConnectionPool
-from twext.internet.threadutils import ThreadHolder
+from twext.enterprise.fixtures import ConnectionPoolHelper
+from twext.enterprise.fixtures import resultOf
+from twext.enterprise.fixtures import ClockWithThreads
+from twext.enterprise.fixtures import FakeConnectionError
+from twext.enterprise.fixtures import RollbackFail
+from twext.enterprise.fixtures import CommitFail
 from twext.enterprise.adbapi2 import Commit
 
-
-def resultOf(deferred, propagate=False):
-    """
-    Add a callback and errback which will capture the result of a L{Deferred} in
-    a list, and return that list.  If 'propagate' is True, pass through the
-    results.
-    """
-    results = []
-    if propagate:
-        def cb(r):
-            results.append(r)
-            return r
-    else:
-        cb = results.append
-    deferred.addBoth(cb)
-    return results
-
-
-
 class AssertResultHelper(object):
     """
     Mixin for asserting about synchronous Deferred results.
@@ -97,384 +73,6 @@
 
 
 
-class Child(object):
-    """
-    An object with a L{Parent}, in its list of C{children}.
-    """
-    def __init__(self, parent):
-        self.closed = False
-        self.parent = parent
-        self.parent.children.append(self)
-
-
-    def close(self):
-        if self.parent._closeFailQueue:
-            raise self.parent._closeFailQueue.pop(0)
-        self.closed = True
-
-
-
-class Parent(object):
-    """
-    An object with a list of L{Child}ren.
-    """
-
-    def __init__(self):
-        self.children = []
-        self._closeFailQueue = []
-
-
-    def childCloseWillFail(self, exception):
-        """
-        Closing children of this object will result in the given exception.
-
-        @see: L{ConnectionFactory}
-        """
-        self._closeFailQueue.append(exception)
-
-
-
-class FakeConnection(Parent, Child):
-    """
-    Fake Stand-in for DB-API 2.0 connection.
-
-    @ivar executions: the number of statements which have been executed.
-
-    """
-
-    executions = 0
-
-    def __init__(self, factory):
-        """
-        Initialize list of cursors
-        """
-        Parent.__init__(self)
-        Child.__init__(self, factory)
-        self.id = factory.idcounter.next()
-        self._executeFailQueue = []
-        self._commitCount = 0
-        self._rollbackCount = 0
-
-
-    def executeWillFail(self, thunk):
-        """
-        The next call to L{FakeCursor.execute} will fail with an exception
-        returned from the given callable.
-        """
-        self._executeFailQueue.append(thunk)
-
-
-    @property
-    def cursors(self):
-        "Alias to make tests more readable."
-        return self.children
-
-
-    def cursor(self):
-        return FakeCursor(self)
-
-
-    def commit(self):
-        self._commitCount += 1
-        if self.parent.commitFail:
-            self.parent.commitFail = False
-            raise CommitFail()
-
-
-    def rollback(self):
-        self._rollbackCount += 1
-        if self.parent.rollbackFail:
-            self.parent.rollbackFail = False
-            raise RollbackFail()
-
-
-
-class RollbackFail(Exception):
-    """
-    Sample rollback-failure exception.
-    """
-
-
-
-class CommitFail(Exception):
-    """
-    Sample Commit-failure exception.
-    """
-
-
-
-class FakeCursor(Child):
-    """
-    Fake stand-in for a DB-API 2.0 cursor.
-    """
-    def __init__(self, connection):
-        Child.__init__(self, connection)
-        self.rowcount = 0
-        # not entirely correct, but all we care about is its truth value.
-        self.description = False
-        self.variables = []
-        self.allExecutions = []
-
-
-    @property
-    def connection(self):
-        "Alias to make tests more readable."
-        return self.parent
-
-
-    def execute(self, sql, args=()):
-        self.connection.executions += 1
-        if self.connection._executeFailQueue:
-            raise self.connection._executeFailQueue.pop(0)()
-        self.allExecutions.append((sql, args))
-        self.sql = sql
-        factory = self.connection.parent
-        self.description = factory.hasResults
-        if factory.hasResults and factory.shouldUpdateRowcount:
-            self.rowcount = 1
-        else:
-            self.rowcount = 0
-        return
-
-
-    def var(self, type, *args):
-        """
-        Return a database variable in the style of the cx_Oracle bindings.
-        """
-        v = FakeVariable(self, type, args)
-        self.variables.append(v)
-        return v
-
-
-    def fetchall(self):
-        """
-        Just echo the SQL that was executed in the last query.
-        """
-        if self.connection.parent.hasResults:
-            return [[self.connection.id, self.sql]]
-        if self.description:
-            return []
-        return None
-
-
-
-class FakeVariable(object):
-    def __init__(self, cursor, type, args):
-        self.cursor = cursor
-        self.type = type
-        self.args = args
-
-
-    def getvalue(self):
-        vv = self.cursor.connection.parent.varvals
-        if vv:
-            return vv.pop(0)
-        return self.cursor.variables.index(self) + 300
-
-
-    def __reduce__(self):
-        raise RuntimeError("Not pickleable (since oracle vars aren't)")
-
-
-
-class ConnectionFactory(Parent):
-    """
-    A factory for L{FakeConnection} objects.
-
-    @ivar shouldUpdateRowcount: Should C{execute} on cursors produced by
-        connections produced by this factory update their C{rowcount} or just
-        their C{description} attribute?
-
-    @ivar hasResults: should cursors produced by connections by this factory
-        have any results returned by C{fetchall()}?
-    """
-
-    rollbackFail = False
-    commitFail = False
-
-    def __init__(self, shouldUpdateRowcount=True, hasResults=True):
-        Parent.__init__(self)
-        self.idcounter = count(1)
-        self._connectResultQueue = []
-        self.defaultConnect()
-        self.varvals = []
-        self.shouldUpdateRowcount = shouldUpdateRowcount
-        self.hasResults = hasResults
-
-
-    @property
-    def connections(self):
-        "Alias to make tests more readable."
-        return self.children
-
-
-    def connect(self):
-        """
-        Implement the C{ConnectionFactory} callable expected by
-        L{ConnectionPool}.
-        """
-        if self._connectResultQueue:
-            thunk = self._connectResultQueue.pop(0)
-        else:
-            thunk = self._default
-        return thunk()
-
-
-    def willConnect(self):
-        """
-        Used by tests to queue a successful result for connect().
-        """
-        def thunk():
-            return FakeConnection(self)
-        self._connectResultQueue.append(thunk)
-
-
-    def willFail(self):
-        """
-        Used by tests to queue a successful result for connect().
-        """
-        def thunk():
-            raise FakeConnectionError()
-        self._connectResultQueue.append(thunk)
-
-
-    def defaultConnect(self):
-        """
-        By default, connection attempts will succeed.
-        """
-        self.willConnect()
-        self._default = self._connectResultQueue.pop()
-
-
-    def defaultFail(self):
-        """
-        By default, connection attempts will fail.
-        """
-        self.willFail()
-        self._default = self._connectResultQueue.pop()
-
-
-
-class FakeConnectionError(Exception):
-    """
-    Synthetic error that might occur during connection.
-    """
-
-
-
-class FakeThreadHolder(ThreadHolder):
-    """
-    Run things to submitted this ThreadHolder on the main thread, so that
-    execution is easier to control.
-    """
-
-    def __init__(self, test):
-        super(FakeThreadHolder, self).__init__(self)
-        self.test = test
-        self.started = False
-        self.stopped = False
-        self._workerIsRunning = False
-
-
-    def start(self):
-        self.started = True
-        return super(FakeThreadHolder, self).start()
-
-
-    def stop(self):
-        result = super(FakeThreadHolder, self).stop()
-        self.stopped = True
-        return result
-
-
-    @property
-    def _get_q(self):
-        return self._q_
-
-
-    @_get_q.setter
-    def _q(self, newq):
-        if newq is not None:
-            oget = newq.get
-            newq.get = lambda: oget(timeout=0)
-            oput = newq.put
-            def putit(x):
-                p = oput(x)
-                if not self.test.paused:
-                    self.flush()
-                return p
-            newq.put = putit
-        self._q_ = newq
-
-
-    def callFromThread(self, f, *a, **k):
-        result = f(*a, **k)
-        return result
-
-
-    def callInThread(self, f, *a, **k):
-        """
-        This should be called only once, to start the worker function that
-        dedicates a thread to this L{ThreadHolder}.
-        """
-        self._workerIsRunning = True
-
-
-    def flush(self):
-        """
-        Fire all deferreds previously returned from submit.
-        """
-        try:
-            while self._workerIsRunning and self._qpull():
-                pass
-            else:
-                self._workerIsRunning = False
-        except Empty:
-            pass
-
-
-
-class ClockWithThreads(Clock):
-    """
-    A testing reactor that supplies L{IReactorTime} and L{IReactorThreads}.
-    """
-    implements(IReactorThreads)
-
-    def __init__(self):
-        super(ClockWithThreads, self).__init__()
-        self._pool = ThreadPool()
-
-
-    def getThreadPool(self):
-        """
-        Get the threadpool.
-        """
-        return self._pool
-
-
-    def suggestThreadPoolSize(self, size):
-        """
-        Approximate the behavior of a 'real' reactor.
-        """
-        self._pool.adjustPoolsize(maxthreads=size)
-
-
-    def callInThread(self, thunk, *a, **kw):
-        """
-        No implementation.
-        """
-
-
-    def callFromThread(self, thunk, *a, **kw):
-        """
-        No implementation.
-        """
-
-
-verifyClass(IReactorThreads, ClockWithThreads)
-
-
-
 class ConnectionPoolBootTests(TestCase):
     """
     Tests for the start-up phase of L{ConnectionPool}.
@@ -521,74 +119,6 @@
 
 
 
-class ConnectionPoolHelper(object):
-    """
-    Connection pool setting-up facilities for tests that need a
-    L{ConnectionPool}.
-    """
-
-    dialect = POSTGRES_DIALECT
-    paramstyle = DEFAULT_PARAM_STYLE
-
-    def setUp(self):
-        """
-        Create a L{ConnectionPool} attached to a C{ConnectionFactory}.  Start
-        the L{ConnectionPool}.
-        """
-        self.paused             = False
-        self.holders            = []
-        self.factory            = ConnectionFactory()
-        self.pool               = ConnectionPool(self.factory.connect,
-                                                 maxConnections=2,
-                                                 dialect=self.dialect,
-                                                 paramstyle=self.paramstyle)
-        self.pool._createHolder = self.makeAHolder
-        self.clock              = self.pool.reactor = ClockWithThreads()
-        self.pool.startService()
-        self.addCleanup(self.flushHolders)
-
-
-    def flushHolders(self):
-        """
-        Flush all pending C{submit}s since C{pauseHolders} was called.  This
-        makes sure the service is stopped and the fake ThreadHolders are all
-        executing their queues so failed tests can exit cleanly.
-        """
-        self.paused = False
-        for holder in self.holders:
-            holder.flush()
-
-
-    def pauseHolders(self):
-        """
-        Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
-        L{Deferred}.
-        """
-        self.paused = True
-
-
-    def makeAHolder(self):
-        """
-        Make a ThreadHolder-alike.
-        """
-        fth = FakeThreadHolder(self)
-        self.holders.append(fth)
-        return fth
-
-
-    def resultOf(self, it):
-        return resultOf(it)
-
-
-    def createTransaction(self):
-        return self.pool.connection()
-
-
-    def translateError(self, err):
-        return err
-
-
-
 class ConnectionPoolTests(ConnectionPoolHelper, TestCase, AssertResultHelper):
     """
     Tests for L{ConnectionPool}.

Modified: CalendarServer/trunk/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_queue.py	2013-04-18 23:33:01 UTC (rev 11065)
+++ CalendarServer/trunk/twext/enterprise/test/test_queue.py	2013-04-19 01:04:09 UTC (rev 11066)
@@ -52,7 +52,8 @@
 from twext.enterprise.queue import ConnectionFromPeerNode
 from twext.enterprise.fixtures import buildConnectionPool
 from zope.interface.verify import verifyObject
-from twisted.test.proto_helpers import StringTransport
+from twisted.test.proto_helpers import StringTransport, MemoryReactor
+from twext.enterprise.fixtures import SteppablePoolHelper
 
 from twext.enterprise.queue import _BaseQueuer, NonPerformingQueuer
 import twext.enterprise.queue
@@ -70,6 +71,16 @@
 
 
 
+class MemoryReactorWithClock(MemoryReactor, Clock):
+    """
+    Simulate a real reactor.
+    """
+    def __init__(self):
+        MemoryReactor.__init__(self)
+        Clock.__init__(self)
+
+
+
 def transactionally(transactionCreator):
     """
     Perform the decorated function immediately in a transaction, replacing its
@@ -213,7 +224,6 @@
 
 
 
-
 class WorkItemTests(TestCase):
     """
     A L{WorkItem} is an item of work that can be executed.
@@ -493,6 +503,38 @@
         self.assertEquals(worker2.currentLoad, 1)
 
 
+    def test_poolStartServiceChecksForWork(self):
+        """
+        L{PeerConnectionPool.startService} kicks off the idle work-check loop.
+        """
+        reactor = MemoryReactorWithClock()
+        cph = SteppablePoolHelper(nodeSchema + schemaText)
+        then = datetime.datetime(2012, 12, 12, 12, 12, 0)
+        reactor.advance(astimestamp(then))
+        cph.setUp(self)
+        pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321, schema)
+        now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
+        @transactionally(cph.pool.connection)
+        def createOldWork(txn):
+            one = DummyWorkItem.create(txn, workID=1, a=3, b=4, notBefore=then)
+            two = DummyWorkItem.create(txn, workID=2, a=7, b=9, notBefore=now)
+            return gatherResults([one, two])
+        pcp.startService()
+        cph.flushHolders()
+        reactor.advance(pcp.queueProcessTimeout * 2)
+        self.assertEquals(
+            cph.rows("select * from DUMMY_WORK_DONE"),
+            [(1, 7)]
+        )
+        cph.rows("delete from DUMMY_WORK_DONE")
+        reactor.advance(pcp.queueProcessTimeout * 2)
+        self.assertEquals(
+            cph.rows("select * from DUMMY_WORK_DONE"),
+            [(2, 16)]
+        )
+
+
+
 class HalfConnection(object):
     def __init__(self, protocol):
         self.protocol = protocol
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130418/10b13b97/attachment-0001.html>


More information about the calendarserver-changes mailing list