[CalendarServer-changes] [11066] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Thu Apr 18 18:04:09 PDT 2013
Revision: 11066
http://trac.calendarserver.org//changeset/11066
Author: glyph at apple.com
Date: 2013-04-18 18:04:09 -0700 (Thu, 18 Apr 2013)
Log Message:
-----------
Merge start-service-start-loop branch.
Several test refactorings, plus make sure to actually kick off the orphaned queue item recovery loop.
Modified Paths:
--------------
CalendarServer/trunk/twext/enterprise/dal/syntax.py
CalendarServer/trunk/twext/enterprise/fixtures.py
CalendarServer/trunk/twext/enterprise/queue.py
CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
CalendarServer/trunk/twext/enterprise/test/test_queue.py
Property Changed:
----------------
CalendarServer/trunk/
Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
+ /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop:11060-11065
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
Modified: CalendarServer/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/syntax.py 2013-04-18 23:33:01 UTC (rev 11065)
+++ CalendarServer/trunk/twext/enterprise/dal/syntax.py 2013-04-19 01:04:09 UTC (rev 11066)
@@ -147,7 +147,8 @@
_paramstyles = {
'pyformat': partial(FixedPlaceholder, "%s"),
- 'numeric': NumericPlaceholder
+ 'numeric': NumericPlaceholder,
+ 'qmark': defaultPlaceholder,
}
@@ -1673,6 +1674,12 @@
def _toSQL(self, queryGenerator):
+ if queryGenerator.dialect == SQLITE_DIALECT:
+ # FIXME - this is only stubbed out for testing right now, actual
+ # concurrency would require some kind of locking statement here.
+ # BEGIN IMMEDIATE maybe, if that's okay in the middle of a
+ # transaction or repeatedly?
+ return SQLFragment('select null')
return SQLFragment('lock table ').append(
self.table.subSQL(queryGenerator, [self.table])).append(
SQLFragment(' in %s mode' % (self.mode,)))
Modified: CalendarServer/trunk/twext/enterprise/fixtures.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/fixtures.py 2013-04-18 23:33:01 UTC (rev 11065)
+++ CalendarServer/trunk/twext/enterprise/fixtures.py 2013-04-19 01:04:09 UTC (rev 11066)
@@ -20,10 +20,24 @@
"""
import sqlite3
+from Queue import Empty
+from itertools import count
+from zope.interface import implementer
+from zope.interface.verify import verifyClass
+
+from twisted.internet.interfaces import IReactorThreads
+from twisted.python.threadpool import ThreadPool
+
+from twisted.internet.task import Clock
+
from twext.enterprise.adbapi2 import ConnectionPool
from twext.enterprise.ienterprise import SQLITE_DIALECT
+from twext.enterprise.ienterprise import POSTGRES_DIALECT
+from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
+from twext.internet.threadutils import ThreadHolder
+
def buildConnectionPool(testCase, schemaText="", dialect=SQLITE_DIALECT):
"""
Build a L{ConnectionPool} for testing purposes, with the given C{testCase}.
@@ -57,3 +71,513 @@
pool.startService()
testCase.addCleanup(pool.stopService)
return pool
+
+
+
+def resultOf(deferred, propagate=False):
+ """
+ Add a callback and errback which will capture the result of a L{Deferred} in
+ a list, and return that list. If 'propagate' is True, pass through the
+ results.
+ """
+ results = []
+ if propagate:
+ def cb(r):
+ results.append(r)
+ return r
+ else:
+ cb = results.append
+ deferred.addBoth(cb)
+ return results
+
+
+
+class FakeThreadHolder(ThreadHolder):
+ """
+ Run things to submitted this ThreadHolder on the main thread, so that
+ execution is easier to control.
+ """
+
+ def __init__(self, test):
+ super(FakeThreadHolder, self).__init__(self)
+ self.test = test
+ self.started = False
+ self.stopped = False
+ self._workerIsRunning = False
+
+
+ def start(self):
+ self.started = True
+ return super(FakeThreadHolder, self).start()
+
+
+ def stop(self):
+ result = super(FakeThreadHolder, self).stop()
+ self.stopped = True
+ return result
+
+
+ @property
+ def _get_q(self):
+ return self._q_
+
+
+ @_get_q.setter
+ def _q(self, newq):
+ if newq is not None:
+ oget = newq.get
+ newq.get = lambda: oget(timeout=0)
+ oput = newq.put
+ def putit(x):
+ p = oput(x)
+ if not self.test.paused:
+ self.flush()
+ return p
+ newq.put = putit
+ self._q_ = newq
+
+
+ def callFromThread(self, f, *a, **k):
+ result = f(*a, **k)
+ return result
+
+
+ def callInThread(self, f, *a, **k):
+ """
+ This should be called only once, to start the worker function that
+ dedicates a thread to this L{ThreadHolder}.
+ """
+ self._workerIsRunning = True
+
+
+ def flush(self):
+ """
+ Fire all deferreds previously returned from submit.
+ """
+ try:
+ while self._workerIsRunning and self._qpull():
+ pass
+ else:
+ self._workerIsRunning = False
+ except Empty:
+ pass
+
+
+
+ at implementer(IReactorThreads)
+class ClockWithThreads(Clock):
+ """
+ A testing reactor that supplies L{IReactorTime} and L{IReactorThreads}.
+ """
+
+ def __init__(self):
+ super(ClockWithThreads, self).__init__()
+ self._pool = ThreadPool()
+
+
+ def getThreadPool(self):
+ """
+ Get the threadpool.
+ """
+ return self._pool
+
+
+ def suggestThreadPoolSize(self, size):
+ """
+ Approximate the behavior of a 'real' reactor.
+ """
+ self._pool.adjustPoolsize(maxthreads=size)
+
+
+ def callInThread(self, thunk, *a, **kw):
+ """
+ No implementation.
+ """
+
+
+ def callFromThread(self, thunk, *a, **kw):
+ """
+ No implementation.
+ """
+
+verifyClass(IReactorThreads, ClockWithThreads)
+
+
+
+class ConnectionPoolHelper(object):
+ """
+ Connection pool setting-up facilities for tests that need a
+ L{ConnectionPool}.
+ """
+
+ dialect = POSTGRES_DIALECT
+ paramstyle = DEFAULT_PARAM_STYLE
+
+ def setUp(self, test=None, connect=None):
+ """
+ Support inheritance by L{TestCase} classes.
+ """
+ if test is None:
+ test = self
+ if connect is None:
+ self.factory = ConnectionFactory()
+ connect = self.factory.connect
+ self.connect = connect
+ self.paused = False
+ self.holders = []
+ self.pool = ConnectionPool(connect,
+ maxConnections=2,
+ dialect=self.dialect,
+ paramstyle=self.paramstyle)
+ self.pool._createHolder = self.makeAHolder
+ self.clock = self.pool.reactor = ClockWithThreads()
+ self.pool.startService()
+ test.addCleanup(self.flushHolders)
+
+
+ def flushHolders(self):
+ """
+ Flush all pending C{submit}s since C{pauseHolders} was called. This
+ makes sure the service is stopped and the fake ThreadHolders are all
+ executing their queues so failed tsets can exit cleanly.
+ """
+ self.paused = False
+ for holder in self.holders:
+ holder.flush()
+
+
+ def pauseHolders(self):
+ """
+ Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
+ L{Deferred}.
+ """
+ self.paused = True
+
+
+ def makeAHolder(self):
+ """
+ Make a ThreadHolder-alike.
+ """
+ fth = FakeThreadHolder(self)
+ self.holders.append(fth)
+ return fth
+
+
+ def resultOf(self, it):
+ return resultOf(it)
+
+
+ def createTransaction(self):
+ return self.pool.connection()
+
+
+ def translateError(self, err):
+ return err
+
+
+
+class SteppablePoolHelper(ConnectionPoolHelper):
+ """
+ A version of L{ConnectionPoolHelper} that can set up a connection pool
+ capable of firing all its L{Deferred}s on demand, synchronously, by using
+ SQLite.
+ """
+ dialect = SQLITE_DIALECT
+ paramstyle = sqlite3.paramstyle
+
+ def __init__(self, schema):
+ self.schema = schema
+
+
+ def setUp(self, test):
+ connect = synchronousConnectionFactory(test)
+ con = connect()
+ cur = con.cursor()
+ cur.executescript(self.schema)
+ con.commit()
+ super(SteppablePoolHelper, self).setUp(test, connect)
+
+
+ def rows(self, sql):
+ """
+ Get some rows from the database to compare in a test.
+ """
+ con = self.connect()
+ cur = con.cursor()
+ cur.execute(sql)
+ result = cur.fetchall()
+ con.commit()
+ return result
+
+
+
+def synchronousConnectionFactory(test):
+ tmpdb = test.mktemp()
+ def connect():
+ return sqlite3.connect(tmpdb)
+ return connect
+
+
+
+class Child(object):
+ """
+ An object with a L{Parent}, in its list of C{children}.
+ """
+ def __init__(self, parent):
+ self.closed = False
+ self.parent = parent
+ self.parent.children.append(self)
+
+
+ def close(self):
+ if self.parent._closeFailQueue:
+ raise self.parent._closeFailQueue.pop(0)
+ self.closed = True
+
+
+
+class Parent(object):
+ """
+ An object with a list of L{Child}ren.
+ """
+
+ def __init__(self):
+ self.children = []
+ self._closeFailQueue = []
+
+
+ def childCloseWillFail(self, exception):
+ """
+ Closing children of this object will result in the given exception.
+
+ @see: L{ConnectionFactory}
+ """
+ self._closeFailQueue.append(exception)
+
+
+
+class FakeConnection(Parent, Child):
+ """
+ Fake Stand-in for DB-API 2.0 connection.
+
+ @ivar executions: the number of statements which have been executed.
+
+ """
+
+ executions = 0
+
+ def __init__(self, factory):
+ """
+ Initialize list of cursors
+ """
+ Parent.__init__(self)
+ Child.__init__(self, factory)
+ self.id = factory.idcounter.next()
+ self._executeFailQueue = []
+ self._commitCount = 0
+ self._rollbackCount = 0
+
+
+ def executeWillFail(self, thunk):
+ """
+ The next call to L{FakeCursor.execute} will fail with an exception
+ returned from the given callable.
+ """
+ self._executeFailQueue.append(thunk)
+
+
+ @property
+ def cursors(self):
+ "Alias to make tests more readable."
+ return self.children
+
+
+ def cursor(self):
+ return FakeCursor(self)
+
+
+ def commit(self):
+ self._commitCount += 1
+ if self.parent.commitFail:
+ self.parent.commitFail = False
+ raise CommitFail()
+
+
+ def rollback(self):
+ self._rollbackCount += 1
+ if self.parent.rollbackFail:
+ self.parent.rollbackFail = False
+ raise RollbackFail()
+
+
+
+class RollbackFail(Exception):
+ """
+ Sample rollback-failure exception.
+ """
+
+
+
+class CommitFail(Exception):
+ """
+ Sample Commit-failure exception.
+ """
+
+
+
+class FakeCursor(Child):
+ """
+ Fake stand-in for a DB-API 2.0 cursor.
+ """
+ def __init__(self, connection):
+ Child.__init__(self, connection)
+ self.rowcount = 0
+ # not entirely correct, but all we care about is its truth value.
+ self.description = False
+ self.variables = []
+ self.allExecutions = []
+
+
+ @property
+ def connection(self):
+ "Alias to make tests more readable."
+ return self.parent
+
+
+ def execute(self, sql, args=()):
+ self.connection.executions += 1
+ if self.connection._executeFailQueue:
+ raise self.connection._executeFailQueue.pop(0)()
+ self.allExecutions.append((sql, args))
+ self.sql = sql
+ factory = self.connection.parent
+ self.description = factory.hasResults
+ if factory.hasResults and factory.shouldUpdateRowcount:
+ self.rowcount = 1
+ else:
+ self.rowcount = 0
+ return
+
+
+ def var(self, type, *args):
+ """
+ Return a database variable in the style of the cx_Oracle bindings.
+ """
+ v = FakeVariable(self, type, args)
+ self.variables.append(v)
+ return v
+
+
+ def fetchall(self):
+ """
+ Just echo the SQL that was executed in the last query.
+ """
+ if self.connection.parent.hasResults:
+ return [[self.connection.id, self.sql]]
+ if self.description:
+ return []
+ return None
+
+
+
+class FakeVariable(object):
+ def __init__(self, cursor, type, args):
+ self.cursor = cursor
+ self.type = type
+ self.args = args
+
+
+ def getvalue(self):
+ vv = self.cursor.connection.parent.varvals
+ if vv:
+ return vv.pop(0)
+ return self.cursor.variables.index(self) + 300
+
+
+ def __reduce__(self):
+ raise RuntimeError("Not pickleable (since oracle vars aren't)")
+
+
+
+class ConnectionFactory(Parent):
+ """
+ A factory for L{FakeConnection} objects.
+
+ @ivar shouldUpdateRowcount: Should C{execute} on cursors produced by
+ connections produced by this factory update their C{rowcount} or just
+ their C{description} attribute?
+
+ @ivar hasResults: should cursors produced by connections by this factory
+ have any results returned by C{fetchall()}?
+ """
+
+ rollbackFail = False
+ commitFail = False
+
+ def __init__(self, shouldUpdateRowcount=True, hasResults=True):
+ Parent.__init__(self)
+ self.idcounter = count(1)
+ self._connectResultQueue = []
+ self.defaultConnect()
+ self.varvals = []
+ self.shouldUpdateRowcount = shouldUpdateRowcount
+ self.hasResults = hasResults
+
+
+ @property
+ def connections(self):
+ "Alias to make tests more readable."
+ return self.children
+
+
+ def connect(self):
+ """
+ Implement the C{ConnectionFactory} callable expected by
+ L{ConnectionPool}.
+ """
+ if self._connectResultQueue:
+ thunk = self._connectResultQueue.pop(0)
+ else:
+ thunk = self._default
+ return thunk()
+
+
+ def willConnect(self):
+ """
+ Used by tests to queue a successful result for connect().
+ """
+ def thunk():
+ return FakeConnection(self)
+ self._connectResultQueue.append(thunk)
+
+
+ def willFail(self):
+ """
+ Used by tests to queue a successful result for connect().
+ """
+ def thunk():
+ raise FakeConnectionError()
+ self._connectResultQueue.append(thunk)
+
+
+ def defaultConnect(self):
+ """
+ By default, connection attempts will succeed.
+ """
+ self.willConnect()
+ self._default = self._connectResultQueue.pop()
+
+
+ def defaultFail(self):
+ """
+ By default, connection attempts will fail.
+ """
+ self.willFail()
+ self._default = self._connectResultQueue.pop()
+
+
+
+class FakeConnectionError(Exception):
+ """
+ Synthetic error that might occur during connection.
+ """
Modified: CalendarServer/trunk/twext/enterprise/queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/queue.py 2013-04-18 23:33:01 UTC (rev 11065)
+++ CalendarServer/trunk/twext/enterprise/queue.py 2013-04-19 01:04:09 UTC (rev 11066)
@@ -1342,6 +1342,7 @@
def done(result):
self._startingUp = None
super(PeerConnectionPool, self).startService()
+ self._lostWorkCheckLoop()
return result
Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2013-04-18 23:33:01 UTC (rev 11065)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2013-04-19 01:04:09 UTC (rev 11066)
@@ -18,22 +18,14 @@
Tests for L{twext.enterprise.adbapi2}.
"""
-from itertools import count
-from Queue import Empty
+from zope.interface.verify import verifyObject
-from zope.interface.verify import verifyClass, verifyObject
-from zope.interface.declarations import implements
-
-from twisted.python.threadpool import ThreadPool
from twisted.python.failure import Failure
from twisted.trial.unittest import TestCase
-from twisted.internet.task import Clock
from twisted.internet.defer import Deferred, fail
-from twisted.internet.interfaces import IReactorThreads
-
from twisted.test.proto_helpers import StringTransport
from twext.enterprise.ienterprise import ConnectionError
@@ -41,33 +33,17 @@
from twext.enterprise.adbapi2 import ConnectionPoolClient
from twext.enterprise.adbapi2 import ConnectionPoolConnection
from twext.enterprise.ienterprise import IAsyncTransaction
-from twext.enterprise.ienterprise import POSTGRES_DIALECT
from twext.enterprise.ienterprise import ICommandBlock
from twext.enterprise.adbapi2 import FailsafeException
-from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
from twext.enterprise.adbapi2 import ConnectionPool
-from twext.internet.threadutils import ThreadHolder
+from twext.enterprise.fixtures import ConnectionPoolHelper
+from twext.enterprise.fixtures import resultOf
+from twext.enterprise.fixtures import ClockWithThreads
+from twext.enterprise.fixtures import FakeConnectionError
+from twext.enterprise.fixtures import RollbackFail
+from twext.enterprise.fixtures import CommitFail
from twext.enterprise.adbapi2 import Commit
-
-def resultOf(deferred, propagate=False):
- """
- Add a callback and errback which will capture the result of a L{Deferred} in
- a list, and return that list. If 'propagate' is True, pass through the
- results.
- """
- results = []
- if propagate:
- def cb(r):
- results.append(r)
- return r
- else:
- cb = results.append
- deferred.addBoth(cb)
- return results
-
-
-
class AssertResultHelper(object):
"""
Mixin for asserting about synchronous Deferred results.
@@ -97,384 +73,6 @@
-class Child(object):
- """
- An object with a L{Parent}, in its list of C{children}.
- """
- def __init__(self, parent):
- self.closed = False
- self.parent = parent
- self.parent.children.append(self)
-
-
- def close(self):
- if self.parent._closeFailQueue:
- raise self.parent._closeFailQueue.pop(0)
- self.closed = True
-
-
-
-class Parent(object):
- """
- An object with a list of L{Child}ren.
- """
-
- def __init__(self):
- self.children = []
- self._closeFailQueue = []
-
-
- def childCloseWillFail(self, exception):
- """
- Closing children of this object will result in the given exception.
-
- @see: L{ConnectionFactory}
- """
- self._closeFailQueue.append(exception)
-
-
-
-class FakeConnection(Parent, Child):
- """
- Fake Stand-in for DB-API 2.0 connection.
-
- @ivar executions: the number of statements which have been executed.
-
- """
-
- executions = 0
-
- def __init__(self, factory):
- """
- Initialize list of cursors
- """
- Parent.__init__(self)
- Child.__init__(self, factory)
- self.id = factory.idcounter.next()
- self._executeFailQueue = []
- self._commitCount = 0
- self._rollbackCount = 0
-
-
- def executeWillFail(self, thunk):
- """
- The next call to L{FakeCursor.execute} will fail with an exception
- returned from the given callable.
- """
- self._executeFailQueue.append(thunk)
-
-
- @property
- def cursors(self):
- "Alias to make tests more readable."
- return self.children
-
-
- def cursor(self):
- return FakeCursor(self)
-
-
- def commit(self):
- self._commitCount += 1
- if self.parent.commitFail:
- self.parent.commitFail = False
- raise CommitFail()
-
-
- def rollback(self):
- self._rollbackCount += 1
- if self.parent.rollbackFail:
- self.parent.rollbackFail = False
- raise RollbackFail()
-
-
-
-class RollbackFail(Exception):
- """
- Sample rollback-failure exception.
- """
-
-
-
-class CommitFail(Exception):
- """
- Sample Commit-failure exception.
- """
-
-
-
-class FakeCursor(Child):
- """
- Fake stand-in for a DB-API 2.0 cursor.
- """
- def __init__(self, connection):
- Child.__init__(self, connection)
- self.rowcount = 0
- # not entirely correct, but all we care about is its truth value.
- self.description = False
- self.variables = []
- self.allExecutions = []
-
-
- @property
- def connection(self):
- "Alias to make tests more readable."
- return self.parent
-
-
- def execute(self, sql, args=()):
- self.connection.executions += 1
- if self.connection._executeFailQueue:
- raise self.connection._executeFailQueue.pop(0)()
- self.allExecutions.append((sql, args))
- self.sql = sql
- factory = self.connection.parent
- self.description = factory.hasResults
- if factory.hasResults and factory.shouldUpdateRowcount:
- self.rowcount = 1
- else:
- self.rowcount = 0
- return
-
-
- def var(self, type, *args):
- """
- Return a database variable in the style of the cx_Oracle bindings.
- """
- v = FakeVariable(self, type, args)
- self.variables.append(v)
- return v
-
-
- def fetchall(self):
- """
- Just echo the SQL that was executed in the last query.
- """
- if self.connection.parent.hasResults:
- return [[self.connection.id, self.sql]]
- if self.description:
- return []
- return None
-
-
-
-class FakeVariable(object):
- def __init__(self, cursor, type, args):
- self.cursor = cursor
- self.type = type
- self.args = args
-
-
- def getvalue(self):
- vv = self.cursor.connection.parent.varvals
- if vv:
- return vv.pop(0)
- return self.cursor.variables.index(self) + 300
-
-
- def __reduce__(self):
- raise RuntimeError("Not pickleable (since oracle vars aren't)")
-
-
-
-class ConnectionFactory(Parent):
- """
- A factory for L{FakeConnection} objects.
-
- @ivar shouldUpdateRowcount: Should C{execute} on cursors produced by
- connections produced by this factory update their C{rowcount} or just
- their C{description} attribute?
-
- @ivar hasResults: should cursors produced by connections by this factory
- have any results returned by C{fetchall()}?
- """
-
- rollbackFail = False
- commitFail = False
-
- def __init__(self, shouldUpdateRowcount=True, hasResults=True):
- Parent.__init__(self)
- self.idcounter = count(1)
- self._connectResultQueue = []
- self.defaultConnect()
- self.varvals = []
- self.shouldUpdateRowcount = shouldUpdateRowcount
- self.hasResults = hasResults
-
-
- @property
- def connections(self):
- "Alias to make tests more readable."
- return self.children
-
-
- def connect(self):
- """
- Implement the C{ConnectionFactory} callable expected by
- L{ConnectionPool}.
- """
- if self._connectResultQueue:
- thunk = self._connectResultQueue.pop(0)
- else:
- thunk = self._default
- return thunk()
-
-
- def willConnect(self):
- """
- Used by tests to queue a successful result for connect().
- """
- def thunk():
- return FakeConnection(self)
- self._connectResultQueue.append(thunk)
-
-
- def willFail(self):
- """
- Used by tests to queue a successful result for connect().
- """
- def thunk():
- raise FakeConnectionError()
- self._connectResultQueue.append(thunk)
-
-
- def defaultConnect(self):
- """
- By default, connection attempts will succeed.
- """
- self.willConnect()
- self._default = self._connectResultQueue.pop()
-
-
- def defaultFail(self):
- """
- By default, connection attempts will fail.
- """
- self.willFail()
- self._default = self._connectResultQueue.pop()
-
-
-
-class FakeConnectionError(Exception):
- """
- Synthetic error that might occur during connection.
- """
-
-
-
-class FakeThreadHolder(ThreadHolder):
- """
- Run things to submitted this ThreadHolder on the main thread, so that
- execution is easier to control.
- """
-
- def __init__(self, test):
- super(FakeThreadHolder, self).__init__(self)
- self.test = test
- self.started = False
- self.stopped = False
- self._workerIsRunning = False
-
-
- def start(self):
- self.started = True
- return super(FakeThreadHolder, self).start()
-
-
- def stop(self):
- result = super(FakeThreadHolder, self).stop()
- self.stopped = True
- return result
-
-
- @property
- def _get_q(self):
- return self._q_
-
-
- @_get_q.setter
- def _q(self, newq):
- if newq is not None:
- oget = newq.get
- newq.get = lambda: oget(timeout=0)
- oput = newq.put
- def putit(x):
- p = oput(x)
- if not self.test.paused:
- self.flush()
- return p
- newq.put = putit
- self._q_ = newq
-
-
- def callFromThread(self, f, *a, **k):
- result = f(*a, **k)
- return result
-
-
- def callInThread(self, f, *a, **k):
- """
- This should be called only once, to start the worker function that
- dedicates a thread to this L{ThreadHolder}.
- """
- self._workerIsRunning = True
-
-
- def flush(self):
- """
- Fire all deferreds previously returned from submit.
- """
- try:
- while self._workerIsRunning and self._qpull():
- pass
- else:
- self._workerIsRunning = False
- except Empty:
- pass
-
-
-
-class ClockWithThreads(Clock):
- """
- A testing reactor that supplies L{IReactorTime} and L{IReactorThreads}.
- """
- implements(IReactorThreads)
-
- def __init__(self):
- super(ClockWithThreads, self).__init__()
- self._pool = ThreadPool()
-
-
- def getThreadPool(self):
- """
- Get the threadpool.
- """
- return self._pool
-
-
- def suggestThreadPoolSize(self, size):
- """
- Approximate the behavior of a 'real' reactor.
- """
- self._pool.adjustPoolsize(maxthreads=size)
-
-
- def callInThread(self, thunk, *a, **kw):
- """
- No implementation.
- """
-
-
- def callFromThread(self, thunk, *a, **kw):
- """
- No implementation.
- """
-
-
-verifyClass(IReactorThreads, ClockWithThreads)
-
-
-
class ConnectionPoolBootTests(TestCase):
"""
Tests for the start-up phase of L{ConnectionPool}.
@@ -521,74 +119,6 @@
-class ConnectionPoolHelper(object):
- """
- Connection pool setting-up facilities for tests that need a
- L{ConnectionPool}.
- """
-
- dialect = POSTGRES_DIALECT
- paramstyle = DEFAULT_PARAM_STYLE
-
- def setUp(self):
- """
- Create a L{ConnectionPool} attached to a C{ConnectionFactory}. Start
- the L{ConnectionPool}.
- """
- self.paused = False
- self.holders = []
- self.factory = ConnectionFactory()
- self.pool = ConnectionPool(self.factory.connect,
- maxConnections=2,
- dialect=self.dialect,
- paramstyle=self.paramstyle)
- self.pool._createHolder = self.makeAHolder
- self.clock = self.pool.reactor = ClockWithThreads()
- self.pool.startService()
- self.addCleanup(self.flushHolders)
-
-
- def flushHolders(self):
- """
- Flush all pending C{submit}s since C{pauseHolders} was called. This
- makes sure the service is stopped and the fake ThreadHolders are all
- executing their queues so failed tests can exit cleanly.
- """
- self.paused = False
- for holder in self.holders:
- holder.flush()
-
-
- def pauseHolders(self):
- """
- Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
- L{Deferred}.
- """
- self.paused = True
-
-
- def makeAHolder(self):
- """
- Make a ThreadHolder-alike.
- """
- fth = FakeThreadHolder(self)
- self.holders.append(fth)
- return fth
-
-
- def resultOf(self, it):
- return resultOf(it)
-
-
- def createTransaction(self):
- return self.pool.connection()
-
-
- def translateError(self, err):
- return err
-
-
-
class ConnectionPoolTests(ConnectionPoolHelper, TestCase, AssertResultHelper):
"""
Tests for L{ConnectionPool}.
Modified: CalendarServer/trunk/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_queue.py 2013-04-18 23:33:01 UTC (rev 11065)
+++ CalendarServer/trunk/twext/enterprise/test/test_queue.py 2013-04-19 01:04:09 UTC (rev 11066)
@@ -52,7 +52,8 @@
from twext.enterprise.queue import ConnectionFromPeerNode
from twext.enterprise.fixtures import buildConnectionPool
from zope.interface.verify import verifyObject
-from twisted.test.proto_helpers import StringTransport
+from twisted.test.proto_helpers import StringTransport, MemoryReactor
+from twext.enterprise.fixtures import SteppablePoolHelper
from twext.enterprise.queue import _BaseQueuer, NonPerformingQueuer
import twext.enterprise.queue
@@ -70,6 +71,16 @@
+class MemoryReactorWithClock(MemoryReactor, Clock):
+ """
+ Simulate a real reactor.
+ """
+ def __init__(self):
+ MemoryReactor.__init__(self)
+ Clock.__init__(self)
+
+
+
def transactionally(transactionCreator):
"""
Perform the decorated function immediately in a transaction, replacing its
@@ -213,7 +224,6 @@
-
class WorkItemTests(TestCase):
"""
A L{WorkItem} is an item of work that can be executed.
@@ -493,6 +503,38 @@
self.assertEquals(worker2.currentLoad, 1)
+ def test_poolStartServiceChecksForWork(self):
+ """
+ L{PeerConnectionPool.startService} kicks off the idle work-check loop.
+ """
+ reactor = MemoryReactorWithClock()
+ cph = SteppablePoolHelper(nodeSchema + schemaText)
+ then = datetime.datetime(2012, 12, 12, 12, 12, 0)
+ reactor.advance(astimestamp(then))
+ cph.setUp(self)
+ pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321, schema)
+ now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
+ @transactionally(cph.pool.connection)
+ def createOldWork(txn):
+ one = DummyWorkItem.create(txn, workID=1, a=3, b=4, notBefore=then)
+ two = DummyWorkItem.create(txn, workID=2, a=7, b=9, notBefore=now)
+ return gatherResults([one, two])
+ pcp.startService()
+ cph.flushHolders()
+ reactor.advance(pcp.queueProcessTimeout * 2)
+ self.assertEquals(
+ cph.rows("select * from DUMMY_WORK_DONE"),
+ [(1, 7)]
+ )
+ cph.rows("delete from DUMMY_WORK_DONE")
+ reactor.advance(pcp.queueProcessTimeout * 2)
+ self.assertEquals(
+ cph.rows("select * from DUMMY_WORK_DONE"),
+ [(2, 16)]
+ )
+
+
+
class HalfConnection(object):
def __init__(self, protocol):
self.protocol = protocol
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130418/10b13b97/attachment-0001.html>
More information about the calendarserver-changes
mailing list