[CalendarServer-changes] [9689] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 02:01:06 PDT 2012


Revision: 9689
          http://trac.macosforge.org/projects/calendarserver/changeset/9689
Author:   glyph at apple.com
Date:     2012-08-11 02:01:06 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
Persistent, deferred, queued task execution.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/accesslog.py
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
    CalendarServer/trunk/twext/enterprise/adbapi2.py
    CalendarServer/trunk/twext/enterprise/dal/syntax.py
    CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
    CalendarServer/trunk/twext/enterprise/ienterprise.py
    CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
    CalendarServer/trunk/twext/internet/threadutils.py
    CalendarServer/trunk/txdav/base/datastore/file.py
    CalendarServer/trunk/txdav/base/datastore/util.py
    CalendarServer/trunk/txdav/common/datastore/sql.py
    CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql
    CalendarServer/trunk/txdav/idav.py

Added Paths:
-----------
    CalendarServer/trunk/calendarserver/controlsocket.py
    CalendarServer/trunk/twext/enterprise/dal/record.py
    CalendarServer/trunk/twext/enterprise/dal/test/test_record.py
    CalendarServer/trunk/twext/enterprise/queue.py
    CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_11_to_12.sql
    CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_11_to_12.sql

Property Changed:
----------------
    CalendarServer/trunk/


Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
   + /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593

Modified: CalendarServer/trunk/calendarserver/accesslog.py
===================================================================
--- CalendarServer/trunk/calendarserver/accesslog.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/calendarserver/accesslog.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -378,34 +378,25 @@
     arguments = [] 
 
 class AMPCommonAccessLoggingObserver(CommonAccessLoggingObserverExtensions):
-    def __init__(self, mode, id):
-        self.mode = mode
-        self.id = id
+    def __init__(self):
         self.protocol = None
         self._buffer = []
 
+
     def flushBuffer(self):
         if self._buffer:
             for msg in self._buffer:
                 self.logMessage(msg)
 
-    def start(self):
-        super(AMPCommonAccessLoggingObserver, self).start()
 
-        from twisted.internet import reactor
+    def addClient(self, connectedClient):
+        """
+        An AMP client connected; hook it up to this observer.
+        """
+        self.protocol = connectedClient
+        self.flushBuffer()
 
-        def _gotProtocol(proto):
-            self.protocol = proto
-            self.flushBuffer()
 
-        self.client = protocol.ClientCreator(reactor, amp.AMP)
-        if self.mode == "AF_UNIX":
-            d = self.client.connectUNIX(self.id)
-        else:
-            d = self.client.connectTCP("localhost", self.id)
-        d.addCallback(_gotProtocol)
-
-
     def logMessage(self, message):
         """
         Log a message to the remote AMP Protocol
@@ -420,6 +411,7 @@
         else:
             self._buffer.append(message)
 
+
     def logGlobalHit(self): 
         """ 
         Log a server hit via the remote AMP Protocol 
@@ -429,8 +421,10 @@
             d = self.protocol.callRemote(LogGlobalHit) 
             d.addErrback(log.err) 
         else: 
-            log.msg("logGlobalHit() only works with an AMP Protocol") 
+            log.msg("logGlobalHit() only works with an AMP Protocol")
 
+
+
 class AMPLoggingProtocol(amp.AMP):
     """
     A server side protocol for logging to the given observer.
@@ -453,15 +447,23 @@
 
     LogGlobalHit.responder(logGlobalHit)
 
+
+
 class AMPLoggingFactory(protocol.ServerFactory):
     def __init__(self, observer):
         self.observer = observer
 
+
     def doStart(self):
         self.observer.start()
 
+
     def doStop(self):
         self.observer.stop()
 
+
     def buildProtocol(self, addr):
         return AMPLoggingProtocol(self.observer)
+
+
+

Copied: CalendarServer/trunk/calendarserver/controlsocket.py (from rev 9688, CalendarServer/branches/users/glyph/q/calendarserver/controlsocket.py)
===================================================================
--- CalendarServer/trunk/calendarserver/controlsocket.py	                        (rev 0)
+++ CalendarServer/trunk/calendarserver/controlsocket.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,128 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Multiplexing control socket.  Currently used for messages related to queueing
+and logging, but extensible to more.
+"""
+
+from zope.interface import implements
+
+from twisted.internet.protocol import Factory
+from twisted.protocols.amp import BinaryBoxProtocol, IBoxReceiver, IBoxSender
+from twisted.application.service import Service
+
+class DispatchingSender(object):
+    implements(IBoxSender)
+
+    def __init__(self, sender, route):
+        self.sender = sender
+        self.route = route
+
+
+    def sendBox(self, box):
+        box['_route'] = self.route
+        self.sender.sendBox(box)
+
+
+    def unhandledError(self, failure):
+        self.sender.unhandledError(failure)
+
+
+
+class DispatchingBoxReceiver(object):
+    implements(IBoxReceiver)
+
+    def __init__(self, receiverMap):
+        self.receiverMap = receiverMap
+
+
+    def startReceivingBoxes(self, boxSender):
+        for key, receiver in self.receiverMap.items():
+            receiver.startReceivingBoxes(DispatchingSender(boxSender, key))
+
+
+    def ampBoxReceived(self, box):
+        self.receiverMap[box['_route']].ampBoxReceived(box)
+
+
+    def stopReceivingBoxes(self, reason):
+        for receiver in self.receiverMap.values():
+            receiver.stopReceivingBoxes(reason)
+
+
+
+class ControlSocket(Factory, object):
+    """
+    An AMP control socket that aggregates other AMP factories.  This is the
+    service that listens in the master process.
+    """
+
+    def __init__(self):
+        """
+        Initialize this L{ControlSocket}.
+        """
+        self._factoryMap = {}
+
+
+    def addFactory(self, key, otherFactory):
+        """
+        Add another L{Factory} - one that returns L{AMP} instances - to this
+        socket.
+        """
+        self._factoryMap[key] = otherFactory
+
+
+    def buildProtocol(self, addr):
+        """
+        Build a thing that will multiplex AMP to all the relevant sockets.
+        """
+        receiverMap = {}
+        for k, f  in self._factoryMap.items():
+            receiverMap[k] = f.buildProtocol(addr)
+        return BinaryBoxProtocol(DispatchingBoxReceiver(receiverMap))
+
+
+    def doStart(self):
+        """
+        Relay start notification to all added factories.
+        """
+        for f in self._factoryMap.values():
+            f.doStart()
+
+
+    def doStop(self):
+        """
+        Relay stop notification to all added factories.
+        """
+        for f in self._factoryMap.values():
+            f.doStop()
+
+
+
+class ControlSocketConnectingService(Service, object):
+
+    def __init__(self, endpointFactory, controlSocket):
+        super(ControlSocketConnectingService, self).__init__()
+        self.endpointFactory = endpointFactory
+        self.controlSocket = controlSocket
+
+
+    def privilegedStartService(self):
+        from twisted.internet import reactor
+        endpoint = self.endpointFactory(reactor)
+        endpoint.connect(self.controlSocket)
+

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -61,6 +61,7 @@
 from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
 from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
 
+from txdav.common.datastore.sql_tables import schema
 from txdav.common.datastore.upgrade.sql.upgrade import (
     UpgradeDatabaseSchemaService, UpgradeDatabaseDataService,
 )
@@ -88,6 +89,13 @@
 
 from calendarserver.tap.util import ConnectionDispenser
 
+from calendarserver.controlsocket import ControlSocket
+from twisted.internet.endpoints import UNIXClientEndpoint, TCP4ClientEndpoint
+
+from calendarserver.controlsocket import ControlSocketConnectingService
+from twisted.protocols.amp import AMP
+from twext.enterprise.queue import WorkerFactory as QueueWorkerFactory
+from twext.enterprise.queue import PeerConnectionPool
 from calendarserver.accesslog import AMPCommonAccessLoggingObserver
 from calendarserver.accesslog import AMPLoggingFactory
 from calendarserver.accesslog import RotatingFileAccessLoggingObserver
@@ -117,6 +125,14 @@
 
 from twisted.python.util import uidFromString, gidFromString
 
+
+# Control socket message-routing constants.
+_LOG_ROUTE = "log"
+_QUEUE_ROUTE = "queue"
+
+_CONTROL_SERVICE_NAME = "control"
+
+
 def getid(uid, gid):
     if uid is not None:
         uid = uidFromString(uid)
@@ -762,22 +778,43 @@
         #
         self.log_info("Setting up service")
 
+        bonusServices = []
+
         if config.ProcessType == "Slave":
+            logObserver = AMPCommonAccessLoggingObserver()
+
             if config.ControlSocket:
-                mode = "AF_UNIX"
                 id = config.ControlSocket
-                self.log_info("Logging via AF_UNIX: %s" % (id,))
+                self.log_info("Control via AF_UNIX: %s" % (id,))
+                endpointFactory = lambda reactor: UNIXClientEndpoint(
+                    reactor, id)
             else:
-                mode = "AF_INET"
                 id = int(config.ControlPort)
-                self.log_info("Logging via AF_INET: %d" % (id,))
-
-            logObserver = AMPCommonAccessLoggingObserver(mode, id)
-
+                self.log_info("Control via AF_INET: %d" % (id,))
+                endpointFactory = lambda reactor: TCP4ClientEndpoint(
+                    reactor, "127.0.0.1", id)
+            controlSocketClient = ControlSocket()
+            class LogClient(AMP):
+                def startReceivingBoxes(self, sender):
+                    super(LogClient, self).startReceivingBoxes(sender)
+                    logObserver.addClient(self)
+            f = Factory()
+            f.protocol = LogClient
+            controlSocketClient.addFactory(_LOG_ROUTE, f)
+            from txdav.common.datastore.sql import CommonDataStore as SQLStore
+            if isinstance(store, SQLStore):
+                def queueMasterAvailable(connectionFromMaster):
+                    store.queuer = connectionFromMaster
+                queueFactory = QueueWorkerFactory(store.newTransaction, schema,
+                                                  queueMasterAvailable)
+                controlSocketClient.addFactory(_QUEUE_ROUTE, queueFactory)
+            controlClient = ControlSocketConnectingService(
+                endpointFactory, controlSocketClient
+            )
+            bonusServices.append(controlClient)
         elif config.ProcessType == "Single":
             # Make sure no old socket files are lying around.
             self.deleteStaleSocketFiles()
-
             logObserver = RotatingFileAccessLoggingObserver(
                 config.AccessLogFile,
             )
@@ -785,18 +822,20 @@
         self.log_info("Configuring access log observer: %s" % (logObserver,))
 
         service = CalDAVService(logObserver)
+        for bonus in bonusServices:
+            bonus.setServiceParent(service)
 
         rootResource = getRootResource(config, store, additional)
         service.rootResource = rootResource
 
         underlyingSite = Site(rootResource)
-        
-        # Need to cache SSL port info here so we can access it in a Request to deal with the
-        # possibility of being behind an SSL decoder
+
+        # Need to cache SSL port info here so we can access it in a Request to
+        # deal with the possibility of being behind an SSL decoder
         underlyingSite.EnableSSL = config.EnableSSL
         underlyingSite.SSLPort = config.SSLPort
         underlyingSite.BindSSLPorts = config.BindSSLPorts
-        
+
         requestFactory = underlyingSite
 
         if config.RedirectHTTPToHTTPS:
@@ -1102,7 +1141,8 @@
             try:
                 gid = getgrnam(config.GroupName).gr_gid
             except KeyError:
-                raise ConfigurationError("Invalid group name: %s" % (config.GroupName,))
+                raise ConfigurationError("Invalid group name: %s" %
+                                         (config.GroupName,))
         else:
             gid = os.getgid()
 
@@ -1110,20 +1150,24 @@
             try:
                 uid = getpwnam(config.UserName).pw_uid
             except KeyError:
-                raise ConfigurationError("Invalid user name: %s" % (config.UserName,))
+                raise ConfigurationError("Invalid user name: %s" %
+                                         (config.UserName,))
         else:
             uid = os.getuid()
 
+
+        controlSocket = ControlSocket()
+        controlSocket.addFactory(_LOG_ROUTE, logger)
         if config.ControlSocket:
