[CalendarServer-changes] [10290] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 4 16:51:01 PST 2013


Revision: 10290
          http://trac.calendarserver.org//changeset/10290
Author:   glyph at apple.com
Date:     2013-01-04 16:51:01 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Allow users of the work queue to control work-item concurrency and timing via the 'group' and 'notBefore' attributes of their WorkItem classes.

Also, substantially improve test coverage of the queue itself, fix a few related issues, and add various utilities in support of this feature.

Modified Paths:
--------------
    CalendarServer/trunk/twext/enterprise/adbapi2.py
    CalendarServer/trunk/twext/enterprise/dal/record.py
    CalendarServer/trunk/twext/enterprise/dal/test/test_record.py
    CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
    CalendarServer/trunk/twext/enterprise/fixtures.py
    CalendarServer/trunk/twext/enterprise/ienterprise.py
    CalendarServer/trunk/twext/enterprise/queue.py
    CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
    CalendarServer/trunk/twext/enterprise/util.py
    CalendarServer/trunk/twistedcaldav/dateops.py
    CalendarServer/trunk/twistedcaldav/test/test_dateops.py
    CalendarServer/trunk/twistedcaldav/test/test_wrapping.py
    CalendarServer/trunk/txdav/base/datastore/file.py
    CalendarServer/trunk/txdav/caldav/datastore/sql.py
    CalendarServer/trunk/txdav/common/datastore/sql.py
    CalendarServer/trunk/txdav/common/datastore/sql_schema/current-oracle-dialect.sql
    CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql
    CalendarServer/trunk/txdav/common/datastore/test/util.py

Added Paths:
-----------
    CalendarServer/trunk/twext/enterprise/locking.py
    CalendarServer/trunk/twext/enterprise/test/test_locking.py
    CalendarServer/trunk/twext/enterprise/test/test_queue.py
    CalendarServer/trunk/twext/enterprise/test/test_util.py
    CalendarServer/trunk/txdav/common/datastore/sql_schema/old/oracle-dialect/v14.sql
    CalendarServer/trunk/txdav/common/datastore/sql_schema/old/postgres-dialect/v14.sql
    CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_14_to_15.sql
    CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_14_to_15.sql

Property Changed:
----------------
    CalendarServer/trunk/


Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
   + /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593

Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -491,7 +491,7 @@
 
 
     @inlineCallbacks
-    def runHooks(self, ignored):
+    def runHooks(self, ignored=None):
         """
         Callback for C{commit} and C{abort} Deferreds.
         """
@@ -516,10 +516,27 @@
     # covered by txdav's test suite.
 
     def __init__(self):
+        self._preCommit = _HookableOperation()
         self._commit = _HookableOperation()
         self._abort = _HookableOperation()
 
 
+    def _commitWithHooks(self, doCommit):
+        """
+        Run pre-hooks, commit, the real DB commit, and then post-hooks.
+        """
+        pre = self._preCommit.runHooks()
+        def ok(ignored):
+            return doCommit().addCallback(self._commit.runHooks)
+        def failed(why):
+            return self.abort().addCallback(lambda ignored: why)
+        return pre.addCallbacks(ok, failed)
+
+
+    def preCommit(self, operation):
+        return self._preCommit.addHook(operation)
+
+
     def postCommit(self, operation):
         return self._commit.addHook(operation)
 
@@ -648,9 +665,10 @@
             # We're in the process of executing a block of commands.  Wait until
             # they're done.  (Commit will be repeated in _checkNextBlock.)
             return self._blockedQueue.commit()
-        self._markComplete()
-        return (super(_SingleTxn, self).commit()
-                .addCallback(self._commit.runHooks))
+        def reallyCommit():
+            self._markComplete()
+            return super(_SingleTxn, self).commit()
+        return self._commitWithHooks(reallyCommit)
 
 
     def abort(self):
@@ -1550,12 +1568,13 @@
 
 
     def commit(self):
-        self._committing = True
-        def done(whatever):
-            self._committed = True
-            return whatever
-        return (self._complete(Commit).addBoth(done)
-                .addCallback(self._commit.runHooks))
+        def reallyCommit():
+            self._committing = True
+            def done(whatever):
+                self._committed = True
+                return whatever
+            return self._complete(Commit).addBoth(done)
+        return self._commitWithHooks(reallyCommit)
 
 
     def abort(self):

Modified: CalendarServer/trunk/twext/enterprise/dal/record.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/record.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twext/enterprise/dal/record.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -26,6 +26,7 @@
 from twext.enterprise.dal.syntax import (
     Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete
 )
+from twext.enterprise.util import parseSQLTimestamp
 # from twext.enterprise.dal.syntax import ExpressionSyntax
 
 class ReadOnly(AttributeError):
@@ -143,6 +144,15 @@
         return super(Record, self).__setattr__(name, value)
 
 
+    def __repr__(self):
+        r = "<{0} record from table {1}".format(self.__class__.__name__,
+                                                self.table.model.name)
+        for k in sorted(self.__attrmap__.keys()):
+            r += " {0}={1}".format(k, repr(getattr(self, k)))
+        r += ">"
+        return r
+
+
     @staticmethod
     def namingConvention(columnName):
         """
@@ -220,12 +230,26 @@
         result = yield (Insert(colmap, Return=needsCols if needsCols else None)
                         .on(transaction))
         if needsCols:
-            for neededAttr, neededValue in zip(needsAttrs, result[0]):
-                setattr(self, neededAttr, neededValue)
+            self._attributesFromRow(zip(needsAttrs, result[0]))
         self.transaction = transaction
         returnValue(self)
 
 
+    def _attributesFromRow(self, attributeList):
+        """
+        Take some data loaded from a row and apply it to this instance,
+        converting types as necessary.
+
+        @param attributeList: a C{list} of 2-C{tuples} of C{(attributeName,
+            attributeValue)}.
+        """
+        for setAttribute, setValue in attributeList:
+            setColumn = self.__attrmap__[setAttribute]
+            if setColumn.model.type.name == "timestamp":
+                setValue = parseSQLTimestamp(setValue)
+            setattr(self, setAttribute, setValue)
+
+
     def delete(self):
         """
         Delete this row from the database.
@@ -273,7 +297,7 @@
 
 
     @classmethod
-    def query(cls, transaction, expr, order=None, ascending=True):
+    def query(cls, transaction, expr, order=None, ascending=True, group=None):
         """
         Query the table that corresponds to C{cls}, and return instances of
         C{cls} corresponding to the rows that are returned from that table.
@@ -288,10 +312,15 @@
 
         @param ascending: A boolean; if C{order} is not C{None}, whether to
             sort in ascending or descending order.
+
+        @param group: a L{ColumnSyntax} to group the resulting record objects
+            by.
         """
         kw = {}
         if order is not None:
             kw.update(OrderBy=order, Ascending=ascending)
+        if group is not None:
+            kw.update(GroupBy=group)
         return cls._rowsFromQuery(transaction, Select(list(cls.table),
                                                       From=cls.table,
                                                       Where=expr, **kw), None)
@@ -314,7 +343,8 @@
     @inlineCallbacks
     def _rowsFromQuery(cls, transaction, qry, rozrc):
         """
-        Execute the given query, and transform its results into rows.
+        Execute the given query, and transform its results into instances of
+        C{cls}.
 
         @param transaction: an L{IAsyncTransaction} to execute the query on.
 
@@ -324,16 +354,15 @@
 
         @param rozrc: The C{raiseOnZeroRowCount} argument.
 
-        @return: a L{Deferred} that succeeds with a C{list} or fails with an
-            exception produced by C{rozrc}.
+        @return: a L{Deferred} that succeeds with a C{list} of instances of
+            C{cls} or fails with an exception produced by C{rozrc}.
         """
         rows = yield qry.on(transaction, raiseOnZeroRowCount=rozrc)
         selves = []
+        names = [cls.__colmap__[column] for column in list(cls.table)]
         for row in rows:
             self = cls()
-            for (column, value) in zip(list(cls.table), row):
-                name = cls.__colmap__[column]
-                setattr(self, name, value)
+            self._attributesFromRow(zip(names, row))
             self.transaction = transaction
             selves.append(self)
         returnValue(selves)

Modified: CalendarServer/trunk/twext/enterprise/dal/test/test_record.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_record.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_record.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -18,6 +18,8 @@
 Test cases for L{twext.enterprise.dal.record}.
 """
 
+import datetime
+
 from twisted.internet.defer import inlineCallbacks
 
 from twisted.trial.unittest import TestCase
@@ -38,7 +40,8 @@
 schemaString = """
 create table ALPHA (BETA integer primary key, GAMMA text);
 create table DELTA (PHI integer primary key default (nextval('myseq')),
-                    EPSILON text not null);
+                    EPSILON text not null,
+                    ZETA timestamp not null default '2012-12-12 12:12:12' );
 """
 
 # sqlite can be made to support nextval() as a function, but 'create sequence'
@@ -139,6 +142,24 @@
 
 
     @inlineCallbacks
+    def test_datetimeType(self):
+        """
+        When a L{Record} references a timestamp column, it retrieves the date
+        as UTC.
+        """
+        txn = self.pool.connection()
+        # Create ...
+        rec = yield TestAutoRecord.create(txn, epsilon=1)
+        self.assertEquals(rec.zeta, datetime.datetime(2012, 12, 12, 12, 12, 12))
+        yield txn.commit()
+        # ... should have the same effect as loading.
+        txn = self.pool.connection()
+        rec = (yield TestAutoRecord.all(txn))[0]
+        self.assertEquals(rec.zeta, datetime.datetime(2012, 12, 12, 12, 12, 12))
+
+
+
+    @inlineCallbacks
     def test_tooManyAttributes(self):
         """
         When a L{Record} object is created with unknown attributes (those which
@@ -238,7 +259,18 @@
             sorted(data)
         )
 
+    @inlineCallbacks
+    def test_repr(self):
+        """
+        The C{repr} of a L{Record} presents all its values.
+        """
+        txn = self.pool.connection()
+        yield txn.execSQL("insert into ALPHA values (:1, :2)", [789, u'nine'])
+        rec = list((yield TestRecord.all(txn)))[0]
+        self.assertIn(" beta=789", repr(rec))
+        self.assertIn(" gamma=u'nine'", repr(rec))
 
+
     @inlineCallbacks
     def test_orderedQuery(self):
         """

Modified: CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -146,6 +146,16 @@
                           SQLFragment("select * from FOO", []))
 
 
+    def test_tableSyntaxFromSchemaSyntaxCompare(self):
+        """
+        One L{TableSyntax} is equivalent to another wrapping the same table;
+        one wrapping a different table is different.
+        """
+        self.assertEquals(self.schema.FOO, self.schema.FOO)
+        self.assertNotEquals(self.schema.FOO, self.schema.BOZ)
+
+
+
     def test_simpleWhereClause(self):
         """
         L{Select} generates a 'select' statement with a 'where' clause

Modified: CalendarServer/trunk/twext/enterprise/fixtures.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/fixtures.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twext/enterprise/fixtures.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -35,6 +35,10 @@
     @param schemaText: The text of the schema with which to initialize the
         database.
     @type schemaText: L{str}
+
+    @return: a L{ConnectionPool} service whose C{startService} method has
+        already been invoked.
+    @rtype: L{ConnectionPool}
     """
     sqlitename = testCase.mktemp()
     seqs = {}

Modified: CalendarServer/trunk/twext/enterprise/ienterprise.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/ienterprise.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twext/enterprise/ienterprise.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -109,10 +109,25 @@
         Commit changes caused by this transaction.
 
         @return: L{Deferred} which fires with C{None} upon successful
-            completion of this transaction.
+            completion of this transaction, or fails if this transaction could
+            not be committed.  It fails with L{AlreadyFinishedError} if the
+            transaction has already been committed or rolled back.
         """
 
 
+    def preCommit(operation):
+        """
+        Perform the given operation when this L{IAsyncTransaction}'s C{commit}
+        method is called, but before the underlying transaction commits.  If
+        any exception is raised by this operation, underlying database commit
+        will be blocked and rollback run instead.
+
+        @param operation: a 0-argument callable that may return a L{Deferred}.
+            If it does, then the subsequent operations added by L{postCommit}
+            will not fire until that L{Deferred} fires.
+        """
+
+
     def postCommit(operation):
         """
         Perform the given operation only after this L{IAsyncTransaction}

Copied: CalendarServer/trunk/twext/enterprise/locking.py (from rev 10289, CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/locking.py)
===================================================================
--- CalendarServer/trunk/twext/enterprise/locking.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/locking.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -0,0 +1,103 @@
+# -*- test-case-name: twext.enterprise.test.test_locking -*-
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Utilities to restrict concurrency based on mutual exclusion.
+"""
+
+from twext.enterprise.dal.model import Table
+from twext.enterprise.dal.model import SQLType
+from twext.enterprise.dal.model import Constraint
+from twext.enterprise.dal.syntax import SchemaSyntax
+from twext.enterprise.dal.model import Schema
+from twext.enterprise.dal.record import Record
+from twext.enterprise.dal.record import fromTable
+
+
+class AlreadyUnlocked(Exception):
+    """
+    The lock you were trying to unlock was already unlocked.
+    """
+
+
+
+def makeLockSchema(inSchema):
+    """
+    Create a self-contained schema just for L{Locker} use, in C{inSchema}.
+
+    @param inSchema: a L{Schema} to add the locks table to.
+    @type inSchema: L{Schema}
+
+    @return: inSchema
+    """
+    LockTable = Table(inSchema, 'NAMED_LOCK')
+
+    LockTable.addColumn("LOCK_NAME", SQLType("varchar", 255))
+    LockTable.tableConstraint(Constraint.NOT_NULL, ["LOCK_NAME"])
+    LockTable.tableConstraint(Constraint.UNIQUE, ["LOCK_NAME"])
+    LockTable.primaryKey = [LockTable.columnNamed("LOCK_NAME")]
+
+    return inSchema
+
+LockSchema = SchemaSyntax(makeLockSchema(Schema(__file__)))
+
+
+
+
+class NamedLock(Record, fromTable(LockSchema.NAMED_LOCK)):
+    """
+    An L{AcquiredLock} lock against a shared data store that the current
+    process holds via the referenced transaction.
+    """
+
+    @classmethod
+    def acquire(cls, txn, name):
+        """
+        Acquire a lock with the given name.
+
+        @param name: The name of the lock to acquire.  Against the same store,
+            no two locks may be acquired.
+        @type name: L{unicode}
+
+        @return: a L{Deferred} that fires with an L{AcquiredLock} when the lock
+            has fired, or fails when the lock has not been acquired.
+        """
+        def autoRelease(self):
+            txn.preCommit(lambda: self.release(True))
+            return self
+        return cls.create(txn, lockName=name).addCallback(autoRelease)
+
+
+    def release(self, ignoreAlreadyUnlocked=False):
+        """
+        Release this lock.
+
+        @param ignoreAlreadyUnlocked: If you don't care about the current
+            status of this lock, and just want to release it if it is still
+            acquired, pass this parameter as L{True}.  Otherwise this method
+            will raise an exception if it is invoked when the lock has already
+            been released.
+
+        @raise: L{AlreadyUnlocked}
+
+        @return: A L{Deferred} that fires with L{None} when the lock has been
+            unlocked.
+        """
+        return self.delete()
+
+
+

Modified: CalendarServer/trunk/twext/enterprise/queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/queue.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twext/enterprise/queue.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -16,10 +16,11 @@
 ##
 
 """
-L{twext.enterprise.queue} is a task-queueing system for use by applications
-with multiple front-end servers talking to a single database instance, that
-want to defer and parallelize work that involves storing the results of
-computation.
+L{twext.enterprise.queue} is an U{eventually consistent
+<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queueing system for
+use by applications with multiple front-end servers talking to a single
+database instance, that want to defer and parallelize work that involves
+storing the results of computation.
 
 By enqueuing with L{twisted.enterprise.queue}, you may guarantee that the work
 will I{eventually} be done, and reliably commit to doing it in the future, but
@@ -78,44 +79,71 @@
             queuer.enqueueWork(txn, CouponWork, customerID=customerID)
 """
 