-            loggingService = GroupOwnedUNIXServer(
-                gid, config.ControlSocket, logger, mode=0660
+            controlSocketService = GroupOwnedUNIXServer(
+                gid, config.ControlSocket, controlSocket, mode=0660
             )
         else:
-            loggingService = ControlPortTCPServer(
-                config.ControlPort, logger, interface="127.0.0.1"
+            controlSocketService = ControlPortTCPServer(
+                config.ControlPort, controlSocket, interface="127.0.0.1"
             )
-        loggingService.setName("logging")
-        loggingService.setServiceParent(s)
+        controlSocketService.setName(_CONTROL_SERVICE_NAME)
+        controlSocketService.setServiceParent(s)
 
         monitor = DelayedStartupProcessMonitor()
         s.processMonitor = monitor
@@ -1244,12 +1288,22 @@
         # filesystem to the database (if that's necessary, and there is
         # filesystem data in need of upgrading).
         def spawnerSvcCreator(pool, store):
+            from twisted.internet import reactor
+            pool = PeerConnectionPool(reactor, store.newTransaction,
+                                      7654, schema)
+            controlSocket.addFactory(_QUEUE_ROUTE,
+                                     pool.workerListenerFactory())
+            # TODO: now that we have the shared control socket, we should get
+            # rid of the connection dispenser and make a shared / async
+            # connection pool implementation that can dispense transactions
+            # synchronously as the interface requires.
             if pool is not None and config.SharedConnectionPool:
                 self.log_warn("Using Shared Connection Pool")
                 dispenser = ConnectionDispenser(pool)
             else:
                 dispenser = None
             multi = MultiService()
+            pool.setServiceParent(multi)
             spawner = SlaveSpawnerService(
                 self, monitor, dispenser, dispatcher, options["config"],
                 inheritFDs=inheritFDs, inheritSSLFDs=inheritSSLFDs

Modified: CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/test/test_caldav.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/calendarserver/tap/test/test_caldav.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -57,7 +57,8 @@
 
 from calendarserver.tap.caldav import (
     CalDAVOptions, CalDAVServiceMaker, CalDAVService, GroupOwnedUNIXServer,
-    DelayedStartupProcessMonitor, DelayedStartupLineLogger, TwistdSlaveProcess
+    DelayedStartupProcessMonitor, DelayedStartupLineLogger, TwistdSlaveProcess,
+    _CONTROL_SERVICE_NAME
 )
 from calendarserver.provision.root import RootResource
 from StringIO import StringIO
@@ -460,7 +461,7 @@
         self.config["ProcessType"] = "Combined"
         self.writeConfig()
         svc = self.makeService()
-        for serviceName in ["logging"]:
+        for serviceName in [_CONTROL_SERVICE_NAME]:
             socketService = svc.getServiceNamed(serviceName)
             self.assertIsInstance(socketService, GroupOwnedUNIXServer)
             m = socketService.kwargs.get("mode", 0666)

Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -484,7 +484,53 @@
 
 
 
-class _SingleTxn(proxyForInterface(iface=IAsyncTransaction,
+class _HookableOperation(object):
+
+    def __init__(self):
+        self._hooks = []
+
+
+    @inlineCallbacks
+    def runHooks(self, ignored):
+        """
+        Callback for C{commit} and C{abort} Deferreds.
+        """
+        for operation in self._hooks:
+            yield operation()
+        returnValue(ignored)
+
+
+    def addHook(self, operation):
+        """
+        Implement L{IAsyncTransaction.postCommit}.
+        """
+        self._hooks.append(operation)
+
+
+
+class _CommitAndAbortHooks(object):
+    """
+    Shared implementation of post-commit and post-abort hooks.
+    """
+    # FIXME: this functionality needs direct tests, although it's pretty well-
+    # covered by txdav's test suite.
+
+    def __init__(self):
+        self._commit = _HookableOperation()
+        self._abort = _HookableOperation()
+
+
+    def postCommit(self, operation):
+        return self._commit.addHook(operation)
+
+
+    def postAbort(self, operation):
+        return self._abort.addHook(operation)
+
+
+
+class _SingleTxn(_CommitAndAbortHooks,
+                 proxyForInterface(iface=IAsyncTransaction,
                                    originalAttribute='_baseTxn')):
     """
     A L{_SingleTxn} is a single-use wrapper for the longer-lived
@@ -505,6 +551,7 @@
     """
 
     def __init__(self, pool, baseTxn):
+        super(_SingleTxn, self).__init__()
         self._pool           = pool
         self._baseTxn        = baseTxn
         self._completed      = False
@@ -601,9 +648,9 @@
             # We're in the process of executing a block of commands.  Wait until
             # they're done.  (Commit will be repeated in _checkNextBlock.)
             return self._blockedQueue.commit()
-
         self._markComplete()
-        return super(_SingleTxn, self).commit()
+        return (super(_SingleTxn, self).commit()
+                .addCallback(self._commit.runHooks))
 
 
     def abort(self):
@@ -611,6 +658,7 @@
         result = super(_SingleTxn, self).abort()
         if self in self._pool._waiting:
             self._stopWaiting()
+        result.addCallback(self._abort.runHooks)
         return result
 
 
@@ -750,15 +798,40 @@
 
 
 class _ConnectingPseudoTxn(object):
+    """
+    This is a pseudo-Transaction for bookkeeping purposes.
 
+    When a connection has asked to connect, but has not yet completed
+    connecting, the L{ConnectionPool} still needs a way to shut it down.  This
+    object provides that tracking handle, and will be present in the pool's
+    C{busy} list while it is populating the list.
+    """
+
     _retry = None
 
     def __init__(self, pool, holder):
-        self._pool   = pool
-        self._holder = holder
+        """
+        Initialize the L{_ConnectingPseudoTxn}; get ready to connect.
 
+        @param pool: The pool that this connection attempt is participating in.
+        @type pool: L{ConnectionPool}
 
+        @param holder: the L{ThreadHolder} allocated to this connection attempt
+            and subsequent SQL executions for this connection.
+        @type holder: L{ThreadHolder}
+        """
+        self._pool    = pool
+        self._holder  = holder
+        self._aborted = False
+
+
     def abort(self):
+        """
+        Ignore the result of attempting to connect to this database, and
+        instead simply close the connection and free the L{ThreadHolder}
+        allocated for it.
+        """
+        self._aborted = True
         if self._retry is not None:
             self._retry.cancel()
         d = self._holder.stop()
@@ -951,6 +1024,8 @@
             cursor     = connection.cursor()
             return (connection, cursor)
         def finishInit((connection, cursor)):
+            if txn._aborted:
+                return
             baseTxn = _ConnectedTxn(
                 pool=self,
                 threadHolder=holder,
@@ -1187,8 +1262,8 @@
         Initialize a mapping of transaction IDs to transaction objects.
         """
         super(ConnectionPoolConnection, self).__init__()
-        self.pool  = pool
-        self._txns = {}
+        self.pool    = pool
+        self._txns   = {}
         self._blocks = {}
 
 
@@ -1405,7 +1480,7 @@
 
 
 
-class _NetTransaction(object):
+class _NetTransaction(_CommitAndAbortHooks):
     """
     A L{_NetTransaction} is an L{AMP}-protocol-based provider of the
     L{IAsyncTransaction} interface.  It sends SQL statements, query results, and
@@ -1419,6 +1494,7 @@
         Initialize a transaction with a L{ConnectionPoolClient} and a unique
         transaction identifier.
         """
+        super(_NetTransaction, self).__init__()
         self._client        = client
         self._transactionID = transactionID
         self._completed     = False
@@ -1476,11 +1552,12 @@
         def done(whatever):
             self._committed = True
             return whatever
-        return self._complete(Commit).addBoth(done)
+        return (self._complete(Commit).addBoth(done)
+                .addCallback(self._commit.runHooks))
 
 
     def abort(self):
-        return self._complete(Abort)
+        return self._complete(Abort).addCallback(self._abort.runHooks)
 
 
     def commandBlock(self):
@@ -1523,7 +1600,6 @@
         return self._transaction.dialect
 
 
-
     def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
         """
         Execute some SQL on this command block.

Copied: CalendarServer/trunk/twext/enterprise/dal/record.py (from rev 9688, CalendarServer/branches/users/glyph/q/twext/enterprise/dal/record.py)
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/record.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/record.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,328 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_record -*-
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+RECORD: Relational Entity Creation from Objects Representing Data.
+
+This is an asynchronous object-relational mapper based on
+L{twext.enterprise.dal.syntax}.
+"""
+
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twext.enterprise.dal.syntax import (
+    Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete
+)
+# from twext.enterprise.dal.syntax import ExpressionSyntax
+
+class ReadOnly(AttributeError):
+    """
+    A caller attempted to set an attribute on a database-backed record, rather
+    than updating it through L{Record.update}.
+    """
+
+    def __init__(self, className, attributeName):
+        self.className = className
+        self.attributeName = attributeName
+        super(ReadOnly, self).__init__("SQL-backed attribute '{0}.{1}' is "
+                                       "read-only. Use '.update(...)' to "
+                                       "modify attributes."
+                                       .format(className, attributeName))
+
+
+
+class NoSuchRecord(Exception):
+    """
+    No matching record could be found.
+    """
+
+
+class _RecordMeta(type):
+    """
+    Metaclass for associating a L{fromTable} with a L{Record} at inheritance
+    time.
+    """
+
+    def __new__(cls, name, bases, ns):
+        """
+        Create a new instance of this meta-type.
+        """
+        newbases = []
+        table = None
+        namer = None
+        for base in bases:
+            if isinstance(base, fromTable):
+                if table is not None:
+                    raise RuntimeError(
+                        "Can't define a class from two or more tables at once."
+                    )
+                table = base.table
+            elif getattr(base, "table", None) is not None:
+                raise RuntimeError(
+                    "Can't define a record class by inheriting one already "
+                    "mapped to a table."
+                    # TODO: more info
+                )
+            else:
+                if namer is None:
+                    if isinstance(base, _RecordMeta):
+                        namer = base
+                newbases.append(base)
+        if table is not None:
+            attrmap = {}
+            colmap = {}
+            allColumns = list(table)
+            for column in allColumns:
+                attrname = namer.namingConvention(column.model.name)
+                attrmap[attrname] = column
+                colmap[column] = attrname
+            ns.update(table=table, __attrmap__=attrmap, __colmap__=colmap)
+            ns.update(attrmap)
+        return super(_RecordMeta, cls).__new__(cls, name, tuple(newbases), ns)
+
+
+
+class fromTable(object):
+    """
+    Inherit from this after L{Record} to specify which table your L{Record}
+    subclass is mapped to.
+    """
+
+    def __init__(self, aTable):
+        """
+        @param table: The table to map to.
+        @type table: L{twext.enterprise.dal.syntax.TableSyntax}
+        """
+        self.table = aTable
+
+
+
+class Record(object):
+    """
+    Superclass for all database-backed record classes.  (i.e.  an object mapped
+    from a database record).
+
+    @cvar table: the table that represents this L{Record} in the database.
+    @type table: L{TableSyntax}
+
+    @ivar transaction: The L{IAsyncTransaction} where this record is being
+        loaded.  This may be C{None} if this L{Record} is not participating in
+        a transaction, which may be true if it was instantiated but never
+        saved.
+
+    @cvar __colmap__: map of L{ColumnSyntax} objects to attribute names.
+    @type __colmap__: L{dict}
+
+    @cvar __attrmap__: map of attribute names to L{ColumnSyntax} objects.
+    @type __attrmap__: L{dict}
+    """
+
+    __metaclass__ = _RecordMeta
+
+    transaction = None
+    def __setattr__(self, name, value):
+        """
+        Once the transaction is initialized, this object is immutable.  If you
+        want to change it, use L{Record.update}.
+        """
+        if self.transaction is not None:
+            raise ReadOnly(self.__class__.__name__, name)
+        return super(Record, self).__setattr__(name, value)
+
+
+    @staticmethod
+    def namingConvention(columnName):
+        """
+        Implement the convention for naming-conversion between column names
+        (typically, upper-case database names map to lower-case attribute
+        names).
+        """
+        words = columnName.lower().split("_")
+        def cap(word):
+            if word.lower() == 'id':
+                return word.upper()
+            else:
+                return word.capitalize()
+        return words[0] + "".join(map(cap, words[1:]))
+
+
+    @classmethod
+    def _primaryKeyExpression(cls):
+        return Tuple([ColumnSyntax(c) for c in cls.table.model.primaryKey])
+
+
+    def _primaryKeyValue(self):
+        val = []
+        for col in self._primaryKeyExpression().columns:
+            val.append(getattr(self, self.__class__.__colmap__[col]))
+        return val
+
+
+    @classmethod
+    def _primaryKeyComparison(cls, primaryKey):
+        return (cls._primaryKeyExpression() ==
+                Tuple(map(Constant, primaryKey)))
+
+
+    @classmethod
+    @inlineCallbacks
+    def load(cls, transaction, *primaryKey):
+        self = (yield cls.query(transaction,
+                                cls._primaryKeyComparison(primaryKey)))[0]
+        returnValue(self)
+
+
+    @classmethod
+    @inlineCallbacks
+    def create(cls, transaction, **k):
+        """
+        Create a row.
+
+        Used like this::
+
+            MyRecord.create(transaction, column1=1, column2=u'two')
+        """
+        self = cls()
+        colmap = {}
+        attrtocol = cls.__attrmap__
+        for attr in k:
+            setattr(self, attr, k[attr])
+            # FIXME: better error reporting
+            colmap[attrtocol[attr]] = k[attr]
+        yield Insert(colmap).on(transaction)
+        self.transaction = transaction
+        returnValue(self)
+
+
+    def delete(self):
+        """
+        Delete this row from the database.
+
+        @return: a L{Deferred} which fires when the underlying row has been
+            deleted.
+        """
+        return Delete(From=self.table,
+                      Where=self._primaryKeyComparison(self._primaryKeyValue())
+                      ).on(self.transaction)
+
+
+    @inlineCallbacks
+    def update(self, **kw):
+        """
+        Modify the given attributes in the database.
+
+        @return: a L{Deferred} that fires when the updates have been sent to
+            the database.
+        """
+        colmap = {}
+        for k, v in kw.iteritems():
+            colmap[self.__attrmap__[k]] = v
+        yield (Update(colmap,
+                      Where=self._primaryKeyComparison(self._primaryKeyValue()))
+                .on(self.transaction))
+        self.__dict__.update(kw)
+
+
+    @classmethod
+    def pop(cls, transaction, *primaryKey):
+        """
+        Atomically retrieve and remove a row from this L{Record}'s table
+        with a primary key value of C{primaryKey}.
+
+        @return: a L{Deferred} that fires with an instance of C{cls}, or fails
+            with L{NoSuchRecord} if there were no records in the database.
+        @rtype: L{Deferred}
+        """
+        return cls._rowsFromQuery(
+            transaction, Delete(Where=cls._primaryKeyComparison(primaryKey),
+                        From=cls.table, Return=list(cls.table)),
+            lambda : NoSuchRecord()
+        ).addCallback(lambda x: x[0])
+
+
+    @classmethod
+    def query(cls, transaction, expr, order=None, ascending=True):
+        """
+        Query the table that corresponds to C{cls}, and return instances of
+        C{cls} corresponding to the rows that are returned from that table.
+
+        @param expr: An L{ExpressionSyntax} that constraints the results of the
+            query.  This is most easily produced by accessing attributes on the
+            class; for example, C{MyRecordType.query((MyRecordType.col1 >
+            MyRecordType.col2).And(MyRecordType.col3 == 7))}
+
+        @param order: A L{ColumnSyntax} to order the resulting record objects
+            by.
+
+        @param ascending: A boolean; if C{order} is not C{None}, whether to
+            sort in ascending or descending order.
+        """
+        kw = {}
+        if order is not None:
+            kw.update(OrderBy=order, Ascending=ascending)
+        return cls._rowsFromQuery(transaction, Select(list(cls.table),
+                                                      From=cls.table,
+                                                      Where=expr, **kw), None)
+
+
+    @classmethod
+    def all(cls, transaction):
+        """
+        Load all rows from the table that corresponds to C{cls} and return
+        instances of C{cls} corresponding to all.
+        """
+        return cls._rowsFromQuery(transaction,
+                                  Select(list(cls.table),
+                                         From=cls.table,
+                                         OrderBy=cls._primaryKeyExpression()),
+                                  None)
+
+
+    @classmethod
+    @inlineCallbacks
+    def _rowsFromQuery(cls, transaction, qry, rozrc):
+        """
+        Execute the given query, and transform its results into rows.
+
+        @param transaction: an L{IAsyncTransaction} to execute the query on.
+
+        @param qry: a L{_DMLStatement} (XXX: maybe _DMLStatement or some
+            interface that defines 'on' should be public?) whose results are
+            the list of columns in C{self.table}.
+
+        @param rozrc: The C{raiseOnZeroRowCount} argument.
+
+        @return: a L{Deferred} that succeeds with a C{list} or fails with an
+            exception produced by C{rozrc}.
+        """
+        rows = yield qry.on(transaction, raiseOnZeroRowCount=rozrc)
+        selves = []
+        for row in rows:
+            self = cls()
+            for (column, value) in zip(list(cls.table), row):
+                name = cls.__colmap__[column]
+                setattr(self, name, value)
+            self.transaction = transaction
+            selves.append(self)
+        returnValue(selves)
+
+
+
+__all__ = [
+    "ReadOnly",
+    "fromTable",
+    "NoSuchRecord",
+]

Modified: CalendarServer/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/syntax.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/enterprise/dal/syntax.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -27,11 +27,14 @@
 
 from twisted.internet.defer import succeed
 
-from twext.enterprise.dal.model import Schema, Table, Column, Sequence
-from twext.enterprise.ienterprise import POSTGRES_DIALECT, ORACLE_DIALECT
-from twext.enterprise.ienterprise import IDerivedParameter
+from twext.enterprise.dal.model import Schema, Table, Column, Sequence, SQLType
+from twext.enterprise.ienterprise import (
+    POSTGRES_DIALECT, ORACLE_DIALECT, SQLITE_DIALECT, IDerivedParameter
+)
 from twext.enterprise.util import mapOracleOutputType
 
+from twisted.internet.defer import inlineCallbacks, returnValue
+
 try:
     import cx_Oracle
     cx_Oracle
@@ -192,14 +195,13 @@
 
         @param txn: the L{IAsyncTransaction} to execute this on.
 
-        @param raiseOnZeroRowCount: the exception to raise if no data was
-            affected or returned by this query.
+        @param raiseOnZeroRowCount: a 0-argument callable which returns an
+            exception to raise if the executed SQL does not affect any rows.
 
         @param kw: keyword arguments, mapping names of L{Parameter} objects
             located somewhere in C{self}
 
         @return: results from the database.
-
         @rtype: a L{Deferred} firing a C{list} of records (C{tuple}s or
             C{list}s)
         """
@@ -445,6 +447,7 @@
 Len = Function("character_length", "length")
 Upper = Function("upper")
 Lower = Function("lower")
+_sqliteLastInsertRowID = Function("last_insert_rowid")
 
 # Use a specific value here for "the convention for case-insensitive values in
 # the database" so we don't need to keep remembering whether it's upper or
@@ -602,7 +605,7 @@
     def __contains__(self, columnSyntax):
         if isinstance(columnSyntax, FunctionInvocation):
             columnSyntax = columnSyntax.arg
-        return (columnSyntax.model in self.model.columns)
+        return (columnSyntax.model.table is self.model)
 
 
 
@@ -945,19 +948,39 @@
 
 
 
-def _columnsMatchTables(columns, tables):
+def _checkColumnsMatchTables(columns, tables):
+    """
+    Verify that the given C{columns} match the given C{tables}; that is, that
+    every L{TableSyntax} referenced by every L{ColumnSyntax} referenced by
+    every L{ExpressionSyntax} in the given C{columns} list is present in the
+    given C{tables} list.
+
+    @param columns: a L{list} of L{ExpressionSyntax}, each of which references
+        some set of L{ColumnSyntax}es via its C{allColumns} method.
+
+    @param tables: a L{list} of L{TableSyntax}
+
+    @return: L{None}
+    @rtype: L{NoneType}
+
+    @raise TableMismatch: if any table referenced by a column is I{not} found
+        in C{tables}
+    """
     for expression in columns:
         for column in expression.allColumns():
             for table in tables:
                 if column in table:
                     break
             else:
-                return False
-    return True
+                raise TableMismatch("{} not found in {}".format(
+                    column, tables
+                ))
+    return None
 
 
-class Tuple(object):
 
+class Tuple(ExpressionSyntax):
+
     def __init__(self, columns):
         self.columns = columns
 
@@ -1064,8 +1087,7 @@
         if columns is None:
             columns = ALL_COLUMNS
         else:
-            if not _columnsMatchTables(columns, From.tables()):
-                raise TableMismatch()
+            _checkColumnsMatchTables(columns, From.tables())
             columns = _SomeColumns(columns)
         self.columns = columns
         
@@ -1263,12 +1285,11 @@
 
     def _returningClause(self, queryGenerator, stmt, allTables):
         """
-        Add a dialect-appropriate 'returning' clause to the end of the given SQL
-        statement.
+        Add a dialect-appropriate 'returning' clause to the end of the given
+        SQL statement.
 
-        @param queryGenerator: describes the database we are generating the statement
-            for.
-
+        @param queryGenerator: describes the database we are generating the
+            statement for.
         @type queryGenerator: L{QueryGenerator}
 
         @param stmt: the SQL fragment generated without the 'returning' clause
@@ -1280,9 +1301,14 @@
         @return: the C{stmt} parameter.
         """
         retclause = self.Return
+        if retclause is None:
+            return stmt
         if isinstance(retclause, (tuple, list)):
             retclause = _CommaList(retclause)
-        if retclause is not None:
+        if queryGenerator.dialect == SQLITE_DIALECT:
+            # sqlite does this another way.
+            return stmt
+        elif retclause is not None:
             stmt.text += ' returning '
             stmt.append(retclause.subSQL(queryGenerator, allTables))
             if queryGenerator.dialect == ORACLE_DIALECT:
@@ -1410,7 +1436,27 @@
         return self._returningClause(queryGenerator, stmt, allTables)
 
 
+    def on(self, txn, *a, **kw):
+        """
+        Override to provide extra logic for L{Insert}s that return values on
+        databases that don't provide return values as part of their C{INSERT}
+        behavior.
+        """
+        result = super(_DMLStatement, self).on(txn, *a, **kw)
+        if self.Return is not None and txn.dialect == SQLITE_DIALECT:
+            table = self._returnAsList()[0].model.table
+            return Select(self._returnAsList(),
+                   # TODO: error reporting when 'return' includes columns
+                   # foreign to the primary table.
+                   From=TableSyntax(table),
+                   Where=ColumnSyntax(Column(table, "rowid",
+                                             SQLType("integer", None))) ==
+                         _sqliteLastInsertRowID()
+                   ).on(txn, *a, **kw)
+        return result
 
+
+
 def _convert(x):
     """
     Convert a value to an appropriate SQL AST node.  (Currently a simple
@@ -1426,6 +1472,12 @@
 class Update(_DMLStatement):
     """
     'update' statement
+
+    @ivar columnMap: A L{dict} mapping L{ColumnSyntax} objects to values to
+        change; values may be simple database values (such as L{str},
+        L{unicode}, L{datetime.datetime}, L{float}, L{int} etc) or L{Parameter}
+        instances.
+    @type columnMap: L{dict}
     """
 
     def __init__(self, columnMap, Where, Return=None):
@@ -1436,6 +1488,37 @@
         self.Return = Return
 
 
+    @inlineCallbacks
+    def on(self, txn, *a, **kw):
+        """
+        Override to provide extra logic for L{Update}s that return values on
+        databases that don't provide return values as part of their C{UPDATE}
+        behavior.
+        """
+        doExtra = self.Return is not None and txn.dialect == SQLITE_DIALECT
+        upcall = lambda: super(_DMLStatement, self).on(txn, *a, **kw)
+
+        if doExtra:
+            table = self._returnAsList()[0].model.table
+            rowidcol = ColumnSyntax(Column(table, "rowid",
+                                           SQLType("integer", None)))
+            prequery = Select([rowidcol], From=TableSyntax(table),
+                              Where=self.Where)
+            preresult = prequery.on(txn, *a, **kw)
+            before = yield preresult
+            yield upcall()
+            result = (yield Select(self._returnAsList(),
+                            # TODO: error reporting when 'return' includes
+                            # columns foreign to the primary table.
+                            From=TableSyntax(table),
+                            Where=reduce(lambda left, right: left.Or(right),
+                                         ((rowidcol == x) for [x] in before))
+                            ).on(txn, *a, **kw))
+            returnValue(result)
+        else:
+            returnValue((yield upcall()))
+
+
     def _toSQL(self, queryGenerator):
         """
         @return: a 'insert' statement with placeholders and arguments
@@ -1459,7 +1542,7 @@
                     for (c, v) in sortedColumns]
             )
         )
-        result.append(SQLFragment( ' where '))
+        result.append(SQLFragment(' where '))
         result.append(self.Where.subSQL(queryGenerator, allTables))
         return self._returningClause(queryGenerator, result, allTables)
 
@@ -1490,7 +1573,19 @@
         return self._returningClause(queryGenerator, result, allTables)
 
 
+    @inlineCallbacks
+    def on(self, txn, *a, **kw):
+        upcall = lambda: super(Delete, self).on(txn, *a, **kw)
+        if txn.dialect == SQLITE_DIALECT and self.Return is not None:
+            result = yield Select(self._returnAsList(), From=self.From,
+                                  Where=self.Where).on(txn, *a, **kw)
+            yield upcall()
+        else:
+            result = yield upcall()
+        returnValue(result)
 
+
+
 class _LockingStatement(_Statement):
     """
     A statement related to lock management, which implicitly has no results.

Copied: CalendarServer/trunk/twext/enterprise/dal/test/test_record.py (from rev 9688, CalendarServer/branches/users/glyph/q/twext/enterprise/dal/test/test_record.py)
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_record.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_record.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,247 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Test cases for L{twext.enterprise.dal.record}.
+"""
+
+import sqlite3
+
+from twisted.internet.defer import inlineCallbacks
+
+from twisted.trial.unittest import TestCase
+
+from twext.enterprise.dal.record import (
+    Record, fromTable, ReadOnly, NoSuchRecord
+)
+from twext.enterprise.dal.syntax import SQLITE_DIALECT
+
+from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
+from twext.enterprise.adbapi2 import ConnectionPool
+from twext.enterprise.dal.syntax import SchemaSyntax
+from twisted.internet.defer import gatherResults
+
+# from twext.enterprise.dal.syntax import
+
+
+sth = SchemaTestHelper()
+sth.id = lambda : __name__
+schemaString = """
+create table ALPHA (BETA integer primary key, GAMMA text);
+"""
+testSchema = SchemaSyntax(sth.schemaFromString(schemaString))
+
+
+
+class TestRecord(Record, fromTable(testSchema.ALPHA)):
+    """
+    A sample test record.
+    """
+
+
+
+class TestCRUD(TestCase):
+    """
+    Tests for creation, mutation, and deletion operations.
+    """
+
+    def setUp(self):
+        sqlitename = self.mktemp()
+        def connectionFactory(label=self.id()):
+            return sqlite3.connect(sqlitename)
+        con = connectionFactory()
+        con.execute(schemaString)
+        con.commit()
+        self.pool = ConnectionPool(connectionFactory, paramstyle='numeric',
+                                   dialect=SQLITE_DIALECT)
+        self.pool.startService()
+        self.addCleanup(self.pool.stopService)
+
+
+    @inlineCallbacks
+    def test_simpleLoad(self):
+        """
+        Loading an existing row from the database by its primary key will
+        populate its attributes from columns of the corresponding row in the
+        database.
+        """
+        txn = self.pool.connection()
+        yield txn.execSQL("insert into ALPHA values (:1, :2)", [234, "one"])
+        yield txn.execSQL("insert into ALPHA values (:1, :2)", [456, "two"])
+        rec = yield TestRecord.load(txn, 456)
+        self.assertIsInstance(rec, TestRecord)
+        self.assertEquals(rec.beta, 456)
+        self.assertEquals(rec.gamma, "two")
+        rec2 = yield TestRecord.load(txn, 234)
+        self.assertIsInstance(rec2, TestRecord)
+        self.assertEqual(rec2.beta, 234)
+        self.assertEqual(rec2.gamma, "one")
+
+
+    @inlineCallbacks
+    def test_simpleCreate(self):
+        """
+        When a record object is created, a row with matching column values will
+        be created in the database.
+        """
+        txn = self.pool.connection()
+        rec = yield TestRecord.create(txn, beta=3, gamma=u'epsilon')
+        self.assertEquals(rec.beta, 3)
+        self.assertEqual(rec.gamma, u'epsilon')
+        rows = yield txn.execSQL("select BETA, GAMMA from ALPHA")
+        self.assertEqual(rows, [tuple([3, u'epsilon'])])
+
+
+    @inlineCallbacks
+    def test_simpleDelete(self):
+        """
+        When a record object is deleted, a row with a matching primary key will
+        be created in the database.
+        """
+        txn = self.pool.connection()
+        def mkrow(beta, gamma):
+            return txn.execSQL("insert into ALPHA values (:1, :2)",
+                               [beta, gamma])
+        yield gatherResults([mkrow(123, u"one"), mkrow(234, u"two"),
+                             mkrow(345, u"three")])
+        tr = yield TestRecord.load(txn, 234)
+        yield tr.delete()
+        rows = yield txn.execSQL("select BETA, GAMMA from ALPHA order by BETA")
+        self.assertEqual(rows, [(123, u"one"), (345, u"three")])
+
+
+    @inlineCallbacks
+    def test_attributesArentMutableYet(self):
+        """
+        Changing attributes on a database object is not supported yet, because
+        it's not entirely clear when to flush the SQL to the database.
+        Instead, for the time being, use C{.update}.  When you attempt to set
+        an attribute, an error will be raised informing you of this fact, so
+        that the error is clear.
+        """
+        txn = self.pool.connection()
+        rec = yield TestRecord.create(txn, beta=7, gamma=u'what')
+        def setit():
+            rec.beta = 12
+        ro = self.assertRaises(ReadOnly, setit)
+        self.assertEqual(rec.beta, 7)
+        self.assertIn("SQL-backed attribute 'TestRecord.beta' is read-only. "
+                      "Use '.update(...)' to modify attributes.", str(ro))
+
+
+    @inlineCallbacks
+    def test_simpleUpdate(self):
+        """
+        L{Record.update} will change the values on the record and in te
+        database.
+        """
+        txn = self.pool.connection()
+        rec = yield TestRecord.create(txn, beta=3, gamma=u'epsilon')
+        yield rec.update(gamma=u'otherwise')
+        self.assertEqual(rec.gamma, u'otherwise')
+        yield txn.commit()
+        # Make sure that it persists.
+        txn = self.pool.connection()
+        rec = yield TestRecord.load(txn, 3)
+        self.assertEqual(rec.gamma, u'otherwise')
+
+
+    @inlineCallbacks
+    def test_simpleQuery(self):
+        """
+        L{Record.query} will allow you to query for a record by its class
+        attributes as columns.
+        """
+        txn = self.pool.connection()
+        for beta, gamma in [(123, u"one"), (234, u"two"), (345, u"three"),
+                            (356, u"three"), (456, u"four")]:
+            yield txn.execSQL("insert into ALPHA values (:1, :2)",
+                              [beta, gamma])
+        records = yield TestRecord.query(txn, TestRecord.gamma == u"three")
+        self.assertEqual(len(records), 2)
+        records.sort(key=lambda x: x.beta)
+        self.assertEqual(records[0].beta, 345)
+        self.assertEqual(records[1].beta, 356)
+
+
+    @inlineCallbacks
+    def test_all(self):
+        """
+        L{Record.all} will return all instances of the record, sorted by
+        primary key.
+        """
+        txn = self.pool.connection()
+        data = [(123, u"one"), (456, u"four"), (345, u"three"),
+                (234, u"two"), (356, u"three")]
+        for beta, gamma in data:
+            yield txn.execSQL("insert into ALPHA values (:1, :2)",
+                              [beta, gamma])
+        self.assertEqual(
+            [(x.beta, x.gamma) for x in (yield TestRecord.all(txn))],
+            sorted(data)
+        )
+
+
+    @inlineCallbacks
+    def test_orderedQuery(self):
+        """
+        L{Record.query} takes an 'order' argument which will allow the objects
+        returned to be ordered.
+        """
+        txn = self.pool.connection()
+        for beta, gamma in [(123, u"one"), (234, u"two"), (345, u"three"),
+                            (356, u"three"), (456, u"four")]:
+            yield txn.execSQL("insert into ALPHA values (:1, :2)",
+                              [beta, gamma])
+
+        records = yield TestRecord.query(txn, TestRecord.gamma == u"three",
+                                         TestRecord.beta)
+        self.assertEqual([record.beta for record in records], [345, 356])
+        records = yield TestRecord.query(txn, TestRecord.gamma == u"three",
+                                         TestRecord.beta, ascending=False)
+        self.assertEqual([record.beta for record in records], [356, 345])
+
+
+    @inlineCallbacks
+    def test_pop(self):
+        """
+        A L{Record} may be loaded and deleted atomically, with L{Record.pop}.
+        """
+        txn = self.pool.connection()
+        for beta, gamma in [(123, u"one"), (234, u"two"), (345, u"three"),
+                            (356, u"three"), (456, u"four")]:
+            yield txn.execSQL("insert into ALPHA values (:1, :2)",
+                              [beta, gamma])
+        rec = yield TestRecord.pop(txn, 234)
+        self.assertEqual(rec.gamma, u'two')
+        self.assertEqual((yield txn.execSQL("select count(*) from ALPHA "
+                                            "where BETA = :1", [234])),
+                         [tuple([0])])
+        yield self.failUnlessFailure(TestRecord.pop(txn, 234), NoSuchRecord)
+
+
+    def test_columnNamingConvention(self):
+        """
+        The naming convention maps columns C{LIKE_THIS} to be attributes
+        C{likeThis}.
+        """
+        self.assertEqual(Record.namingConvention(u"like_this"), "likeThis")
+        self.assertEqual(Record.namingConvention(u"LIKE_THIS"), "likeThis")
+        self.assertEqual(Record.namingConvention(u"LIKE_THIS_ID"), "likeThisID")
+
+
+
+

Modified: CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -30,14 +30,16 @@
 from twext.enterprise.dal.syntax import Function
 from twext.enterprise.dal.syntax import SchemaSyntax
 from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
-from twext.enterprise.ienterprise import POSTGRES_DIALECT, ORACLE_DIALECT
+from twext.enterprise.ienterprise import (POSTGRES_DIALECT, ORACLE_DIALECT,
+                                          SQLITE_DIALECT)
 from twext.enterprise.test.test_adbapi2 import ConnectionPoolHelper
 from twext.enterprise.test.test_adbapi2 import NetworkedPoolHelper
-from twext.enterprise.test.test_adbapi2 import resultOf
+from twext.enterprise.test.test_adbapi2 import resultOf, AssertResultHelper
 from twisted.internet.defer import succeed
 from twisted.trial.unittest import TestCase
 
 
+
 class _FakeTransaction(object):
     """
     An L{IAsyncTransaction} that provides the relevant metadata for SQL
@@ -56,7 +58,42 @@
     TIMESTAMP = 'for timestamps!'
 
 
+class CatchSQL(object):
+    """
+    L{IAsyncTransaction} emulator that records the SQL executed on it.
+    """
+    counter = 0
 
+    def __init__(self, dialect=SQLITE_DIALECT, paramstyle='numeric'):
+        self.execed = []
+        self.pendingResults = []
+        self.dialect = SQLITE_DIALECT
+        self.paramstyle = 'numeric'
+
+
+    def nextResult(self, result):
+        """
+        Make it so that the next result from L{execSQL} will be the argument.
+        """
+        self.pendingResults.append(result)
+
+
+    def execSQL(self, sql, args, rozrc):
+        """
+        Implement L{IAsyncTransaction} by recording C{sql} and C{args} in
+        C{self.execed}, and return a L{Deferred} firing either an integer or a
+        value pre-supplied by L{CatchSQL.nextResult}.
+        """
+        self.execed.append([sql, args])
+        self.counter += 1
+        if self.pendingResults:
+            result = self.pendingResults.pop(0)
+        else:
+            result = self.counter
+        return succeed(result)
+
+
+
 class NullTestingOracleTxn(object):
     """
     Fake transaction for testing oracle NULL behavior.
@@ -92,7 +129,7 @@
 
 
 
-class GenerationTests(ExampleSchemaHelper, TestCase):
+class GenerationTests(ExampleSchemaHelper, TestCase, AssertResultHelper):
     """
     Tests for syntactic helpers to generate SQL queries.
     """
@@ -623,7 +660,6 @@
         """
         L{SetExpression} in a From sub-select.
         """
-        
         # Simple UNION
         self.assertEquals(
             Select(
@@ -943,6 +979,137 @@
         )
 
 
+    def test_insertMultiReturnSQLite(self):
+        """
+        In SQLite's SQL dialect, there is no 'returning' clause, but given that
+        SQLite serializes all SQL transactions, you can rely upon 'select'
+        after a write operation to reliably give you exactly what was just
+        modified.  Therefore, although 'toSQL' won't include any indication of
+        the return value, the 'on' method will execute a 'select' statement
+        following the insert to retrieve the value.
+        """
+        insertStatement = Insert({self.schema.FOO.BAR: 39,
+                    self.schema.FOO.BAZ: 82},
+                   Return=(self.schema.FOO.BAR, self.schema.FOO.BAZ)
+        )
+        qg = lambda : QueryGenerator(SQLITE_DIALECT, NumericPlaceholder())
+        self.assertEquals(insertStatement.toSQL(qg()),
+            SQLFragment("insert into FOO (BAR, BAZ) values (:1, :2)",
+                        [39, 82])
+        )
+        result = []
+        csql = CatchSQL()
+        insertStatement.on(csql).addCallback(result.append)
+        self.assertEqual(result, [2])
+        self.assertEqual(
+            csql.execed,
+            [["insert into FOO (BAR, BAZ) values (:1, :2)", [39, 82]],
+             ["select BAR, BAZ from FOO where rowid = last_insert_rowid()", []]]
+        )
+
+
+    def test_insertNoReturnSQLite(self):
+        """
+        Insert a row I{without} a C{Return=} parameter should also work as
+        normal in sqlite.
+        """
+        statement = Insert({self.schema.FOO.BAR: 12,
+                            self.schema.FOO.BAZ: 48})
+        csql = CatchSQL()
+        statement.on(csql)
+        self.assertEqual(
+            csql.execed,
+            [["insert into FOO (BAR, BAZ) values (:1, :2)", [12, 48]]]
+        )
+
+
+    def test_updateReturningSQLite(self):
+        """
+        Since SQLite does not support the SQL 'returning' syntax extension, in
+        order to preserve the rows that will be modified during an UPDATE
+        statement, we must first find the rows that will be affected, then
+        update them, then return the rows that were affected.  Since we might
+        be changing even part of the primary key, we use the internal 'rowid'
+        column to uniquely and reliably identify rows in the sqlite database
+        that have been modified.
+        """
+        csql = CatchSQL()
+        stmt = Update({self.schema.FOO.BAR: 4321},
+                      Where=self.schema.FOO.BAZ == 1234,
+                      Return=self.schema.FOO.BAR)
+        csql.nextResult([["sample row id"]])
+        result = resultOf(stmt.on(csql))
+        # Three statements were executed; make sure that the result returned was
+        # the result of executing the 3rd (and final) one.
+        self.assertResultList(result, 3)
+        # Check that they were the right statements.
+        self.assertEqual(len(csql.execed), 3)
+        self.assertEqual(
+            csql.execed[0],
+            ["select rowid from FOO where BAZ = :1", [1234]]
+        )
+        self.assertEqual(
+            csql.execed[1],
+            ["update FOO set BAR = :1 where BAZ = :2", [4321, 1234]]
+        )
+        self.assertEqual(
+            csql.execed[2],
+            ["select BAR from FOO where rowid = :1", ["sample row id"]]
+        )
+
+
+    def test_updateReturningMultipleValuesSQLite(self):
+        """
+        When SQLite updates multiple values, it must embed the row ID of each
+        subsequent value into its second 'where' clause, as there is no way to
+        pass a list of values to a single statement..
+        """
+        csql = CatchSQL()
+        stmt = Update({self.schema.FOO.BAR: 4321},
+                      Where=self.schema.FOO.BAZ == 1234,
+                      Return=self.schema.FOO.BAR)
+        csql.nextResult([["one row id"], ["and another"], ["and one more"]])
+        result = resultOf(stmt.on(csql))
+        # Three statements were executed; make sure that the result returned was
+        # the result of executing the 3rd (and final) one.
+        self.assertResultList(result, 3)
+        # Check that they were the right statements.
+        self.assertEqual(len(csql.execed), 3)
+        self.assertEqual(
+            csql.execed[0],
+            ["select rowid from FOO where BAZ = :1", [1234]]
+        )
+        self.assertEqual(
+            csql.execed[1],
+            ["update FOO set BAR = :1 where BAZ = :2", [4321, 1234]]
+        )
+        self.assertEqual(
+            csql.execed[2],
+            ["select BAR from FOO where rowid = :1 or rowid = :2 or rowid = :3",
+             ["one row id", "and another", "and one more"]]
+        )
+
+
+    def test_deleteReturningSQLite(self):
+        """
+        When SQLite deletes a value, ...
+        """
+        csql = CatchSQL()
+        stmt = Delete(From=self.schema.FOO, Where=self.schema.FOO.BAZ == 1234,
+                      Return=self.schema.FOO.BAR)
+        result = resultOf(stmt.on(csql))
+        self.assertResultList(result, 1)
+        self.assertEqual(len(csql.execed), 2)
+        self.assertEqual(
+            csql.execed[0],
+            ["select BAR from FOO where BAZ = :1", [1234]]
+        )
+        self.assertEqual(
+            csql.execed[1],
+            ["delete from FOO where BAZ = :1", [1234]]
+        )
+
+
     def test_insertMismatch(self):
         """
         L{Insert} raises L{TableMismatch} if the columns specified aren't all

Modified: CalendarServer/trunk/twext/enterprise/ienterprise.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/ienterprise.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/enterprise/ienterprise.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -20,6 +20,16 @@
 
 __all__ = [
     "IAsyncTransaction",
+    "ISQLExecutor",
+    "ICommandBlock",
+    "IQueuer",
+    "IDerivedParameter",
+    "AlreadyFinishedError",
+    "ConnectionError",
+    "POSTGRES_DIALECT",
+    "SQLITE_DIALECT",
+    "ORACLE_DIALECT",
+    "ORACLE_TABLE_NAME_MAX",
 ]
 
 from zope.interface import Interface, Attribute
@@ -42,6 +52,7 @@
 
 POSTGRES_DIALECT = 'postgres-dialect'
 ORACLE_DIALECT = 'oracle-dialect'
+SQLITE_DIALECT = 'sqlite-dialect'
 ORACLE_TABLE_NAME_MAX = 30
 
 
@@ -102,6 +113,18 @@
         """
 
 
+    def postCommit(operation):
+        """
+        Perform the given operation only after this L{IAsyncTransaction}
+        commits.  These will be invoked before the L{Deferred} returned by
+        L{IAsyncTransaction.commit} fires.
+
+        @param operation: a 0-argument callable that may return a L{Deferred}.
+            If it does, then the subsequent operations added by L{postCommit}
+            will not fire until that L{Deferred} fires.
+        """
+
+
     def abort():
         """
         Roll back changes caused by this transaction.
@@ -111,6 +134,17 @@
         """
 
 
+    def postAbort(operation):
+        """
+        Invoke a callback after abort.
+
+        @see: L{IAsyncTransaction.postCommit}
+
+        @param operation: 0-argument callable, potentially returning a
+            L{Deferred}.
+        """
+
+
     def commandBlock():
         """
         Create an object which will cause the commands executed on it to be
@@ -218,3 +252,32 @@
 
         @return: C{None}
         """
+
+
+
+class IQueuer(Interface):
+    """
+    An L{IQueuer} can enqueue work for later execution.
+    """
+
+    def enqueueWork(self, transaction, workItemType, **kw):
+        """
+        Perform some work, eventually.
+
+        @param transaction: an L{IAsyncTransaction} within which to I{commit}
+            to doing the work.  Note that this work will likely be done later
+            (but depending on various factors, may actually be done within this
+            transaction as well).
+
+        @param workItemType: the type of work item to create.
+        @type workItemType: L{type}, specifically, a subtype of L{WorkItem
+            <twext.enterprise.queue.WorkItem>}
+
+        @param kw: The keyword parameters are relayed to C{workItemType.create}
+            to create an appropriately initialized item.
+
+        @return: a work proposal that allows tracking of the various phases of
+            completion of the work item.
+        @rtype: L{twext.enterprise.queue.WorkItem}
+        """
+

Copied: CalendarServer/trunk/twext/enterprise/queue.py (from rev 9688, CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py)
===================================================================
--- CalendarServer/trunk/twext/enterprise/queue.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/queue.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,1230 @@
+# -*- test-case-name: twext.enterprise.test.test_queue -*-
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+L{twext.enterprise.queue} is a task-queueing system for use by applications
+with multiple front-end servers talking to a single database instance, that
+want to defer and parallelize work that involves storing the results of
+computation.
+
+By enqueuing with L{twisted.enterprise.queue}, you may guarantee that the work
+will I{eventually} be done, and reliably commit to doing it in the future, but
+defer it if it does not need to be done I{now}.
+
+To pick a hypothetical example, let's say that you have a store which wants to
+issue a promotional coupon based on a customer loyalty program, in response to
+an administrator clicking on a button.  Determining the list of customers to
+send the coupon to is quick: a simple query will get you all their names.
+However, analyzing each user's historical purchase data is (A) time consuming
+and (B) relatively isolated, so it would be good to do that in parallel, and it
+would also be acceptable to have that happen at a later time, outside the
+critical path.
+
+Such an application might be implemented with this queueing system like so::
+
+    from twext.enterprise.queue import WorkItem, queueFromTransaction
+    from twext.enterprise.dal.parseschema import addSQLToSchema
+    from twext.enterprise.dal.syntax import SchemaSyntax
+
+    schemaModel = Schema()
+    addSQLToSchema('''
+        create table CUSTOMER (NAME varchar(255), ID integer primary key);
+        create table PRODUCT (NAME varchar(255), ID integer primary key);
+        create table PURCHASE (NAME varchar(255), WHEN timestamp,
+                               CUSTOMER_ID integer references CUSTOMER,
+                               PRODUCT_ID integer references PRODUCT;
+        create table COUPON_WORK (WORK_ID integer primary key,
+                                  CUSTOMER_ID integer references CUSTOMER);
+        create table COUPON (ID integer primary key,
+                            CUSTOMER_ID integer references customer,
+                            AMOUNT integer);
+    ''')
+    schema = SchemaSyntax(schemaModel)
+
+    class Coupon(Record, fromTable(schema.COUPON_WORK)):
+        pass
+
+    class CouponWork(WorkItem, fromTable(schema.COUPON_WORK)):
+        @inlineCallbacks
+        def doWork(self):
+            purchases = yield Select(schema.PURCHASE,
+                                     Where=schema.PURCHASE.CUSTOMER_ID
+                                     == self.customerID).on(self.transaction)
+            couponAmount = yield doSomeMathThatTakesAWhile(purchases)
+            yield Coupon.create(customerID=self.customerID,
+                                amount=couponAmount)
+
+    @inlineCallbacks
+    def makeSomeCoupons(txn):
+        # Note, txn was started before, will be committed later...
+        for customerID in (yield Select([schema.CUSTOMER.CUSTOMER_ID],
+                                        From=schema.CUSTOMER).on(txn)):
+            # queuer is a provider of IQueuer, of which there are several
+            # implementations in this module.
+            queuer.enqueueWork(txn, CouponWork, customerID=customerID)
+"""
+
+from socket import getfqdn
+from functools import wraps
+from os import getpid
+from datetime import datetime
+
+from zope.interface import implements
+
+from twisted.application.service import Service
+from twisted.internet.protocol import Factory
+from twisted.internet.defer import (
+    inlineCallbacks, returnValue, Deferred, succeed
+)
+from twisted.internet.endpoints import TCP4ClientEndpoint
+from twisted.protocols.amp import AMP, Command, Integer, Argument, String
+from twisted.python.reflect import qual
+from twisted.python import log
+
+from twext.enterprise.dal.syntax import TableSyntax, SchemaSyntax
+from twext.enterprise.dal.model import ProcedureCall
+from twext.enterprise.dal.syntax import NamedValue
+from twext.enterprise.dal.record import Record, fromTable
+from twisted.python.failure import Failure
+from twisted.internet.defer import passthru
+from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
+from twisted.internet.endpoints import TCP4ServerEndpoint
+from twext.enterprise.dal.syntax import Lock
+from twext.enterprise.ienterprise import IQueuer
+
+def makeNodeSchema(inSchema):
+    """
+    Create a self-contained schema for L{NodeInfo} to use.
+
+    @return: a schema with just the one table.
+    """
+    # Initializing this duplicate schema avoids a circular dependency, but this
+    # should really be accomplished with independent schema objects that the
+    # transaction is made aware of somehow.
+    NodeTable = Table(inSchema, 'NODE_INFO')
+    NodeTable.addColumn("HOSTNAME", SQLType("varchar", 255))
+    NodeTable.addColumn("PID", SQLType("integer", None))
+    NodeTable.addColumn("PORT", SQLType("integer", None))
+    NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
+        # Note: in the real data structure, this is actually a not-cleaned-up
+        # sqlparse internal data structure, but it *should* look closer to this.
+        ProcedureCall("timezone", ["UTC", NamedValue('CURRENT_TIMESTAMP')])
+    )
+    for column in NodeTable.columns:
+        NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+    NodeTable.primaryKey = [NodeTable.columnNamed("HOSTNAME"),
+                            NodeTable.columnNamed("PORT")]
+    return inSchema
+
+NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
+
+
+ at inlineCallbacks
+def inTransaction(transactionCreator, operation):
+    """
+    Perform the given operation in a transaction, committing or aborting as
+    required.
+
+    @param transactionCreator: a 0-arg callable that returns an
+        L{IAsyncTransaction}
+
+    @param operation: a 1-arg callable that takes an L{IAsyncTransaction} and
+        returns a value.
+
+    @return: a L{Deferred} that fires with C{operation}'s result or fails with
+        its error, unless there is an error creating, aborting or committing
+        the transaction.
+    """
+    txn = transactionCreator()
+    try:
+        result = yield operation(txn)
+    except:
+        f = Failure()
+        yield txn.abort()
+        returnValue(f)
+    else:
+        yield txn.commit()
+        returnValue(result)
+
+
+
+class TableSyntaxByName(Argument):
+    """
+    Serialize and deserialize L{TableSyntax} objects for an AMP protocol with
+    an attached schema.
+    """
+
+    def fromStringProto(self, inString, proto):
+        """
+        Convert the name of the table into a table, given a C{proto} with an
+        attached C{schema}.
+
+        @param inString: the name of a table, as utf-8 encoded bytes
+        @type inString: L{bytes}
+
+        @param proto: an L{SchemaAMP}
+        """
+        return TableSyntax(proto.schema.tableNamed(inString.decode("UTF-8")))
+
+
+    def toString(self, inObject):
+        """
+        Convert a L{TableSyntax} object into just its name for wire transport.
+
+        @param inObject: a table.
+        @type inObject: L{TableSyntax}
+
+        @return: the name of that table
+        @rtype: L{bytes}
+        """
+        return inObject.model.name.encode("UTF-8")
+
+
+
+class NodeInfo(Record, fromTable(NodeInfoSchema.NODE_INFO)):
+    """
+    A L{NodeInfo} is information about a currently-active Node process.
+    """
+
+    def endpoint(self, reactor):
+        """
+        Create an L{IStreamServerEndpoint} that will talk to the node process
+        that is described by this L{NodeInfo}.
+
+        @return: an endpoint that will connect to this host.
+        @rtype: L{IStreamServerEndpoint}
+        """
+        return TCP4ClientEndpoint(reactor, self.hostname, self.port)
+
+
+
+def abstract(thunk):
+    """
+    The decorated function is abstract.
+
+    @note: only methods are currently supported.
+    """
+    @classmethod
+    @wraps(thunk)
+    def inner(cls, *a, **k):
+        raise NotImplementedError(qual(cls) + " does not implement " +
+                                  thunk.func_name)
+    return inner
+
+
+
+class WorkItem(Record):
+    """
+    An item of work.
+
+    @ivar workID: the unique identifier (primary key) for items of this type.
+        There must be a corresponding column in the database.
+    @type workID: L{int}
+
+    @cvar created: the timestamp that a given item was created, or the column
+        describing its creation time, on the class.
+    @type created: L{datetime.datetime}
+    """
+
+    @abstract
+    def doWork(self):
+        """
+        Subclasses must implement this to actually perform the queued work.
+
+        This method will be invoked in a worker process.
+
+        This method does I{not} need to delete the row referencing it; that
+        will be taken care of by the job queueing machinery.
+        """
+
+
+    @classmethod
+    def forTable(cls, table):
+        """
+        Look up a work-item class given a particular L{TableSyntax}.  Factoring
+        this correctly may place it into L{twext.enterprise.record.Record}
+        instead; it is probably generally useful to be able to look up a mapped
+        class from a table.
+
+        @param table: the table to look up
+        @type table: L{twext.enterprise.dal.model.Table}
+
+        @return: the relevant subclass
+        @rtype: L{type}
+        """
+        for subcls in cls.__subclasses__():
+            if table == getattr(subcls, "table", None):
+                return subcls
+        raise KeyError("No mapped {0} class for {1}.".format(
+            cls, table
+        ))
+
+
+
+class PerformWork(Command):
+    """
+    Notify another process that it must do some work that has been persisted to
+    the database, by informing it of the table and the ID where said work has
+    been persisted.
+    """
+
+    arguments = [
+        ("table", TableSyntaxByName()),
+        ("workID", Integer()),
+    ]
+    response = []
+
+
+
+class ReportLoad(Command):
+    """
+    Notify another node of the total, current load for this whole node (all of
+    its workers).
+    """
+    arguments = [
+        ("load", Integer())
+    ]
+    response = []
+
+
+class IdentifyNode(Command):
+    """
+    Identify this node to its peer.  The connector knows which hostname it's
+    looking for, and which hostname it considers itself to be, only the
+    initiator (not the listener) issues this command.  This command is
+    necessary because if reverse DNS isn't set up perfectly, the listener may
+    not be able to identify its peer.
+    """
+
+    arguments = [
+        ("host", String()),
+        ("port", Integer()),
+    ]
+
+
+
+class SchemaAMP(AMP):
+    """
+    An AMP instance which also has a L{Schema} attached to it.
+
+    @ivar schema: The schema to look up L{TableSyntaxByName} arguments in.
+    @type schema: L{Schema}
+    """
+
+    def __init__(self, schema, boxReceiver=None, locator=None):
+        self.schema = schema
+        super(SchemaAMP, self).__init__(boxReceiver, locator)
+
+
+
+class ConnectionFromPeerNode(SchemaAMP):
+    """
+    A connection to a peer node.  Symmetric; since the 'client' and the
+    'server' both serve the same role, the logic is the same in every node.
+    """
+
+    def __init__(self, peerPool, boxReceiver=None, locator=None):
+        """
+        Initialize this L{ConnectionFromPeerNode} with a reference to a pool of
+        local workers.
+
+        @param localWorkerPool: the pool of local worker procesess that can
+            process queue work.
+        @type localWorkerPool: L{WorkerConnectionPool}
+
+        @see: L{AMP.__init__}
+        """
+        self.peerPool = peerPool
+        self.localWorkerPool = peerPool.workerPool
+        self._bonusLoad = 0
+        self._reportedLoad = 0
+        super(ConnectionFromPeerNode, self).__init__(peerPool.schema,
+                                                     boxReceiver, locator)
+
+
+    def reportCurrentLoad(self):
+        """
+        Report the current load for the local worker pool to this peer.
+        """
+        return self.callRemote(ReportLoad,
+                               load=self.localWorkerPool.totalLoad())
+
+
+    @ReportLoad.responder
+    def repotedLoad(self, load):
+        """
+        The peer reports its load.
+        """
+        self._reportedLoad = (load - self._bonusLoad)
+        return {}
+
+
+    def startReceivingBoxes(self, sender):
+        """
+        Connection is up and running; add this to the list of active peers.
+        """
+        r = super(ConnectionFromPeerNode, self).startReceivingBoxes(sender)
+        self.peerPool.addPeerConnection(self)
+        return r
+
+
+    def stopReceivingBoxes(self, reason):
+        """
+        The connection has shut down; remove this from the list of active
+        peers.
+        """
+        self.peerPool.removePeerConnection(self)
+        r = super(ConnectionFromPeerNode, self).stopReceivingBoxes(reason)
+        return r
+
+
+    def currentLoadEstimate(self):
+        """
+        What is the current load estimate for this peer?
+
+        @return: The number of full "slots", i.e. currently-being-processed
+            queue items (and other items which may contribute to this process's
+            load, such as currently-being-processed client requests).
+        @rtype: L{int}
+        """
+        return self._reportedLoad + self._bonusLoad
+
+
+    def performWork(self, table, workID):
+        """
+        A L{local worker connection <ConnectionFromWorker>} is asking this
+        specific peer node-controller process to perform some work, having
+        already determined that it's appropriate.
+
+        @param table: The table where work is waiting.
+        @type table: L{TableSyntax}
+
+        @param workID: The primary key identifier of the given work.
+        @type workID: L{int}
+
+        @return: a L{Deferred} firing with an empty dictionary when the work is
+            complete.
+        @rtype: L{Deferred} firing L{dict}
+        """
+        d = self.callRemote(PerformWork, table=table, workID=workID)
+        self._bonusLoad += 1
+        @d.addBoth
+        def performed(result):
+            self._bonusLoad -= 1
+            return result
+        return d
+
+
+    @PerformWork.responder
+    def dispatchToWorker(self, table, workID):
+        """
+        A remote peer node has asked this node to do some work; dispatch it to
+        a local worker on this node.
+
+        @param table: the table to work on.
+        @type table: L{TableSyntax}
+
+        @param workID: the identifier within the table.
+        @type workID: L{int}
+
+        @return: a L{Deferred} that fires when the work has been completed.
+        """
+        return self.localWorkerPool.performWork(table, workID)
+
+
+    @IdentifyNode.responder
+    def identifyPeer(self, host, port):
+        self.peerPool.mapPeer(host, port, self)
+
+
+
+class WorkerConnectionPool(object):
+    """
+    A pool of L{ConnectionFromWorker}s.
+
+    L{WorkerConnectionPool} also implements the same implicit protocol as a
+    L{ConnectionFromPeerNode}, but one that dispenses work to the local worker
+    processes rather than to a remote connection pool.
+    """
+
+    def __init__(self, maximumLoadPerWorker=0):
+        self.workers = []
+        self.maximumLoadPerWorker = maximumLoadPerWorker
+
+
+    def addWorker(self, worker):
+        """
+        Add a L{ConnectionFromWorker} to this L{WorkerConnectionPool} so that
+        it can be selected.
+        """
+        self.workers.append(worker)
+
+
+    def removeWorker(self, worker):
+        """
+        Remove a L{ConnectionFromWorker} from this L{WorkerConnectionPool} that
+        was previously added.
+        """
+        self.workers.remove(worker)
+
+
+    def hasAvailableCapacity(self):
+        """
+        Does this worker connection pool have any local workers who have spare
+        hasAvailableCapacity to process another queue item?
+        """
+        for worker in self.workers:
+            if worker.currentLoad() < self.maximumLoadPerWorker:
+                return True
+        return False
+
+
+    def totalLoad(self):
+        """
+        The total load of all currently connected workers.
+        """
+        return sum(worker.currentLoad() for worker in self.workers)
+
+
+    def _selectLowestLoadWorker(self):
+        """
+        Select the local connection with the lowest current load, or C{None} if
+        all workers are too busy.
+
+        @return: a worker connection with the lowest current load.
+        @rtype: L{ConnectionFromWorker}
+        """
+        return sorted(self.workers[:], key=lambda w: w.currentLoad())[0]
+
+
+    def performWork(self, table, workID):
+        """
+        Select a local worker that is idle enough to perform the given work,
+        then ask them to perform it.
+
+        @param table: The table where work is waiting.
+        @type table: L{TableSyntax}
+
+        @param workID: The primary key identifier of the given work.
+        @type workID: L{int}
+
+        @return: a L{Deferred} firing with an empty dictionary when the work is
+            complete.
+        @rtype: L{Deferred} firing L{dict}
+        """
+        return self._selectLowestLoadWorker().performWork(table, workID)
+
+
+
+class ConnectionFromWorker(SchemaAMP):
+    """
+    An individual connection from a worker, as seem from the master's
+    perspective.  L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
+
+    @ivar workerPool: The connection pool that this individual connection is
+        participating in.
+    @type workerPool: L{WorkerConnectionPool}
+    """
+
+    def __init__(self, schema, workerPool, boxReceiver=None, locator=None):
+        self.workerPool = workerPool
+        super(ConnectionFromWorker, self).__init__(schema, boxReceiver, locator)
+        self._load = 0
+
+
+    @property
+    def currentLoad(self):
+        """
+        What is the current load of this worker?
+        """
+        return self._load
+
+
+    def startReceivingBoxes(self, sender):
+        """
+        Start receiving AMP boxes from the peer.  Initialize all necessary
+        state.
+        """
+        result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
+        self.workerPool.addWorker(self)
+        return result
+
+
+    def stopReceivingBoxes(self, reason):
+        """
+        AMP boxes will no longer be received.
+        """
+        result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
+        self.workerPool.removeWorker(self)
+        return result
+
+
+    def performWork(self, table, workID):
+        """
+        Dispatch work to this worker.
+
+        @see: The responder for this should always be
+            L{ConnectionFromController.actuallyReallyExecuteWorkHere}.
+        """
+        d = self.callRemote(PerformWork, table=table, workID=workID)
+        self._load += 1
+        @d.addBoth
+        def f(result):
+            self._load -= 1
+            return result
+        return d
+
+
+
+class ConnectionFromController(SchemaAMP):
+    """
+    A L{ConnectionFromController} is the connection to a node-controller
+    process, in a worker process.  It processes requests from its own
+    controller to do work.  It is the opposite end of the connection from
+    L{ConnectionFromWorker}.
+    """
+    implements(IQueuer)
+
+    def __init__(self, transactionFactory, schema, whenConnected,
+                 boxReceiver=None, locator=None):
+        super(ConnectionFromController, self).__init__(schema,
+                                                       boxReceiver, locator)
+        self.transactionFactory = transactionFactory
+        self.whenConnected = whenConnected
+
+
+    def startReceivingBoxes(self, sender):
+        super(ConnectionFromController, self).startReceivingBoxes(sender)
+        self.whenConnected(self)
+
+
+    def choosePerformer(self):
+        """
+        To conform with L{WorkProposal}'s expectations, which may run in either
+        a controller (against a L{PeerConnectionPool}) or in a worker (against
+        a L{ConnectionFromController}), this is implemented to always return
+        C{self}, since C{self} is also an object that has a C{performWork}
+        method.
+        """
+        return succeed(self)
+
+
+    def performWork(self, table, workID):
+        """
+        Ask the controller to perform some work on our behalf.
+        """
+        return self.callRemote(PerformWork, table=table, workID=workID)
+
+
+    def enqueueWork(self, txn, workItemType, **kw):
+        """
+        There is some work to do.  Do it, someplace else, ideally in parallel.
+        Later, let the caller know that the work has been completed by firing a
+        L{Deferred}.
+
+        @param workItemType: The type of work item to be enqueued.
+        @type workItemType: A subtype of L{WorkItem}
+
+        @param kw: The parameters to construct a work item.
+        @type kw: keyword parameters to C{workItemType.create}, i.e.
+            C{workItemType.__init__}
+
+        @return: an object that can track the enqueuing and remote execution of
+            this work.
+        @rtype: L{WorkProposal}
+        """
+        wp = WorkProposal(self, txn, workItemType, kw)
+        wp._start()
+        return wp
+
+
+    @PerformWork.responder
+    def actuallyReallyExecuteWorkHere(self, table, workID):
+        """
+        This is where it's time to actually do the work.  The controller
+        process has instructed this worker to do it; so, look up the data in
+        the row, and do it.
+        """
+        @inlineCallbacks
+        def work(txn):
+            workItemClass = WorkItem.forTable(table)
+            workItem = yield workItemClass.load(txn, workID)
+            # TODO: what if we fail?  error-handling should be recorded
+            # someplace, the row should probably be marked, re-tries should be
+            # triggerable administratively.
+            yield workItem.delete()
+            # TODO: verify that workID is the primary key someplace.
+            yield workItem.doWork()
+            returnValue({})
+        return inTransaction(self.transactionFactory, work)
+
+
+
+class WorkerFactory(Factory, object):
+    """
+    Factory, to be used as the client to connect from the worker to the
+    controller.
+    """
+
+    def __init__(self, transactionFactory, schema, whenConnected):
+        """
+        Create a L{WorkerFactory} with a transaction factory and a schema.
+        """
+        self.transactionFactory = transactionFactory
+        self.schema = schema
+        self.whenConnected = whenConnected
+
+
+    def buildProtocol(self, addr):
+        """
+        Create a L{ConnectionFromController} connected to the
+        transactionFactory and store.
+        """
+        return ConnectionFromController(self.transactionFactory, self.schema,
+                                        self.whenConnected)
+
+
+
+class TransactionFailed(Exception):
+    """
+    A transaction failed.
+    """
+
+
+
+def _cloneDeferred(d):
+    """
+    Make a new Deferred, adding callbacks to C{d}.
+
+    @return: another L{Deferred} that fires with C{d's} result when C{d} fires.
+    @rtype: L{Deferred}
+    """
+    d2 = Deferred()
+    d.chainDeferred(d2)
+    return d2
+
+
+
+class WorkProposal(object):
+    """
+    A L{WorkProposal} is a proposal for work that will be executed, perhaps on
+    another node, perhaps in the future.
+
+    @ivar pool: the connection pool which this L{WorkProposal} will use to
+        submit its work.
+    @type pool: L{PeerConnectionPool}
+
+    @ivar txn: The transaction where the work will be enqueued.
+    @type txn: L{IAsyncTransaction}
+
+    @ivar workItemType: The type of work to be enqueued by this L{WorkProposal}
+    @type workItemType: L{WorkItem} subclass
+
+    @ivar kw: The keyword arguments to pass to C{self.workItemType.create} to
+        construct it.
+    @type kw: L{dict}
+    """
+
+    def __init__(self, pool, txn, workItemType, kw):
+        self.pool = pool
+        self.txn = txn
+        self.workItemType = workItemType
+        self.kw = kw
+        self._whenProposed = Deferred()
+        self._whenExecuted = Deferred()
+        self._whenCommitted = Deferred()
+
+
+    def _start(self):
+        """
+        Execute this L{WorkProposal} by creating the work item in the database,
+        waiting for the transaction where that addition was completed to
+        commit, and asking the local node controller process to do the work.
+        """
+        @passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
+        def created(item):
+            self._whenProposed.callback(None)
+            @self.txn.postCommit
+            def whenDone():
+                self._whenCommitted.callback(None)
+                @passthru(self.pool.choosePerformer().addCallback)
+                def performerChosen(performer):
+                    @passthru(performer.performWork(item.table, item.workID))
+                    def performed(result):
+                        self._whenExecuted.callback(None)
+                    @performed.addErrback
+                    def notPerformed(why):
+                        self._whenExecuted.errback(why)
+                @performerChosen.addErrback
+                def notChosen(whyNot):
+                    self._whenExecuted.errback(whyNot)
+            @self.txn.postAbort
+            def whenFailed():
+                self._whenCommitted.errback(TransactionFailed)
+
+
+    def whenExecuted(self):
+        """
+        Let the caller know when the proposed work has been fully executed.
+
+        @note: The L{Deferred} returned by C{whenExecuted} should be used with
+            extreme caution.  If an application decides to do any
+            database-persistent work as a result of this L{Deferred} firing,
+            that work I{may be lost} as a result of a service being normally
+            shut down between the time that the work is scheduled and the time
+            that it is executed.  So, the only things that should be added as
+            callbacks to this L{Deferred} are those which are ephemeral, in
+            memory, and reflect only presentation state associated with the
+            user's perception of the completion of work, not logical chains of
+            work which need to be completed in sequence; those should all be
+            completed within the transaction of the L{WorkItem.doWork} that
+            gets executed.
+
+        @return: a L{Deferred} that fires with C{None} when the work has been
+            completed remotely.
+        """
+        return _cloneDeferred(self._whenExecuted)
+
+
+    def whenProposed(self):
+        """
+        Let the caller know when the work has been proposed; i.e. when the work
+        is first transmitted to the database.
+
+        @return: a L{Deferred} that fires with C{None} when the relevant
+            commands have been sent to the database to create the L{WorkItem},
+            and fails if those commands do not succeed for some reason.
+        """
+        return _cloneDeferred(self._whenProposed)
+
+
+    def whenCommitted(self):
+        """
+        Let the caller know when the work has been committed to; i.e. when the
+        transaction where the work was proposed has been committed to the
+        database.
+
+        @return: a L{Deferred} that fires with C{None} when the relevant
+            transaction has been committed, or fails if the transaction is not
+            committed for any reason.
+        """
+        return _cloneDeferred(self._whenCommitted)
+
+
+
+class PeerConnectionPool(Service, object):
+    """
+    Each node has a L{PeerConnectionPool} connecting it to all the other nodes
+    currently active on the same database.
+
+    @ivar hostname: The hostname where this node process is running, as
+        reported by the local host's configuration.  Possibly this should be
+        obtained via C{config.ServerHostName} instead of C{socket.getfqdn()};
+        although hosts within a cluster may be configured with the same
+        C{ServerHostName}; TODO need to confirm.
+    @type hostname: L{bytes}
+
+    @ivar thisProcess: a L{NodeInfo} representing this process, which is
+        initialized when this L{PeerConnectionPool} service is started via
+        C{startService}.  May be C{None} if this service is not fully started
+        up or if it is shutting down.
+    @type thisProcess: L{NodeInfo}
+
+    @ivar queueProcessTimeout: The maximum amount of time allowed for a queue
+        item to be processed.  By default, 10 minutes.
+    @type queueProcessTimeout: L{float} (in seconds)
+
+    @ivar queueDelayedProcessInterval: The amount of time between database
+        pings, i.e. checks for over-due queue items that might have been
+        orphaned by a controller process that died mid-transaction.  This is
+        how often the shared database should be pinged by I{all} nodes (i.e.,
+        all controller processes, or each instance of L{PeerConnectionPool});
+        each individual node will ping commensurately less often as more nodes
+        join the database.
+    @type queueDelayedProcessInterval: L{float} (in seconds)
+
+    @ivar reactor: The reactor used for scheduling timed events.
+    @type reactor: L{IReactorTime} provider.
+
+    @ivar peers: The list of currently connected peers.
+    @type peers: L{list} of L{PeerConnectionPool}
+    """
+    implements(IQueuer)
+
+    getfqdn = staticmethod(getfqdn)
+    getpid = staticmethod(getpid)
+
+    queueProcessTimeout = (10.0 * 60.0)
+    queueDelayedProcessInterval = (60.0)
+
+    def __init__(self, reactor, transactionFactory, ampPort, schema):
+        """
+        Initialize a L{PeerConnectionPool}.
+
+        @param ampPort: The AMP TCP port number to listen on for inter-host
+            communication.  This must be an integer (and not, say, an endpoint,
+            or an endpoint description) because we need to communicate it to
+            the other peers in the cluster in a way that will be meaningful to
+            them as clients.
+        @type ampPort: L{int}
+
+        @param transactionFactory: a 0- or 1-argument callable that produces an
+            L{IAsyncTransaction}
+
+        @param schema: The schema which contains all the tables associated with
+            the L{WorkItem}s that this L{PeerConnectionPool} will process.
+        @type schema: L{Schema}
+        """
+        self.reactor = reactor
+        self.transactionFactory = transactionFactory
+        self.hostname = self.getfqdn()
+        self.pid = self.getpid()
+        self.ampPort = ampPort
+        self.thisProcess = None
+        self.workerPool = WorkerConnectionPool()
+        self.peers = []
+        self.mappedPeers = {}
+        self.schema = schema
+        self._startingUp = None
+        self._listeningPortObject = None
+        self._lastSeenTotalNodes = 1
+        self._lastSeenNodeIndex = 1
+
+
+    def addPeerConnection(self, peer):
+        """
+        Add a L{ConnectionFromPeerNode} to the active list of peers.
+        """
+        self.peers.append(peer)
+
+
+    def workerListenerFactory(self):
+        """
+        Factory that listens for connections from workers.
+        """
+        f = Factory()
+        f.buildProtocol = lambda addr: ConnectionFromWorker(self.schema,
+                                                            self.workerPool)
+        return f
+
+
+    def removePeerConnection(self, peer):
+        """
+        Remove a L{ConnectionFromPeerNode} to the active list of peers.
+        """
+        self.peers.remove(peer)
+
+
+    def choosePerformer(self):
+        """
+        Choose a peer to distribute work to based on the current known slot
+        occupancy of the other nodes.  Note that this will prefer distributing
+        work to local workers until the current node is full, because that
+        should be lower-latency.  Also, if no peers are available, work will be
+        submitted locally even if the worker pool is already over-subscribed.
+
+        @return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
+            with the chosen 'peer', i.e. object with a C{performWork} method,
+            as soon as one is available.  Normally this will be synchronous,
+            but we need to account for the possibility that we may need to
+            connect to other hosts.
+        @rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
+            L{ConnectionFromPeerNode} or L{WorkerConnectionPool}
+        """
+        if not self.workerPool.hasAvailableCapacity() and self.peers:
+            return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
+        else:
+            return succeed(self.workerPool)
+
+
+    def enqueueWork(self, txn, workItemType, **kw):
+        """
+        There is some work to do.  Do it, someplace else, ideally in parallel.
+        Later, let the caller know that the work has been completed by firing a
+        L{Deferred}.
+
+        @param workItemType: The type of work item to be enqueued.
+        @type workItemType: A subtype of L{WorkItem}
+
+        @param kw: The parameters to construct a work item.
+        @type kw: keyword parameters to C{workItemType.create}, i.e.
+            C{workItemType.__init__}
+
+        @return: an object that can track the enqueuing and remote execution of
+            this work.
+        @rtype: L{WorkProposal}
+        """
+        wp = WorkProposal(self, txn, workItemType, kw)
+        wp._start()
+        return wp
+
+
+    def allWorkItemTypes(self):
+        """
+        Load all the L{WorkItem} types that this node can process and return
+        them.
+
+        @return: L{list} of L{type}
+        """
+        # TODO: For completeness, this may need to involve a plugin query to
+        # make sure that all WorkItem subclasses are imported first.
+        return WorkItem.__subclasses__()
+
+
+    def totalNumberOfNodes(self):
+        """
+        How many nodes are there, total?
+
+        @return: the maximum number of other L{PeerConnectionPool} instances
+            that may be connected to the database described by
+            C{self.transactionFactory}.  Note that this is not the current count
+            by connectivity, but the count according to the database.
+        @rtype: L{int}
+        """
+        # TODO
+        return self._lastSeenTotalNodes
+
+
+    def nodeIndex(self):
+        """
+        What ordinal does this node, i.e. this instance of
+        L{PeerConnectionPool}, occupy within the ordered set of all nodes
+        connected to the database described by C{self.transactionFactory}?
+
+        @return: the index of this node within the total collection.  For
+            example, if this L{PeerConnectionPool} is 6 out of 30, this method
+            will return C{6}.
+        @rtype: L{int}
+        """
+        # TODO
+        return self._lastSeenNodeIndex
+
+
+    def _periodicLostWorkCheck(self):
+        """
+        Periodically, every node controller has to check to make sure that work
+        hasn't been dropped on the floor by someone.  In order to do that it
+        queries each work-item table.
+        """
+        @inlineCallbacks
+        def workCheck(txn):
+
+            nodes = [(node.hostname, node.port) for node in
+                     (yield self.activeNodes(txn))]
+            nodes.sort()
+            self._lastSeenTotalNodes = len(nodes)
+            self._lastSeenNodeIndex = nodes.index((self.thisProcess.hostname,
+                                                   self.thisProcess.port))
+            for itemType in self.allWorkItemTypes():
+                for overdueItem in (
+                        yield itemType.query(
+                            txn, itemType.created > self.queueProcessTimeout
+                    )):
+                    peer = yield self.choosePerformer()
+                    yield peer.performWork(overdueItem.table,
+                                           overdueItem.workID)
+        return inTransaction(self.transactionFactory, workCheck)
+
+
+    _currentWorkDeferred = None
+    _lostWorkCheckCall = None
+
+    def _lostWorkCheckLoop(self):
+        """
+        While the service is running, keep checking for any overdue / lost work
+        items and re-submit them to the cluster for processing.  Space out
+        those checks in time based on the size of the cluster.
+        """
+        self._lostWorkCheckCall = None
+        @passthru(self._periodicLostWorkCheck().addErrback(log.err)
+                  .addCallback)
+        def scheduleNext(result):
+            self._currentWorkDeferred = None
+            if not self.running:
+                return
+            index = self.nodeIndex()
+            now = self.reactor.seconds()
+
+            interval = self.queueDelayedProcessInterval
+            count = self.totalNumberOfNodes()
+            when = (now - (now % interval)) + (interval * (count + index))
+            delay = when - now
+            self._lostWorkCheckCall = self.reactor.callLater(
+                delay, self._lostWorkCheckLoop
+            )
+        self._currentWorkDeferred = scheduleNext
+
+
+    def startService(self):
+        """
+        Register ourselves with the database and establish all outgoing
+        connections to other servers in the cluster.
+        """
+        @inlineCallbacks
+        def startup(txn):
+            endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+            f = Factory()
+            f.buildProtocol = self.createPeerConnection
+            # If this fails, the failure mode is going to be ugly, just like all
+            # conflicted-port failures.  But, at least it won't proceed.
+            yield endpoint.listen(f)
+            yield Lock.exclusive(NodeInfo.table).on(txn)
+            nodes = yield self.activeNodes(txn)
+            selves = [node for node in nodes
+                      if ((node.hostname == self.hostname) and
+                          (node.port == self.ampPort))]
+            if selves:
+                self.thisProcess = selves[0]
+                nodes.remove(self.thisProcess)
+                yield self.thisProcess.update(pid=self.pid,
+                                              time=datetime.now())
+            else:
+                self.thisProcess = yield NodeInfo.create(
+                    txn, hostname=self.hostname, port=self.ampPort,
+                    pid=self.pid, time=datetime.now()
+                )
+            for node in nodes:
+                self._startConnectingTo(node)
+
+        self._startingUp = inTransaction(self.transactionFactory, startup)
+        @self._startingUp.addBoth
+        def done(result):
+            self._startingUp = None
+            return result
+
+
+    @inlineCallbacks
+    def stopService(self):
+        """
+        Stop this service, terminating any incoming or outgoing connections.
+        """
+        yield super(PeerConnectionPool, self).stopService()
+        if self._startingUp is not None:
+            yield self._startingUp
+        if self._listeningPortObject is not None:
+            yield self._listeningPortObject.stopListening()
+        if self._lostWorkCheckCall is not None:
+            self._lostWorkCheckCall.cancel()
+        if self._currentWorkDeferred is not None:
+            yield self._currentWorkDeferred
+        for peer in self.peers:
+            peer.transport.loseConnection()
+
+
+    def activeNodes(self, txn):
+        """
+        Load information about all other nodes.
+        """
+        return NodeInfo.all(txn)
+
+
+    def mapPeer(self, host, port, peer):
+        """
+        A peer has been identified as belonging to the given host/port
+        combination.  Disconnect any other peer that claims to be connected for
+        the same peer.
+        """
+        # if (host, port) in self.mappedPeers:
+            # TODO: think about this for race conditions
+            # self.mappedPeers.pop((host, port)).transport.loseConnection()
+        self.mappedPeers[(host, port)] = peer
+
+
+    def _startConnectingTo(self, node):
+        """
+        Start an outgoing connection to another master process.
+
+        @param node: a description of the master to connect to.
+        @type node: L{NodeInfo}
+        """
+        f = Factory()
+        f.buildProtocol = self.createPeerConnection
+        @passthru(node.endpoint(self.reactor).connect(f).addCallback)
+        def connected(proto):
+            self.mapPeer(node, proto)
+            proto.callRemote(IdentifyNode, self.thisProcess)
+
+
+    def createPeerConnection(self, addr):
+        return ConnectionFromPeerNode(self)
+
+
+
+class ImmediateWorkProposal(object):
+    """
+    Like L{WorkProposal}, but for items that must be executed immediately
+    because no real queue is set up yet.
+
+    @see: L{WorkProposal}, L{NullQueuer.enqueueWork}
+    """
+    def __init__(self, proposed, done):
+        self.proposed = proposed
+        self.done = done
+
+
+    def whenExecuted(self):
+        return _cloneDeferred(self.done)
+
+
+    def whenProposed(self):
+        return _cloneDeferred(self.proposed)
+
+
+    def whenCommitted(self):
+        return _cloneDeferred(self.done)
+
+
+
+class NullQueuer(object):
+    """
+    When work is enqueued with this queuer, it is just executed immediately,
+    within the same transaction.  While this is technically correct, it is not
+    very efficient.
+    """
+    implements(IQueuer)
+
+    def enqueueWork(self, txn, workItemType, **kw):
+        """
+        Do this work immediately.
+
+        @see: L{PeerConnectionPool.enqueueWork}
+
+        @return: a pseudo work proposal, since everything completes at the same
+            time.
+        @rtype: L{ImmediateWorkProposal}
+        """
+        proposed = Deferred()
+        done = Deferred()
+        @inlineCallbacks
+        def doit():
+            item = yield self.workItemType.create(self.txn, **self.kw)
+            proposed.callback(True)
+            yield item.delete()
+            yield item.doWork()
+        @txn.postCommit
+        def committed():
+            done.callback(True)
+        @txn.postAbort
+        def aborted():
+            tf = TransactionFailed()
+            done.errback(tf)
+            if not proposed.called:
+                proposed.errback(tf)
+        return ImmediateWorkProposal(proposed, done)
+
+
+

Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -19,19 +19,19 @@
 """
 
 from itertools import count
+from Queue import Empty
 
 from zope.interface.verify import verifyClass, verifyObject
 from zope.interface.declarations import implements
 
 from twisted.python.threadpool import ThreadPool
+from twisted.python.failure import Failure
 
 from twisted.trial.unittest import TestCase
 
-from twisted.internet.defer import execute
 from twisted.internet.task import Clock
 
 from twisted.internet.interfaces import IReactorThreads
-from twisted.internet.defer import Deferred
 
 from twisted.test.proto_helpers import StringTransport
 
@@ -45,6 +45,7 @@
 from twext.enterprise.adbapi2 import FailsafeException
 from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
 from twext.enterprise.adbapi2 import ConnectionPool
+from twext.internet.threadutils import ThreadHolder
 
 
 def resultOf(deferred, propagate=False):
@@ -65,6 +66,22 @@
 
 
 
+class AssertResultHelper(object):
+    """
+    Mixin for asserting about synchronous Deferred results.
+    """
+
+    def assertResultList(self, resultList, expected):
+        if not resultList:
+            self.fail("No result; Deferred didn't fire yet.")
+        else:
+            if isinstance(resultList[0], Failure):
+                resultList[0].raiseException()
+            else:
+                self.assertEqual(resultList, [expected])
+
+
+
 class Child(object):
     """
     An object with a L{Parent}, in its list of C{children}.
@@ -326,59 +343,75 @@
 
 
 
-class FakeThreadHolder(object):
+class FakeThreadHolder(ThreadHolder):
     """
-    Run things submitted to this ThreadHolder on the main thread, so that
+    Run things to submitted this ThreadHolder on the main thread, so that
     execution is easier to control.
     """
 
     def __init__(self, test):
+        super(FakeThreadHolder, self).__init__(self)
+        self.test = test
         self.started = False
         self.stopped = False
-        self.test = test
-        self.queue = []
+        self._workerIsRunning = False
 
 
     def start(self):
-        """
-        Mark this L{FakeThreadHolder} as not started.
-        """
         self.started = True
+        return super(FakeThreadHolder, self).start()
 
 
     def stop(self):
-        """
-        Mark this L{FakeThreadHolder} as stopped.
-        """
-        def stopped(nothing):
-            self.stopped = True
-        return self.submit(lambda : None).addCallback(stopped)
+        result = super(FakeThreadHolder, self).stop()
+        self.stopped = True
+        return result
 
 
-    def submit(self, work):
+    @property
+    def _q(self):
+        return self._q_
+
+
+    @_q.setter
+    def _q(self, newq):
+        if newq is not None:
+            oget = newq.get
+            newq.get = lambda: oget(timeout=0)
+            oput = newq.put
+            def putit(x):
+                p = oput(x)
+                if not self.test.paused:
+                    self.flush()
+                return p
+            newq.put = putit
+        self._q_ = newq
+
+
+    def callFromThread(self, f, *a, **k):
+        result = f(*a, **k)
+        return result
+
+
+    def callInThread(self, f, *a, **k):
         """
-        Call the function (or queue it)
+        This should be called only once, to start the worker function that
+        dedicates a thread to this L{ThreadHolder}.
         """
-        if self.test.paused:
-            d = Deferred()
-            self.queue.append((d, work))
-            return d
-        else:
-            return execute(work)
+        self._workerIsRunning = True
 
 
     def flush(self):
         """
         Fire all deferreds previously returned from submit.
         """
-        self.queue, queue = [], self.queue
-        for (d, work) in queue:
-            try:
-                result = work()
-            except:
-                d.errback()
+        try:
+            while self._workerIsRunning and self._qpull():
+                pass
             else:
-                d.callback(result)
+                self._workerIsRunning = False
+        except Empty:
+            pass
 
 
 
@@ -525,7 +558,7 @@
 
 
 
-class ConnectionPoolTests(ConnectionPoolHelper, TestCase):
+class ConnectionPoolTests(ConnectionPoolHelper, TestCase, AssertResultHelper):
     """
     Tests for L{ConnectionPool}.
     """
@@ -634,7 +667,7 @@
         self.assertEquals(len(errors), 1)
         stopd = []
         self.pool.stopService().addBoth(stopd.append)
-        self.assertEquals([None], stopd)
+        self.assertResultList(stopd, None)
         self.assertEquals(self.clock.calls, [])
         [holder] = self.holders
         self.assertEquals(holder.started, True)
@@ -643,8 +676,8 @@
 
     def test_shutdownDuringAttemptSuccess(self):
         """
-        If L{ConnectionPool.stopService} is called while a connection attempt is
-        outstanding, the resulting L{Deferred} won't be fired until the
+        If L{ConnectionPool.stopService} is called while a connection attempt
+        is outstanding, the resulting L{Deferred} won't be fired until the
         connection attempt has finished; in this case, succeeded.
         """
         self.pauseHolders()
@@ -653,7 +686,7 @@
         self.pool.stopService().addBoth(stopd.append)
         self.assertEquals(stopd, [])
         self.flushHolders()
-        self.assertEquals(stopd, [None])
+        self.assertResultList(stopd, None)
         [holder] = self.holders
         self.assertEquals(holder.started, True)
         self.assertEquals(holder.stopped, True)
@@ -661,8 +694,8 @@
 
     def test_shutdownDuringAttemptFailed(self):
         """
-        If L{ConnectionPool.stopService} is called while a connection attempt is
-        outstanding, the resulting L{Deferred} won't be fired until the
+        If L{ConnectionPool.stopService} is called while a connection attempt
+        is outstanding, the resulting L{Deferred} won't be fired until the
         connection attempt has finished; in this case, failed.
         """
         self.factory.defaultFail()
@@ -674,7 +707,7 @@
         self.flushHolders()
         errors = self.flushLoggedErrors(FakeConnectionError)
         self.assertEquals(len(errors), 1)
-        self.assertEquals(stopd, [None])
+        self.assertResultList(stopd, None)
         [holder] = self.holders
         self.assertEquals(holder.started, True)
         self.assertEquals(holder.stopped, True)
@@ -699,7 +732,7 @@
         self.assertEquals(stopResult, [])
         self.flushHolders()
         #self.assertEquals(abortResult, [None])
-        self.assertEquals(stopResult, [None])
+        self.assertResultList(stopResult, None)
 
 
     def test_stopServiceWithSpooled(self):
@@ -845,10 +878,10 @@
         abortResult = self.resultOf(it.abort())
 
         # steal it from the queue so we can do it out of order
-        d, work = self.holders[0].queue.pop()
+        d, work = self.holders[0]._q.get()
         # that should be the only work unit so don't continue if something else
         # got in there
-        self.assertEquals(self.holders[0].queue, [])
+        self.assertEquals(list(self.holders[0]._q.queue), [])
         self.assertEquals(len(self.holders), 1)
         self.flushHolders()
         stopResult = self.resultOf(self.pool.stopService())

Modified: CalendarServer/trunk/twext/internet/threadutils.py
===================================================================
--- CalendarServer/trunk/twext/internet/threadutils.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/internet/threadutils.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -45,22 +45,32 @@
         """
         Worker function which runs in a non-reactor thread.
         """
-        while True:
-            work = self._q.get()
-            if work is _DONE:
-                def finishStopping():
-                    self._state = _STATE_STOPPED
-                    self._q = None
-                    s = self._stopper
-                    self._stopper = None
-                    s.callback(None)
-                self._reactor.callFromThread(finishStopping)
-                return
-            self._oneWorkUnit(*work)
+        while self._qpull():
+            pass
 
 
+    def _qpull(self):
+        """
+        Pull one item off the queue and react appropriately.
+
+        Return whether or not to keep going.
+        """
+        work = self._q.get()
+        if work is _DONE:
+            def finishStopping():
+                self._state = _STATE_STOPPED
+                self._q = None
+                s = self._stopper
+                self._stopper = None
+                s.callback(None)
+            self._reactor.callFromThread(finishStopping)
+            return False
+        self._oneWorkUnit(*work)
+        return True
+
+
     def _oneWorkUnit(self, deferred, instruction):
-        try: 
+        try:
             result = instruction()
         except:
             etype, evalue, etb = sys.exc_info()
@@ -80,6 +90,8 @@
 
         @return: L{Deferred} that fires with the result of L{work}
         """
+        if self._state != _STATE_RUNNING:
+            raise RuntimeError("not running")
         d = Deferred()
         self._q.put((d, work))
         return d

Modified: CalendarServer/trunk/txdav/base/datastore/file.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/file.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/txdav/base/datastore/file.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -187,15 +187,19 @@
                         self.log_error("Cannot undo DataStoreTransaction")
                 raise
 
-        for (operation, ignored) in self._postCommitOperations:
+        for operation in self._postCommitOperations:
             operation()
 
-    def postCommit(self, operation, immediately=False):
-        self._postCommitOperations.append((operation, immediately))
 
+    def postCommit(self, operation):
+        self._postCommitOperations.append(operation)
+
+
     def postAbort(self, operation):
         self._postAbortOperations.append(operation)
 
+
+
 class FileMetaDataMixin(object):
     
     implements(IDataStoreObject)

Modified: CalendarServer/trunk/txdav/base/datastore/util.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/util.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/txdav/base/datastore/util.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -70,13 +70,19 @@
 
 
     def setAfterCommit(self, transaction, key, value):
-        transaction.postCommit(lambda: self.set(key, value), immediately=True)
+        def setit():
+            # Don't return Deferred; let the postCommit chain continue.
+            self.set(key, value)
+        transaction.postCommit(setit)
 
     def invalidateAfterCommit(self, transaction, key):
         # Invalidate now (so that operations within this transaction see it)
         # and *also* post-commit (because there could be a scheduled setAfterCommit
         # for this key)
-        transaction.postCommit(lambda: self.delete(key), immediately=True)
+        def delit():
+            # Don't return Deferred; let the postCommit chain continue.
+            self.delete(key)
+        transaction.postCommit(delit)
         return self.delete(key)
 
     # Home child objects by name

Modified: CalendarServer/trunk/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/txdav/common/datastore/sql.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -85,6 +85,7 @@
 from txdav.xml.rfc2518 import DisplayName
 
 from txdav.base.datastore.util import normalizeUUIDOrNot
+from twext.enterprise.queue import NullQueuer
 
 from pycalendar.datetime import PyCalendarDateTime
 
@@ -132,8 +133,13 @@
 
     @ivar quota: the amount of space granted to each calendar home (in bytes)
         for storing attachments, or C{None} if quota should not be enforced.
+    @type quota: C{int} or C{NoneType}
 
-    @type quota: C{int} or C{NoneType}
+    @ivar queuer: An object with an C{enqueueWork} method, from
+        L{twext.enterprise.queue}.  Initially, this is a L{NullQueuer}, so it
+        is always usable, but in a properly configured environment it will be
+        upgraded to a more capable object that can distribute work throughout a
+        cluster.
     """
 
     implements(ICalendarStore)
@@ -160,6 +166,7 @@
         self.logSQL = logSQL
         self.logTransactionWaits = logTransactionWaits
         self.timeoutTransactions = timeoutTransactions
+        self.queuer = NullQueuer()
         self._migrating = False
         self._enableNotifications = True
 
@@ -335,6 +342,8 @@
         if self.timeoutSeconds:
             self.delayedTimeout = self.callLater(self.timeoutSeconds, _forceAbort)
 
+
+
 class CommonStoreTransaction(object):
     """
     Transaction implementation for SQL database.
@@ -350,8 +359,6 @@
         self._calendarHomes = {}
         self._addressbookHomes = {}
         self._notificationHomes = {}
-        self._postCommitOperations = []
-        self._postAbortOperations = []
         self._notifierFactory = notifierFactory
         self._notifiedAlready = set()
         self._bumpedAlready = set()
@@ -382,7 +389,10 @@
         self.paramstyle = sqlTxn.paramstyle
         self.dialect = sqlTxn.dialect
 
-        self._stats = TransactionStatsCollector(self._label, self._store.logStatsLogFile) if self._store.logStats else None
+        self._stats = (
+            TransactionStatsCollector(self._label, self._store.logStatsLogFile)
+            if self._store.logStats else None
+        )
         self.statementCount = 0
         self.iudCount = 0
         self.currentStatement = None
@@ -402,6 +412,22 @@
             __import__("txdav.carddav.datastore.sql")
 
 
+    def enqueue(self, workItem, **kw):
+        """
+        Enqueue a L{twext.enterprise.queue.WorkItem} for later execution.
+
+        For example::
+
+            yield (txn.enqueue(MyWorkItem, workDescription="some work to do")
+                   .whenProposed())
+
+        @return: a work proposal describing various events in the work's
+            life-cycle.
+        @rtype: L{twext.enterprise.queue.WorkProposal}
+        """
+        return self._store.queuer.enqueueWork(self, workItem, **kw)
+
+
     def store(self):
         return self._store
 
@@ -606,18 +632,18 @@
         return self._apnSubscriptionsBySubscriberQuery.on(self, subscriberGUID=guid)
 
 
-    def postCommit(self, operation, immediately=False):
+    def postCommit(self, operation):
         """
         Run things after C{commit}.
         """
-        self._postCommitOperations.append((operation, immediately))
+        return self._sqlTxn.postCommit(operation)
 
 
     def postAbort(self, operation):
         """
         Run things after C{abort}.
         """
-        self._postAbortOperations.append(operation)
+        return self._sqlTxn.postAbort(operation)
 
 
     def isNotifiedAlready(self, obj):
@@ -774,30 +800,16 @@
         """
         Commit the transaction and execute any post-commit hooks.
         """
-        @inlineCallbacks
-        def postCommit(ignored):
-            for operation, immediately in self._postCommitOperations:
-                if immediately:
-                    yield operation()
-                else:
-                    operation()
-            returnValue(ignored)
-
         if self._stats:
             self._stats.printReport()
+        return self._sqlTxn.commit()
 
-        return self._sqlTxn.commit().addCallback(postCommit)
 
-
     def abort(self):
         """
         Abort the transaction.
         """
-        def postAbort(ignored):
-            for operation in self._postAbortOperations:
-                operation()
-            return ignored
-        return self._sqlTxn.abort().addCallback(postAbort)
+        return self._sqlTxn.abort()
 
 
     def _oldEventsBase(limited): #@NoSelf

Modified: CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql	2012-08-11 09:01:06 UTC (rev 9689)
@@ -22,7 +22,23 @@
 
 create sequence RESOURCE_ID_SEQ;
 
+-------------------------
+-- Cluster Bookkeeping --
+-------------------------
 
+-- Information about a process connected to this database.
+
+-- Note that this must match the node info schema in twext.enterprise.queue.
+create table NODE_INFO (
+  HOSTNAME  varchar(255) not null,
+  PID       integer not null,
+  PORT      integer not null,
+  TIME      timestamp not null default timezone('UTC', CURRENT_TIMESTAMP),
+
+  primary key(HOSTNAME, PORT)
+);
+
+
 -------------------
 -- Calendar Home --
 -------------------
@@ -497,6 +513,6 @@
   VALUE                         varchar(255)
 );
 
-insert into CALENDARSERVER values ('VERSION', '11');
+insert into CALENDARSERVER values ('VERSION', '12');
 insert into CALENDARSERVER values ('CALENDAR-DATAVERSION', '3');
 insert into CALENDARSERVER values ('ADDRESSBOOK-DATAVERSION', '1');

Copied: CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_11_to_12.sql (from rev 9688, CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_11_to_12.sql)
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_11_to_12.sql	                        (rev 0)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_11_to_12.sql	2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,31 @@
+----
+-- Copyright (c) 2012 Apple Inc. All rights reserved.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+----
+
+---------------------------------------------------
+-- Upgrade database schema from VERSION 11 to 12 --
+---------------------------------------------------
+
+create table NODE_INFO (
+    "HOSTNAME" nvarchar2(255),
+    "PID" integer not null,
+    "PORT" integer not null,
+    "TIME" timestamp default CURRENT_TIMESTAMP at time zone 'UTC' not null,
+    primary key("HOSTNAME", "PORT")
+);
+
+-- Now update the version
+-- No data upgrades
+update CALENDARSERVER set VALUE = '12' where NAME = 'VERSION';

Copied: CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_11_to_12.sql (from rev 9688, CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_11_to_12.sql)
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_11_to_12.sql	                        (rev 0)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_11_to_12.sql	2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,31 @@
+----
+-- Copyright (c) 2012 Apple Inc. All rights reserved.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+----
+
+---------------------------------------------------
+-- Upgrade database schema from VERSION 11 to 12 --
+---------------------------------------------------
+
+create table NODE_INFO (
+  HOSTNAME  varchar(255) not null,
+  PID       integer not null,
+  PORT      integer not null,
+  TIME      timestamp not null default timezone('UTC', CURRENT_TIMESTAMP),
+  primary key(HOSTNAME, PORT)
+);
+
+-- Now update the version
+-- No data upgrades
+update CALENDARSERVER set VALUE = '12' where NAME = 'VERSION';

Modified: CalendarServer/trunk/txdav/idav.py
===================================================================
--- CalendarServer/trunk/txdav/idav.py	2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/txdav/idav.py	2012-08-11 09:01:06 UTC (rev 9689)
@@ -207,29 +207,15 @@
         """
 
 
-    def postCommit(operation, immediately=False):
+    def postCommit(operation):
         """
-        Registers an operation to be executed after the transaction is
-        committed.
-
-        postCommit can be called multiple times, and operations are executed
-        in the order which they were registered.
-
-        @param operation: a callable.
-        @param immediately: a boolean; True means finish this operation *before* the
-            commit( ) call completes, defaults to False.
+        @see: L{IAsyncTransaction.postCommit}
         """
 
 
     def postAbort(operation):
         """
-        Registers an operation to be executed after the transaction is
-        aborted.
-
-        postAbort can be called multiple times, and operations are executed
-        in the order which they were registered.
-
-        @param operation: a callable.
+        @see: L{IAsyncTransaction.postAbort}
         """
 
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/e75bd55f/attachment-0001.html>


More information about the calendarserver-changes mailing list