-from socket import getfqdn
 from functools import wraps
-from os import getpid
 from datetime import datetime
 
 from zope.interface import implements
 
-from twisted.application.service import Service
+from twisted.application.service import MultiService
 from twisted.internet.protocol import Factory
 from twisted.internet.defer import (
-    inlineCallbacks, returnValue, Deferred, succeed
+    inlineCallbacks, returnValue, Deferred, passthru
 )
 from twisted.internet.endpoints import TCP4ClientEndpoint
 from twisted.protocols.amp import AMP, Command, Integer, Argument, String
 from twisted.python.reflect import qual
 from twisted.python import log
 
-from twext.enterprise.dal.syntax import TableSyntax, SchemaSyntax
+from twext.enterprise.dal.syntax import SchemaSyntax, Lock, NamedValue
+
 from twext.enterprise.dal.model import ProcedureCall
-from twext.enterprise.dal.syntax import NamedValue
 from twext.enterprise.dal.record import Record, fromTable
 from twisted.python.failure import Failure
-from twisted.internet.defer import passthru
+
 from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
 from twisted.internet.endpoints import TCP4ServerEndpoint
-from twext.enterprise.dal.syntax import Lock
 from twext.enterprise.ienterprise import IQueuer
+from zope.interface.interface import Interface
+from twext.enterprise.locking import NamedLock
 
+
+class _IWorkPerformer(Interface):
+    """
+    An object that can perform work.
+
+    Internal interface; implemented by several classes here since work has to
+    (in the worst case) pass from worker->controller->controller->worker.
+    """
+
+    def performWork(table, workID):
+        """
+        @param table: The table where work is waiting.
+        @type table: L{TableSyntax}
+
+        @param workID: The primary key identifier of the given work.
+        @type workID: L{int}
+
+        @return: a L{Deferred} firing with an empty dictionary when the work is
+            complete.
+        @rtype: L{Deferred} firing L{dict}
+        """
+
+
+
 def makeNodeSchema(inSchema):
     """
-    Create a self-contained schema for L{NodeInfo} to use.
+    Create a self-contained schema for L{NodeInfo} to use, in C{inSchema}.
 
+    @param inSchema: a L{Schema} to add the node-info table to.
+    @type inSchema: L{Schema}
+
     @return: a schema with just the one table.
     """
     # Initializing this duplicate schema avoids a circular dependency, but this
     # should really be accomplished with independent schema objects that the
     # transaction is made aware of somehow.
     NodeTable = Table(inSchema, 'NODE_INFO')
+
     NodeTable.addColumn("HOSTNAME", SQLType("varchar", 255))
     NodeTable.addColumn("PID", SQLType("integer", None))
     NodeTable.addColumn("PORT", SQLType("integer", None))
@@ -128,11 +156,13 @@
         NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
     NodeTable.primaryKey = [NodeTable.columnNamed("HOSTNAME"),
                             NodeTable.columnNamed("PORT")]
+
     return inSchema
 
 NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
 
 
+
 @inlineCallbacks
 def inTransaction(transactionCreator, operation):
     """
@@ -162,6 +192,14 @@
 
 
 
+def astimestamp(v):
+    """
+    Convert the given datetime to a POSIX timestamp.
+    """
+    return (v - datetime.utcfromtimestamp(0)).total_seconds()
+
+
+
 class TableSyntaxByName(Argument):
     """
     Serialize and deserialize L{TableSyntax} objects for an AMP protocol with
@@ -178,7 +216,7 @@
 
         @param proto: an L{SchemaAMP}
         """
-        return TableSyntax(proto.schema.tableNamed(inString.decode("UTF-8")))
+        return getattr(proto.schema, inString.decode("UTF-8"))
 
 
     def toString(self, inObject):
@@ -229,17 +267,98 @@
 
 class WorkItem(Record):
     """
-    An item of work.
+    A L{WorkItem} is an item of work which may be stored in a database, then
+    executed later.
 
-    @ivar workID: the unique identifier (primary key) for items of this type.
-        There must be a corresponding column in the database.
-    @type workID: L{int}
+    L{WorkItem} is an abstract class, since it is a L{Record} with no table
+    associated via L{fromTable}.  Concrete subclasses must associate a specific
+    table by inheriting like so::
 
-    @cvar created: the timestamp that a given item was created, or the column
-        describing its creation time, on the class.
-    @type created: L{datetime.datetime}
+        class MyWorkItem(WorkItem, fromTable(schema.MY_TABLE)):
+
+    Concrete L{WorkItem}s should generally not be created directly; they are
+    both created and thereby implicitly scheduled to be executed by calling
+    L{enqueueWork <twext.enterprise.ienterprise.IQueuer.enqueueWork>} with the
+    appropriate L{WorkItem} concrete subclass.  There are different queue
+    implementations (L{PeerConnectionPool} and L{LocalQueuer}, for example), so
+    the exact timing and location of the work execution may differ.
+
+    L{WorkItem}s may be constrained in the ordering and timing of their
+    execution, to control concurrency and for performance reasons repsectively.
+
+    Although all the usual database mutual-exclusion rules apply to work
+    executed in L{WorkItem.doWork}, implicit database row locking is not always
+    the best way to manage concurrency.  They have some problems, including:
+
+        - implicit locks are easy to accidentally acquire out of order, which
+          can lead to deadlocks
+
+        - implicit locks are easy to forget to acquire correctly - for example,
+          any read operation which subsequently turns into a write operation
+          must have been acquired with C{Select(..., ForUpdate=True)}, but it
+          is difficult to consistently indicate that methods which abstract out
+          read operations must pass this flag in certain cases and not others.
+
+        - implicit locks are held until the transaction ends, which means that
+          if expensive (long-running) queue operations share the same lock with
+          cheap (short-running) queue operations or user interactions, the
+          cheap operations all have to wait for the expensive ones to complete,
+          but continue to consume whatever database resources they were using.
+
+    In order to ameliorate these problems with potentiallly concurrent work
+    that uses the same resources, L{WorkItem} provides a database-wide mutex
+    that is automatically acquired at the beginning of the transaction and
+    released at the end.  To use it, simply L{align
+    <twext.enterprise.dal.record.Record.namingConvention>} the C{group}
+    attribute on your L{WorkItem} subclass with a column holding a string
+    (varchar).  L{WorkItem} subclasses with the same value for C{group} will
+    not execute their C{doWork} methods concurrently.  Furthermore, if the lock
+    cannot be quickly acquired, database resources associated with the
+    transaction attempting it will be released, and the transaction rolled back
+    until a future transaction I{can} can acquire it quickly.  If you do not
+    want any limits to concurrency, simply leave it set to C{None}.
+
+    In some applications it's possible to coalesce work together; to grab
+    multiple L{WorkItem}s in one C{doWork} transaction.  All you need to do is
+    to delete the rows which back other L{WorkItem}s from the database, and
+    they won't be processed.  Using the C{group} attribute, you can easily
+    prevent concurrency so that you can easily group these items together and
+    remove them as a set (otherwise, other workers might be attempting to
+    concurrently work on them and you'll get deletion errors).
+
+    However, if doing more work at once is less expensive, and you want to
+    avoid processing lots of individual rows in tiny transactions, you may also
+    delay the execution of a L{WorkItem} by setting its C{notBefore} attribute.
+    This must be backed by a database timestamp, so that processes which happen
+    to be restarting and examining the work to be done in the database don't
+    jump the gun and do it too early.
+
+    @cvar workID: the unique identifier (primary key) for items of this type.
+        On an instance of a concrete L{WorkItem} subclass, this attribute must
+        be an integer; on the concrete L{WorkItem} subclass itself, this
+        attribute must be a L{twext.enterprise.dal.syntax.ColumnSyntax}.  Note
+        that this is automatically taken care of if you simply have a
+        corresponding C{work_id} column in the associated L{fromTable} on your
+        L{WorkItem} subclass.  This column must be unique, and it must be an
+        integer.  In almost all cases, this column really ought to be filled
+        out by a database-defined sequence; if not, you need some other
+        mechanism for establishing a cluster-wide sequence.
+    @type workID: L{int} on instance,
+        L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
+
+    @cvar notBefore: the timestamp before which this item should I{not} be
+        processed.  If unspecified, this should be the date and time of the
+        creation of the L{WorkItem}.
+    @type notBefore: L{datetime.datetime} on instance,
+        L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
+
+    @ivar group: If not C{None}, a unique-to-the-database identifier for which
+        only one L{WorkItem} will execute at a time.
+    @type group: L{unicode} or L{NoneType}
     """
 
+    group = None
+
     @abstract
     def doWork(self):
         """
@@ -266,11 +385,13 @@
         @return: the relevant subclass
         @rtype: L{type}
         """
+        tableName = table.model.name
         for subcls in cls.__subclasses__():
-            if table == getattr(subcls, "table", None):
+            clstable = getattr(subcls, "table", None)
+            if table == clstable:
                 return subcls
         raise KeyError("No mapped {0} class for {1}.".format(
-            cls, table
+            cls, tableName
         ))
 
 
@@ -301,13 +422,16 @@
     response = []
 
 
+
 class IdentifyNode(Command):
     """
     Identify this node to its peer.  The connector knows which hostname it's
     looking for, and which hostname it considers itself to be, only the
     initiator (not the listener) issues this command.  This command is
-    necessary because if reverse DNS isn't set up perfectly, the listener may
-    not be able to identify its peer.
+    necessary because we don't want to rely on DNS; if reverse DNS weren't set
+    up perfectly, the listener would not be able to identify its peer, and it
+    is easier to modify local configuration so that L{socket.getfqdn} returns
+    the right value than to ensure that DNS doesself.
     """
 
     arguments = [
@@ -335,21 +459,37 @@
     """
     A connection to a peer node.  Symmetric; since the 'client' and the
     'server' both serve the same role, the logic is the same in every node.
+
+    @ivar localWorkerPool: the pool of local worker procesess that can process
+        queue work.
+    @type localWorkerPool: L{WorkerConnectionPool}
+
+    @ivar _reportedLoad: The number of outstanding requests being processed by
+        the peer of this connection, from all requestors (both the host of this
+        connection and others), as last reported by the most recent
+        L{ReportLoad} message received from the peer.
+    @type _reportedLoad: L{int}
+
+    @ivar _bonusLoad: The number of additional outstanding requests being
+        processed by the peer of this connection; the number of requests made
+        by the host of this connection since the last L{ReportLoad} message.
+    @type _bonusLoad: L{int}
     """
+    implements(_IWorkPerformer)
 
     def __init__(self, peerPool, boxReceiver=None, locator=None):
         """
-        Initialize this L{ConnectionFromPeerNode} with a reference to a pool of
-        local workers.
+        Initialize this L{ConnectionFromPeerNode} with a reference to a
+        L{PeerConnectionPool}, as well as required initialization arguments for
+        L{AMP}.
 
-        @param localWorkerPool: the pool of local worker procesess that can
-            process queue work.
-        @type localWorkerPool: L{WorkerConnectionPool}
+        @param peerPool: the connection pool within which this
+            L{ConnectionFromPeerNode} is a participant.
+        @type peerPool: L{PeerConnectionPool}
 
         @see: L{AMP.__init__}
         """
         self.peerPool = peerPool
-        self.localWorkerPool = peerPool.workerPool
         self._bonusLoad = 0
         self._reportedLoad = 0
         super(ConnectionFromPeerNode, self).__init__(peerPool.schema,
@@ -360,12 +500,11 @@
         """
         Report the current load for the local worker pool to this peer.
         """
-        return self.callRemote(ReportLoad,
-                               load=self.localWorkerPool.totalLoad())
+        return self.callRemote(ReportLoad, load=self.totalLoad())
 
 
     @ReportLoad.responder
-    def repotedLoad(self, load):
+    def reportedLoad(self, load):
         """
         The peer reports its load.
         """
@@ -410,15 +549,7 @@
         specific peer node-controller process to perform some work, having
         already determined that it's appropriate.
 
-        @param table: The table where work is waiting.
-        @type table: L{TableSyntax}
-
-        @param workID: The primary key identifier of the given work.
-        @type workID: L{int}
-
-        @return: a L{Deferred} firing with an empty dictionary when the work is
-            complete.
-        @rtype: L{Deferred} firing L{dict}
+        @see: L{_IWorkPerformer.performWork}
         """
         d = self.callRemote(PerformWork, table=table, workID=workID)
         self._bonusLoad += 1
@@ -426,6 +557,9 @@
         def performed(result):
             self._bonusLoad -= 1
             return result
+        @d.addCallback
+        def success(result):
+            return None
         return d
 
 
@@ -443,12 +577,15 @@
 
         @return: a L{Deferred} that fires when the work has been completed.
         """
-        return self.localWorkerPool.performWork(table, workID)
+        return self.peerPool.performWorkForPeer(table, workID).addCallback(
+            lambda ignored: {}
+        )
 
 
     @IdentifyNode.responder
     def identifyPeer(self, host, port):
         self.peerPool.mapPeer(host, port, self)
+        return {}
 
 
 
@@ -460,8 +597,9 @@
     L{ConnectionFromPeerNode}, but one that dispenses work to the local worker
     processes rather than to a remote connection pool.
     """
+    implements(_IWorkPerformer)
 
-    def __init__(self, maximumLoadPerWorker=0):
+    def __init__(self, maximumLoadPerWorker=5):
         self.workers = []
         self.maximumLoadPerWorker = maximumLoadPerWorker
 
@@ -488,12 +626,12 @@
         hasAvailableCapacity to process another queue item?
         """
         for worker in self.workers:
-            if worker.currentLoad() < self.maximumLoadPerWorker:
+            if worker.currentLoad < self.maximumLoadPerWorker:
                 return True
         return False
 
 
-    def totalLoad(self):
+    def allWorkerLoad(self):
         """
         The total load of all currently connected workers.
         """
@@ -526,7 +664,9 @@
             complete.
         @rtype: L{Deferred} firing L{dict}
         """
-        return self._selectLowestLoadWorker().performWork(table, workID)
+        preferredWorker = self._selectLowestLoadWorker()
+        result = preferredWorker.performWork(table, workID)
+        return result
 
 
 
@@ -534,15 +674,12 @@
     """
     An individual connection from a worker, as seem from the master's
     perspective.  L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
-
-    @ivar workerPool: The connection pool that this individual connection is
-        participating in.
-    @type workerPool: L{WorkerConnectionPool}
     """
 
-    def __init__(self, schema, workerPool, boxReceiver=None, locator=None):
-        self.workerPool = workerPool
-        super(ConnectionFromWorker, self).__init__(schema, boxReceiver, locator)
+    def __init__(self, peerPool, boxReceiver=None, locator=None):
+        super(ConnectionFromWorker, self).__init__(peerPool.schema, boxReceiver,
+                                                   locator)
+        self.peerPool = peerPool
         self._load = 0
 
 
@@ -560,7 +697,7 @@
         state.
         """
         result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
-        self.workerPool.addWorker(self)
+        self.peerPool.workerPool.addWorker(self)
         return result
 
 
@@ -569,7 +706,7 @@
         AMP boxes will no longer be received.
         """
         result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
-        self.workerPool.removeWorker(self)
+        self.peerPool.workerPool.removeWorker(self)
         return result
 
 
@@ -620,7 +757,7 @@
         C{self}, since C{self} is also an object that has a C{performWork}
         method.
         """
-        return succeed(self)
+        return self
 
 
     def performWork(self, table, workID):
@@ -632,9 +769,9 @@
 
     def enqueueWork(self, txn, workItemType, **kw):
         """
-        There is some work to do.  Do it, someplace else, ideally in parallel.
-        Later, let the caller know that the work has been completed by firing a
-        L{Deferred}.
+        There is some work to do.  Do it, ideally someplace else, ideally in
+        parallel.  Later, let the caller know that the work has been completed
+        by firing a L{Deferred}.
 
         @param workItemType: The type of work item to be enqueued.
         @type workItemType: A subtype of L{WorkItem}
@@ -659,21 +796,67 @@
         process has instructed this worker to do it; so, look up the data in
         the row, and do it.
         """
-        @inlineCallbacks
-        def work(txn):
-            workItemClass = WorkItem.forTable(table)
-            workItem = yield workItemClass.load(txn, workID)
-            # TODO: what if we fail?  error-handling should be recorded
-            # someplace, the row should probably be marked, re-tries should be
-            # triggerable administratively.
-            yield workItem.delete()
-            # TODO: verify that workID is the primary key someplace.
-            yield workItem.doWork()
-            returnValue({})
-        return inTransaction(self.transactionFactory, work)
+        return (ultimatelyPerform(self.transactionFactory, table, workID)
+                .addCallback(lambda ignored: {}))
 
 
 
+def ultimatelyPerform(txnFactory, table, workID):
+    """
+    Eventually, after routing the work to the appropriate place, somebody
+    actually has to I{do} it.
+
+    @param txnFactory: a 0- or 1-argument callable that creates an
+        L{IAsyncTransaction}
+    @type txnFactory: L{callable}
+
+    @param table: the table object that corresponds to the necessary work item
+    @type table: L{twext.enterprise.dal.syntax.TableSyntax}
+
+    @param workID: the ID of the work to be performed
+    @type workID: L{int}
+
+    @return: a L{Deferred} which fires with C{None} when the work has been
+        performed, or fails if the work can't be performed.
+    """
+    @inlineCallbacks
+    def work(txn):
+        workItemClass = WorkItem.forTable(table)
+        workItem = yield workItemClass.load(txn, workID)
+        if workItem.group is not None:
+            yield NamedLock.acquire(txn, workItem.group)
+        # TODO: what if we fail?  error-handling should be recorded someplace,
+        # the row should probably be marked, re-tries should be triggerable
+        # administratively.
+        yield workItem.delete()
+        # TODO: verify that workID is the primary key someplace.
+        yield workItem.doWork()
+    return inTransaction(txnFactory, work)
+
+
+
+class LocalPerformer(object):
+    """
+    Implementor of C{performWork} that does its work in the local process,
+    regardless of other conditions.
+    """
+    implements(_IWorkPerformer)
+
+    def __init__(self, txnFactory):
+        """
+        Create this L{LocalPerformer} with a transaction factory.
+        """
+        self.txnFactory = txnFactory
+
+
+    def performWork(self, table, workID):
+        """
+        Perform the given work right now.
+        """
+        return ultimatelyPerform(self.txnFactory, table, workID)
+
+
+
 class WorkerFactory(Factory, object):
     """
     Factory, to be used as the client to connect from the worker to the
@@ -724,9 +907,10 @@
     A L{WorkProposal} is a proposal for work that will be executed, perhaps on
     another node, perhaps in the future.
 
-    @ivar pool: the connection pool which this L{WorkProposal} will use to
-        submit its work.
-    @type pool: L{PeerConnectionPool}
+    @ivar _chooser: The object which will choose where the work in this
+        proposal gets performed.  This must have both a C{choosePerformer}
+        method and a C{reactor} attribute, providing an L{IReactorTime}.
+    @type _chooser: L{PeerConnectionPool} or L{LocalQueuer}
 
     @ivar txn: The transaction where the work will be enqueued.
     @type txn: L{IAsyncTransaction}
@@ -739,8 +923,8 @@
     @type kw: L{dict}
     """
 
-    def __init__(self, pool, txn, workItemType, kw):
-        self.pool = pool
+    def __init__(self, chooser, txn, workItemType, kw):
+        self._chooser = chooser
         self.txn = txn
         self.workItemType = workItemType
         self.kw = kw
@@ -756,22 +940,25 @@
         commit, and asking the local node controller process to do the work.
         """
         @passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
-        def created(item):
-            self._whenProposed.callback(None)
+        def whenCreated(item):
+            self._whenProposed.callback(self)
             @self.txn.postCommit
             def whenDone():
-                self._whenCommitted.callback(None)
-                @passthru(self.pool.choosePerformer().addCallback)
-                def performerChosen(performer):
-                    @passthru(performer.performWork(item.table, item.workID))
+                self._whenCommitted.callback(self)
+                def maybeLater():
+                    performer = self._chooser.choosePerformer()
+                    @passthru(performer.performWork(item.table, item.workID)
+                              .addCallback)
                     def performed(result):
-                        self._whenExecuted.callback(None)
+                        self._whenExecuted.callback(self)
                     @performed.addErrback
                     def notPerformed(why):
                         self._whenExecuted.errback(why)
-                @performerChosen.addErrback
-                def notChosen(whyNot):
-                    self._whenExecuted.errback(whyNot)
+                reactor = self._chooser.reactor
+                when = max(0, astimestamp(item.notBefore) - reactor.seconds())
+                # TODO: Track the returned DelayedCall so it can be stopped when
+                # the service stops.
+                self._chooser.reactor.callLater(when, maybeLater)
             @self.txn.postAbort
             def whenFailed():
                 self._whenCommitted.errback(TransactionFailed)
@@ -794,8 +981,8 @@
             completed within the transaction of the L{WorkItem.doWork} that
             gets executed.
 
-        @return: a L{Deferred} that fires with C{None} when the work has been
-            completed remotely.
+        @return: a L{Deferred} that fires with this L{WorkProposal} when the
+            work has been completed remotely.
         """
         return _cloneDeferred(self._whenExecuted)
 
@@ -805,9 +992,10 @@
         Let the caller know when the work has been proposed; i.e. when the work
         is first transmitted to the database.
 
-        @return: a L{Deferred} that fires with C{None} when the relevant
-            commands have been sent to the database to create the L{WorkItem},
-            and fails if those commands do not succeed for some reason.
+        @return: a L{Deferred} that fires with this L{WorkProposal} when the
+            relevant commands have been sent to the database to create the
+            L{WorkItem}, and fails if those commands do not succeed for some
+            reason.
         """
         return _cloneDeferred(self._whenProposed)
 
@@ -818,15 +1006,15 @@
         transaction where the work was proposed has been committed to the
         database.
 
-        @return: a L{Deferred} that fires with C{None} when the relevant
-            transaction has been committed, or fails if the transaction is not
-            committed for any reason.
+        @return: a L{Deferred} that fires with this L{WorkProposal} when the
+            relevant transaction has been committed, or fails if the
+            transaction is not committed for any reason.
         """
         return _cloneDeferred(self._whenCommitted)
 
 
 
-class PeerConnectionPool(Service, object):
+class PeerConnectionPool(MultiService, object):
     """
     Each node has a L{PeerConnectionPool} connecting it to all the other nodes
     currently active on the same database.
@@ -844,8 +1032,10 @@
         up or if it is shutting down.
     @type thisProcess: L{NodeInfo}
 
-    @ivar queueProcessTimeout: The maximum amount of time allowed for a queue
-        item to be processed.  By default, 10 minutes.
+    @ivar queueProcessTimeout: The amount of time after a L{WorkItem} is
+        scheduled to be processed (its C{notBefore} attribute) that it is
+        considered to be "orphaned" and will be run by a lost-work check rather
+        than waiting for it to be requested.  By default, 10 minutes.
     @type queueProcessTimeout: L{float} (in seconds)
 
     @ivar queueDelayedProcessInterval: The amount of time between database
@@ -865,6 +1055,8 @@
     """
     implements(IQueuer)
 
+    from socket import getfqdn
+    from os import getpid
     getfqdn = staticmethod(getfqdn)
     getpid = staticmethod(getpid)
 
@@ -889,6 +1081,7 @@
             the L{WorkItem}s that this L{PeerConnectionPool} will process.
         @type schema: L{Schema}
         """
+        super(PeerConnectionPool, self).__init__()
         self.reactor = reactor
         self.transactionFactory = transactionFactory
         self.hostname = self.getfqdn()
@@ -912,13 +1105,16 @@
         self.peers.append(peer)
 
 
+    def totalLoad(self):
+        return self.workerPool.allWorkerLoad()
+
+
     def workerListenerFactory(self):
         """
         Factory that listens for connections from workers.
         """
         f = Factory()
-        f.buildProtocol = lambda addr: ConnectionFromWorker(self.schema,
-                                                            self.workerPool)
+        f.buildProtocol = lambda addr: ConnectionFromWorker(self)
         return f
 
 
@@ -929,7 +1125,7 @@
         self.peers.remove(peer)
 
 
-    def choosePerformer(self):
+    def choosePerformer(self, onlyLocally=False):
         """
         Choose a peer to distribute work to based on the current known slot
         occupancy of the other nodes.  Note that this will prefer distributing
@@ -937,20 +1133,26 @@
         should be lower-latency.  Also, if no peers are available, work will be
         submitted locally even if the worker pool is already over-subscribed.
 
-        @return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
-            with the chosen 'peer', i.e. object with a C{performWork} method,
-            as soon as one is available.  Normally this will be synchronous,
-            but we need to account for the possibility that we may need to
-            connect to other hosts.
-        @rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
-            L{ConnectionFromPeerNode} or L{WorkerConnectionPool}
+        @return: the chosen peer.
+        @rtype: L{_IWorkPerformer} L{ConnectionFromPeerNode} or
+            L{WorkerConnectionPool}
         """
-        if not self.workerPool.hasAvailableCapacity() and self.peers:
+        if self.workerPool.hasAvailableCapacity():
+            return self.workerPool
+        if self.peers and not onlyLocally:
             return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
         else:
-            return succeed(self.workerPool)
+            return LocalPerformer(self.transactionFactory)
 
 
+    def performWorkForPeer(self, table, workID):
+        """
+        A peer has requested us to perform some work; choose a work performer
+        local to this node, and then execute it.
+        """
+        return self.choosePerformer(onlyLocally=True).performWork(table, workID)
+
+
     def enqueueWork(self, txn, workItemType, **kw):
         """
         There is some work to do.  Do it, someplace else, ideally in parallel.
@@ -1022,19 +1224,23 @@
         """
         @inlineCallbacks
         def workCheck(txn):
-
-            nodes = [(node.hostname, node.port) for node in
-                     (yield self.activeNodes(txn))]
-            nodes.sort()
-            self._lastSeenTotalNodes = len(nodes)
-            self._lastSeenNodeIndex = nodes.index((self.thisProcess.hostname,
-                                                   self.thisProcess.port))
+            if self.thisProcess:
+                nodes = [(node.hostname, node.port) for node in
+                         (yield self.activeNodes(txn))]
+                nodes.sort()
+                self._lastSeenTotalNodes = len(nodes)
+                self._lastSeenNodeIndex = nodes.index(
+                    (self.thisProcess.hostname, self.thisProcess.port)
+                )
             for itemType in self.allWorkItemTypes():
-                for overdueItem in (
-                        yield itemType.query(
-                            txn, itemType.created > self.queueProcessTimeout
-                    )):
-                    peer = yield self.choosePerformer()
+                tooLate = datetime.utcfromtimestamp(
+                    self.reactor.seconds() - self.queueProcessTimeout
+                )
+                overdueItems = (yield itemType.query(
+                    txn, (itemType.notBefore < tooLate))
+                )
+                for overdueItem in overdueItems:
+                    peer = self.choosePerformer()
                     yield peer.performWork(overdueItem.table,
                                            overdueItem.workID)
         return inTransaction(self.transactionFactory, workCheck)
@@ -1077,11 +1283,10 @@
         @inlineCallbacks
         def startup(txn):
             endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
-            f = Factory()
-            f.buildProtocol = self.createPeerConnection
             # If this fails, the failure mode is going to be ugly, just like all
             # conflicted-port failures.  But, at least it won't proceed.
-            yield endpoint.listen(f)
+            self._listeningPortObject = yield endpoint.listen(self.peerFactory())
+            self.ampPort = self._listeningPortObject.getHost().port
             yield Lock.exclusive(NodeInfo.table).on(txn)
             nodes = yield self.activeNodes(txn)
             selves = [node for node in nodes
@@ -1104,6 +1309,7 @@
         @self._startingUp.addBoth
         def done(result):
             self._startingUp = None
+            super(PeerConnectionPool, self).startService()
             return result
 
 
@@ -1122,7 +1328,7 @@
         if self._currentWorkDeferred is not None:
             yield self._currentWorkDeferred
         for peer in self.peers:
-            peer.transport.loseConnection()
+            peer.transport.abortConnection()
 
 
     def activeNodes(self, txn):
@@ -1151,80 +1357,80 @@
         @param node: a description of the master to connect to.
         @type node: L{NodeInfo}
         """
-        f = Factory()
-        f.buildProtocol = self.createPeerConnection
-        @passthru(node.endpoint(self.reactor).connect(f).addCallback)
-        def connected(proto):
-            self.mapPeer(node, proto)
-            proto.callRemote(IdentifyNode, self.thisProcess)
+        connected = node.endpoint(self.reactor).connect(self.peerFactory())
+        def whenConnected(proto):
+            self.mapPeer(node.hostname, node.port, proto)
+            proto.callRemote(IdentifyNode,
+                             host=self.thisProcess.hostname,
+                             port=self.thisProcess.port).addErrback(
+                                 noted, "identify"
+                             )
+        def noted(err, x="connect"):
+            log.msg("Could not {0} to cluster peer {1} because {2}"
+                    .format(x, node, str(err.value)))
+        connected.addCallbacks(whenConnected, noted)
 
 
-    def createPeerConnection(self, addr):
-        return ConnectionFromPeerNode(self)
+    def peerFactory(self):
+        """
+        Factory for peer connections.
 
+        @return: a L{Factory} that will produce L{ConnectionFromPeerNode}
+            protocols attached to this L{PeerConnectionPool}.
+        """
+        return _PeerPoolFactory(self)
 
 
-class ImmediateWorkProposal(object):
-    """
-    Like L{WorkProposal}, but for items that must be executed immediately
-    because no real queue is set up yet.
 
-    @see: L{WorkProposal}, L{NullQueuer.enqueueWork}
+class _PeerPoolFactory(Factory, object):
     """
-    def __init__(self, proposed, done):
-        self.proposed = proposed
-        self.done = done
+    Protocol factory responsible for creating L{ConnectionFromPeerNode}
+    connections, both client and server.
+    """
 
+    def __init__(self, peerConnectionPool):
+        self.peerConnectionPool = peerConnectionPool
 
-    def whenExecuted(self):
-        return _cloneDeferred(self.done)
 
+    def buildProtocol(self, addr):
+        return ConnectionFromPeerNode(self.peerConnectionPool)
 
-    def whenProposed(self):
-        return _cloneDeferred(self.proposed)
 
 
-    def whenCommitted(self):
-        return _cloneDeferred(self.done)
-
-
-
-class NullQueuer(object):
+class LocalQueuer(object):
     """
-    When work is enqueued with this queuer, it is just executed immediately,
-    within the same transaction.  While this is technically correct, it is not
-    very efficient.
+    When work is enqueued with this queuer, it is just executed locally.
     """
     implements(IQueuer)
 
+    def __init__(self, txnFactory, reactor=None):
+        self.txnFactory = txnFactory
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+
+
+    def choosePerformer(self):
+        """
+        Choose to perform the work locally.
+        """
+        return LocalPerformer(self.txnFactory)
+
+
     def enqueueWork(self, txn, workItemType, **kw):
         """
-        Do this work immediately.
+        Do this work in the local process.
 
         @see: L{PeerConnectionPool.enqueueWork}
 
         @return: a pseudo work proposal, since everything completes at the same
             time.
-        @rtype: L{ImmediateWorkProposal}
+        @rtype: L{WorkProposal}
         """
-        proposed = Deferred()
-        done = Deferred()
-        @inlineCallbacks
-        def doit():
-            item = yield self.workItemType.create(self.txn, **self.kw)
-            proposed.callback(True)
-            yield item.delete()
-            yield item.doWork()
-        @txn.postCommit
-        def committed():
-            done.callback(True)
-        @txn.postAbort
-        def aborted():
-            tf = TransactionFailed()
-            done.errback(tf)
-            if not proposed.called:
-                proposed.errback(tf)
-        return ImmediateWorkProposal(proposed, done)
+        wp = WorkProposal(self, txn, workItemType, kw)
+        wp._start()
+        return wp
 
 
 
+

Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -30,6 +30,7 @@
 from twisted.trial.unittest import TestCase
 
 from twisted.internet.task import Clock
+from twisted.internet.defer import Deferred, fail
 
 from twisted.internet.interfaces import IReactorThreads
 
@@ -72,11 +73,24 @@
     """
 
     def assertResultList(self, resultList, expected):
+        """
+        Assert that a list created with L{resultOf} contais the expected
+        result.
+
+        @param resultList: The return value of L{resultOf}.
+        @type resultList: L{list}
+
+        @param expected: The expected value that should be present in the list;
+            a L{Failure} if an exception is expected to be raised.
+        """
         if not resultList:
             self.fail("No result; Deferred didn't fire yet.")
         else:
             if isinstance(resultList[0], Failure):
-                resultList[0].raiseException()
+                if isinstance(expected, Failure):
+                    resultList[0].trap(expected.type)
+                else:
+                    resultList[0].raiseException()
             else:
                 self.assertEqual(resultList, [expected])
 
@@ -369,11 +383,11 @@
 
 
     @property
-    def _q(self):
+    def _get_q(self):
         return self._q_
 
 
-    @_q.setter
+    @_get_q.setter
     def _q(self, newq):
         if newq is not None:
             oget = newq.get
@@ -1080,6 +1094,87 @@
         return t
 
 
+    def test_preCommitSuccess(self):
+        """
+        Callables passed to L{IAsyncTransaction.preCommit} will be invoked upon
+        commit.
+        """
+        txn = self.createTransaction()
+        def simple():
+            simple.done = True
+        simple.done = False
+        txn.preCommit(simple)
+        self.assertEquals(simple.done, False)
+        result = self.resultOf(txn.commit())
+        self.assertEquals(len(result), 1)
+        self.assertEquals(simple.done, True)
+
+
+    def test_deferPreCommit(self):
+        """
+        If callables passed to L{IAsyncTransaction.preCommit} return
+        L{Deferred}s, they will defer the actual commit operation until it has
+        fired.
+        """
+        txn = self.createTransaction()
+        d = Deferred()
+        def wait():
+            wait.started = True
+            def executed(it):
+                wait.sqlResult = it
+            # To make sure the _underlying_ commit operation was Deferred, we
+            # have to execute some SQL to make sure it happens.
+            return (d.addCallback(lambda ignored: txn.execSQL("some test sql"))
+                     .addCallback(executed))
+        wait.started = False
+        wait.sqlResult = None
+        txn.preCommit(wait)
+        result = self.resultOf(txn.commit())
+        self.flushHolders()
+        self.assertEquals(wait.started, True)
+        self.assertEquals(wait.sqlResult, None)
+        self.assertEquals(result, [])
+        d.callback(None)
+        # allow network I/O for pooled / networked implementation; there should
+        # be the commit message now.
+        self.flushHolders()
+        self.assertEquals(len(result), 1)
+        self.assertEquals(wait.sqlResult, [[1, "some test sql"]])
+
+
+    def test_failPreCommit(self):
+        """
+        If callables passed to L{IAsyncTransaction.preCommit} raise an
+        exception or return a Failure, subsequent callables will not be run,
+        and the transaction will be aborted.
+        """
+        def test(flawedCallable, exc):
+            # Set up.
+            test.committed = False
+            test.aborted = False
+            # Create transaction and add monitoring hooks.
+            txn = self.createTransaction()
+            def didCommit():
+                test.committed = True
+            def didAbort():
+                test.aborted = True
+            txn.postCommit(didCommit)
+            txn.postAbort(didAbort)
+            txn.preCommit(flawedCallable)
+            result = self.resultOf(txn.commit())
+            self.flushHolders()
+            self.assertResultList(result, Failure(exc()))
+            self.assertEquals(test.committed, False)
+            self.assertEquals(test.aborted, True)
+
+        def failer():
+            return fail(ZeroDivisionError())
+        def raiser():
+            raise EOFError()
+        test(failer, ZeroDivisionError)
+        test(raiser, EOFError)
+
+
     def test_noOpCommitDoesntHinderReconnection(self):
         """
         Until you've executed a query or performed a statement on an ADBAPI

Copied: CalendarServer/trunk/twext/enterprise/test/test_locking.py (from rev 10289, CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_locking.py)
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_locking.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/test/test_locking.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -0,0 +1,78 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for mutual exclusion locks.
+"""
+
+from twisted.internet.defer import inlineCallbacks
+from twisted.trial.unittest import TestCase
+
+from twext.enterprise.fixtures import buildConnectionPool
+from twext.enterprise.locking import NamedLock
+from twext.enterprise.dal.syntax import Select
+from twext.enterprise.locking import LockSchema
+
+schemaText = """
+create table NAMED_LOCK (LOCK_NAME varchar(255) unique primary key);
+"""
+
+class TestLocking(TestCase):
+    """
+    Test locking and unlocking a database row.
+    """
+
+    def setUp(self):
+        """
+        Build a connection pool for the tests to use.
+        """
+        self.pool = buildConnectionPool(self, schemaText)
+
+
+    @inlineCallbacks
+    def test_acquire(self):
+        """
+        Acquiring a lock adds a row in that transaction.
+        """
+        txn = self.pool.connection()
+        yield NamedLock.acquire(txn, u"a test lock")
+        rows = yield Select(From=LockSchema.NAMED_LOCK).on(txn)
+        self.assertEquals(rows, [tuple([u"a test lock"])])
+
+
+    @inlineCallbacks
+    def test_release(self):
+        """
+        Releasing an acquired lock removes the row.
+        """
+        txn = self.pool.connection()
+        lck = yield NamedLock.acquire(txn, u"a test lock")
+        yield lck.release()
+        rows = yield Select(From=LockSchema.NAMED_LOCK).on(txn)
+        self.assertEquals(rows, [])
+
+
+    @inlineCallbacks
+    def test_autoRelease(self):
+        """
+        Committing a transaction automatically releases all of its locks.
+        """
+        txn = self.pool.connection()
+        yield NamedLock.acquire(txn, u"something")
+        yield txn.commit()
+        txn2 = self.pool.connection()
+        rows = yield Select(From=LockSchema.NAMED_LOCK).on(txn2)
+        self.assertEquals(rows, [])

Copied: CalendarServer/trunk/twext/enterprise/test/test_queue.py (from rev 10289, CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py)
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_queue.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/test/test_queue.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -0,0 +1,629 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.enterprise.queue}.
+"""
+
+import datetime
+
+# TODO: There should be a store-building utility within twext.enterprise.
+
+from twisted.protocols.amp import Command
+from twisted.internet.task import Clock as _Clock
+
+from txdav.common.datastore.test.util import buildStore
+
+from twext.enterprise.dal.syntax import SchemaSyntax, Select
+from twext.enterprise.dal.record import fromTable
+from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
+
+from twext.enterprise.queue import (
+    inTransaction, PeerConnectionPool, WorkItem, astimestamp
+)
+
+from twisted.trial.unittest import TestCase
+from twisted.internet.defer import (
+    Deferred, inlineCallbacks, gatherResults, passthru#, returnValue
+)
+
+from twisted.application.service import Service, MultiService
+
+from twext.enterprise.queue import (
+    LocalPerformer, _IWorkPerformer, WorkerConnectionPool, SchemaAMP,
+    TableSyntaxByName
+)
+
+from twext.enterprise.dal.record import Record
+
+from twext.enterprise.queue import ConnectionFromPeerNode
+from twext.enterprise.fixtures import buildConnectionPool
+from zope.interface.verify import verifyObject
+from twisted.test.proto_helpers import StringTransport
+
+class Clock(_Clock):
+    """
+    More careful L{IReactorTime} fake which mimics the exception behavior of
+    the real reactor.
+    """
+
+    def callLater(self, _seconds, _f, *args, **kw):
+        if _seconds < 0:
+            raise ValueError("%s<0: "%(_seconds,))
+        return super(Clock, self).callLater(_seconds, _f, *args, **kw)
+
+
+
+def transactionally(transactionCreator):
+    """
+    Perform the decorated function immediately in a transaction, replacing its
+    name with a L{Deferred}.
+
+    Use like so::
+
+        @transactionally(connectionPool.connection)
+        @inlineCallbacks
+        def it(txn):
+            yield txn.doSomething()
+        it.addCallback(firedWhenDone)
+
+    @param transactionCreator: A 0-arg callable that returns an
+        L{IAsyncTransaction}.
+    """
+    def thunk(operation):
+        return inTransaction(transactionCreator, operation)
+    return thunk
+
+
+
+class UtilityTests(TestCase):
+    """
+    Tests for supporting utilities.
+    """
+
+    def test_inTransactionSuccess(self):
+        """
+        L{inTransaction} invokes its C{transactionCreator} argument, and then
+        returns a L{Deferred} which fires with the result of its C{operation}
+        argument when it succeeds.
+        """
+        class faketxn(object):
+            def __init__(self):
+                self.commits = []
+                self.aborts = []
+            def commit(self):
+                self.commits.append(Deferred())
+                return self.commits[-1]
+            def abort(self):
+                self.aborts.append(Deferred())
+                return self.aborts[-1]
+
+        createdTxns = []
+        def createTxn():
+            createdTxns.append(faketxn())
+            return createdTxns[-1]
+        dfrs = []
+        def operation(t):
+            self.assertIdentical(t, createdTxns[-1])
+            dfrs.append(Deferred())
+            return dfrs[-1]
+        d = inTransaction(createTxn, operation)
+        x = []
+        d.addCallback(x.append)
+        self.assertEquals(x, [])
+        self.assertEquals(len(dfrs), 1)
+        dfrs[0].callback(35)
+        # Commit in progress, so still no result...
+        self.assertEquals(x, [])
+        createdTxns[0].commits[0].callback(42)
+        # Committed, everything's done.
+        self.assertEquals(x, [35])
+
+
+
+class SimpleSchemaHelper(SchemaTestHelper):
+    def id(self):
+        return 'worker'
+
+SQL = passthru
+
+schemaText = SQL("""
+    create table DUMMY_WORK_ITEM (WORK_ID integer primary key,
+                                  NOT_BEFORE timestamp,
+                                  A integer, B integer);
+    create table DUMMY_WORK_DONE (WORK_ID integer primary key,
+                                  A_PLUS_B integer);
+""")
+
+nodeSchema = SQL("""
+    create table NODE_INFO (HOSTNAME varchar(255) not null,
+                            PID integer not null,
+                            PORT integer not null,
+                            TIME timestamp default current_timestamp not null,
+                            primary key (HOSTNAME, PORT));
+""")
+
+schema = SchemaSyntax(SimpleSchemaHelper().schemaFromString(schemaText))
+
+dropSQL = ["drop table {name}".format(name=table.model.name)
+           for table in schema]
+
+
+class DummyWorkDone(Record, fromTable(schema.DUMMY_WORK_DONE)):
+    """
+    Work result.
+    """
+
+
+
+class DummyWorkItem(WorkItem, fromTable(schema.DUMMY_WORK_ITEM)):
+    """
+    Sample L{WorkItem} subclass that adds two integers together and stores them
+    in another table.
+    """
+
+    def doWork(self):
+        return DummyWorkDone.create(self.transaction, workID=self.workID,
+                                    aPlusB=self.a + self.b)
+
+
+
+class SchemaAMPTests(TestCase):
+    """
+    Tests for L{SchemaAMP} faithfully relaying tables across the wire.
+    """
+
+    def test_sendTableWithName(self):
+        """
+        You can send a reference to a table through a L{SchemaAMP} via
+        L{TableSyntaxByName}.
+        """
+        client = SchemaAMP(schema)
+        class SampleCommand(Command):
+            arguments = [('table', TableSyntaxByName())]
+        class Receiver(SchemaAMP):
+            @SampleCommand.responder
+            def gotIt(self, table):
+                self.it = table
+                return {}
+        server = Receiver(schema)
+        clientT = StringTransport()
+        serverT = StringTransport()
+        client.makeConnection(clientT)
+        server.makeConnection(serverT)
+        client.callRemote(SampleCommand, table=schema.DUMMY_WORK_ITEM)
+        server.dataReceived(clientT.io.getvalue())
+        self.assertEqual(server.it, schema.DUMMY_WORK_ITEM)
+
+
+
+
+class WorkItemTests(TestCase):
+    """
+    A L{WorkItem} is an item of work that can be executed.
+    """
+
+    def test_forTable(self):
+        """
+        L{WorkItem.forTable} returns L{WorkItem} subclasses mapped to the given
+        table.
+        """
+        self.assertIdentical(WorkItem.forTable(schema.DUMMY_WORK_ITEM),
+                             DummyWorkItem)
+
+
+
+class WorkerConnectionPoolTests(TestCase):
+    """
+    A L{WorkerConnectionPool} is responsible for managing, in a node's
+    controller (master) process, the collection of worker (slave) processes
+    that are capable of executing queue work.
+    """
+
+
+
+class PeerConnectionPoolUnitTests(TestCase):
+    """
+    L{PeerConnectionPool} has many internal components.
+    """
+    def setUp(self):
+        """
+        Create a L{PeerConnectionPool} that is just initialized enough.
+        """
+        self.pcp = PeerConnectionPool(None, None, 4321, schema)
+
+
+    def checkPerformer(self, cls):
+        """
+        Verify that the performer returned by
+        L{PeerConnectionPool.choosePerformer}.
+        """
+        performer = self.pcp.choosePerformer()
+        self.failUnlessIsInstance(performer, cls)
+        verifyObject(_IWorkPerformer, performer)
+
+
+    def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
+        """
+        If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+        have spawned and no peers have established connections (either incoming
+        or outgoing), then it chooses an implementation of C{performWork} that
+        simply executes the work locally.
+        """
+        self.checkPerformer(LocalPerformer)
+
+
+    def test_choosingPerformerWithLocalCapacity(self):
+        """
+        If L{PeerConnectionPool.choosePerformer} is invoked when some workers
+        have spawned, then it should choose the worker pool as the local
+        performer.
+        """
+        # Give it some local capacity.
+        wlf = self.pcp.workerListenerFactory()
+        proto = wlf.buildProtocol(None)
+        proto.makeConnection(StringTransport())
+        # Sanity check.
+        self.assertEqual(len(self.pcp.workerPool.workers), 1)
+        self.assertEqual(self.pcp.workerPool.hasAvailableCapacity(), True)
+        # Now it has some capacity.
+        self.checkPerformer(WorkerConnectionPool)
+
+
+    def test_choosingPerformerFromNetwork(self):
+        """
+        If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+        have spawned but some peers have connected, then it should choose a
+        connection from the network to perform it.
+        """
+        peer = PeerConnectionPool(None, None, 4322, schema)
+        local = self.pcp.peerFactory().buildProtocol(None)
+        remote = peer.peerFactory().buildProtocol(None)
+        connection = Connection(local, remote)
+        connection.start()
+        self.checkPerformer(ConnectionFromPeerNode)
+
+
+    def test_performingWorkOnNetwork(self):
+        """
+        The L{PerformWork} command will get relayed to the remote peer
+        controller.
+        """
+        peer = PeerConnectionPool(None, None, 4322, schema)
+        local = self.pcp.peerFactory().buildProtocol(None)
+        remote = peer.peerFactory().buildProtocol(None)
+        connection = Connection(local, remote)
+        connection.start()
+        d = Deferred()
+        class DummyPerformer(object):
+            def performWork(self, table, workID):
+                self.table = table
+                self.workID = workID
+                return d
+        # Doing real database I/O in this test would be tedious so fake the
+        # first method in the call stack which actually talks to the DB.
+        dummy = DummyPerformer()
+        def chooseDummy(onlyLocally=False):
+            return dummy
+        peer.choosePerformer = chooseDummy
+        performed = local.performWork(schema.DUMMY_WORK_ITEM, 7384)
+        performResult = []
+        performed.addCallback(performResult.append)
+        # Sanity check.
+        self.assertEquals(performResult, [])
+        connection.flush()
+        self.assertEquals(dummy.table, schema.DUMMY_WORK_ITEM)
+        self.assertEquals(dummy.workID, 7384)
+        self.assertEquals(performResult, [])
+        d.callback(128374)
+        connection.flush()
+        self.assertEquals(performResult, [None])
+
+
+    @inlineCallbacks
+    def test_notBeforeWhenCheckingForLostWork(self):
+        """
+        L{PeerConnectionPool._periodicLostWorkCheck} should execute any
+        outstanding work items, but only those that are expired.
+        """
+        dbpool = buildConnectionPool(self, schemaText + nodeSchema)
+        # An arbitrary point in time.
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        # *why* does datetime still not have .astimestamp()
+        sinceEpoch = astimestamp(fakeNow)
+        clock = Clock()
+        clock.advance(sinceEpoch)
+        qpool = PeerConnectionPool(clock, dbpool.connection, 0, schema)
+
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+
+        @transactionally(dbpool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # First, one that's right now.
+            yield DummyWorkItem.create(txn, a=1, b=2, notBefore=fakeNow)
+
+            # Next, create one that's actually far enough into the past to run.
+            yield DummyWorkItem.create(
+                txn, a=3, b=4, notBefore=(
+                    # Schedule it in the past so that it should have already run.
+                    fakeNow - datetime.timedelta(
+                        seconds=qpool.queueProcessTimeout + 20
+                    )
+                )
+            )
+
+            # Finally, one that's actually scheduled for the future.
+            yield DummyWorkItem.create(
+                txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
+            )
+        yield setup
+        yield qpool._periodicLostWorkCheck()
+        @transactionally(dbpool.connection)
+        def check(txn):
+            return DummyWorkDone.all(txn)
+        every = yield check
+        self.assertEquals([x.aPlusB for x in every], [7])
+
+
+    @inlineCallbacks
+    def test_notBeforeWhenEnqueueing(self):
+        """
+        L{PeerConnectionPool.enqueueWork} enqueues some work immediately, but
+        only executes it when enough time has elapsed to allow the C{notBefore}
+        attribute of the given work item to have passed.
+        """
+        dbpool = buildConnectionPool(self, schemaText + nodeSchema)
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        sinceEpoch = astimestamp(fakeNow)
+        clock = Clock()
+        clock.advance(sinceEpoch)
+        qpool = PeerConnectionPool(clock, dbpool.connection, 0, schema)
+        realChoosePerformer = qpool.choosePerformer
+        performerChosen = []
+        def catchPerformerChoice():
+            result = realChoosePerformer()
+            performerChosen.append(True)
+            return result
+        qpool.choosePerformer = catchPerformerChoice
+        @transactionally(dbpool.connection)
+        def check(txn):
+            return qpool.enqueueWork(
+                txn, DummyWorkItem, a=3, b=9,
+                notBefore=datetime.datetime(2012, 12, 12, 12, 12, 20)
+            ).whenProposed()
+
+        proposal = yield check
+
+        # This is going to schedule the work to happen with some asynchronous
+        # I/O in the middle; this is a problem because how do we know when it's
+        # time to check to see if the work has started?  We need to intercept
+        # the thing that kicks off the work; we can then wait for the work
+        # itself.
+
+        self.assertEquals(performerChosen, [])
+
+        # Advance to exactly the appointed second.
+        clock.advance(20 - 12)
+        self.assertEquals(performerChosen, [True])
+
+        # FIXME: if this fails, it will hang, but that's better than no
+        # notification that it is broken at all.
+
+        result = yield proposal.whenExecuted()
+        self.assertIdentical(result, proposal)
+
+
+    @inlineCallbacks
+    def test_notBeforeBefore(self):
+        """
+        L{PeerConnectionPool.enqueueWork} will execute its work immediately if
+        the C{notBefore} attribute of the work item in question is in the past.
+        """
+        dbpool = buildConnectionPool(self, schemaText + nodeSchema)
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        sinceEpoch = astimestamp(fakeNow)
+        clock = Clock()
+        clock.advance(sinceEpoch)
+        qpool = PeerConnectionPool(clock, dbpool.connection, 0, schema)
+        realChoosePerformer = qpool.choosePerformer
+        performerChosen = []
+        def catchPerformerChoice():
+            result = realChoosePerformer()
+            performerChosen.append(True)
+            return result
+        qpool.choosePerformer = catchPerformerChoice
+        @transactionally(dbpool.connection)
+        def check(txn):
+            return qpool.enqueueWork(
+                txn, DummyWorkItem, a=3, b=9,
+                notBefore=datetime.datetime(2012, 12, 12, 12, 12, 0)
+            ).whenProposed()
+        proposal = yield check
+
+        clock.advance(1000)
+        # Advance far beyond the given timestamp.
+        self.assertEquals(performerChosen, [True])
+
+        result = yield proposal.whenExecuted()
+        self.assertIdentical(result, proposal)
+
+
+
+class HalfConnection(object):
+    def __init__(self, protocol):
+        self.protocol = protocol
+        self.transport = StringTransport()
+
+
+    def start(self):
+        """
+        Hook up the protocol and the transport.
+        """
+        self.protocol.makeConnection(self.transport)
+
+
+    def extract(self):
+        """
+        Extract the data currently present in this protocol's output buffer.
+        """
+        io = self.transport.io
+        value = io.getvalue()
+        io.seek(0)
+        io.truncate()
+        return value
+
+
+    def deliver(self, data):
+        """
+        Deliver the given data to this L{HalfConnection}'s protocol's
+        C{dataReceived} method.
+
+        @return: a boolean indicating whether any data was delivered.
+        @rtype: L{bool}
+        """
+        if data:
+            self.protocol.dataReceived(data)
+            return True
+        return False
+
+
+
+class Connection(object):
+
+    def __init__(self, local, remote):
+        """
+        Connect two protocol instances to each other via string transports.
+        """
+        self.receiver = HalfConnection(local)
+        self.sender = HalfConnection(remote)
+
+
+    def start(self):
+        """
+        Start up the connection.
+        """
+        self.sender.start()
+        self.receiver.start()
+
+
+    def pump(self):
+        """
+        Relay data in one direction between the two connections.
+        """
+        result = self.receiver.deliver(self.sender.extract())
+        self.receiver, self.sender = self.sender, self.receiver
+        return result
+
+    def flush(self, turns=10):
+        """
+        Keep relaying data until there's no more.
+        """
+        for x in range(turns):
+            if not (self.pump() or self.pump()):
+                return
+
+
+
+class PeerConnectionPoolIntegrationTests(TestCase):
+    """
+    L{PeerConnectionPool} is the service responsible for coordinating
+    eventually-consistent task queuing within a cluster.
+    """
+
+    @inlineCallbacks
+    def setUp(self):
+        """
+        L{PeerConnectionPool} requires access to a database and the reactor.
+        """
+        self.store = yield buildStore(self, None)
+        def doit(txn):
+            return txn.execSQL(schemaText)
+        yield inTransaction(lambda: self.store.newTransaction("bonus schema"),
+                            doit)
+        def deschema():
+            @inlineCallbacks
+            def deletestuff(txn):
+                for stmt in dropSQL:
+                    yield txn.execSQL(stmt)
+            return inTransaction(self.store.newTransaction, deletestuff)
+        self.addCleanup(deschema)
+
+        from twisted.internet import reactor
+        self.node1 = PeerConnectionPool(
+            reactor, self.store.newTransaction, 0, schema)
+        self.node2 = PeerConnectionPool(
+            reactor, self.store.newTransaction, 0, schema)
+
+        class FireMeService(Service, object):
+            def __init__(self, d):
+                super(FireMeService, self).__init__()
+                self.d = d
+            def startService(self):
+                self.d.callback(None)
+        d1 = Deferred()
+        d2 = Deferred()
+        FireMeService(d1).setServiceParent(self.node1)
+        FireMeService(d2).setServiceParent(self.node2)
+        ms = MultiService()
+        self.node1.setServiceParent(ms)
+        self.node2.setServiceParent(ms)
+        ms.startService()
+        self.addCleanup(ms.stopService)
+        yield gatherResults([d1, d2])
+        self.store.queuer = self.node1
+
+
+    def test_currentNodeInfo(self):
+        """
+        There will be two C{NODE_INFO} rows in the database, retrievable as two
+        L{NodeInfo} objects, once both nodes have started up.
+        """
+        @inlineCallbacks
+        def check(txn):
+            self.assertEquals(len((yield self.node1.activeNodes(txn))), 2)
+            self.assertEquals(len((yield self.node2.activeNodes(txn))), 2)
+        return inTransaction(self.store.newTransaction, check)
+
+
+    @inlineCallbacks
+    def test_enqueueHappyPath(self):
+        """
+        When a L{WorkItem} is scheduled for execution via
+        L{PeerConnectionPool.enqueueWork} its C{doWork} method will be invoked
+        by the time the L{Deferred} returned from the resulting
+        L{WorkProposal}'s C{whenExecuted} method has fired.
+        """
+        # TODO: this exact test should run against LocalQueuer as well.
+        def operation(txn):
+            # TODO: how does 'enqueue' get associated with the transaction? This
+            # is not the fact with a raw t.w.enterprise transaction.  Should
+            # probably do something with components.
+            return txn.enqueue(DummyWorkItem, a=3, b=4, workID=4321,
+                               notBefore=datetime.datetime.utcnow())
+        result = yield inTransaction(self.store.newTransaction, operation)
+        # Wait for it to be executed.  Hopefully this does not time out :-\.
+        yield result.whenExecuted()
+        def op2(txn):
+            return Select([schema.DUMMY_WORK_DONE.WORK_ID,
+                           schema.DUMMY_WORK_DONE.A_PLUS_B],
+                           From=schema.DUMMY_WORK_DONE).on(txn)
+        rows = yield inTransaction(self.store.newTransaction, op2)
+        self.assertEquals(rows, [[4321, 7]])
+
+

Copied: CalendarServer/trunk/twext/enterprise/test/test_util.py (from rev 10289, CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_util.py)
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_util.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/test/test_util.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -0,0 +1,38 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import datetime
+
+from twisted.trial.unittest import TestCase
+
+from twext.enterprise.util import parseSQLTimestamp
+
+class TimestampTests(TestCase):
+    """
+    Tests for date-related functions.
+    """
+
+    def test_parseSQLTimestamp(self):
+        """
+        L{parseSQLTimestamp} parses the traditional SQL timestamp.
+        """
+        tests = (
+            ("2012-04-04 12:34:56", datetime.datetime(2012, 4, 4, 12, 34, 56)),
+            ("2012-12-31 01:01:01", datetime.datetime(2012, 12, 31, 1, 1, 1)),
+        )
+
+        for sqlStr, result in tests:
+            self.assertEqual(parseSQLTimestamp(sqlStr), result)

Modified: CalendarServer/trunk/twext/enterprise/util.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/util.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twext/enterprise/util.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -1,4 +1,4 @@
-
+# -*- test-case-name: twext.enterprise.test.test_util -*-
 ##
 # Copyright (c) 2010-2012 Apple Inc. All rights reserved.
 #
@@ -20,8 +20,22 @@
 """
 
 from datetime import datetime
-from twistedcaldav.dateops import SQL_TIMESTAMP_FORMAT
 
+SQL_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
+
+
+
+def parseSQLTimestamp(ts):
+    """
+    Parse an SQL timestamp string.
+    """
+    # Handle case where fraction seconds may not be present
+    if len(ts) < len(SQL_TIMESTAMP_FORMAT):
+        ts += ".0"
+    return datetime.strptime(ts, SQL_TIMESTAMP_FORMAT)
+
+
+
 def mapOracleOutputType(column):
     """
     Map a single output value from cx_Oracle based on some rules and

Modified: CalendarServer/trunk/twistedcaldav/dateops.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/dateops.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twistedcaldav/dateops.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -13,11 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 ##
-from pycalendar.datetime import PyCalendarDateTime
-from pycalendar.timezone import PyCalendarTimezone
-from pycalendar.period import PyCalendarPeriod
-import datetime
-import dateutil.tz
 
 """
 Date/time Utilities
@@ -33,6 +28,13 @@
     "clipPeriod"
 ]
 
+from pycalendar.datetime import PyCalendarDateTime
+from pycalendar.timezone import PyCalendarTimezone
+from pycalendar.period import PyCalendarPeriod
+
+import datetime
+import dateutil.tz
+
 import calendar
 
 def normalizeForIndex(dt):
@@ -259,14 +261,6 @@
             tzinfo=dateutil.tz.tzutc()
         )
 
-SQL_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
-
-def parseSQLTimestamp(ts):
-    # Handle case where fraction seconds may not be present
-    if len(ts) < 20:
-        ts += ".0"
-    return datetime.datetime.strptime(ts, SQL_TIMESTAMP_FORMAT)
-
 def parseSQLTimestampToPyCalendar(ts):
     """
     Parse an SQL formated timestamp into a PyCalendarDateTime

Modified: CalendarServer/trunk/twistedcaldav/test/test_dateops.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_dateops.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twistedcaldav/test/test_dateops.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -17,9 +17,11 @@
 import twistedcaldav.test.util
 from twisted.trial.unittest import SkipTest
 from pycalendar.datetime import PyCalendarDateTime
+
 from twistedcaldav.dateops import parseSQLTimestampToPyCalendar,\
-    parseSQLDateToPyCalendar, parseSQLTimestamp, pyCalendarTodatetime,\
+    parseSQLDateToPyCalendar, pyCalendarTodatetime,\
     normalizeForExpand, normalizeForIndex, normalizeToUTC, timeRangesOverlap
+
 import datetime
 import dateutil
 from pycalendar.timezone import PyCalendarTimezone
@@ -241,11 +243,11 @@
     def test_clipPeriod(self):
         raise SkipTest("test unimplemented")
 
+
     def test_pyCalendarTodatetime(self):
         """
         dateops.pyCalendarTodatetime
         """
-        
         tests = (
             (PyCalendarDateTime(2012, 4, 4, 12, 34, 56), datetime.datetime(2012, 4, 4, 12, 34, 56, tzinfo=dateutil.tz.tzutc())),
             (PyCalendarDateTime(2012, 12, 31), datetime.date(2012, 12, 31)),
@@ -254,24 +256,11 @@
         for pycal, result in tests:
             self.assertEqual(pyCalendarTodatetime(pycal), result)
 
-    def test_parseSQLTimestamp(self):
-        """
-        dateops.parseSQLTimestamp
-        """
-        
-        tests = (
-            ("2012-04-04 12:34:56", datetime.datetime(2012, 4, 4, 12, 34, 56)),
-            ("2012-12-31 01:01:01", datetime.datetime(2012, 12, 31, 1, 1, 1)),
-        )
 
-        for sqlStr, result in tests:
-            self.assertEqual(parseSQLTimestamp(sqlStr), result)
-
     def test_parseSQLTimestampToPyCalendar(self):
         """
         dateops.parseSQLTimestampToPyCalendar
         """
-        
         tests = (
             ("2012-04-04 12:34:56", PyCalendarDateTime(2012, 4, 4, 12, 34, 56)),
             ("2012-12-31 01:01:01", PyCalendarDateTime(2012, 12, 31, 1, 1, 1)),

Modified: CalendarServer/trunk/twistedcaldav/test/test_wrapping.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_wrapping.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/twistedcaldav/test/test_wrapping.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -54,6 +54,7 @@
 from txdav.caldav.icalendarstore import ICalendarHome
 from txdav.carddav.iaddressbookstore import IAddressBookHome
 
+from twisted.internet.defer import maybeDeferred
 from txdav.caldav.datastore.file import Calendar
 
 
@@ -248,8 +249,10 @@
                                       % (pathType, pathType))
             yield req.process()
             self.assertEquals(req.chanRequest.code, 404)
-            self.assertRaises(AlreadyFinishedError,
-                              req._newStoreTransaction.commit)
+            yield self.failUnlessFailure(
+                maybeDeferred(req._newStoreTransaction.commit),
+                AlreadyFinishedError
+            )
 
 
     @inlineCallbacks

Modified: CalendarServer/trunk/txdav/base/datastore/file.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/file.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/txdav/base/datastore/file.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -136,6 +136,7 @@
     def store(self):
         return self._dataStore
 
+
     def addOperation(self, operation, name):
         self._operations.append(operation)
         self._tracker.info.append(name)
@@ -148,7 +149,7 @@
         terminated.
 
         @param mode: The manner of the termination of this transaction.
-        
+
         @type mode: C{str}
 
         @raise AlreadyFinishedError: This transaction has already been

Modified: CalendarServer/trunk/txdav/caldav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/sql.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/txdav/caldav/datastore/sql.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -32,6 +32,9 @@
 from twext.enterprise.dal.syntax import Select, Count, ColumnSyntax
 from twext.enterprise.dal.syntax import Update
 from twext.enterprise.dal.syntax import utcNowSQL
+
+from twext.enterprise.util import parseSQLTimestamp
+
 from twext.python.clsprop import classproperty
 from twext.python.filepath import CachingFilePath
 from twext.python.vcomponent import VComponent
@@ -45,7 +48,7 @@
 from twistedcaldav.caldavxml import ScheduleCalendarTransp, Opaque
 from twistedcaldav.config import config
 from twistedcaldav.dateops import normalizeForIndex, datetimeMktime, \
-    parseSQLTimestamp, pyCalendarTodatetime, parseSQLDateToPyCalendar
+    pyCalendarTodatetime, parseSQLDateToPyCalendar
 from twistedcaldav.ical import Component, InvalidICalendarDataError, Property
 from twistedcaldav.instance import InvalidOverriddenInstanceError
 from twistedcaldav.memcacher import Memcacher

Modified: CalendarServer/trunk/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/txdav/common/datastore/sql.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -84,11 +84,11 @@
 
 from txdav.common.icommondatastore import ConcurrentModification
 from twistedcaldav.customxml import NotificationType
-from twistedcaldav.dateops import datetimeMktime, parseSQLTimestamp, \
-    pyCalendarTodatetime
+from twistedcaldav.dateops import datetimeMktime, pyCalendarTodatetime
 
 from txdav.base.datastore.util import normalizeUUIDOrNot
-from twext.enterprise.queue import NullQueuer
+from twext.enterprise.queue import LocalQueuer
+from twext.enterprise.util import parseSQLTimestamp
 
 from pycalendar.datetime import PyCalendarDateTime
 
@@ -139,7 +139,7 @@
     @type quota: C{int} or C{NoneType}
 
     @ivar queuer: An object with an C{enqueueWork} method, from
-        L{twext.enterprise.queue}.  Initially, this is a L{NullQueuer}, so it
+        L{twext.enterprise.queue}.  Initially, this is a L{LocalQueuer}, so it
         is always usable, but in a properly configured environment it will be
         upgraded to a more capable object that can distribute work throughout a
         cluster.
@@ -171,7 +171,7 @@
         self.logSQL = logSQL
         self.logTransactionWaits = logTransactionWaits
         self.timeoutTransactions = timeoutTransactions
-        self.queuer = NullQueuer()
+        self.queuer = LocalQueuer(self.newTransaction)
         self._migrating = False
         self._enableNotifications = True
 
@@ -692,6 +692,14 @@
         return self._apnSubscriptionsBySubscriberQuery.on(self, subscriberGUID=guid)
 
 
+    def preCommit(self, operation):
+        """
+        Run things before C{commit}.  (Note: only provided by SQL
+        implementation, used only for cleaning up database state.)
+        """
+        return self._sqlTxn.preCommit(operation)
+
+
     def postCommit(self, operation):
         """
         Run things after C{commit}.

Modified: CalendarServer/trunk/txdav/common/datastore/sql_schema/current-oracle-dialect.sql
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/current-oracle-dialect.sql	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/current-oracle-dialect.sql	2013-01-05 00:51:01 UTC (rev 10290)
@@ -10,6 +10,10 @@
     primary key("HOSTNAME", "PORT")
 );
 
+create table NAMED_LOCK (
+    "LOCK_NAME" nvarchar2(255) primary key
+);
+
 create table CALENDAR_HOME (
     "RESOURCE_ID" integer primary key,
     "OWNER_UID" nvarchar2(255) unique,
@@ -265,7 +269,7 @@
     "VALUE" nvarchar2(255)
 );
 
-insert into CALENDARSERVER (NAME, VALUE) values ('VERSION', '14');
+insert into CALENDARSERVER (NAME, VALUE) values ('VERSION', '15');
 insert into CALENDARSERVER (NAME, VALUE) values ('CALENDAR-DATAVERSION', '3');
 insert into CALENDARSERVER (NAME, VALUE) values ('ADDRESSBOOK-DATAVERSION', '1');
 

Modified: CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql	2013-01-05 00:51:01 UTC (rev 10290)
@@ -38,7 +38,13 @@
   primary key (HOSTNAME, PORT)
 );
 
+-- Unique named locks.  This table should always be empty, but rows are
+-- temporarily created in order to prevent undesirable concurrency.
+create table NAMED_LOCK (
+    LOCK_NAME varchar(255) primary key
+);
 
+
 -------------------
 -- Calendar Home --
 -------------------
@@ -496,6 +502,6 @@
   VALUE                         varchar(255)
 );
 
-insert into CALENDARSERVER values ('VERSION', '14');
+insert into CALENDARSERVER values ('VERSION', '15');
 insert into CALENDARSERVER values ('CALENDAR-DATAVERSION', '3');
 insert into CALENDARSERVER values ('ADDRESSBOOK-DATAVERSION', '1');

Copied: CalendarServer/trunk/txdav/common/datastore/sql_schema/old/oracle-dialect/v14.sql (from rev 10289, CalendarServer/branches/users/glyph/queue-locking-and-timing/txdav/common/datastore/sql_schema/old/oracle-dialect/v14.sql)
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/old/oracle-dialect/v14.sql	                        (rev 0)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/old/oracle-dialect/v14.sql	2013-01-05 00:51:01 UTC (rev 10290)
@@ -0,0 +1,356 @@
+create sequence RESOURCE_ID_SEQ;
+create sequence INSTANCE_ID_SEQ;
+create sequence ATTACHMENT_ID_SEQ;
+create sequence REVISION_SEQ;
+create table NODE_INFO (
+    "HOSTNAME" nvarchar2(255),
+    "PID" integer not null,
+    "PORT" integer not null,
+    "TIME" timestamp default CURRENT_TIMESTAMP at time zone 'UTC' not null, 
+    primary key("HOSTNAME", "PORT")
+);
+
+create table CALENDAR_HOME (
+    "RESOURCE_ID" integer primary key,
+    "OWNER_UID" nvarchar2(255) unique,
+    "DATAVERSION" integer default 0 not null
+);
+
+create table CALENDAR_HOME_METADATA (
+    "RESOURCE_ID" integer primary key references CALENDAR_HOME on delete cascade,
+    "QUOTA_USED_BYTES" integer default 0 not null,
+    "CREATED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "MODIFIED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC'
+);
+
+create table CALENDAR (
+    "RESOURCE_ID" integer primary key
+);
+
+create table CALENDAR_METADATA (
+    "RESOURCE_ID" integer primary key references CALENDAR on delete cascade,
+    "SUPPORTED_COMPONENTS" nvarchar2(255) default null,
+    "CREATED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "MODIFIED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC'
+);
+
+create table NOTIFICATION_HOME (
+    "RESOURCE_ID" integer primary key,
+    "OWNER_UID" nvarchar2(255) unique
+);
+
+create table NOTIFICATION (
+    "RESOURCE_ID" integer primary key,
+    "NOTIFICATION_HOME_RESOURCE_ID" integer not null references NOTIFICATION_HOME,
+    "NOTIFICATION_UID" nvarchar2(255),
+    "XML_TYPE" nvarchar2(255),
+    "XML_DATA" nclob,
+    "MD5" nchar(32),
+    "CREATED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "MODIFIED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', 
+    unique("NOTIFICATION_UID", "NOTIFICATION_HOME_RESOURCE_ID")
+);
+
+create table CALENDAR_BIND (
+    "CALENDAR_HOME_RESOURCE_ID" integer not null references CALENDAR_HOME,
+    "CALENDAR_RESOURCE_ID" integer not null references CALENDAR on delete cascade,
+    "CALENDAR_RESOURCE_NAME" nvarchar2(255) not null,
+    "BIND_MODE" integer not null,
+    "BIND_STATUS" integer not null,
+    "MESSAGE" nclob, 
+    primary key("CALENDAR_HOME_RESOURCE_ID", "CALENDAR_RESOURCE_ID"), 
+    unique("CALENDAR_HOME_RESOURCE_ID", "CALENDAR_RESOURCE_NAME")
+);
+
+create table CALENDAR_BIND_MODE (
+    "ID" integer primary key,
+    "DESCRIPTION" nvarchar2(16) unique
+);
+
+insert into CALENDAR_BIND_MODE (DESCRIPTION, ID) values ('own', 0);
+insert into CALENDAR_BIND_MODE (DESCRIPTION, ID) values ('read', 1);
+insert into CALENDAR_BIND_MODE (DESCRIPTION, ID) values ('write', 2);
+insert into CALENDAR_BIND_MODE (DESCRIPTION, ID) values ('direct', 3);
+create table CALENDAR_BIND_STATUS (
+    "ID" integer primary key,
+    "DESCRIPTION" nvarchar2(16) unique
+);
+
+insert into CALENDAR_BIND_STATUS (DESCRIPTION, ID) values ('invited', 0);
+insert into CALENDAR_BIND_STATUS (DESCRIPTION, ID) values ('accepted', 1);
+insert into CALENDAR_BIND_STATUS (DESCRIPTION, ID) values ('declined', 2);
+insert into CALENDAR_BIND_STATUS (DESCRIPTION, ID) values ('invalid', 3);
+create table CALENDAR_OBJECT (
+    "RESOURCE_ID" integer primary key,
+    "CALENDAR_RESOURCE_ID" integer not null references CALENDAR on delete cascade,
+    "RESOURCE_NAME" nvarchar2(255),
+    "ICALENDAR_TEXT" nclob,
+    "ICALENDAR_UID" nvarchar2(255),
+    "ICALENDAR_TYPE" nvarchar2(255),
+    "ATTACHMENTS_MODE" integer default 0 not null,
+    "DROPBOX_ID" nvarchar2(255),
+    "ORGANIZER" nvarchar2(255),
+    "ORGANIZER_OBJECT" integer references CALENDAR_OBJECT,
+    "RECURRANCE_MIN" date,
+    "RECURRANCE_MAX" date,
+    "ACCESS" integer default 0 not null,
+    "SCHEDULE_OBJECT" integer default 0,
+    "SCHEDULE_TAG" nvarchar2(36) default null,
+    "SCHEDULE_ETAGS" nclob default null,
+    "PRIVATE_COMMENTS" integer default 0 not null,
+    "MD5" nchar(32),
+    "CREATED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "MODIFIED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', 
+    unique("CALENDAR_RESOURCE_ID", "RESOURCE_NAME")
+);
+
+create table CALENDAR_OBJECT_ATTACHMENTS_MO (
+    "ID" integer primary key,
+    "DESCRIPTION" nvarchar2(16) unique
+);
+
+insert into CALENDAR_OBJECT_ATTACHMENTS_MO (DESCRIPTION, ID) values ('none', 0);
+insert into CALENDAR_OBJECT_ATTACHMENTS_MO (DESCRIPTION, ID) values ('read', 1);
+insert into CALENDAR_OBJECT_ATTACHMENTS_MO (DESCRIPTION, ID) values ('write', 2);
+create table CALENDAR_ACCESS_TYPE (
+    "ID" integer primary key,
+    "DESCRIPTION" nvarchar2(32) unique
+);
+
+insert into CALENDAR_ACCESS_TYPE (DESCRIPTION, ID) values ('', 0);
+insert into CALENDAR_ACCESS_TYPE (DESCRIPTION, ID) values ('public', 1);
+insert into CALENDAR_ACCESS_TYPE (DESCRIPTION, ID) values ('private', 2);
+insert into CALENDAR_ACCESS_TYPE (DESCRIPTION, ID) values ('confidential', 3);
+insert into CALENDAR_ACCESS_TYPE (DESCRIPTION, ID) values ('restricted', 4);
+create table TIME_RANGE (
+    "INSTANCE_ID" integer primary key,
+    "CALENDAR_RESOURCE_ID" integer not null references CALENDAR on delete cascade,
+    "CALENDAR_OBJECT_RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade,
+    "FLOATING" integer not null,
+    "START_DATE" timestamp not null,
+    "END_DATE" timestamp not null,
+    "FBTYPE" integer not null,
+    "TRANSPARENT" integer not null
+);
+
+create table FREE_BUSY_TYPE (
+    "ID" integer primary key,
+    "DESCRIPTION" nvarchar2(16) unique
+);
+
+insert into FREE_BUSY_TYPE (DESCRIPTION, ID) values ('unknown', 0);
+insert into FREE_BUSY_TYPE (DESCRIPTION, ID) values ('free', 1);
+insert into FREE_BUSY_TYPE (DESCRIPTION, ID) values ('busy', 2);
+insert into FREE_BUSY_TYPE (DESCRIPTION, ID) values ('busy-unavailable', 3);
+insert into FREE_BUSY_TYPE (DESCRIPTION, ID) values ('busy-tentative', 4);
+create table TRANSPARENCY (
+    "TIME_RANGE_INSTANCE_ID" integer not null references TIME_RANGE on delete cascade,
+    "USER_ID" nvarchar2(255),
+    "TRANSPARENT" integer not null
+);
+
+create table ATTACHMENT (
+    "ATTACHMENT_ID" integer primary key,
+    "CALENDAR_HOME_RESOURCE_ID" integer not null references CALENDAR_HOME,
+    "DROPBOX_ID" nvarchar2(255),
+    "CONTENT_TYPE" nvarchar2(255),
+    "SIZE" integer not null,
+    "MD5" nchar(32),
+    "CREATED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "MODIFIED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "PATH" nvarchar2(1024)
+);
+
+create table ATTACHMENT_CALENDAR_OBJECT (
+    "ATTACHMENT_ID" integer not null references ATTACHMENT on delete cascade,
+    "MANAGED_ID" nvarchar2(255),
+    "CALENDAR_OBJECT_RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade, 
+    primary key("ATTACHMENT_ID", "CALENDAR_OBJECT_RESOURCE_ID"), 
+    unique("MANAGED_ID", "CALENDAR_OBJECT_RESOURCE_ID")
+);
+
+create table RESOURCE_PROPERTY (
+    "RESOURCE_ID" integer not null,
+    "NAME" nvarchar2(255),
+    "VALUE" nclob,
+    "VIEWER_UID" nvarchar2(255), 
+    primary key("RESOURCE_ID", "NAME", "VIEWER_UID")
+);
+
+create table ADDRESSBOOK_HOME (
+    "RESOURCE_ID" integer primary key,
+    "OWNER_UID" nvarchar2(255) unique,
+    "DATAVERSION" integer default 0 not null
+);
+
+create table ADDRESSBOOK_HOME_METADATA (
+    "RESOURCE_ID" integer primary key references ADDRESSBOOK_HOME on delete cascade,
+    "QUOTA_USED_BYTES" integer default 0 not null,
+    "CREATED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "MODIFIED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC'
+);
+
+create table ADDRESSBOOK (
+    "RESOURCE_ID" integer primary key
+);
+
+create table ADDRESSBOOK_METADATA (
+    "RESOURCE_ID" integer primary key references ADDRESSBOOK on delete cascade,
+    "CREATED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "MODIFIED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC'
+);
+
+create table ADDRESSBOOK_BIND (
+    "ADDRESSBOOK_HOME_RESOURCE_ID" integer not null references ADDRESSBOOK_HOME,
+    "ADDRESSBOOK_RESOURCE_ID" integer not null references ADDRESSBOOK on delete cascade,
+    "ADDRESSBOOK_RESOURCE_NAME" nvarchar2(255) not null,
+    "BIND_MODE" integer not null,
+    "BIND_STATUS" integer not null,
+    "MESSAGE" nclob, 
+    primary key("ADDRESSBOOK_HOME_RESOURCE_ID", "ADDRESSBOOK_RESOURCE_ID"), 
+    unique("ADDRESSBOOK_HOME_RESOURCE_ID", "ADDRESSBOOK_RESOURCE_NAME")
+);
+
+create table ADDRESSBOOK_OBJECT (
+    "RESOURCE_ID" integer primary key,
+    "ADDRESSBOOK_RESOURCE_ID" integer not null references ADDRESSBOOK on delete cascade,
+    "RESOURCE_NAME" nvarchar2(255),
+    "VCARD_TEXT" nclob,
+    "VCARD_UID" nvarchar2(255),
+    "MD5" nchar(32),
+    "CREATED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "MODIFIED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', 
+    unique("ADDRESSBOOK_RESOURCE_ID", "RESOURCE_NAME"), 
+    unique("ADDRESSBOOK_RESOURCE_ID", "VCARD_UID")
+);
+
+create table CALENDAR_OBJECT_REVISIONS (
+    "CALENDAR_HOME_RESOURCE_ID" integer not null references CALENDAR_HOME,
+    "CALENDAR_RESOURCE_ID" integer references CALENDAR,
+    "CALENDAR_NAME" nvarchar2(255) default null,
+    "RESOURCE_NAME" nvarchar2(255),
+    "REVISION" integer not null,
+    "DELETED" integer not null
+);
+
+create table ADDRESSBOOK_OBJECT_REVISIONS (
+    "ADDRESSBOOK_HOME_RESOURCE_ID" integer not null references ADDRESSBOOK_HOME,
+    "ADDRESSBOOK_RESOURCE_ID" integer references ADDRESSBOOK,
+    "ADDRESSBOOK_NAME" nvarchar2(255) default null,
+    "RESOURCE_NAME" nvarchar2(255),
+    "REVISION" integer not null,
+    "DELETED" integer not null
+);
+
+create table NOTIFICATION_OBJECT_REVISIONS (
+    "NOTIFICATION_HOME_RESOURCE_ID" integer not null references NOTIFICATION_HOME on delete cascade,
+    "RESOURCE_NAME" nvarchar2(255),
+    "REVISION" integer not null,
+    "DELETED" integer not null, 
+    unique("NOTIFICATION_HOME_RESOURCE_ID", "RESOURCE_NAME")
+);
+
+create table APN_SUBSCRIPTIONS (
+    "TOKEN" nvarchar2(255),
+    "RESOURCE_KEY" nvarchar2(255),
+    "MODIFIED" integer not null,
+    "SUBSCRIBER_GUID" nvarchar2(255),
+    "USER_AGENT" nvarchar2(255) default null,
+    "IP_ADDR" nvarchar2(255) default null, 
+    primary key("TOKEN", "RESOURCE_KEY")
+);
+
+create table CALENDARSERVER (
+    "NAME" nvarchar2(255) primary key,
+    "VALUE" nvarchar2(255)
+);
+
+insert into CALENDARSERVER (NAME, VALUE) values ('VERSION', '14');
+insert into CALENDARSERVER (NAME, VALUE) values ('CALENDAR-DATAVERSION', '3');
+insert into CALENDARSERVER (NAME, VALUE) values ('ADDRESSBOOK-DATAVERSION', '1');
+
+create index NOTIFICATION_NOTIFICA_f891f5f9 on NOTIFICATION (
+    NOTIFICATION_HOME_RESOURCE_ID
+);
+
+create index CALENDAR_BIND_RESOURC_e57964d4 on CALENDAR_BIND (
+    CALENDAR_RESOURCE_ID
+);
+
+create index CALENDAR_OBJECT_CALEN_a9a453a9 on CALENDAR_OBJECT (
+    CALENDAR_RESOURCE_ID,
+    ICALENDAR_UID
+);
+
+create index CALENDAR_OBJECT_CALEN_96e83b73 on CALENDAR_OBJECT (
+    CALENDAR_RESOURCE_ID,
+    RECURRANCE_MAX
+);
+
+create index CALENDAR_OBJECT_ORGAN_7ce24750 on CALENDAR_OBJECT (
+    ORGANIZER_OBJECT
+);
+
+create index CALENDAR_OBJECT_DROPB_de041d80 on CALENDAR_OBJECT (
+    DROPBOX_ID
+);
+
+create index TIME_RANGE_CALENDAR_R_beb6e7eb on TIME_RANGE (
+    CALENDAR_RESOURCE_ID
+);
+
+create index TIME_RANGE_CALENDAR_O_acf37bd1 on TIME_RANGE (
+    CALENDAR_OBJECT_RESOURCE_ID
+);
+
+create index TRANSPARENCY_TIME_RAN_5f34467f on TRANSPARENCY (
+    TIME_RANGE_INSTANCE_ID
+);
+
+create index ATTACHMENT_CALENDAR_H_0078845c on ATTACHMENT (
+    CALENDAR_HOME_RESOURCE_ID
+);
+
+create index ADDRESSBOOK_BIND_RESO_205aa75c on ADDRESSBOOK_BIND (
+    ADDRESSBOOK_RESOURCE_ID
+);
+
+create index CALENDAR_OBJECT_REVIS_3a3956c4 on CALENDAR_OBJECT_REVISIONS (
+    CALENDAR_HOME_RESOURCE_ID,
+    CALENDAR_RESOURCE_ID
+);
+
+create index CALENDAR_OBJECT_REVIS_2643d556 on CALENDAR_OBJECT_REVISIONS (
+    CALENDAR_RESOURCE_ID,
+    RESOURCE_NAME
+);
+
+create index CALENDAR_OBJECT_REVIS_265c8acf on CALENDAR_OBJECT_REVISIONS (
+    CALENDAR_RESOURCE_ID,
+    REVISION
+);
+
+create index ADDRESSBOOK_OBJECT_RE_f460d62d on ADDRESSBOOK_OBJECT_REVISIONS (
+    ADDRESSBOOK_HOME_RESOURCE_ID,
+    ADDRESSBOOK_RESOURCE_ID
+);
+
+create index ADDRESSBOOK_OBJECT_RE_9a848f39 on ADDRESSBOOK_OBJECT_REVISIONS (
+    ADDRESSBOOK_RESOURCE_ID,
+    RESOURCE_NAME
+);
+
+create index ADDRESSBOOK_OBJECT_RE_cb101e6b on ADDRESSBOOK_OBJECT_REVISIONS (
+    ADDRESSBOOK_RESOURCE_ID,
+    REVISION
+);
+
+create index NOTIFICATION_OBJECT_R_036a9cee on NOTIFICATION_OBJECT_REVISIONS (
+    NOTIFICATION_HOME_RESOURCE_ID,
+    REVISION
+);
+
+create index APN_SUBSCRIPTIONS_RES_9610d78e on APN_SUBSCRIPTIONS (
+    RESOURCE_KEY
+);
+

Copied: CalendarServer/trunk/txdav/common/datastore/sql_schema/old/postgres-dialect/v14.sql (from rev 10289, CalendarServer/branches/users/glyph/queue-locking-and-timing/txdav/common/datastore/sql_schema/old/postgres-dialect/v14.sql)
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/old/postgres-dialect/v14.sql	                        (rev 0)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/old/postgres-dialect/v14.sql	2013-01-05 00:51:01 UTC (rev 10290)
@@ -0,0 +1,501 @@
+-- -*- test-case-name: txdav.caldav.datastore.test.test_sql,txdav.carddav.datastore.test.test_sql -*-
+
+----
+-- Copyright (c) 2010-2012 Apple Inc. All rights reserved.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+----
+
+-----------------
+-- Resource ID --
+-----------------
+
+create sequence RESOURCE_ID_SEQ;
+
+-------------------------
+-- Cluster Bookkeeping --
+-------------------------
+
+-- Information about a process connected to this database.
+
+-- Note that this must match the node info schema in twext.enterprise.queue.
+create table NODE_INFO (
+  HOSTNAME  varchar(255) not null,
+  PID       integer not null,
+  PORT      integer not null,
+  TIME      timestamp not null default timezone('UTC', CURRENT_TIMESTAMP),
+
+  primary key (HOSTNAME, PORT)
+);
+
+
+-------------------
+-- Calendar Home --
+-------------------
+
+create table CALENDAR_HOME (
+  RESOURCE_ID      integer      primary key default nextval('RESOURCE_ID_SEQ'), -- implicit index
+  OWNER_UID        varchar(255) not null unique,                                 -- implicit index
+  DATAVERSION      integer      default 0 not null
+);
+
+----------------------------
+-- Calendar Home Metadata --
+----------------------------
+
+create table CALENDAR_HOME_METADATA (
+  RESOURCE_ID      integer      primary key references CALENDAR_HOME on delete cascade, -- implicit index
+  QUOTA_USED_BYTES integer      default 0 not null,
+  CREATED          timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  MODIFIED         timestamp    default timezone('UTC', CURRENT_TIMESTAMP)
+);
+
+--------------
+-- Calendar --
+--------------
+
+create table CALENDAR (
+  RESOURCE_ID integer   primary key default nextval('RESOURCE_ID_SEQ') -- implicit index
+);
+
+
+-----------------------
+-- Calendar Metadata --
+-----------------------
+
+create table CALENDAR_METADATA (
+  RESOURCE_ID           integer   primary key references CALENDAR on delete cascade, -- implicit index
+  SUPPORTED_COMPONENTS  varchar(255) default null,
+  CREATED               timestamp default timezone('UTC', CURRENT_TIMESTAMP),
+  MODIFIED              timestamp default timezone('UTC', CURRENT_TIMESTAMP)
+);
+
+
+---------------------------
+-- Sharing Notifications --
+---------------------------
+
+create table NOTIFICATION_HOME (
+  RESOURCE_ID integer      primary key default nextval('RESOURCE_ID_SEQ'), -- implicit index
+  OWNER_UID   varchar(255) not null unique                                 -- implicit index
+);
+
+create table NOTIFICATION (
+  RESOURCE_ID                   integer      primary key default nextval('RESOURCE_ID_SEQ'), -- implicit index
+  NOTIFICATION_HOME_RESOURCE_ID integer      not null references NOTIFICATION_HOME,
+  NOTIFICATION_UID              varchar(255) not null,
+  XML_TYPE                      varchar(255) not null,
+  XML_DATA                      text         not null,
+  MD5                           char(32)     not null,
+  CREATED                       timestamp default timezone('UTC', CURRENT_TIMESTAMP),
+  MODIFIED                      timestamp default timezone('UTC', CURRENT_TIMESTAMP),
+
+  unique(NOTIFICATION_UID, NOTIFICATION_HOME_RESOURCE_ID) -- implicit index
+);
+
+create index NOTIFICATION_NOTIFICATION_HOME_RESOURCE_ID on
+  NOTIFICATION(NOTIFICATION_HOME_RESOURCE_ID);
+
+-------------------
+-- Calendar Bind --
+-------------------
+
+-- Joins CALENDAR_HOME and CALENDAR
+
+create table CALENDAR_BIND (
+  CALENDAR_HOME_RESOURCE_ID integer      not null references CALENDAR_HOME,
+  CALENDAR_RESOURCE_ID      integer      not null references CALENDAR on delete cascade,
+  CALENDAR_RESOURCE_NAME    varchar(255) not null,
+  BIND_MODE                 integer      not null, -- enum CALENDAR_BIND_MODE
+  BIND_STATUS               integer      not null, -- enum CALENDAR_BIND_STATUS
+  MESSAGE                   text,
+
+  primary key(CALENDAR_HOME_RESOURCE_ID, CALENDAR_RESOURCE_ID), -- implicit index
+  unique(CALENDAR_HOME_RESOURCE_ID, CALENDAR_RESOURCE_NAME)     -- implicit index
+);
+
+create index CALENDAR_BIND_RESOURCE_ID on CALENDAR_BIND(CALENDAR_RESOURCE_ID);
+
+-- Enumeration of calendar bind modes
+
+create table CALENDAR_BIND_MODE (
+  ID          integer     primary key,
+  DESCRIPTION varchar(16) not null unique
+);
+
+insert into CALENDAR_BIND_MODE values (0, 'own'  );
+insert into CALENDAR_BIND_MODE values (1, 'read' );
+insert into CALENDAR_BIND_MODE values (2, 'write');
+insert into CALENDAR_BIND_MODE values (3, 'direct');
+
+-- Enumeration of statuses
+
+create table CALENDAR_BIND_STATUS (
+  ID          integer     primary key,
+  DESCRIPTION varchar(16) not null unique
+);
+
+insert into CALENDAR_BIND_STATUS values (0, 'invited' );
+insert into CALENDAR_BIND_STATUS values (1, 'accepted');
+insert into CALENDAR_BIND_STATUS values (2, 'declined');
+insert into CALENDAR_BIND_STATUS values (3, 'invalid');
+
+
+---------------------
+-- Calendar Object --
+---------------------
+
+create table CALENDAR_OBJECT (
+  RESOURCE_ID          integer      primary key default nextval('RESOURCE_ID_SEQ'), -- implicit index
+  CALENDAR_RESOURCE_ID integer      not null references CALENDAR on delete cascade,
+  RESOURCE_NAME        varchar(255) not null,
+  ICALENDAR_TEXT       text         not null,
+  ICALENDAR_UID        varchar(255) not null,
+  ICALENDAR_TYPE       varchar(255) not null,
+  ATTACHMENTS_MODE     integer      default 0 not null, -- enum CALENDAR_OBJECT_ATTACHMENTS_MODE
+  DROPBOX_ID           varchar(255),
+  ORGANIZER            varchar(255),
+  ORGANIZER_OBJECT     integer      references CALENDAR_OBJECT,
+  RECURRANCE_MIN       date,        -- minimum date that recurrences have been expanded to.
+  RECURRANCE_MAX       date,        -- maximum date that recurrences have been expanded to.
+  ACCESS               integer      default 0 not null,
+  SCHEDULE_OBJECT      boolean      default false,
+  SCHEDULE_TAG         varchar(36)  default null,
+  SCHEDULE_ETAGS       text         default null,
+  PRIVATE_COMMENTS     boolean      default false not null,
+  MD5                  char(32)     not null,
+  CREATED              timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  MODIFIED             timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+
+  unique (CALENDAR_RESOURCE_ID, RESOURCE_NAME) -- implicit index
+
+  -- since the 'inbox' is a 'calendar resource' for the purpose of storing
+  -- calendar objects, this constraint has to be selectively enforced by the
+  -- application layer.
+
+  -- unique(CALENDAR_RESOURCE_ID, ICALENDAR_UID)
+);
+
+create index CALENDAR_OBJECT_CALENDAR_RESOURCE_ID_AND_ICALENDAR_UID on
+  CALENDAR_OBJECT(CALENDAR_RESOURCE_ID, ICALENDAR_UID);
+
+create index CALENDAR_OBJECT_CALENDAR_RESOURCE_ID_RECURRANCE_MAX on
+  CALENDAR_OBJECT(CALENDAR_RESOURCE_ID, RECURRANCE_MAX);
+
+create index CALENDAR_OBJECT_ORGANIZER_OBJECT on
+  CALENDAR_OBJECT(ORGANIZER_OBJECT);
+
+create index CALENDAR_OBJECT_DROPBOX_ID on
+  CALENDAR_OBJECT(DROPBOX_ID);
+
+-- Enumeration of attachment modes
+
+create table CALENDAR_OBJECT_ATTACHMENTS_MODE (
+  ID          integer     primary key,
+  DESCRIPTION varchar(16) not null unique
+);
+
+insert into CALENDAR_OBJECT_ATTACHMENTS_MODE values (0, 'none' );
+insert into CALENDAR_OBJECT_ATTACHMENTS_MODE values (1, 'read' );
+insert into CALENDAR_OBJECT_ATTACHMENTS_MODE values (2, 'write');
+
+
+-- Enumeration of calendar access types
+
+create table CALENDAR_ACCESS_TYPE (
+  ID          integer     primary key,
+  DESCRIPTION varchar(32) not null unique
+);
+
+insert into CALENDAR_ACCESS_TYPE values (0, ''             );
+insert into CALENDAR_ACCESS_TYPE values (1, 'public'       );
+insert into CALENDAR_ACCESS_TYPE values (2, 'private'      );
+insert into CALENDAR_ACCESS_TYPE values (3, 'confidential' );
+insert into CALENDAR_ACCESS_TYPE values (4, 'restricted'   );
+
+-----------------
+-- Instance ID --
+-----------------
+
+create sequence INSTANCE_ID_SEQ;
+
+
+----------------
+-- Time Range --
+----------------
+
+create table TIME_RANGE (
+  INSTANCE_ID                 integer        primary key default nextval('INSTANCE_ID_SEQ'), -- implicit index
+  CALENDAR_RESOURCE_ID        integer        not null references CALENDAR on delete cascade,
+  CALENDAR_OBJECT_RESOURCE_ID integer        not null references CALENDAR_OBJECT on delete cascade,
+  FLOATING                    boolean        not null,
+  START_DATE                  timestamp      not null,
+  END_DATE                    timestamp      not null,
+  FBTYPE                      integer        not null,
+  TRANSPARENT                 boolean        not null
+);
+
+create index TIME_RANGE_CALENDAR_RESOURCE_ID on
+  TIME_RANGE(CALENDAR_RESOURCE_ID);
+create index TIME_RANGE_CALENDAR_OBJECT_RESOURCE_ID on
+  TIME_RANGE(CALENDAR_OBJECT_RESOURCE_ID);
+
+
+-- Enumeration of free/busy types
+
+create table FREE_BUSY_TYPE (
+  ID          integer     primary key,
+  DESCRIPTION varchar(16) not null unique
+);
+
+insert into FREE_BUSY_TYPE values (0, 'unknown'         );
+insert into FREE_BUSY_TYPE values (1, 'free'            );
+insert into FREE_BUSY_TYPE values (2, 'busy'            );
+insert into FREE_BUSY_TYPE values (3, 'busy-unavailable');
+insert into FREE_BUSY_TYPE values (4, 'busy-tentative'  );
+
+
+------------------
+-- Transparency --
+------------------
+
+create table TRANSPARENCY (
+  TIME_RANGE_INSTANCE_ID      integer      not null references TIME_RANGE on delete cascade,
+  USER_ID                     varchar(255) not null,
+  TRANSPARENT                 boolean      not null
+);
+
+create index TRANSPARENCY_TIME_RANGE_INSTANCE_ID on
+  TRANSPARENCY(TIME_RANGE_INSTANCE_ID);
+
+
+----------------
+-- Attachment --
+----------------
+
+create sequence ATTACHMENT_ID_SEQ;
+
+create table ATTACHMENT (
+  ATTACHMENT_ID               integer           primary key default nextval('ATTACHMENT_ID_SEQ'), -- implicit index
+  CALENDAR_HOME_RESOURCE_ID   integer           not null references CALENDAR_HOME,
+  DROPBOX_ID                  varchar(255),
+  CONTENT_TYPE                varchar(255)      not null,
+  SIZE                        integer           not null,
+  MD5                         char(32)          not null,
+  CREATED                     timestamp default timezone('UTC', CURRENT_TIMESTAMP),
+  MODIFIED                    timestamp default timezone('UTC', CURRENT_TIMESTAMP),
+  PATH                        varchar(1024)     not null
+);
+
+create index ATTACHMENT_CALENDAR_HOME_RESOURCE_ID on
+  ATTACHMENT(CALENDAR_HOME_RESOURCE_ID);
+
+-- Many-to-many relationship between attachments and calendar objects
+create table ATTACHMENT_CALENDAR_OBJECT (
+  ATTACHMENT_ID                  integer      not null references ATTACHMENT on delete cascade,
+  MANAGED_ID                     varchar(255) not null,
+  CALENDAR_OBJECT_RESOURCE_ID    integer      not null references CALENDAR_OBJECT on delete cascade,
+
+  primary key (ATTACHMENT_ID, CALENDAR_OBJECT_RESOURCE_ID), -- implicit index
+  unique (MANAGED_ID, CALENDAR_OBJECT_RESOURCE_ID) --implicit index
+);
+
+
+-----------------------
+-- Resource Property --
+-----------------------
+
+create table RESOURCE_PROPERTY (
+  RESOURCE_ID integer      not null, -- foreign key: *.RESOURCE_ID
+  NAME        varchar(255) not null,
+  VALUE       text         not null, -- FIXME: xml?
+  VIEWER_UID  varchar(255),
+
+  primary key (RESOURCE_ID, NAME, VIEWER_UID) -- implicit index
+);
+
+
+----------------------
+-- AddressBook Home --
+----------------------
+
+create table ADDRESSBOOK_HOME (
+  RESOURCE_ID      integer      primary key default nextval('RESOURCE_ID_SEQ'), -- implicit index
+  OWNER_UID        varchar(255) not null unique,                                -- implicit index
+  DATAVERSION      integer      default 0 not null
+);
+
+-------------------------------
+-- AddressBook Home Metadata --
+-------------------------------
+
+create table ADDRESSBOOK_HOME_METADATA (
+  RESOURCE_ID      integer      primary key references ADDRESSBOOK_HOME on delete cascade, -- implicit index
+  QUOTA_USED_BYTES integer      default 0 not null,
+  CREATED          timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  MODIFIED         timestamp    default timezone('UTC', CURRENT_TIMESTAMP)
+);
+
+-----------------
+-- AddressBook --
+-----------------
+
+create table ADDRESSBOOK (
+  RESOURCE_ID integer   primary key default nextval('RESOURCE_ID_SEQ') -- implicit index
+);
+
+
+--------------------------
+-- AddressBook Metadata --
+--------------------------
+
+create table ADDRESSBOOK_METADATA (
+  RESOURCE_ID integer   primary key references ADDRESSBOOK on delete cascade, -- implicit index
+  CREATED     timestamp default timezone('UTC', CURRENT_TIMESTAMP),
+  MODIFIED    timestamp default timezone('UTC', CURRENT_TIMESTAMP)
+);
+
+
+----------------------
+-- AddressBook Bind --
+----------------------
+
+-- Joins ADDRESSBOOK_HOME and ADDRESSBOOK
+
+create table ADDRESSBOOK_BIND (
+  ADDRESSBOOK_HOME_RESOURCE_ID integer      not null references ADDRESSBOOK_HOME,
+  ADDRESSBOOK_RESOURCE_ID      integer      not null references ADDRESSBOOK on delete cascade,
+  ADDRESSBOOK_RESOURCE_NAME    varchar(255) not null,
+  BIND_MODE                    integer      not null, -- enum CALENDAR_BIND_MODE
+  BIND_STATUS                  integer      not null, -- enum CALENDAR_BIND_STATUS
+  MESSAGE                      text,                  -- FIXME: xml?
+
+  primary key (ADDRESSBOOK_HOME_RESOURCE_ID, ADDRESSBOOK_RESOURCE_ID), -- implicit index
+  unique (ADDRESSBOOK_HOME_RESOURCE_ID, ADDRESSBOOK_RESOURCE_NAME)     -- implicit index
+);
+
+create index ADDRESSBOOK_BIND_RESOURCE_ID on
+  ADDRESSBOOK_BIND(ADDRESSBOOK_RESOURCE_ID);
+
+create table ADDRESSBOOK_OBJECT (
+  RESOURCE_ID             integer      primary key default nextval('RESOURCE_ID_SEQ'),    -- implicit index
+  ADDRESSBOOK_RESOURCE_ID integer      not null references ADDRESSBOOK on delete cascade,
+  RESOURCE_NAME           varchar(255) not null,
+  VCARD_TEXT              text         not null,
+  VCARD_UID               varchar(255) not null,
+  MD5                     char(32)     not null,
+  CREATED                 timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  MODIFIED                timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+
+  unique (ADDRESSBOOK_RESOURCE_ID, RESOURCE_NAME), -- implicit index
+  unique (ADDRESSBOOK_RESOURCE_ID, VCARD_UID)      -- implicit index
+);
+
+---------------
+-- Revisions --
+---------------
+
+create sequence REVISION_SEQ;
+
+
+---------------
+-- Revisions --
+---------------
+
+create table CALENDAR_OBJECT_REVISIONS (
+  CALENDAR_HOME_RESOURCE_ID integer      not null references CALENDAR_HOME,
+  CALENDAR_RESOURCE_ID      integer      references CALENDAR,
+  CALENDAR_NAME             varchar(255) default null,
+  RESOURCE_NAME             varchar(255),
+  REVISION                  integer      default nextval('REVISION_SEQ') not null,
+  DELETED                   boolean      not null
+);
+
+create index CALENDAR_OBJECT_REVISIONS_HOME_RESOURCE_ID_CALENDAR_RESOURCE_ID
+  on CALENDAR_OBJECT_REVISIONS(CALENDAR_HOME_RESOURCE_ID, CALENDAR_RESOURCE_ID);
+
+create index CALENDAR_OBJECT_REVISIONS_RESOURCE_ID_RESOURCE_NAME
+  on CALENDAR_OBJECT_REVISIONS(CALENDAR_RESOURCE_ID, RESOURCE_NAME);
+
+create index CALENDAR_OBJECT_REVISIONS_RESOURCE_ID_REVISION
+  on CALENDAR_OBJECT_REVISIONS(CALENDAR_RESOURCE_ID, REVISION);
+
+-------------------------------
+-- AddressBook Object Revisions --
+-------------------------------
+
+create table ADDRESSBOOK_OBJECT_REVISIONS (
+  ADDRESSBOOK_HOME_RESOURCE_ID integer      not null references ADDRESSBOOK_HOME,
+  ADDRESSBOOK_RESOURCE_ID      integer      references ADDRESSBOOK,
+  ADDRESSBOOK_NAME             varchar(255) default null,
+  RESOURCE_NAME                varchar(255),
+  REVISION                     integer      default nextval('REVISION_SEQ') not null,
+  DELETED                      boolean      not null
+);
+
+create index ADDRESSBOOK_OBJECT_REVISIONS_HOME_RESOURCE_ID_ADDRESSBOOK_RESOURCE_ID
+  on ADDRESSBOOK_OBJECT_REVISIONS(ADDRESSBOOK_HOME_RESOURCE_ID, ADDRESSBOOK_RESOURCE_ID);
+
+create index ADDRESSBOOK_OBJECT_REVISIONS_RESOURCE_ID_RESOURCE_NAME
+  on ADDRESSBOOK_OBJECT_REVISIONS(ADDRESSBOOK_RESOURCE_ID, RESOURCE_NAME);
+
+create index ADDRESSBOOK_OBJECT_REVISIONS_RESOURCE_ID_REVISION
+  on ADDRESSBOOK_OBJECT_REVISIONS(ADDRESSBOOK_RESOURCE_ID, REVISION);
+
+-----------------------------------
+-- Notification Object Revisions --
+-----------------------------------
+
+create table NOTIFICATION_OBJECT_REVISIONS (
+  NOTIFICATION_HOME_RESOURCE_ID integer      not null references NOTIFICATION_HOME on delete cascade,
+  RESOURCE_NAME                 varchar(255),
+  REVISION                      integer      default nextval('REVISION_SEQ') not null,
+  DELETED                       boolean      not null,
+
+  unique(NOTIFICATION_HOME_RESOURCE_ID, RESOURCE_NAME) -- implicit index
+);
+
+create index NOTIFICATION_OBJECT_REVISIONS_RESOURCE_ID_REVISION
+  on NOTIFICATION_OBJECT_REVISIONS(NOTIFICATION_HOME_RESOURCE_ID, REVISION);
+
+-------------------------------------------
+-- Apple Push Notification Subscriptions --
+-------------------------------------------
+
+create table APN_SUBSCRIPTIONS (
+  TOKEN                         varchar(255) not null,
+  RESOURCE_KEY                  varchar(255) not null,
+  MODIFIED                      integer not null,
+  SUBSCRIBER_GUID               varchar(255) not null,
+  USER_AGENT                    varchar(255) default null,
+  IP_ADDR                       varchar(255) default null,
+
+  primary key (TOKEN, RESOURCE_KEY) -- implicit index
+);
+
+create index APN_SUBSCRIPTIONS_RESOURCE_KEY
+   on APN_SUBSCRIPTIONS(RESOURCE_KEY);
+
+
+--------------------
+-- Schema Version --
+--------------------
+
+create table CALENDARSERVER (
+  NAME                          varchar(255) primary key, -- implicit index
+  VALUE                         varchar(255)
+);
+
+insert into CALENDARSERVER values ('VERSION', '14');
+insert into CALENDARSERVER values ('CALENDAR-DATAVERSION', '3');
+insert into CALENDARSERVER values ('ADDRESSBOOK-DATAVERSION', '1');

Copied: CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_14_to_15.sql (from rev 10289, CalendarServer/branches/users/glyph/queue-locking-and-timing/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_14_to_15.sql)
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_14_to_15.sql	                        (rev 0)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_14_to_15.sql	2013-01-05 00:51:01 UTC (rev 10290)
@@ -0,0 +1,27 @@
+----
+-- Copyright (c) 2011 Apple Inc. All rights reserved.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+----
+
+-------------------------------------------------
+-- Upgrade database schema from VERSION 14 to 15 --
+-------------------------------------------------
+
+
+create table NAMED_LOCK (
+    "LOCK_NAME" nvarchar2(255) primary key
+);
+-- Now update the version
+update CALENDARSERVER set VALUE = '15' where NAME = 'VERSION';
+

Copied: CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_14_to_15.sql (from rev 10289, CalendarServer/branches/users/glyph/queue-locking-and-timing/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_14_to_15.sql)
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_14_to_15.sql	                        (rev 0)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_14_to_15.sql	2013-01-05 00:51:01 UTC (rev 10290)
@@ -0,0 +1,27 @@
+----
+-- Copyright (c) 2011 Apple Inc. All rights reserved.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+----
+
+-------------------------------------------------
+-- Upgrade database schema from VERSION 14 to 15 --
+-------------------------------------------------
+
+
+create table NAMED_LOCK (
+    LOCK_NAME varchar(255) primary key
+);
+-- Now update the version
+update CALENDARSERVER set VALUE = '15' where NAME = 'VERSION';
+

Modified: CalendarServer/trunk/txdav/common/datastore/test/util.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/test/util.py	2013-01-05 00:40:03 UTC (rev 10289)
+++ CalendarServer/trunk/txdav/common/datastore/test/util.py	2013-01-05 00:51:01 UTC (rev 10290)
@@ -50,6 +50,7 @@
 from twistedcaldav.vcard import Component as ABComponent
 
 from pycalendar.datetime import PyCalendarDateTime
+from txdav.common.datastore.sql_tables import schema
 
 md5key = PropertyName.fromElement(TwistedGETContentMD5)
 
@@ -219,23 +220,17 @@
         cleanupTxn = storeToClean.sqlTxnFactory(
             "%s schema-cleanup" % (testCase.id(),)
         )
-        # TODO: should be getting these tables from a declaration of the schema
-        # somewhere.
-        tables = ['RESOURCE_PROPERTY',
-                  'ATTACHMENT',
-                  'NOTIFICATION_OBJECT_REVISIONS',
-                  'ADDRESSBOOK_OBJECT_REVISIONS',
-                  'CALENDAR_OBJECT_REVISIONS',
-                  'ADDRESSBOOK_OBJECT',
-                  'CALENDAR_OBJECT',
-                  'CALENDAR_BIND',
-                  'ADDRESSBOOK_BIND',
-                  'CALENDAR',
-                  'ADDRESSBOOK',
-                  'CALENDAR_HOME',
-                  'ADDRESSBOOK_HOME',
-                  'NOTIFICATION',
-                  'NOTIFICATION_HOME']
+
+        # Tables are defined in the schema in the order in which the 'create
+        # table' statements are issued, so it's not possible to reference a
+        # later table.  Therefore it's OK to drop them in the (reverse) order
+        # that they happen to be in.
+        tables = [t.name for t in schema.model.tables
+                  # All tables with rows _in_ the schema are populated
+                  # exclusively _by_ the schema and shouldn't be manipulated
+                  # while the server is running, so we leave those populated.
+                  if not t.schemaRows][::-1]
+
         for table in tables:
             try:
                 yield cleanupTxn.execSQL("delete from " + table, [])
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/7b7d2422/attachment-0001.html>


More information about the calendarserver-changes mailing list