[CalendarServer-changes] [6551] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Mon Nov 1 15:38:50 PDT 2010


Revision: 6551
          http://trac.macosforge.org/projects/calendarserver/changeset/6551
Author:   glyph at apple.com
Date:     2010-11-01 15:38:48 -0700 (Mon, 01 Nov 2010)
Log Message:
-----------
Add a connection pool.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/sidecar/task.py
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
    CalendarServer/trunk/calendarserver/tap/util.py
    CalendarServer/trunk/calendarserver/tools/purge.py
    CalendarServer/trunk/calendarserver/tools/test/test_principals.py
    CalendarServer/trunk/calendarserver/tools/test/test_purge.py
    CalendarServer/trunk/twistedcaldav/directory/util.py
    CalendarServer/trunk/twistedcaldav/localization.py
    CalendarServer/trunk/twistedcaldav/mail.py
    CalendarServer/trunk/twistedcaldav/stdconfig.py
    CalendarServer/trunk/twistedcaldav/storebridge.py
    CalendarServer/trunk/twistedcaldav/test/test_mail.py
    CalendarServer/trunk/twistedcaldav/test/test_wrapping.py
    CalendarServer/trunk/twistedcaldav/timezoneservice.py
    CalendarServer/trunk/txdav/base/datastore/subpostgres.py
    CalendarServer/trunk/txdav/base/propertystore/test/test_sql.py
    CalendarServer/trunk/txdav/caldav/datastore/sql.py
    CalendarServer/trunk/txdav/caldav/datastore/test/common.py
    CalendarServer/trunk/txdav/caldav/datastore/test/test_sql.py
    CalendarServer/trunk/txdav/carddav/datastore/sql.py
    CalendarServer/trunk/txdav/carddav/datastore/test/common.py
    CalendarServer/trunk/txdav/carddav/datastore/test/test_sql.py
    CalendarServer/trunk/txdav/common/datastore/sql.py
    CalendarServer/trunk/txdav/common/datastore/test/util.py
    CalendarServer/trunk/txdav/idav.py

Added Paths:
-----------
    CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py
    CalendarServer/trunk/txdav/base/datastore/threadutils.py

Property Changed:
----------------
    CalendarServer/trunk/
    CalendarServer/trunk/txdav/caldav/datastore/index_file.py
    CalendarServer/trunk/txdav/caldav/datastore/test/test_index_file.py
    CalendarServer/trunk/txdav/carddav/datastore/index_file.py
    CalendarServer/trunk/txdav/carddav/datastore/test/test_index_file.py


Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
   + /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593

Modified: CalendarServer/trunk/calendarserver/sidecar/task.py
===================================================================
--- CalendarServer/trunk/calendarserver/sidecar/task.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/sidecar/task.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -27,7 +27,7 @@
 
 from zope.interface import implements
 
-from twisted.application.service import Service, IServiceMaker
+from twisted.application.service import MultiService, Service, IServiceMaker
 from twisted.internet.defer import DeferredList, inlineCallbacks, returnValue
 from twisted.internet.reactor import callLater
 from twisted.plugin import IPlugin
@@ -318,6 +318,7 @@
 
     def makeService(self, options):
 
+        svc = MultiService()
         #
         # The task sidecar doesn't care about system SACLs
         #
@@ -330,11 +331,12 @@
         oldLogLevel = logLevelForNamespace(None)
         setLogLevelForNamespace(None, "info")
 
-        rootResource = getRootResource(config)
+        rootResource = getRootResource(config, svc)
 
-        service = CalDAVTaskService(rootResource)
+        CalDAVTaskService(rootResource).setServiceParent(svc)
 
         # Change log level back to what it was before
         setLogLevelForNamespace(None, oldLogLevel)
 
-        return service
+        return svc
+

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -29,7 +29,6 @@
 from subprocess import Popen, PIPE
 from pwd import getpwuid, getpwnam
 from grp import getgrnam
-from inspect import getargspec
 import OpenSSL
 from OpenSSL.SSL import Error as SSLError
 
@@ -41,14 +40,14 @@
 from twisted.python.reflect import namedClass
 from twisted.plugin import IPlugin
 from twisted.internet.defer import gatherResults
-from twisted.internet import reactor
+from twisted.internet import reactor as _reactor
 from twisted.internet.reactor import addSystemEventTrigger
 from twisted.internet.process import ProcessExitedAlready
 from twisted.internet.protocol import Protocol, Factory
+from twisted.internet.protocol import ProcessProtocol
 from twisted.application.internet import TCPServer, UNIXServer
 from twisted.application.service import MultiService, IServiceMaker
-from twisted.scripts.mktap import getid
-from twisted.runner import procmon
+from twisted.application.service import Service
 
 import twext
 from twext.web2.server import Site
@@ -60,8 +59,6 @@
 from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
 from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
 
-from txdav.common.datastore.sql import v1_schema
-from txdav.base.datastore.subpostgres import PostgresService
 from txdav.common.datastore.util import UpgradeToDatabaseService
 
 from twistedcaldav.config import ConfigurationError
@@ -72,7 +69,13 @@
 from twistedcaldav.mail import IMIPReplyInboxResource
 from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
 from twistedcaldav.upgrade import upgradeData
+from txdav.base.datastore.subpostgres import PostgresService
 
+from calendarserver.tap.util import pgServiceFromConfig
+from txdav.base.datastore.asyncsqlpool import ConnectionPool
+
+from txdav.base.datastore.asyncsqlpool import ConnectionPoolConnection
+
 try:
     from twistedcaldav.authkerb import NegotiateCredentialFactory
     NegotiateCredentialFactory  # pacify pyflakes
@@ -83,10 +86,12 @@
 from calendarserver.accesslog import AMPLoggingFactory
 from calendarserver.accesslog import RotatingFileAccessLoggingObserver
 from calendarserver.tap.util import getRootResource, computeProcessCount
+from calendarserver.tap.util import ConnectionWithPeer
 from calendarserver.tools.util import checkDirectory
 
 try:
     from calendarserver.version import version
+    version
 except ImportError:
     sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "support"))
     from version import version as getVersion
@@ -97,22 +102,30 @@
 
 log = Logger()
 
+from twisted.python.util import uidFromString, gidFromString
 
-class CalDAVStatisticsProtocol (Protocol): 
+def getid(uid, gid):
+    if uid is not None:
+        uid = uidFromString(uid)
+    if gid is not None:
+        gid = gidFromString(gid)
+    return (uid, gid)
 
-    def connectionMade(self): 
-        stats = self.factory.logger.observer.getGlobalHits() 
-        self.transport.write("%s\r\n" % (stats,)) 
-        self.transport.loseConnection() 
+class CalDAVStatisticsProtocol (Protocol):
 
-class CalDAVStatisticsServer (Factory): 
+    def connectionMade(self):
+        stats = self.factory.logger.observer.getGlobalHits()
+        self.transport.write("%s\r\n" % (stats,))
+        self.transport.loseConnection()
 
-    protocol = CalDAVStatisticsProtocol 
+class CalDAVStatisticsServer (Factory):
 
-    def __init__(self, logObserver): 
-        self.logger = logObserver 
+    protocol = CalDAVStatisticsProtocol
 
+    def __init__(self, logObserver):
+        self.logger = logObserver
 
+
 class ErrorLoggingMultiService(MultiService):
     """ Registers a rotating file logger for error logging, iff
         config.ErrorLogEnabled is True. """
@@ -233,7 +246,7 @@
     def postOptions(self):
         self.loadConfiguration()
         self.checkConfiguration()
-            
+
     def loadConfiguration(self):
         if not os.path.exists(self["config"]):
             print "Config file %s not found. Exiting." % (self["config"],)
@@ -248,7 +261,7 @@
             sys.exit(1)
 
         config.updateDefaults(self.overrides)
-        
+
     def checkDirectory(self, dirpath, description, access=None, create=None):
         checkDirectory(dirpath, description, access=access, create=create)
 
@@ -282,7 +295,7 @@
             # Require write access because one might not allow editing on /
             access=os.W_OK,
         )
-        
+
         #
         # Verify that other root paths are OK
         #
@@ -325,7 +338,7 @@
                 access=os.W_OK,
                 create=(0750, config.UserName, config.GroupName),
             )
-            
+
         #
         # Nuke the file log observer's time format.
         #
@@ -497,7 +510,6 @@
         additional = []
         if config.Scheduling.iMIP.Enabled:
             additional.append(("inbox", IMIPReplyInboxResource, [], "digest"))
-        rootResource = getRootResource(config, additional)
 
         #
         # Configure the service
@@ -528,6 +540,8 @@
 
         service = CalDAVService(logObserver)
 
+        rootResource = getRootResource(config, service, additional)
+
         underlyingSite = Site(rootResource)
         requestFactory = underlyingSite
 
@@ -670,16 +684,22 @@
         return service
 
 
+    def scheduleOnDiskUpgrade(self):
+        """
+        Schedule any on disk upgrades we might need.  Note that this will only
+        do the filesystem-format upgrades; migration to the database needs to
+        be done when the connection and possibly server is already up and
+        running.
+        """
+        addSystemEventTrigger("before", "startup", upgradeData, config)
+
+
     def makeService_Single(self, options):
         """
         Create a service to be used in a single-process, stand-alone
         configuration.
         """
-        # Schedule any on disk upgrades we might need.  Note that this
-        # will only do the filesystem-format upgrades; migration to the
-        # database needs to be done when the connection and possibly
-        # server is already up and running. -glyph
-        addSystemEventTrigger("before", "startup", upgradeData, config)
+        self.scheduleOnDiskUpgrade()
 
         return self.storageService(self.makeService_Slave(options))
 
@@ -721,7 +741,13 @@
                 attachmentsRoot = dbRoot.child("attachments")
                 return UpgradeToDatabaseService.wrapService(
                     CachingFilePath(config.DocumentRoot), mainService,
-                    connectionFactory, attachmentsRoot,
+                    # FIXME: somehow, this should be a connection pool too, not
+                    # unpooled connections; this only runs in the master
+                    # process, so this would be a good point to bootstrap that
+                    # whole process.  However, it's somewhat tricky to do that
+                    # right.  The upgrade needs to run in the master, before
+                    # any other things have run.
+                    pgserv.produceLocalTransaction, attachmentsRoot,
                     uid=postgresUID, gid=postgresGID
                 )
             if os.getuid() == 0: # Only override if root
@@ -730,16 +756,8 @@
             else:
                 postgresUID = None
                 postgresGID = None
-            pgserv = PostgresService(
-                dbRoot, subServiceFactory, v1_schema,
-                databaseName=config.Postgres.DatabaseName,
-                logFile=config.Postgres.LogFile,
-                socketDir=config.RunRoot,
-                listenAddresses=config.Postgres.ListenAddresses,
-                sharedBuffers=config.Postgres.SharedBuffers,
-                maxConnections=config.Postgres.MaxConnections,
-                options=config.Postgres.Options,
-                uid=postgresUID, gid=postgresGID
+            pgserv = pgServiceFromConfig(
+                config, subServiceFactory, postgresUID, postgresGID
             )
             return pgserv
         else:
@@ -753,11 +771,7 @@
         """
         s = ErrorLoggingMultiService()
 
-        # Schedule any on disk upgrades we might need.  Note that this
-        # will only do the filesystem-format upgrades; migration to the
-        # database needs to be done when the connection and possibly
-        # server is already up and running. -glyph
-        addSystemEventTrigger("before", "startup", upgradeData, config)
+        self.scheduleOnDiskUpgrade()
 
         # Make sure no old socket files are lying around.
         self.deleteStaleSocketFiles()
@@ -799,8 +813,18 @@
         monitor = DelayedStartupProcessMonitor()
         s.processMonitor = monitor
 
-        self.storageService(monitor, uid, gid).setServiceParent(s)
+        ssvc = self.storageService(monitor, uid, gid)
+        ssvc.setServiceParent(s)
 
+        if isinstance(ssvc, PostgresService):
+            # TODO: better way of doing this conditional.  Look at the config
+            # again, possibly?
+            pool = ConnectionPool(ssvc.produceConnection)
+            pool.setServiceParent(s)
+            dispenser = ConnectionDispenser(pool)
+        else:
+            dispenser = None
+
         parentEnv = {
             "PATH": os.environ.get("PATH", ""),
             "PYTHONPATH": os.environ.get("PYTHONPATH", ""),
@@ -889,6 +913,8 @@
             else:
                 extraArgs = dict(inheritFDs=inheritFDs,
                                  inheritSSLFDs=inheritSSLFDs)
+            if dispenser is not None:
+                extraArgs.update(ampSQLDispenser=dispenser)
             process = TwistdSlaveProcess(
                 sys.argv[0],
                 self.tapname,
@@ -902,21 +928,21 @@
         for name, pool in config.Memcached.Pools.items():
             if pool.ServerEnabled:
                 self.log_info("Adding memcached service for pool: %s" % (name,))
-        
+
                 memcachedArgv = [
                     config.Memcached.memcached,
                     "-p", str(pool.Port),
                     "-l", pool.BindAddress,
                     "-U", "0",
                 ]
-        
+
                 if config.Memcached.MaxMemory is not 0:
                     memcachedArgv.extend(["-m", str(config.Memcached.MaxMemory)])
                 if config.UserName:
                     memcachedArgv.extend(["-u", config.UserName])
-        
+
                 memcachedArgv.extend(config.Memcached.Options)
-        
+
                 monitor.addProcess('memcached-%s' % (name,), memcachedArgv, env=parentEnv)
 
         if (
@@ -981,7 +1007,7 @@
         monitor.addProcess("caldav_task", taskArgv, env=parentEnv)
 
 
-        stats = CalDAVStatisticsServer(logger) 
+        stats = CalDAVStatisticsServer(logger)
         statsService = GroupOwnedUNIXServer(
             gid, config.GlobalStatsSocket, stats, mode=0440
         )
@@ -992,10 +1018,10 @@
 
 
     def deleteStaleSocketFiles(self):
-        
+
         # Check all socket files we use.
         for checkSocket in [config.ControlSocket, config.GlobalStatsSocket] :
-    
+
             # See if the file exists.
             if (os.path.exists(checkSocket)):
                 # See if the file represents a socket.  If not, delete it.
@@ -1021,6 +1047,28 @@
 
 
 
+class ConnectionDispenser(object):
+
+    def __init__(self, connectionPool):
+        self.pool = connectionPool
+
+
+    def dispense(self):
+        """
+        Dispense a file descriptor, already connected to a server, for a
+        client.
+        """
+        # FIXME: these sockets need to be re-dispensed when the process is
+        # respawned, and they currently won't be.
+        c, s = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+        protocol = ConnectionPoolConnection(self.pool)
+        transport = ConnectionWithPeer(s, protocol)
+        protocol.makeConnection(transport)
+        transport.startReading()
+        return c
+
+
+
 class TwistdSlaveProcess(object):
     """
     A L{TwistdSlaveProcess} is information about how to start a slave process
@@ -1053,11 +1101,16 @@
         subprocess and used to accept incoming connections.
 
     @type metaSocket: L{socket.socket}
+
+    @ivar ampSQLDispenser: a factory for AF_UNIX/SOCK_STREAM sockets that are
+        to be inherited by subprocesses and used for sending AMP SQL commands
+        back to its parent.
     """
     prefix = "caldav"
 
     def __init__(self, twistd, tapname, configFile, id, interfaces,
-                 inheritFDs=None, inheritSSLFDs=None, metaSocket=None):
+                 inheritFDs=None, inheritSSLFDs=None, metaSocket=None,
+                 ampSQLDispenser=None):
 
         self.twistd = twistd
 
@@ -1075,7 +1128,10 @@
         self.inheritSSLFDs = emptyIfNone(inheritSSLFDs)
         self.metaSocket = metaSocket
         self.interfaces = interfaces
+        self.ampSQLDispenser = ampSQLDispenser
+        self.ampDBSocket = None
 
+
     def getName(self):
         return '%s-%s' % (self.prefix, self.id)
 
@@ -1086,10 +1142,13 @@
             process to file descriptor numbers in the current (master) process.
         """
         fds = {}
-        maybeMetaFD = []
+        extraFDs = []
         if self.metaSocket is not None:
-            maybeMetaFD.append(self.metaSocket.fileno())
-        for fd in self.inheritSSLFDs + self.inheritFDs + maybeMetaFD:
+            extraFDs.append(self.metaSocket.fileno())
+        if self.ampSQLDispenser is not None:
+            self.ampDBSocket = self.ampSQLDispenser.dispense()
+            extraFDs.append(self.ampDBSocket.fileno())
+        for fd in self.inheritSSLFDs + self.inheritFDs + extraFDs:
             fds[fd] = fd
         return fds
 
@@ -1146,7 +1205,10 @@
             args.extend([
                     "-o", "MetaFD=%s" % (self.metaSocket.fileno(),)
                 ])
-
+        if self.ampDBSocket is not None:
+            args.extend([
+                    "-o", "DBAMPFD=%s" % (self.ampDBSocket.fileno(),)
+                ])
         return args
 
 
@@ -1162,7 +1224,7 @@
 
 
 
-class DelayedStartupProcessMonitor(procmon.ProcessMonitor):
+class DelayedStartupProcessMonitor(Service, object):
     """
     A L{DelayedStartupProcessMonitor} is a L{procmon.ProcessMonitor} that
     defers building its command lines until the service is actually ready to
@@ -1174,15 +1236,6 @@
     L{Deferred} which fires only when all processes have shut down, to allow
     for a clean service shutdown.
 
-    @ivar processObjects: a C{list} of L{TwistdSlaveProcess} to add using
-        C{self.addProcess} when this service starts up.
-
-    @ivar _extraFDs: a mapping from process names to extra file-descriptor
-        maps.  (By default, all processes will have the standard stdio mapping,
-        so all file descriptors here should be >2.)  This is updated during
-        L{DelayedStartupProcessMonitor.startService}, by inspecting the result
-        of L{TwistdSlaveProcess.getFileDescriptors}.
-
     @ivar reactor: an L{IReactorProcess} for spawning processes, defaulting to
         the global reactor.
 
@@ -1193,22 +1246,20 @@
         Deferreds that track service shutdown.
     """
 
-    _shouldPassReactor = (
-        len(getargspec(procmon.ProcessMonitor.__init__)[0]) > 1
-    )
+    threshold = 1
+    killTime = 5
+    minRestartDelay = 1
+    maxRestartDelay = 3600
 
-    def __init__(self, *args, **kwargs):
-        reactorToUse = kwargs.get("reactor", reactor)
-        if not self._shouldPassReactor:
-            # Try to do this the right way if we can, otherwise, let the tests
-            # monkeypatch.  (Our superclass does not accept a 'reactor'
-            # argument in Twisted 10.0.0, but does in Twisted 10.1.0 and
-            # later.)
-            kwargs.pop('reactor', None)
-        procmon.ProcessMonitor.__init__(self, *args, **kwargs)
-        self.processObjects = []
-        self._extraFDs = {}
-        self.reactor = reactorToUse
+    def __init__(self, reactor=_reactor):
+        super(DelayedStartupProcessMonitor, self).__init__()
+        self._reactor = reactor
+        self.processes = {}
+        self.protocols = {}
+        self.delay = {}
+        self.timeStarted = {}
+        self.murder = {}
+        self.restart = {}
         self.stopping = False
         if config.MultiProcess.StaggeredStartup.Enabled:
             self.delayInterval = config.MultiProcess.StaggeredStartup.Interval
@@ -1216,8 +1267,43 @@
             self.delayInterval = 0
 
 
-    def addProcessObject(self, process, env):
+    def addProcess(self, name, args, uid=None, gid=None, env={}):
         """
+        Add a new monitored process and start it immediately if the
+        L{DelayedStartupProcessMonitor} service is running.
+
+        Note that args are passed to the system call, not to the shell. If
+        running the shell is desired, the common idiom is to use
+        C{ProcessMonitor.addProcess("name", ['/bin/sh', '-c', shell_script])}
+
+        @param name: A name for this process.  This value must be
+            unique across all processes added to this monitor.
+        @type name: C{str}
+        @param args: The argv sequence for the process to launch.
+        @param uid: The user ID to use to run the process.  If C{None},
+            the current UID is used.
+        @type uid: C{int}
+        @param gid: The group ID to use to run the process.  If C{None},
+            the current GID is used.
+        @type uid: C{int}
+        @param env: The environment to give to the launched process. See
+            L{IReactorProcess.spawnProcess}'s C{env} parameter.
+        @type env: C{dict}
+        @raises: C{KeyError} if a process with the given name already
+            exists
+        """
+        class SimpleProcessObject(object):
+            def getName(self):
+                return name
+            def getCommandLine(self):
+                return args
+            def getFileDescriptors(self):
+                return []
+        self.addProcessObject(SimpleProcessObject(), env, uid, gid)
+
+
+    def addProcessObject(self, process, env, uid=None, gid=None):
+        """
         Add a process object to be run when this service is started.
 
         @param env: a dictionary of environment variables.
@@ -1225,19 +1311,19 @@
         @param process: a L{TwistdSlaveProcesses} object to be started upon
             service startup.
         """
-        self.processObjects.append((process, env))
+        name = process.getName()
+        self.processes[name] = (process, env, uid, gid)
+        self.delay[name] = self.minRestartDelay
+        if self.running:
+            self.startProcess(name)
 
 
     def startService(self):
         # Now we're ready to build the command lines and actualy add the
         # processes to procmon.
-        for processObject, env in self.processObjects:
-            name = processObject.getName()
-            cmdline = processObject.getCommandLine()
-            filedes = processObject.getFileDescriptors()
-            self._extraFDs[name] = filedes
-            self.addProcess(name, cmdline, env=env)
-        procmon.ProcessMonitor.startService(self)
+        super(DelayedStartupProcessMonitor, self).startService()
+        for name in self.processes:
+            self.startProcess(name)
 
 
     def stopService(self):
@@ -1246,21 +1332,96 @@
         """
         self.stopping = True
         self.deferreds = {}
-        procmon.ProcessMonitor.stopService(self)
+        super(DelayedStartupProcessMonitor, self).stopService()
+
+        # Cancel any outstanding restarts
+        for name, delayedCall in self.restart.items():
+            if delayedCall.active():
+                delayedCall.cancel()
+
+        for name in self.processes:
+            self.stopProcess(name)
         return gatherResults(self.deferreds.values())
 
 
+    def removeProcess(self, name):
+        """
+        Stop the named process and remove it from the list of monitored
+        processes.
+
+        @type name: C{str}
+        @param name: A string that uniquely identifies the process.
+        """
+        self.stopProcess(name)
+        del self.processes[name]
+
+
+    def stopProcess(self, name):
+        """
+        @param name: The name of the process to be stopped
+        """
+        if name not in self.processes:
+            raise KeyError('Unrecognized process name: %s' % (name,))
+
+        proto = self.protocols.get(name, None)
+        if proto is not None:
+            proc = proto.transport
+            try:
+                proc.signalProcess('TERM')
+            except ProcessExitedAlready:
+                pass
+            else:
+                self.murder[name] = self._reactor.callLater(
+                                            self.killTime,
+                                            self._forceStopProcess, proc)
+
+
     def processEnded(self, name):
         """
         When a child process has ended it calls me so I can fire the
         appropriate deferred which was created in stopService
         """
+        # Cancel the scheduled _forceStopProcess function if the process
+        # dies naturally
+        if name in self.murder:
+            if self.murder[name].active():
+                self.murder[name].cancel()
+            del self.murder[name]
+
+        del self.protocols[name]
+
+        if self._reactor.seconds() - self.timeStarted[name] < self.threshold:
+            # The process died too fast - backoff
+            nextDelay = self.delay[name]
+            self.delay[name] = min(self.delay[name] * 2, self.maxRestartDelay)
+
+        else:
+            # Process had been running for a significant amount of time
+            # restart immediately
+            nextDelay = 0
+            self.delay[name] = self.minRestartDelay
+
+        # Schedule a process restart if the service is running
+        if self.running and name in self.processes:
+            self.restart[name] = self._reactor.callLater(nextDelay,
+                                                         self.startProcess,
+                                                         name)
         if self.stopping:
             deferred = self.deferreds.get(name, None)
             if deferred is not None:
                 deferred.callback(None)
 
 
+    def _forceStopProcess(self, proc):
+        """
+        @param proc: An L{IProcessTransport} provider
+        """
+        try:
+            proc.signalProcess('KILL')
+        except ProcessExitedAlready:
+            pass
+
+
     def signalAll(self, signal, startswithname=None):
         """
         Send a signal to all child processes.
@@ -1306,14 +1467,16 @@
         p = self.protocols[name] = DelayedStartupLoggingProtocol()
         p.service = self
         p.name = name
-        args, uid, gid, env = self.processes[name]
+        procObj, env, uid, gid= self.processes[name]
         self.timeStarted[name] = time()
 
         childFDs = { 0 : "w", 1 : "r", 2 : "r" }
 
-        childFDs.update(self._extraFDs.get(name, {}))
+        childFDs.update(procObj.getFileDescriptors())
 
-        self.reactor.spawnProcess(
+        args = procObj.getCommandLine()
+
+        self._reactor.spawnProcess(
             p, args[0], args, uid=uid, gid=gid, env=env,
             childFDs=childFDs
         )
@@ -1333,10 +1496,38 @@
         def delayedStart():
             self._pendingStarts -= 1
             self.reallyStartProcess(name)
-        self.reactor.callLater(interval, delayedStart)
+        self._reactor.callLater(interval, delayedStart)
 
 
+    def restartAll(self):
+        """
+        Restart all processes. This is useful for third party management
+        services to allow a user to restart servers because of an outside change
+        in circumstances -- for example, a new version of a library is
+        installed.
+        """
+        for name in self.processes:
+            self.stopProcess(name)
 
+
+    def __repr__(self):
+        l = []
+        for name, (procObj, uid, gid, env) in self.processes.items():
+            uidgid = ''
+            if uid is not None:
+                uidgid = str(uid)
+            if gid is not None:
+                uidgid += ':'+str(gid)
+
+            if uidgid:
+                uidgid = '(' + uidgid + ')'
+            l.append('%r%s: %r' % (name, uidgid, procObj))
+        return ('<' + self.__class__.__name__ + ' '
+                + ' '.join(l)
+                + '>')
+
+
+
 class DelayedStartupLineLogger(object):
     """
     A line logger that can handle very long lines.
@@ -1389,28 +1580,42 @@
 
 
 
-class DelayedStartupLoggingProtocol(procmon.LoggingProtocol, object):
+class DelayedStartupLoggingProtocol(ProcessProtocol):
     """
     Logging protocol that handles lines which are too long.
     """
 
+    service = None
+    name = None
+    empty = 1
+
     def connectionMade(self):
         """
         Replace the superclass's output monitoring logic with one that can
         handle lineLengthExceeded.
         """
-        super(DelayedStartupLoggingProtocol, self).connectionMade()
         self.output = DelayedStartupLineLogger()
+        self.output.makeConnection(self.transport)
         self.output.tag = self.name
 
+
+    def outReceived(self, data):
+        self.output.dataReceived(data)
+        self.empty = data[-1] == '\n'
+
+    errReceived = outReceived
+
+
     def processEnded(self, reason):
         """
         Let the service know that this child process has ended
         """
-        procmon.LoggingProtocol.processEnded(self, reason)
+        if not self.empty:
+            self.output.dataReceived('\n')
         self.service.processEnded(self.name)
 
 
+
 def getSSLPassphrase(*ignored):
 
     if not config.SSLPrivateKey:

Modified: CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/test/test_caldav.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tap/test/test_caldav.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -359,7 +359,8 @@
         """
         service = self.makeService()
 
-        return service.services[0].args[1].protocolArgs["requestFactory"]
+        # FIXME: should at least use service name, not index
+        return service.services[2].args[1].protocolArgs["requestFactory"]
 
 
 
@@ -489,20 +490,23 @@
         """
         service = self.makeService()
 
-        expectedSubServices = (
+        expectedSubServices = dict((
             (MaxAcceptTCPServer, self.config["HTTPPort"]),
             (MaxAcceptSSLServer, self.config["SSLPort"]),
-        )
+        ))
 
-        configuredSubServices = [(s.__class__, s.args) for s in service.services]
-
+        configuredSubServices = [(s.__class__, getattr(s, 'args', None))
+                                 for s in service.services]
+        checked = 0
         for serviceClass, serviceArgs in configuredSubServices:
-            self.failUnless(serviceClass in (s[0] for s in expectedSubServices))
+            if serviceClass in expectedSubServices:
+                checked += 1
+                self.assertEquals(
+                    serviceArgs[0],
+                    dict(expectedSubServices)[serviceClass]
+                )
+        self.assertEquals(checked, len(expectedSubServices))
 
-            self.assertEquals(
-                serviceArgs[0],
-                dict(expectedSubServices)[serviceClass]
-            )
 
     def test_SSLKeyConfiguration(self):
         """
@@ -570,7 +574,8 @@
         service = self.makeService()
 
         for s in service.services:
-            self.assertEquals(s.kwargs["interface"], "127.0.0.1")
+            if isinstance(s, (internet.TCPServer, internet.SSLServer)):
+                self.assertEquals(s.kwargs["interface"], "127.0.0.1")
 
     def test_multipleBindAddresses(self):
         """
@@ -619,7 +624,8 @@
         service = self.makeService()
 
         for s in service.services:
-            self.assertEquals(s.kwargs["backlog"], 1024)
+            if isinstance(s, (internet.TCPServer, internet.SSLServer)):
+                self.assertEquals(s.kwargs["backlog"], 1024)
 
 
 class ServiceHTTPFactoryTests(BaseServiceMakerTests):
@@ -888,22 +894,22 @@
 
 class DummyProcessObject(object):
     """
-    Simple stub for the Process Object API that will run a test script.
+    Simple stub for Process Object API which just has an executable and some
+    arguments.
 
     This is a stand in for L{TwistdSlaveProcess}.
     """
 
     def __init__(self, scriptname, *args):
         self.scriptname = scriptname
-        self.args = list(args)
+        self.args = args
 
 
     def getCommandLine(self):
         """
-        Get the command line to invoke this script.
+        Simple command line.
         """
-        return [sys.executable,
-                FilePath(__file__).sibling(self.scriptname).path] + self.args
+        return [self.scriptname] + list(self.args)
 
 
     def getFileDescriptors(self):
@@ -920,23 +926,29 @@
         return 'Dummy'
 
 
-
-class DelayedStartupProcessMonitorTests(TestCase):
+class ScriptProcessObject(DummyProcessObject):
     """
-    Test cases for L{DelayedStartupProcessMonitor}.
+    Simple stub for the Process Object API that will run a test script.
     """
 
-    def useFakeReactor(self, fakeReactor):
+    def getCommandLine(self):
         """
-        Earlier versions of Twisted used a global reactor in
-        L{twisted.runner.procmon}, so we need to take that into account when
-        testing.
+        Get the command line to invoke this script.
         """
-        if not DelayedStartupProcessMonitor._shouldPassReactor:
-            import twisted.runner.procmon as inherited
-            self.patch(inherited, "reactor", fakeReactor)
+        return [
+            sys.executable,
+            FilePath(__file__).sibling(self.scriptname).path
+        ] + list(self.args)
 
 
+
+
+
+class DelayedStartupProcessMonitorTests(TestCase):
+    """
+    Test cases for L{DelayedStartupProcessMonitor}.
+    """
+
     def test_lineAfterLongLine(self):
         """
         A "long" line of output from a monitored process (longer than
@@ -944,7 +956,7 @@
         at once, to avoid resource exhaustion.
         """
         dspm = DelayedStartupProcessMonitor()
-        dspm.addProcessObject(DummyProcessObject(
+        dspm.addProcessObject(ScriptProcessObject(
                 'longlines.py', str(DelayedStartupLineLogger.MAX_LENGTH)),
                           os.environ)
         dspm.startService()
@@ -966,7 +978,7 @@
                 logged.append(event)
                 if m == '[Dummy] z':
                     d.callback("done")
-            
+
         log.addObserver(tempObserver)
         self.addCleanup(log.removeObserver, tempObserver)
         d = Deferred()
@@ -990,9 +1002,8 @@
         If a L{TwistdSlaveProcess} specifies some file descriptors to be
         inherited, they should be inherited by the subprocess.
         """
-        dspm                = DelayedStartupProcessMonitor()
-        imps = dspm.reactor = InMemoryProcessSpawner()
-        self.useFakeReactor(imps)
+        imps = InMemoryProcessSpawner()
+        dspm = DelayedStartupProcessMonitor(imps)
 
         # Most arguments here will be ignored, so these are bogus values.
         slave = TwistdSlaveProcess(
@@ -1017,6 +1028,27 @@
                            19: 19, 25: 25})
 
 
+    def test_changedArgumentEachSpawn(self):
+        """
+        If the result of C{getCommandLine} changes on subsequent calls,
+        subsequent calls should result in different arguments being passed to
+        C{spawnProcess} each time.
+        """
+        imps = InMemoryProcessSpawner()
+        dspm = DelayedStartupProcessMonitor(imps)
+        slave = DummyProcessObject('scriptname', 'first')
+        dspm.addProcessObject(slave, {})
+        dspm.startService()
+        oneProcessTransport = imps.waitForOneProcess()
+        self.assertEquals(oneProcessTransport.args,
+                          ['scriptname', 'first'])
+        slave.args = ['second']
+        oneProcessTransport.processProtocol.processEnded(None)
+        twoProcessTransport = imps.waitForOneProcess()
+        self.assertEquals(twoProcessTransport.args,
+                          ['scriptname', 'second'])
+
+
     def test_metaDescriptorInheritance(self):
         """
         If a L{TwistdSlaveProcess} specifies a meta-file-descriptor to be
@@ -1024,9 +1056,8 @@
         configuration argument should be passed that indicates to the
         subprocess.
         """
-        dspm                = DelayedStartupProcessMonitor()
-        imps = dspm.reactor = InMemoryProcessSpawner()
-        self.useFakeReactor(imps)
+        imps = InMemoryProcessSpawner()
+        dspm = DelayedStartupProcessMonitor(imps)
         # Most arguments here will be ignored, so these are bogus values.
         slave = TwistdSlaveProcess(
             twistd     = "bleh",
@@ -1057,10 +1088,9 @@
         objects that have been added to it being started once per
         delayInterval.
         """
-        dspm                = DelayedStartupProcessMonitor()
+        imps = InMemoryProcessSpawner()
+        dspm = DelayedStartupProcessMonitor(imps)
         dspm.delayInterval = 3.0
-        imps = dspm.reactor = InMemoryProcessSpawner()
-        self.useFakeReactor(imps)
         sampleCounter = range(0, 5)
         for counter in sampleCounter:
             slave = TwistdSlaveProcess(

Modified: CalendarServer/trunk/calendarserver/tap/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/util.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tap/util.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -14,6 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 ##
+
+"""
+Utilities for assembling the service and resource hierarchy.
+"""
+
 __all__ = [
     "getRootResource",
     "FakeRequest",
@@ -22,6 +27,7 @@
 import errno
 import os
 from time import sleep
+from socket import fromfd, AF_UNIX, SOCK_STREAM
 
 from twext.python.filepath import CachingFilePath as FilePath
 from twext.python.log import Logger
@@ -34,6 +40,7 @@
 from twisted.cred.portal import Portal
 from twisted.internet.defer import inlineCallbacks, returnValue
 from twisted.internet.reactor import addSystemEventTrigger
+from twisted.internet.tcp import Connection
 from twisted.python.reflect import namedClass
 
 from twistedcaldav import memcachepool
@@ -61,6 +68,8 @@
     NegotiateCredentialFactory  # pacify pyflakes
 except ImportError:
     NegotiateCredentialFactory = None
+from txdav.base.datastore.asyncsqlpool import ConnectionPool
+from txdav.base.datastore.asyncsqlpool import ConnectionPoolClient
 
 from calendarserver.accesslog import DirectoryLogWrapperResource
 from calendarserver.provision.root import RootResource
@@ -77,77 +86,95 @@
 log = Logger()
 
 
+def pgServiceFromConfig(config, subServiceFactory, uid=None, gid=None):
+    """
+    Construct a L{PostgresService} from a given configuration and subservice.
 
-def storeFromConfig(config, notifierFactory=None):
+    @param config: the configuration to derive postgres configuration
+        parameters from.
+
+    @param subServiceFactory: A factory for the service to start once the
+        L{PostgresService} has been initialized.
+
+    @param uid: The user-ID to run the PostgreSQL server as.
+
+    @param gid: The group-ID to run the PostgreSQL server as.
+
+    @return: a service which can start postgres.
+
+    @rtype: L{PostgresService}
     """
+    dbRoot = CachingFilePath(config.DatabaseRoot)
+    # Construct a PostgresService exactly as the parent would, so that we
+    # can establish connection information.
+    return PostgresService(
+        dbRoot, subServiceFactory, v1_schema,
+        databaseName=config.Postgres.DatabaseName,
+        logFile=config.Postgres.LogFile,
+        socketDir=config.RunRoot,
+        listenAddresses=config.Postgres.ListenAddresses,
+        sharedBuffers=config.Postgres.SharedBuffers,
+        maxConnections=config.Postgres.MaxConnections,
+        options=config.Postgres.Options,
+        uid=uid, gid=gid
+    )
+
+
+class ConnectionWithPeer(Connection):
+
+    connected = True
+
+    def getPeer(self):
+        return "<peer: %r %r>" % (self.socket.fileno(), id(self))
+
+    def getHost(self):
+        return "<host: %r %r>" % (self.socket.fileno(), id(self))
+
+def storeFromConfig(config, serviceParent, notifierFactory=None):
+    """
     Produce an L{IDataStore} from the given configuration and notifier factory.
     """
     if config.UseDatabase:
-        dbRoot = CachingFilePath(config.DatabaseRoot)
-        postgresService = PostgresService(
-            dbRoot, None, v1_schema,
-            databaseName=config.Postgres.DatabaseName,
-            logFile=config.Postgres.LogFile,
-            socketDir=config.RunRoot,
-            listenAddresses=config.Postgres.ListenAddresses,
-            sharedBuffers=config.Postgres.SharedBuffers,
-            maxConnections=config.Postgres.MaxConnections,
-            options=config.Postgres.Options,
+        postgresService = pgServiceFromConfig(config, None)
+        if config.DBAMPFD == 0:
+            cp = ConnectionPool(postgresService.produceConnection)
+            cp.setServiceParent(serviceParent)
+            txnFactory = cp.connection
+        else:
+            # TODO: something to do with loseConnection here, maybe?  I don't
+            # think it actually needs to be shut down, though.
+            skt = fromfd(int(config.DBAMPFD), AF_UNIX, SOCK_STREAM)
+            os.close(config.DBAMPFD)
+            protocol = ConnectionPoolClient()
+            transport = ConnectionWithPeer(skt, protocol)
+            protocol.makeConnection(transport)
+            transport.startReading()
+            txnFactory = protocol.newTransaction
+        dataStore = CommonSQLDataStore(
+            txnFactory, notifierFactory,
+            postgresService.dataStoreDirectory.child("attachments"),
+            config.EnableCalDAV, config.EnableCardDAV
         )
-        return CommonSQLDataStore(postgresService.produceConnection,
-            notifierFactory, dbRoot.child("attachments"),
-            config.EnableCalDAV, config.EnableCardDAV)
+        dataStore.setServiceParent(serviceParent)
+        return dataStore
     else:
         return CommonFileDataStore(FilePath(config.DocumentRoot),
             notifierFactory, config.EnableCalDAV, config.EnableCardDAV) 
 
 
 
-def getRootResource(config, resources=None):
+def directoryFromConfig(config):
     """
-    Set up directory service and resource hierarchy based on config.
-    Return root resource.
-
-    Additional resources can be added to the hierarchy by passing a list of
-    tuples containing: path, resource class, __init__ args list, and optional
-    authentication scheme ("basic" or "digest").
+    Create an L{AggregateDirectoryService} from the given configuration.
     """
-    
-    # FIXME: this is only here to workaround circular imports
-    doBind()
 
     #
-    # Default resource classes
-    #
-    rootResourceClass            = RootResource
-    principalResourceClass       = DirectoryPrincipalProvisioningResource
-    calendarResourceClass        = DirectoryCalendarHomeProvisioningResource
-    iScheduleResourceClass       = IScheduleInboxResource
-    timezoneServiceResourceClass = TimezoneServiceResource
-    webCalendarResourceClass     = WebCalendarResource
-    webAdminResourceClass        = WebAdminResource
-    addressBookResourceClass     = DirectoryAddressBookHomeProvisioningResource
-    directoryBackedAddressBookResourceClass = DirectoryBackedAddressBookResource
-
-    #
-    # Setup the Augment Service
-    #
-    augmentClass = namedClass(config.AugmentService.type)
-
-    log.info("Configuring augment service of type: %s" % (augmentClass,))
-
-    try:
-        augment.AugmentService = augmentClass(**config.AugmentService.params)
-    except IOError:
-        log.error("Could not start augment service")
-        raise
-
-    #
     # Setup the Directory
     #
     directories = []
 
     directoryClass = namedClass(config.DirectoryService.type)
+    principalResourceClass       = DirectoryPrincipalProvisioningResource
 
     log.info("Configuring directory service of type: %s"
         % (config.DirectoryService.type,))
@@ -223,8 +250,54 @@
         directory.setRealm(realmName)
     except ImportError:
         pass
+    log.info("Setting up principal collection: %r"
+                  % (principalResourceClass,))
+    principalResourceClass("/principals/", directory)
+    return directory
 
+
+def getRootResource(config, serviceParent, resources=None):
+    """
+    Set up directory service and resource hierarchy based on config.
+    Return root resource.
+
+    Additional resources can be added to the hierarchy by passing a list of
+    tuples containing: path, resource class, __init__ args list, and optional
+    authentication scheme ("basic" or "digest").
+    """
+
+    # FIXME: this is only here to workaround circular imports
+    doBind()
+
     #
+    # Default resource classes
+    #
+    rootResourceClass            = RootResource
+    calendarResourceClass        = DirectoryCalendarHomeProvisioningResource
+    iScheduleResourceClass       = IScheduleInboxResource
+    timezoneServiceResourceClass = TimezoneServiceResource
+    webCalendarResourceClass     = WebCalendarResource
+    webAdminResourceClass        = WebAdminResource
+    addressBookResourceClass     = DirectoryAddressBookHomeProvisioningResource
+    directoryBackedAddressBookResourceClass = DirectoryBackedAddressBookResource
+
+    #
+    # Setup the Augment Service
+    #
+    augmentClass = namedClass(config.AugmentService.type)
+
+    log.info("Configuring augment service of type: %s" % (augmentClass,))
+
+    try:
+        augment.AugmentService = augmentClass(**config.AugmentService.params)
+    except IOError:
+        log.error("Could not start augment service")
+        raise
+
+
+    directory = directoryFromConfig(config)
+
+    #
     # Setup the ProxyDB Service
     #
     proxydbClass = namedClass(config.ProxyDBService.type)
@@ -311,10 +384,8 @@
     #
     log.info("Setting up document root at: %s"
                   % (config.DocumentRoot,))
-    log.info("Setting up principal collection: %r"
-                  % (principalResourceClass,))
 
-    principalCollection = principalResourceClass("/principals/", directory)
+    principalCollection = directory.principalCollection
 
     #
     # Configure NotifierFactory
@@ -327,7 +398,7 @@
     else:
         notifierFactory = None
 
-    newStore = storeFromConfig(config, notifierFactory)
+    newStore = storeFromConfig(config, serviceParent, notifierFactory)
 
     if config.EnableCalDAV:
         log.info("Setting up calendar collection: %r" % (calendarResourceClass,))

Modified: CalendarServer/trunk/calendarserver/tools/purge.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/purge.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tools/purge.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -97,6 +97,7 @@
         os.umask(config.umask)
 
         try:
+            # TODO: getRootResource needs a parent service now.
             rootResource = getRootResource(config)
             directory = rootResource.getDirectory()
         except DirectoryError, e:

Modified: CalendarServer/trunk/calendarserver/tools/test/test_principals.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/test/test_principals.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tools/test/test_principals.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -24,7 +24,7 @@
 from twistedcaldav.directory.directory import DirectoryError
 from twistedcaldav.test.util import TestCase, CapturingProcessProtocol
 
-from calendarserver.tap.util import getRootResource
+from calendarserver.tap.util import directoryFromConfig
 from calendarserver.tools.principals import parseCreationArgs, matchStrings, updateRecord, principalForPrincipalID, getProxies, setProxies
 
 
@@ -226,7 +226,7 @@
 
     @inlineCallbacks
     def test_updateRecord(self):
-        directory = getRootResource(config).getDirectory()
+        directory = directoryFromConfig(config)
         guid = "eee28807-a8c5-46c8-a558-a08281c558a7"
 
         (yield updateRecord(True, directory, "locations",
@@ -264,7 +264,7 @@
         """
         Read and Write proxies can be set en masse
         """
-        directory = getRootResource(config).getDirectory()
+        directory = directoryFromConfig(config)
 
         principal = principalForPrincipalID("users:user01", directory=directory)
         readProxies, writeProxies = (yield getProxies(principal, directory=directory))

Modified: CalendarServer/trunk/calendarserver/tools/test/test_purge.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/test/test_purge.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tools/test/test_purge.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -47,7 +47,11 @@
     def setUp(self):
         super(PurgeOldEventsTestCase, self).setUp()
 
-        config.DirectoryService.params['xmlFile'] = os.path.join(os.path.dirname(__file__), "purge", "accounts.xml")
+        config.DirectoryService.params['xmlFile'] = os.path.join(
+            os.path.dirname(__file__), "purge", "accounts.xml"
+        )
+        # TODO: when rewriting for new-store, getRootResource needs a parent
+        # service argument now, to know when to start/stop the connection pool.
         self.rootResource = getRootResource(config)
         self.directory = self.rootResource.getDirectory()
 
@@ -381,6 +385,8 @@
         copyAugmentFile = FilePath(config.DataRoot).child("augments.xml")
         origAugmentFile.copyTo(copyAugmentFile)
 
+        # TODO: when rewriting for new-store, getRootResource needs a parent
+        # service argument now, to know when to start/stop the connection pool.
         self.rootResource = getRootResource(config)
         self.directory = self.rootResource.getDirectory()
 

Modified: CalendarServer/trunk/twistedcaldav/directory/util.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/directory/util.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/directory/util.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -66,6 +66,10 @@
         transaction = newStore.newTransaction(repr(request))
         def abortIfUncommitted(request, response):
             try:
+                # TODO: missing 'yield' here.  For formal correctness as per
+                # the interface, this should be allowed to be a Deferred.  (The
+                # actual implementation still raises synchronously, so there's
+                # no bug currently.)
                 transaction.abort()
             except AlreadyFinishedError:
                 pass

Modified: CalendarServer/trunk/twistedcaldav/localization.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/localization.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/localization.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -26,7 +26,9 @@
 from twext.python.log import Logger
 
 try:
-    from Foundation import *
+    from Foundation import (
+        NSPropertyListImmutable, NSPropertyListSerialization, NSData
+    )
     foundationImported = True
 except ImportError:
     foundationImported = False

Modified: CalendarServer/trunk/twistedcaldav/mail.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/mail.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/mail.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -27,6 +27,7 @@
 
 try:
     from cStringIO import StringIO
+    StringIO
 except ImportError:
     from StringIO import StringIO
 
@@ -37,7 +38,7 @@
 from zope.interface import implements
 
 from twisted.application import internet, service
-from twisted.internet import protocol, defer, ssl, reactor
+from twisted.internet import protocol, defer, ssl, reactor as _reactor
 from twisted.internet.defer import inlineCallbacks, returnValue, succeed
 from twisted.mail import pop3client, imap4
 from twisted.mail.smtp import messageid, rfc822date, ESMTPSenderFactory
@@ -60,7 +61,7 @@
 from twistedcaldav.config import config
 from twistedcaldav.directory.util import transactionFromRequest
 from twistedcaldav.ical import Property
-from twistedcaldav.localization import translationTo
+from twistedcaldav.localization import translationTo, _
 from twistedcaldav.resource import CalDAVResource
 from twistedcaldav.schedule import deliverSchedulePrivilegeSet
 from twistedcaldav.scheduling.cuaddress import normalizeCUAddr
@@ -350,7 +351,7 @@
 def injectMessage(organizer, attendee, calendar, msgId, reactor=None):
 
     if reactor is None:
-        from twisted.internet import reactor
+        reactor = _reactor
 
     headers = {
         'Content-Type' : 'text/calendar',
@@ -586,16 +587,18 @@
         return multiService
 
 
-#
-# ISchedule Inbox
-#
-class IScheduleService(service.Service, LoggingMixIn):
+class IScheduleService(service.MultiService, LoggingMixIn):
+    """
+    ISchedule Inbox
+    """
 
     def __init__(self, settings, mailer):
         self.settings = settings
         self.mailer = mailer
 
-        rootResource = getRootResource(config,
+        rootResource = getRootResource(
+            config,
+            self,
             (
                 ("inbox", IMIPInvitationInboxResource, (mailer,), "digest"),
             )
@@ -604,16 +607,10 @@
         self.factory = HTTPFactory(server.Site(rootResource))
         self.server = internet.TCPServer(settings['MailGatewayPort'],
             self.factory)
+        self.server.setServiceParent(self)
 
-    def startService(self):
-        self.server.startService()
 
-    def stopService(self):
-        self.server.stopService()
 
-
-
-
 class MailHandler(LoggingMixIn):
 
     def __init__(self, dataRoot=None):
@@ -767,7 +764,7 @@
                 requireTransportSecurity=settings["UseSSL"],
             )
 
-            reactor.connectTCP(settings["Server"], settings["Port"], factory)
+            _reactor.connectTCP(settings["Server"], settings["Port"], factory)
             return deferred
 
 
@@ -954,7 +951,7 @@
                 requireAuthentication=False,
                 requireTransportSecurity=settings["UseSSL"])
 
-            reactor.connectTCP(settings['Server'], settings['Port'], factory)
+            _reactor.connectTCP(settings['Server'], settings['Port'], factory)
             deferred.addCallback(_success, msgId, fromAddr, toAddr)
             deferred.addErrback(_failure, msgId, fromAddr, toAddr)
             return deferred

Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -139,17 +139,27 @@
     #    This configures the actual network address that the server binds to.
     #
     "BindAddresses": [],   # List of IP addresses to bind to [empty = all]
-    "BindHTTPPorts": [],   # List of port numbers to bind to for HTTP [empty = same as "Port"]
-    "BindSSLPorts" : [],   # List of port numbers to bind to for SSL [empty = same as "SSLPort"]
-    "InheritFDs"   : [],   # File descriptors to inherit for HTTP requests (empty = don't inherit)
-    "InheritSSLFDs": [],   # File descriptors to inherit for HTTPS requests (empty = don't inherit)
-    "MetaFD": 0,        # Inherited file descriptor to call recvmsg() on to receive sockets (none = don't inherit)
+    "BindHTTPPorts": [],   # List of port numbers to bind to for HTTP
+                           # [empty = same as "Port"]
+    "BindSSLPorts" : [],   # List of port numbers to bind to for SSL
+                           # [empty = same as "SSLPort"]
+    "InheritFDs"   : [],   # File descriptors to inherit for HTTP requests
+                           # (empty = don't inherit)
+    "InheritSSLFDs": [],   # File descriptors to inherit for HTTPS requests
+                           # (empty = don't inherit)
+    "MetaFD"       : 0,    # Inherited file descriptor to call recvmsg() on to
+                           # receive sockets (none = don't inherit)
 
-    "UseMetaFD": True,         # Use a 'meta' FD, i.e. an FD to transmit other
-                               # FDs to slave processes.
+    "UseMetaFD"    : True, # Use a 'meta' FD, i.e. an FD to transmit other FDs
+                           # to slave processes.
 
-    "UseDatabase" : True,      # True: postgres; False: files
+    "UseDatabase"  : True, # True: postgres; False: files
 
+    "DBAMPFD"      : 0,    # Internally used by database to tell slave
+                           # processes to inherit a file descriptor and use it
+                           # as an AMP connection over a UNIX socket; see
+                           # txdav.base.datastore.asyncsqlpool.
+
     #
     # Types of service provided
     #

Modified: CalendarServer/trunk/twistedcaldav/storebridge.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/storebridge.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/storebridge.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -664,9 +664,10 @@
             content_type = MimeType("application", "octet-stream")
 
         t = self._newStoreAttachment.store(content_type)
+        @inlineCallbacks
         def done(ignored):
-            t.loseConnection()
-            return NO_CONTENT
+            yield t.loseConnection()
+            returnValue(NO_CONTENT)
         return readStream(request.stream, t.write).addCallback(done)
 
 

Modified: CalendarServer/trunk/twistedcaldav/test/test_mail.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_mail.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/test/test_mail.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -14,7 +14,6 @@
 # limitations under the License.
 ##
 
-from twistedcaldav.mail import *
 from twistedcaldav.test.util import TestCase
 from twistedcaldav.ical import Component
 from twistedcaldav.config import config
@@ -23,6 +22,8 @@
 
 from twisted.internet.defer import inlineCallbacks
 import email
+from twistedcaldav.mail import MailHandler
+from twistedcaldav.mail import MailGatewayTokensDatabase
 import os
 
 

Modified: CalendarServer/trunk/twistedcaldav/test/test_wrapping.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_wrapping.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/test/test_wrapping.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -113,7 +113,9 @@
         txn = self.calendarCollection._newStore.newTransaction()
         home = yield txn.calendarHomeWithUID(uid, True)
         cal = yield home.calendarWithName("calendar")
-        cal.createCalendarObjectWithName(objectName, VComponent.fromString(objectText))
+        yield cal.createCalendarObjectWithName(
+            objectName, VComponent.fromString(objectText)
+        )
         yield txn.commit()
 
 
@@ -141,7 +143,9 @@
         if adbk is None:
             yield home.createAddressBookWithName("addressbook")
             adbk = yield home.addressbookWithName("addressbook")
-        adbk.createAddressBookObjectWithName(objectName, VCComponent.fromString(objectText))
+        yield adbk.createAddressBookObjectWithName(
+            objectName, VCComponent.fromString(objectText)
+        )
         yield txn.commit()
 
 
@@ -287,14 +291,14 @@
     def test_lookupNewCalendar(self):
         """
         When a L{CalDAVResource} which represents a not-yet-created calendar
-        collection is looked up in a L{CalendarHomeResource} representing a calendar
-        home, it will initially have a new storage backend set to C{None}, but
-        when the calendar is created via a protocol action, the backend will be
-        initialized to match.
+        collection is looked up in a L{CalendarHomeResource} representing a
+        calendar home, it will initially have a new storage backend set to
+        C{None}, but when the calendar is created via a protocol action, the
+        backend will be initialized to match.
         """
         calDavFile = yield self.getResource("calendars/users/wsanchez/frobozz")
         self.assertIsInstance(calDavFile, ProtoCalendarCollectionResource)
-        calDavFile.createCalendarCollection()
+        yield calDavFile.createCalendarCollection()
         yield self.commit()
 
 
@@ -417,7 +421,7 @@
         """
         calDavFile = yield self.getResource("addressbooks/users/wsanchez/frobozz")
         self.assertIsInstance(calDavFile, ProtoAddressBookCollectionResource)
-        calDavFile.createAddressBookCollection()
+        yield calDavFile.createAddressBookCollection()
         yield self.commit()
         self.assertEquals(calDavFile._principalCollections,
                           frozenset([self.principalsResource]))

Modified: CalendarServer/trunk/twistedcaldav/timezoneservice.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/timezoneservice.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/timezoneservice.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -91,6 +91,9 @@
             ),
         )
 
+    def contentType(self):
+        return MimeType.fromString("text/xml")
+
     def resourceType(self):
         return davxml.ResourceType.timezones
 

Copied: CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py (from rev 6550, CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py)
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py	                        (rev 0)
+++ CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -0,0 +1,557 @@
+# -*- test-case-name: txdav.caldav.datastore -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Asynchronous multi-process connection pool.
+"""
+
+import sys
+
+from cStringIO import StringIO
+from cPickle import dumps, loads
+from itertools import count
+
+from zope.interface import implements
+
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import returnValue
+from txdav.idav import IAsyncTransaction
+from twisted.internet.defer import Deferred
+from twisted.protocols.amp import Boolean
+from twisted.python.failure import Failure
+from twisted.protocols.amp import Argument, String, Command, AMP, Integer
+from twisted.internet import reactor as _reactor
+from twisted.application.service import Service
+from txdav.base.datastore.threadutils import ThreadHolder
+from txdav.idav import AlreadyFinishedError
+from twisted.python import log
+from twisted.python.components import proxyForInterface
+
+
+class BaseSqlTxn(object):
+    """
+    L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
+    current process.
+    """
+    implements(IAsyncTransaction)
+
+    def __init__(self, connectionFactory, reactor=_reactor):
+        """
+        @param connectionFactory: A 0-argument callable which returns a DB-API
+            2.0 connection.
+        """
+        self._completed = False
+        self._holder = ThreadHolder(reactor)
+        self._holder.start()
+        def initCursor():
+            # support threadlevel=1; we can't necessarily cursor() in a
+            # different thread than we do transactions in.
+
+            # TODO: Re-try connect when it fails.  Specify a timeout.  That
+            # should happen in this layer because we need to be able to stop
+            # the reconnect attempt if it's hanging.
+            self._connection = connectionFactory()
+            self._cursor = self._connection.cursor()
+
+        # Note: no locking necessary here; since this gets submitted first, all
+        # subsequent submitted work-units will be in line behind it and the
+        # cursor will already have been initialized.
+        self._holder.submit(initCursor)
+
+
+    def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        if args is None:
+            args = []
+        self._cursor.execute(sql, args)
+        if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
+            raise raiseOnZeroRowCount()
+        if self._cursor.description:
+            return self._cursor.fetchall()
+        else:
+            return None
+
+
+    noisy = False
+
+    def execSQL(self, *args, **kw):
+        result = self._holder.submit(
+            lambda : self._reallyExecSQL(*args, **kw)
+        )
+        if self.noisy:
+            def reportResult(results):
+                sys.stdout.write("\n".join([
+                    "",
+                    "SQL: %r %r" % (args, kw),
+                    "Results: %r" % (results,),
+                    "",
+                    ]))
+                return results
+            result.addBoth(reportResult)
+        return result
+
+
+    def commit(self):
+        if not self._completed:
+            self._completed = True
+            def reallyCommit():
+                self._connection.commit()
+            result = self._holder.submit(reallyCommit)
+            return result
+        else:
+            raise AlreadyFinishedError()
+
+
+    def abort(self):
+        if not self._completed:
+            self._completed = True
+            def reallyAbort():
+                self._connection.rollback()
+            result = self._holder.submit(reallyAbort)
+            return result
+        else:
+            raise AlreadyFinishedError()
+
+
+    def __del__(self):
+        if not self._completed:
+            print 'BaseSqlTxn.__del__: OK'
+            self.abort()
+
+
+    def reset(self):
+        """
+        Call this when placing this transaction back into the pool.
+
+        @raise RuntimeError: if the transaction has not been committed or
+            aborted.
+        """
+        if not self._completed:
+            raise RuntimeError("Attempt to re-set active transaction.")
+        self._completed = False
+
+
+    def stop(self):
+        """
+        Release the thread and database connection associated with this
+        transaction.
+        """
+        self._completed = True
+        self._stopped = True
+        holder = self._holder
+        self._holder = None
+        holder.submit(self._connection.close)
+        return holder.stop()
+
+
+
+class PooledSqlTxn(proxyForInterface(iface=IAsyncTransaction,
+                                     originalAttribute='_baseTxn')):
+    """
+    This is a temporary throw-away wrapper for the longer-lived BaseSqlTxn, so
+    that if a badly-behaved API client accidentally hangs on to one of these
+    and, for example C{.abort()}s it multiple times once another client is
+    using that connection, it will get some harmless tracebacks.
+    """
+
+    def __init__(self, pool, baseTxn):
+        self._pool     = pool
+        self._baseTxn  = baseTxn
+        self._complete = False
+
+
+    def execSQL(self, *a, **kw):
+        self._checkComplete()
+        return super(PooledSqlTxn, self).execSQL(*a, **kw)
+
+
+    def commit(self):
+        self._markComplete()
+        return self._repoolAfter(super(PooledSqlTxn, self).commit())
+
+
+    def abort(self):
+        self._markComplete()
+        return self._repoolAfter(super(PooledSqlTxn, self).abort())
+
+
+    def _checkComplete(self):
+        """
+        If the transaction is complete, raise L{AlreadyFinishedError}
+        """
+        if self._complete:
+            raise AlreadyFinishedError()
+
+
+    def _markComplete(self):
+        """
+        Mark the transaction as complete, raising AlreadyFinishedError.
+        """
+        self._checkComplete()
+        self._complete = True
+
+
+    def _repoolAfter(self, d):
+        def repool(result):
+            self._pool.reclaim(self)
+            return result
+        return d.addCallback(repool)
+
+
+
+class ConnectionPool(Service, object):
+    """
+    This is a central service that has a threadpool and executes SQL statements
+    asynchronously, in a pool.
+    """
+
+    reactor = _reactor
+
+    def __init__(self, connectionFactory):
+        super(ConnectionPool, self).__init__()
+        self.free = []
+        self.busy = []
+        self.connectionFactory = connectionFactory
+
+
+    def startService(self):
+        """
+        No startup necessary.
+        """
+
+
+    @inlineCallbacks
+    def stopService(self):
+        """
+        Forcibly abort any outstanding transactions.
+        """
+        for busy in self.busy[:]:
+            try:
+                yield busy.abort()
+            except:
+                log.err()
+        # all transactions should now be in the free list, since 'abort()' will
+        # have put them there.
+        for free in self.free:
+            yield free.stop()
+
+
+    def connection(self, label="<unlabeled>"):
+        """
+        Find a transaction; either retrieve a free one from the list or
+        allocate a new one if no free ones are available.
+
+        @return: an L{IAsyncTransaction}
+        """
+        if self.free:
+            basetxn = self.free.pop(0)
+        else:
+            basetxn = BaseSqlTxn(
+                connectionFactory=self.connectionFactory,
+                reactor=self.reactor
+            )
+        txn = PooledSqlTxn(self, basetxn)
+        self.busy.append(txn)
+        return txn
+
+
+    def reclaim(self, txn):
+        """
+        Shuck the L{PooledSqlTxn} wrapper off, and put the BaseSqlTxn into the
+        free list.
+        """
+        baseTxn = txn._baseTxn
+        baseTxn.reset()
+        self.free.append(baseTxn)
+        self.busy.remove(txn)
+
+
+
+def txnarg():
+    return [('transactionID', Integer())]
+
+
+CHUNK_MAX = 0xffff
+
+class BigArgument(Argument):
+    """
+    An argument whose payload can be larger than L{CHUNK_MAX}, by splitting
+    across multiple AMP keys.
+    """
+    def fromBox(self, name, strings, objects, proto):
+        value = StringIO()
+        for counter in count():
+            chunk = strings.get("%s.%d" % (name, counter))
+            if chunk is None:
+                break
+            value.write(chunk)
+        objects[name] = self.fromString(value.getvalue())
+
+
+    def toBox(self, name, strings, objects, proto):
+        value = StringIO(self.toString(objects[name]))
+        for counter in count():
+            nextChunk = value.read(CHUNK_MAX)
+            if not nextChunk:
+                break
+            strings["%s.%d" % (name, counter)] = nextChunk
+
+
+
+class Pickle(BigArgument):
+    """
+    A pickle sent over AMP.  This is to serialize the 'args' argument to
+    C{execSQL}, which is the dynamically-typed 'args' list argument to a DB-API
+    C{execute} function, as well as its dynamically-typed result ('rows').
+
+    This should be cleaned up into a nicer structure, but this is not a network
+    protocol, so we can be a little relaxed about security.
+
+    This is a L{BigArgument} rather than a regular L{Argument} because
+    individual arguments and query results need to contain entire vCard or
+    iCalendar documents, which can easily be greater than 64k.
+    """
+
+    def toString(self, inObject):
+        return dumps(inObject)
+
+    def fromString(self, inString):
+        return loads(inString)
+
+
+
+
+class StartTxn(Command):
+    """
+    Start a transaction, identified with an ID generated by the client.
+    """
+    arguments = txnarg()
+
+
+
+class ExecSQL(Command):
+    """
+    Execute an SQL statement.
+    """
+    arguments = [('sql', String()),
+                 ('queryID', String()),
+                 ('args', Pickle())] + txnarg()
+
+
+
+class Row(Command):
+    """
+    A row has been returned.  Sent from server to client in response to
+    L{ExecSQL}.
+    """
+
+    arguments = [('queryID', String()),
+                 ('row', Pickle())]
+
+
+
+class QueryComplete(Command):
+    """
+    A query issued with ExecSQL is complete.
+    """
+
+    arguments = [('queryID', String()),
+                 ('norows', Boolean())]
+
+
+
+class Commit(Command):
+    arguments = txnarg()
+
+
+
+class Abort(Command):
+    arguments = txnarg()
+
+
+
+class _NoRows(Exception):
+    """
+    Placeholder exception to report zero rows.
+    """
+
+
+class ConnectionPoolConnection(AMP):
+    """
+    A L{ConnectionPoolConnection} is a single connection to a
+    L{ConnectionPool}.
+    """
+
+    def __init__(self, pool):
+        """
+        Initialize a mapping of transaction IDs to transaction objects.
+        """
+        super(ConnectionPoolConnection, self).__init__()
+        self.pool = pool
+        self._txns = {}
+
+
+    @StartTxn.responder
+    def start(self, transactionID):
+        self._txns[transactionID] = self.pool.connection()
+        return {}
+
+
+    @ExecSQL.responder
+    @inlineCallbacks
+    def receivedSQL(self, transactionID, queryID, sql, args):
+        try:
+            rows = yield self._txns[transactionID].execSQL(sql, args, _NoRows)
+        except _NoRows:
+            norows = True
+        else:
+            norows = False
+            if rows is not None:
+                for row in rows:
+                    # Either this should be yielded or it should be
+                    # requiresAnswer=False
+                    self.callRemote(Row, queryID=queryID, row=row)
+        self.callRemote(QueryComplete, queryID=queryID, norows=norows)
+        returnValue({})
+
+
+    def _complete(self, transactionID, thunk):
+        txn = self._txns.pop(transactionID)
+        return thunk(txn).addCallback(lambda ignored: {})
+
+
+    @Commit.responder
+    def commit(self, transactionID):
+        """
+        Successfully complete the given transaction.
+        """
+        return self._complete(transactionID, lambda x: x.commit())
+
+
+    @Abort.responder
+    def abort(self, transactionID):
+        """
+        Roll back the given transaction.
+        """
+        return self._complete(transactionID, lambda x: x.abort())
+
+
+
+class ConnectionPoolClient(AMP):
+    """
+    A client which can execute SQL.
+    """
+    def __init__(self):
+        super(ConnectionPoolClient, self).__init__()
+        self._nextID = count().next
+        self._txns = {}
+        self._queries = {}
+
+
+    def newTransaction(self):
+        txnid = str(self._nextID())
+        self.callRemote(StartTxn, transactionID=txnid)
+        txn = Transaction(client=self, transactionID=txnid)
+        self._txns[txnid] = txn
+        return txn
+
+
+    @Row.responder
+    def row(self, queryID, row):
+        self._queries[queryID].row(row)
+        return {}
+
+
+    @QueryComplete.responder
+    def complete(self, queryID, norows):
+        self._queries.pop(queryID).done(norows)
+        return {}
+
+
+
+class _Query(object):
+    def __init__(self, raiseOnZeroRowCount):
+        self.results = []
+        self.deferred = Deferred()
+        self.raiseOnZeroRowCount = raiseOnZeroRowCount
+
+
+    def row(self, row):
+        """
+        A row was received.
+        """
+        self.results.append(row)
+
+
+    def done(self, norows):
+        """
+        The query is complete.
+
+        @param norows: A boolean.  True if there were not any rows.
+        """
+        if norows and (self.raiseOnZeroRowCount is not None):
+            exc = self.raiseOnZeroRowCount()
+            self.deferred.errback(Failure(exc))
+        else:
+            self.deferred.callback(self.results)
+
+
+
+
+class Transaction(object):
+    """
+    Async protocol-based transaction implementation.
+    """
+
+    implements(IAsyncTransaction)
+
+    def __init__(self, client, transactionID):
+        """
+        Initialize a transaction with a L{ConnectionPoolClient} and a unique
+        transaction identifier.
+        """
+        self._client = client
+        self._transactionID = transactionID
+        self._completed = False
+
+
+    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        if args is None:
+            args = []
+        queryID = str(self._client._nextID())
+        query = self._client._queries[queryID] = _Query(raiseOnZeroRowCount)
+        self._client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args,
+                                transactionID=self._transactionID)
+        return query.deferred
+
+
+    def _complete(self, command):
+        if self._completed:
+            raise AlreadyFinishedError()
+        self._completed = True
+        return self._client.callRemote(
+            command, transactionID=self._transactionID
+            ).addCallback(lambda x: None)
+
+
+    def commit(self):
+        return self._complete(Commit)
+
+
+    def abort(self):
+        return self._complete(Abort)
+
+

Modified: CalendarServer/trunk/txdav/base/datastore/subpostgres.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/subpostgres.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/base/datastore/subpostgres.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -18,9 +18,9 @@
 """
 Run and manage PostgreSQL as a subprocess.
 """
+
 import os
 import pwd
-#import thread
 
 from hashlib import md5
 
@@ -36,10 +36,11 @@
 from twisted.protocols.basic import LineReceiver
 from twisted.internet import reactor
 from twisted.internet.defer import Deferred
+from txdav.base.datastore.asyncsqlpool import BaseSqlTxn
 
 from twisted.application.service import MultiService
 
-log = Logger()                                                                                                                     
+log = Logger()
 
 # This appears in the postgres log to indicate that it is accepting
 # connections.
@@ -167,6 +168,7 @@
         self.completionDeferred.callback(None)
 
 
+
 class ErrorOutput(Exception):
     """
     The process produced some error output and exited with a non-zero exit
@@ -174,6 +176,7 @@
     """
 
 
+
 class CapturingProcessProtocol(ProcessProtocol):
     """
     A L{ProcessProtocol} that captures its output and error.
@@ -212,6 +215,7 @@
         """
         self.output.append(data)
 
+
     def errReceived(self, data):
         """
         Some output was received on stderr.
@@ -226,6 +230,24 @@
         self.deferred.callback(''.join(self.output))
 
 
+
+class UnpooledSqlTxn(BaseSqlTxn):
+    """
+    Unpooled variant (releases thread immediately on commit or abort),
+    currently exclusively for testing.
+    """
+    def commit(self):
+        result = super(UnpooledSqlTxn, self).commit()
+        self.stop()
+        return result
+
+    def abort(self):
+        result = super(UnpooledSqlTxn, self).abort()
+        self.stop()
+        return result
+
+
+
 class PostgresService(MultiService):
 
     def __init__(self, dataStoreDirectory, subServiceFactory,
@@ -290,6 +312,7 @@
         self.monitor = None
         self.openConnections = []
 
+
     def activateDelayedShutdown(self):
         """
         Call this when starting database initialization code to protect against
@@ -301,6 +324,7 @@
         """
         self.delayedShutdown = True
 
+
     def deactivateDelayedShutdown(self):
         """
         Call this when database initialization code has completed so that the
@@ -310,6 +334,7 @@
         if self.shutdownDeferred:
             self.shutdownDeferred.callback(None)
 
+
     def produceConnection(self, label="<unlabeled>", databaseName=None):
         """
         Produce a DB-API 2.0 connection pointed at this database.
@@ -326,6 +351,7 @@
 
         w = DiagnosticConnectionWrapper(connection, label)
         c = w.cursor()
+
         # Turn on standard conforming strings.  This option is _required_ if
         # you want to get correct behavior out of parameter-passing with the
         # pgdb module.  If it is not set then the server is potentially
@@ -340,11 +366,24 @@
         # preferable to see some exceptions while we're in this state than to
         # have the entire worker process hang.
         c.execute("set statement_timeout=30000")
+
+        # pgdb (as per DB-API 2.0) automatically puts the connection into a
+        # 'executing a transaction' state when _any_ statement is executed on
+        # it (even these not-touching-any-data statements); make sure to commit
+        # first so that the application sees a fresh transaction, and the
+        # connection can safely be pooled without executing anything on it.
         w.commit()
         c.close()
         return w
 
 
+    def produceLocalTransaction(self, label="<unlabeled>"):
+        """
+        Create a L{IAsyncTransaction} based on a thread in the current process.
+        """
+        return UnpooledSqlTxn(lambda : self.produceConnection(label))
+
+
     def ready(self):
         """
         Subprocess is ready.  Time to initialize the subservice.
@@ -389,6 +428,7 @@
             # Only continue startup if we've not begun shutdown
             self.subServiceFactory(self.produceConnection).setServiceParent(self)
 
+
     def pauseMonitor(self):
         """
         Pause monitoring.  This is a testing hook for when (if) we are
@@ -402,7 +442,7 @@
     def unpauseMonitor(self):
         """
         Unpause monitoring.
-        
+
         @see: L{pauseMonitor} 
         """
 #        for pipe in self.monitor.transport.pipes.values():
@@ -417,9 +457,11 @@
         monitor = _PostgresMonitor(self)
         pg_ctl = which("pg_ctl")[0]
         # check consistency of initdb and postgres?
-        
+
         options = []
-        options.append("-c listen_addresses='%s'" % (",".join(self.listenAddresses)))
+        options.append(
+            "-c listen_addresses='%s'" % (",".join(self.listenAddresses))
+        )
         if self.socketDir:
             options.append("-k '%s'" % (self.socketDir.path,))
         options.append("-c shared_buffers=%d" % (self.sharedBuffers,))

Copied: CalendarServer/trunk/txdav/base/datastore/threadutils.py (from rev 6550, CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py)
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/threadutils.py	                        (rev 0)
+++ CalendarServer/trunk/txdav/base/datastore/threadutils.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -0,0 +1,111 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import sys
+from Queue import Queue
+
+
+from twisted.python.failure import Failure
+from twisted.internet.defer import Deferred
+
+
+_DONE = object()
+
+_STATE_STOPPED = 'STOPPED'
+_STATE_RUNNING = 'RUNNING'
+_STATE_STOPPING = 'STOPPING'
+
+class ThreadHolder(object):
+    """
+    A queue which will hold a reactor threadpool thread open until all of the
+    work in that queue is done.
+    """
+
+    def __init__(self, reactor):
+        self._reactor = reactor
+        self._state = _STATE_STOPPED
+        self._stopper = None
+        self._q = None
+
+
+    def _run(self):
+        """
+        Worker function which runs in a non-reactor thread.
+        """
+        while True:
+            work = self._q.get()
+            if work is _DONE:
+                def finishStopping():
+                    self._state = _STATE_STOPPED
+                    self._q = None
+                    s = self._stopper
+                    self._stopper = None
+                    s.callback(None)
+                self._reactor.callFromThread(finishStopping)
+                return
+            self._oneWorkUnit(*work)
+
+
+    def _oneWorkUnit(self, deferred, instruction):
+        try: 
+            result = instruction()
+        except:
+            etype, evalue, etb = sys.exc_info()
+            def relayFailure():
+                f = Failure(evalue, etype, etb)
+                deferred.errback(f)
+            self._reactor.callFromThread(relayFailure)
+        else:
+            self._reactor.callFromThread(deferred.callback, result)
+
+
+    def submit(self, work):
+        """
+        Submit some work to be run.
+
+        @param work: a 0-argument callable, which will be run in a thread.
+
+        @return: L{Deferred} that fires with the result of L{work}
+        """
+        d = Deferred()
+        self._q.put((d, work))
+        return d
+
+
+    def start(self):
+        """
+        Start this thing, if it's stopped.
+        """
+        if self._state != _STATE_STOPPED:
+            raise RuntimeError("Not stopped.")
+        self._state = _STATE_RUNNING
+        self._q = Queue(0)
+        self._reactor.callInThread(self._run)
+
+
+    def stop(self):
+        """
+        Stop this thing and release its thread, if it's running.
+        """
+        if self._state != _STATE_RUNNING:
+            raise RuntimeError("Not running.")
+        s = self._stopper = Deferred()
+        self._state = _STATE_STOPPING
+        self._q.put(_DONE)
+        return s
+
+
+

Modified: CalendarServer/trunk/txdav/base/propertystore/test/test_sql.py
===================================================================
--- CalendarServer/trunk/txdav/base/propertystore/test/test_sql.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/base/propertystore/test/test_sql.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -41,6 +41,7 @@
     def setUp(self):
         self.notifierFactory = StubNotifierFactory()
         self.store = yield buildStore(self, self.notifierFactory)
+        self.addCleanup(self.maybeCommitLast)
         self._txn = self.store.newTransaction()
         self.propertyStore = self.propertyStore1 = yield PropertyStore.load(
             "user01", self._txn, 1
@@ -50,7 +51,7 @@
 
 
     @inlineCallbacks
-    def tearDown(self):
+    def maybeCommitLast(self):
         if hasattr(self, "_txn"):
             result = yield self._txn.commit()
             delattr(self, "_txn")
@@ -66,7 +67,7 @@
             yield self._txn.commit()
             delattr(self, "_txn")
         self._txn = self.store.newTransaction()
-        
+
         store = self.propertyStore1
         self.propertyStore = self.propertyStore1 = yield PropertyStore.load(
             "user01", self._txn, 1


Property changes on: CalendarServer/trunk/txdav/caldav/datastore/index_file.py
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation/txdav/caldav/datastore/index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/caldav/datastore/index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/caldav/datastore/index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/caldav/datastore/index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/caldav/datastore/index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/caldav/datastore/index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/caldav/datastore/index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/caldav/datastore/index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/caldav/datastore/index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/caldav/datastore/index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/caldav/datastore/index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/caldav/datastore/index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/caldav/datastore/index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/caldav/datastore/index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/caldav/datastore/index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/caldav/datastore/index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sql-store/txdav/caldav/datastore/index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/caldav/datastore/index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/caldav/datastore/index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/caldav/datastore/index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/caldav/datastore/index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/caldav/datastore/index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/caldav/datastore/index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/caldav/datastore/index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/index.py:6322-6394
   + /CalendarServer/branches/config-separation/txdav/caldav/datastore/index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/caldav/datastore/index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/caldav/datastore/index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/caldav/datastore/index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/caldav/datastore/index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/caldav/datastore/index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/caldav/datastore/index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/caldav/datastore/index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/caldav/datastore/index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/caldav/datastore/index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/caldav/datastore/index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/caldav/datastore/index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/caldav/datastore/index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/caldav/datastore/index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/caldav/datastore/index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/caldav/datastore/index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sharedpool/txdav/caldav/datastore/index_file.py:6490-6550
/CalendarServer/branches/users/glyph/sql-store/txdav/caldav/datastore/index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/caldav/datastore/index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/caldav/datastore/index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/caldav/datastore/index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/caldav/datastore/index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/caldav/datastore/index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/caldav/datastore/index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/caldav/datastore/index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/index.py:6322-6394

Modified: CalendarServer/trunk/txdav/caldav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/sql.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/caldav/datastore/sql.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -589,32 +589,38 @@
 
     @inlineCallbacks
     def loseConnection(self):
-        
+
         old_size = self.attachment.size()
 
         self.attachment._path.setContent(self.buf)
         self.attachment._contentType = self.contentType
         self.attachment._md5 = self.hash.hexdigest()
         self.attachment._size = len(self.buf)
-        self.attachment._created, self.attachment._modified = (yield self._txn.execSQL(
-            """
-            update ATTACHMENT set CONTENT_TYPE = %s, SIZE = %s, MD5 = %s,
-             MODIFIED = timezone('UTC', CURRENT_TIMESTAMP)
-            where PATH = %s
-            returning CREATED, MODIFIED
-            """,
-            [
-                generateContentType(self.contentType),
-                self.attachment._size,
-                self.attachment._md5,
-                self.attachment.name()
-            ]
-        ))[0]
+        self.attachment._created, self.attachment._modified = map(
+            sqltime,
+            (yield self._txn.execSQL(
+                """
+                update ATTACHMENT set CONTENT_TYPE = %s, SIZE = %s, MD5 = %s,
+                 MODIFIED = timezone('UTC', CURRENT_TIMESTAMP)
+                where PATH = %s
+                returning CREATED, MODIFIED
+                """,
+                [
+                    generateContentType(self.contentType),
+                    self.attachment._size,
+                    self.attachment._md5,
+                    self.attachment.name()
+                ]
+            ))[0]
+        )
 
         # Adjust quota
         yield self.attachment._calendarObject._calendar._home.adjustQuotaUsedBytes(self.attachment.size() - old_size)
 
 
+def sqltime(value):
+    return datetimeMktime(datetime.datetime.strptime(value, "%Y-%m-%d %H:%M:%S.%f"))
+
 class Attachment(object):
 
     implements(IAttachment)
@@ -648,8 +654,8 @@
         self._contentType = MimeType.fromString(rows[0][0])
         self._size = rows[0][1]
         self._md5 = rows[0][2]
-        self._created = datetimeMktime(datetime.datetime.strptime(rows[0][3], "%Y-%m-%d %H:%M:%S.%f"))
-        self._modified = datetimeMktime(datetime.datetime.strptime(rows[0][4], "%Y-%m-%d %H:%M:%S.%f"))
+        self._created = sqltime(rows[0][3])
+        self._modified = sqltime(rows[0][4])
         returnValue(self)
 
 
@@ -692,3 +698,5 @@
 
     def modified(self):
         return self._modified
+
+

Modified: CalendarServer/trunk/txdav/caldav/datastore/test/common.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/test/common.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/caldav/datastore/test/common.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -993,7 +993,7 @@
         # Sanity check; make sure the test has the right idea of the subject.
         self.assertNotEquals(event1_text, event1_text_withDifferentSubject)
         newComponent = VComponent.fromString(event1_text_withDifferentSubject)
-        obj.setComponent(newComponent)
+        yield obj.setComponent(newComponent)
 
         # Putting everything into a separate transaction to account for any
         # caching that may take place.
@@ -1067,7 +1067,7 @@
         """
         objName = "with-dropbox.ics"
         cal = yield self.calendarUnderTest()
-        cal.createCalendarObjectWithName(
+        yield cal.createCalendarObjectWithName(
             objName, VComponent.fromString(
                 self.eventWithDropbox
             )
@@ -1091,7 +1091,7 @@
         )
         t.write("new attachment")
         t.write(" text")
-        t.loseConnection()
+        yield t.loseConnection()
         obj = yield refresh(obj)
         class CaptureProtocol(Protocol):
             buf = ''
@@ -1181,7 +1181,7 @@
             "new.attachment", MimeType("text", "plain")
         )
         t.write("new attachment text")
-        t.loseConnection()
+        yield t.loseConnection()
         yield self.commit()
         home = (yield self.homeUnderTest())
         calendars = (yield home.calendars())


Property changes on: CalendarServer/trunk/txdav/caldav/datastore/test/test_index_file.py
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation/txdav/caldav/datastore/test/test_index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/caldav/datastore/test/test_index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/caldav/datastore/test/test_index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/caldav/datastore/test/test_index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/caldav/datastore/test/test_index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/caldav/datastore/test/test_index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/caldav/datastore/test/test_index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/caldav/datastore/test/test_index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/caldav/datastore/test/test_index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/caldav/datastore/test/test_index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/caldav/datastore/test/test_index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/caldav/datastore/test/test_index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/caldav/datastore/test/test_index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/caldav/datastore/test/test_index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/caldav/datastore/test/test_index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/caldav/datastore/test/test_index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sql-store/txdav/caldav/datastore/test/test_index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/caldav/datastore/test/test_index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/caldav/datastore/test/test_index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/caldav/datastore/test/test_index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/caldav/datastore/test/test_index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/caldav/datastore/test/test_index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/caldav/datastore/test/test_index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/caldav/datastore/test/test_index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/test/test_index.py:6322-6394
   + /CalendarServer/branches/config-separation/txdav/caldav/datastore/test/test_index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/caldav/datastore/test/test_index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/caldav/datastore/test/test_index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/caldav/datastore/test/test_index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/caldav/datastore/test/test_index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/caldav/datastore/test/test_index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/caldav/datastore/test/test_index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/caldav/datastore/test/test_index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/caldav/datastore/test/test_index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/caldav/datastore/test/test_index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/caldav/datastore/test/test_index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/caldav/datastore/test/test_index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/caldav/datastore/test/test_index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/caldav/datastore/test/test_index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/caldav/datastore/test/test_index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/caldav/datastore/test/test_index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sharedpool/txdav/caldav/datastore/test/test_index_file.py:6490-6550
/CalendarServer/branches/users/glyph/sql-store/txdav/caldav/datastore/test/test_index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/caldav/datastore/test/test_index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/caldav/datastore/test/test_index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/caldav/datastore/test/test_index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/caldav/datastore/test/test_index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/caldav/datastore/test/test_index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/caldav/datastore/test/test_index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/caldav/datastore/test/test_index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/test/test_index.py:6322-6394

Modified: CalendarServer/trunk/txdav/caldav/datastore/test/test_sql.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/test/test_sql.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/caldav/datastore/test/test_sql.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -44,7 +44,7 @@
 
     @inlineCallbacks
     def setUp(self):
-        super(CalendarSQLStorageTests, self).setUp()
+        yield super(CalendarSQLStorageTests, self).setUp()
         self._sqlCalendarStore = yield buildStore(self, self.notifierFactory)
         yield self.populate()
 
@@ -86,7 +86,7 @@
         """
         Assert that two objects with C{properties} methods have similar
         properties.
-        
+
         @param disregard: a list of L{PropertyName} keys to discard from both
             input and output.
         """
@@ -201,7 +201,7 @@
         operations, that we do not block other reads of the table.
         """
 
-        calendarStore = yield buildStore(self, self.notifierFactory)
+        calendarStore = self._sqlCalendarStore
 
         txn1 = calendarStore.newTransaction()
         txn2 = calendarStore.newTransaction()
@@ -211,7 +211,7 @@
         # reads of existing data in the table
         home_uid2 = yield txn3.homeWithUID(ECALENDARTYPE, "uid2", create=True)
         self.assertNotEqual(home_uid2, None)
-        txn3.commit()
+        yield txn3.commit()
 
         home_uid1_1 = yield txn1.homeWithUID(
             ECALENDARTYPE, "uid1", create=True
@@ -237,7 +237,7 @@
         txn4 = calendarStore.newTransaction()
         home_uid2 = yield txn4.homeWithUID(ECALENDARTYPE, "uid2", create=True)
         self.assertNotEqual(home_uid2, None)
-        txn4.commit()
+        yield txn4.commit()
 
         # Now do the concurrent provision attempt
         yield d2
@@ -250,21 +250,20 @@
     @inlineCallbacks
     def test_putConcurrency(self):
         """
-        Test that two concurrent attempts to PUT different address book object resources to the
-        same address book home does not cause a deadlock.
+        Test that two concurrent attempts to PUT different calendar object
+        resources to the same address book home does not cause a deadlock.
         """
 
-        calendarStore1 = yield buildStore(self, self.notifierFactory)
-        calendarStore2 = yield buildStore(self, self.notifierFactory)
+        calendarStore = self._sqlCalendarStore
 
         # Provision the home now
-        txn = calendarStore1.newTransaction()
+        txn = calendarStore.newTransaction()
         home = yield txn.homeWithUID(ECALENDARTYPE, "uid1", create=True)
         self.assertNotEqual(home, None)
-        txn.commit()
+        yield txn.commit()
 
-        txn1 = calendarStore1.newTransaction()
-        txn2 = calendarStore2.newTransaction()
+        txn1 = calendarStore.newTransaction()
+        txn2 = calendarStore.newTransaction()
 
         home1 = yield txn1.homeWithUID(ECALENDARTYPE, "uid1", create=True)
         home2 = yield txn2.homeWithUID(ECALENDARTYPE, "uid1", create=True)
@@ -364,3 +363,6 @@
 
         yield d1
         yield d2
+
+
+


Property changes on: CalendarServer/trunk/txdav/carddav/datastore/index_file.py
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation/txdav/carddav/datastore/index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/carddav/datastore/index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/carddav/datastore/index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/carddav/datastore/index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/carddav/datastore/index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/carddav/datastore/index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/carddav/datastore/index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/carddav/datastore/index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/carddav/datastore/index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/carddav/datastore/index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/carddav/datastore/index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/carddav/datastore/index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/carddav/datastore/index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/carddav/datastore/index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/carddav/datastore/index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/carddav/datastore/index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sql-store/txdav/carddav/datastore/index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/carddav/datastore/index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/carddav/datastore/index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/carddav/datastore/index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/carddav/datastore/index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/carddav/datastore/index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/carddav/datastore/index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/carddav/datastore/index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/vcardindex.py:6322-6394
   + /CalendarServer/branches/config-separation/txdav/carddav/datastore/index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/carddav/datastore/index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/carddav/datastore/index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/carddav/datastore/index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/carddav/datastore/index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/carddav/datastore/index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/carddav/datastore/index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/carddav/datastore/index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/carddav/datastore/index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/carddav/datastore/index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/carddav/datastore/index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/carddav/datastore/index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/carddav/datastore/index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/carddav/datastore/index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/carddav/datastore/index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/carddav/datastore/index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sharedpool/txdav/carddav/datastore/index_file.py:6490-6550
/CalendarServer/branches/users/glyph/sql-store/txdav/carddav/datastore/index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/carddav/datastore/index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/carddav/datastore/index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/carddav/datastore/index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/carddav/datastore/index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/carddav/datastore/index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/carddav/datastore/index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/carddav/datastore/index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/vcardindex.py:6322-6394

Modified: CalendarServer/trunk/txdav/carddav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/carddav/datastore/sql.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/carddav/datastore/sql.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -102,7 +102,7 @@
         will eventually have on disk.
         @type realName: C{str}
         """
-        
+
         super(AddressBook, self).__init__(home, name, resourceID, notifier)
 
         self._index = PostgresLegacyABIndexEmulator(self)

Modified: CalendarServer/trunk/txdav/carddav/datastore/test/common.py
===================================================================
--- CalendarServer/trunk/txdav/carddav/datastore/test/common.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/carddav/datastore/test/common.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -867,7 +867,7 @@
         # Sanity check; make sure the test has the right idea of the subject.
         self.assertNotEquals(vcard1_text, vcard1_text_withDifferentNote)
         newComponent = VComponent.fromString(vcard1_text_withDifferentNote)
-        obj.setComponent(newComponent)
+        yield obj.setComponent(newComponent)
 
         # Putting everything into a separate transaction to account for any
         # caching that may take place.


Property changes on: CalendarServer/trunk/txdav/carddav/datastore/test/test_index_file.py
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation/txdav/carddav/datastore/test/test_index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/carddav/datastore/test/test_index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/carddav/datastore/test/test_index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/carddav/datastore/test/test_index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/carddav/datastore/test/test_index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/carddav/datastore/test/test_index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/carddav/datastore/test/test_index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/carddav/datastore/test/test_index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/carddav/datastore/test/test_index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/carddav/datastore/test/test_index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/carddav/datastore/test/test_index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/carddav/datastore/test/test_index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/carddav/datastore/test/test_index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/carddav/datastore/test/test_index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/carddav/datastore/test/test_index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/carddav/datastore/test/test_index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sql-store/txdav/carddav/datastore/test/test_index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/carddav/datastore/test/test_index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/carddav/datastore/test/test_index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/carddav/datastore/test/test_index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/carddav/datastore/test/test_index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/carddav/datastore/test/test_index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/carddav/datastore/test/test_index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/carddav/datastore/test/test_index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/test/test_vcardindex.py:6322-6394
   + /CalendarServer/branches/config-separation/txdav/carddav/datastore/test/test_index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/carddav/datastore/test/test_index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/carddav/datastore/test/test_index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/carddav/datastore/test/test_index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/carddav/datastore/test/test_index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/carddav/datastore/test/test_index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/carddav/datastore/test/test_index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/carddav/datastore/test/test_index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/carddav/datastore/test/test_index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/carddav/datastore/test/test_index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/carddav/datastore/test/test_index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/carddav/datastore/test/test_index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/carddav/datastore/test/test_index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/carddav/datastore/test/test_index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/carddav/datastore/test/test_index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/carddav/datastore/test/test_index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sharedpool/txdav/carddav/datastore/test/test_index_file.py:6490-6550
/CalendarServer/branches/users/glyph/sql-store/txdav/carddav/datastore/test/test_index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/carddav/datastore/test/test_index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/carddav/datastore/test/test_index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/carddav/datastore/test/test_index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/carddav/datastore/test/test_index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/carddav/datastore/test/test_index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/carddav/datastore/test/test_index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/carddav/datastore/test/test_index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/test/test_vcardindex.py:6322-6394

Modified: CalendarServer/trunk/txdav/carddav/datastore/test/test_sql.py
===================================================================
--- CalendarServer/trunk/txdav/carddav/datastore/test/test_sql.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/carddav/datastore/test/test_sql.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -41,7 +41,7 @@
 
     @inlineCallbacks
     def setUp(self):
-        super(AddressBookSQLStorageTests, self).setUp()
+        yield super(AddressBookSQLStorageTests, self).setUp()
         self._sqlStore = yield buildStore(self, self.notifierFactory)
         yield self.populate()
 
@@ -195,18 +195,16 @@
         Test that two concurrent attempts to PUT different address book object resources to the
         same address book home does not cause a deadlock.
         """
+        addressbookStore = yield buildStore(self, self.notifierFactory)
 
-        addressbookStore1 = yield buildStore(self, self.notifierFactory)
-        addressbookStore2 = yield buildStore(self, self.notifierFactory)
-
         # Provision the home now
-        txn = addressbookStore1.newTransaction()
+        txn = addressbookStore.newTransaction()
         home = yield txn.homeWithUID(EADDRESSBOOKTYPE, "uid1", create=True)
         self.assertNotEqual(home, None)
         yield txn.commit()
 
-        txn1 = addressbookStore1.newTransaction()
-        txn2 = addressbookStore2.newTransaction()
+        txn1 = addressbookStore.newTransaction()
+        txn2 = addressbookStore.newTransaction()
 
         home1 = yield txn1.homeWithUID(EADDRESSBOOKTYPE, "uid1", create=True)
         home2 = yield txn2.homeWithUID(EADDRESSBOOKTYPE, "uid1", create=True)

Modified: CalendarServer/trunk/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/common/datastore/sql.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -25,26 +25,23 @@
     "CommonHome",
 ]
 
-import sys
 import datetime
-from Queue import Queue
 
-from zope.interface.declarations import implements, directlyProvides
+from zope.interface import implements, directlyProvides
 
+from twext.python.log import Logger, LoggingMixIn
+from twext.web2.dav.element.rfc2518 import ResourceType
+from twext.web2.http_headers import MimeType
+
 from twisted.python import hashlib
 from twisted.python.modules import getModule
 from twisted.python.util import FancyEqMixin
-from twisted.python.failure import Failure
 
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from twisted.internet.defer import inlineCallbacks, returnValue
 
 from twisted.application.service import Service
 
-from twext.python.log import Logger, LoggingMixIn
 from twext.internet.decorate import memoizedKey
-from twext.web2.dav.element.rfc2518 import ResourceType
-from twext.web2.http_headers import MimeType
 
 from txdav.common.datastore.sql_legacy import PostgresLegacyNotificationsEmulator
 from txdav.caldav.icalendarstore import ICalendarTransaction, ICalendarStore
@@ -61,9 +58,8 @@
 from txdav.common.inotifications import INotificationCollection, \
     INotificationObject
 
-from txdav.idav import AlreadyFinishedError
-from txdav.base.propertystore.base import PropertyName
 from txdav.base.propertystore.sql import PropertyStore
+from txdav.base.propertystore.base import PropertyName
 
 from twistedcaldav.customxml import NotificationType
 from twistedcaldav.dateops import datetimeMktime
@@ -87,15 +83,17 @@
 
     implements(ICalendarStore)
 
-    def __init__(self, connectionFactory, notifierFactory, attachmentsPath,
-                 enableCalendars=True, enableAddressBooks=True):
+    def __init__(self, sqlTxnFactory, notifierFactory, attachmentsPath,
+                 enableCalendars=True, enableAddressBooks=True,
+                 label="unlabeled"):
         assert enableCalendars or enableAddressBooks
 
-        self.connectionFactory = connectionFactory
+        self.sqlTxnFactory = sqlTxnFactory
         self.notifierFactory = notifierFactory
         self.attachmentsPath = attachmentsPath
         self.enableCalendars = enableCalendars
         self.enableAddressBooks = enableAddressBooks
+        self.label = label
 
 
     def eachCalendarHome(self):
@@ -113,9 +111,12 @@
 
 
     def newTransaction(self, label="unlabeled", migrating=False):
+        """
+        @see L{IDataStore.newTransaction}
+        """
         return CommonStoreTransaction(
             self,
-            self.connectionFactory,
+            self.sqlTxnFactory(),
             self.enableCalendars,
             self.enableAddressBooks,
             self.notifierFactory,
@@ -124,93 +125,7 @@
         )
 
 
-_DONE = object()
 
-_STATE_STOPPED = "STOPPED"
-_STATE_RUNNING = "RUNNING"
-_STATE_STOPPING = "STOPPING"
-
-class ThreadHolder(object):
-    """
-    A queue which will hold a reactor threadpool thread open until all of the
-    work in that queue is done.
-    """
-
-    def __init__(self, reactor):
-        self._reactor = reactor
-        self._state = _STATE_STOPPED
-        self._stopper = None
-        self._q = None
-
-
-    def _run(self):
-        """
-        Worker function which runs in a non-reactor thread.
-        """
-        while True:
-            work = self._q.get()
-            if work is _DONE:
-                def finishStopping():
-                    self._state = _STATE_STOPPED
-                    self._q = None
-                    s = self._stopper
-                    self._stopper = None
-                    s.callback(None)
-                self._reactor.callFromThread(finishStopping)
-                return
-            self._oneWorkUnit(*work)
-
-
-    def _oneWorkUnit(self, deferred, instruction):
-        try: 
-            result = instruction()
-        except:
-            etype, evalue, etb = sys.exc_info()
-            def relayFailure():
-                f = Failure(evalue, etype, etb)
-                deferred.errback(f)
-            self._reactor.callFromThread(relayFailure)
-        else:
-            self._reactor.callFromThread(deferred.callback, result)
-
-
-    def submit(self, work):
-        """
-        Submit some work to be run.
-
-        @param work: a 0-argument callable, which will be run in a thread.
-
-        @return: L{Deferred} that fires with the result of L{work}
-        """
-        d = Deferred()
-        self._q.put((d, work))
-        return d
-
-
-    def start(self):
-        """
-        Start this thing, if it's stopped.
-        """
-        if self._state != _STATE_STOPPED:
-            raise RuntimeError("Not stopped.")
-        self._state = _STATE_RUNNING
-        self._q = Queue(0)
-        self._reactor.callInThread(self._run)
-
-
-    def stop(self):
-        """
-        Stop this thing and release its thread, if it's running.
-        """
-        if self._state != _STATE_RUNNING:
-            raise RuntimeError("Not running.")
-        s = self._stopper = Deferred()
-        self._state = _STATE_STOPPING
-        self._q.put(_DONE)
-        return s
-
-
-
 class CommonStoreTransaction(object):
     """
     Transaction implementation for SQL database.
@@ -218,14 +133,12 @@
     _homeClass = {}
     _homeTable = {}
 
-    noisy = False
     id = 0
 
-    def __init__(self, store, connectionFactory,
+    def __init__(self, store, sqlTxn,
                  enableCalendars, enableAddressBooks,
                  notifierFactory, label, migrating=False):
         self._store = store
-        self._completed = False
         self._calendarHomes = {}
         self._addressbookHomes = {}
         self._notificationHomes = {}
@@ -233,6 +146,7 @@
         self._notifierFactory = notifierFactory
         self._label = label
         self._migrating = migrating
+
         CommonStoreTransaction.id += 1
         self._txid = CommonStoreTransaction.id
 
@@ -249,62 +163,18 @@
         CommonStoreTransaction._homeClass[EADDRESSBOOKTYPE] = AddressBookHome
         CommonStoreTransaction._homeTable[ECALENDARTYPE] = CALENDAR_HOME_TABLE
         CommonStoreTransaction._homeTable[EADDRESSBOOKTYPE] = ADDRESSBOOK_HOME_TABLE
-        self._holder = ThreadHolder(reactor)
-        self._holder.start()
-        def initCursor():
-            # support threadlevel=1; we can't necessarily cursor() in a
-            # different thread than we do transactions in.
+        self._sqlTxn = sqlTxn
 
-            # FIXME: may need to be pooling ThreadHolders along with
-            # connections, if threadlevel=1 requires connect() be called in the
-            # same thread as cursor() et. al.
-            self._connection = connectionFactory()
-            self._cursor = self._connection.cursor()
-        self._holder.submit(initCursor)
 
-
     def store(self):
         return self._store
 
 
     def __repr__(self):
-        return "PG-TXN<%s>" % (self._label,)
+        return 'PG-TXN<%s>' % (self._label,)
 
 
-    def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None):
-        self._cursor.execute(sql, args)
-        if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
-            raise raiseOnZeroRowCount()
-        if self._cursor.description:
-            return self._cursor.fetchall()
-        else:
-            return None
-
-
-    def execSQL(self, *args, **kw):
-        result = self._holder.submit(
-            lambda : self._reallyExecSQL(*args, **kw)
-        )
-        if self.noisy:
-            def reportResult(results):
-                sys.stdout.write("\n".join([
-                    "",
-                    "SQL (%d): %r %r" % (self._txid, args, kw),
-                    "Results (%d): %r" % (self._txid, results,),
-                    "",
-                    ]))
-                return results
-            result.addBoth(reportResult)
-        return result
-
-
-    def __del__(self):
-        if not self._completed:
-            print "CommonStoreTransaction.__del__: OK"
-            self.abort()
-
-
-    @memoizedKey("uid", "_calendarHomes")
+    @memoizedKey('uid', '_calendarHomes')
     def calendarHomeWithUID(self, uid, create=False):
         return self.homeWithUID(ECALENDARTYPE, uid, create=create)
 
@@ -409,40 +279,36 @@
         returnValue(collection)
 
 
-    def abort(self):
-        if not self._completed:
-            def reallyAbort():
-                self._connection.rollback()
-                self._connection.close()
-            self._completed = True
-            result = self._holder.submit(reallyAbort)
-            self._holder.stop()
-            return result
-        else:
-            raise AlreadyFinishedError()
+    def postCommit(self, operation):
+        """
+        Run things after C{commit}.
+        """
+        self._postCommitOperations.append(operation)
 
 
+    def execSQL(self, *a, **kw):
+        """
+        Execute some SQL (delegate to L{IAsyncTransaction}).
+        """
+        return self._sqlTxn.execSQL(*a, **kw)
+
+
     def commit(self):
-        if not self._completed:
-            self._completed = True
-            def postCommit(ignored):
-                for operation in self._postCommitOperations:
-                    operation()
-            def reallyCommit():
-                self._connection.commit()
-                self._connection.close()
-            result = self._holder.submit(reallyCommit).addCallback(postCommit)
-            self._holder.stop()
-            return result
-        else:
-            raise AlreadyFinishedError()
+        """
+        Commit the transaction and execute any post-commit hooks.
+        """
+        def postCommit(ignored):
+            for operation in self._postCommitOperations:
+                operation()
+            return ignored
+        return self._sqlTxn.commit().addCallback(postCommit)
 
 
-    def postCommit(self, operation):
+    def abort(self):
         """
-        Run things after C{commit}.
+        Abort the transaction.
         """
-        self._postCommitOperations.append(operation)
+        return self._sqlTxn.abort()
 
 
 
@@ -717,13 +583,14 @@
             raise NoSuchHomeChildError()
         yield child._deletedSyncToken()
 
-        yield self._txn.execSQL(
-            "delete from %(name)s where %(column_RESOURCE_ID)s = %%s" % self._childTable,
-            [child._resourceID]
-        )
-        self._children.pop(name, None)
-        if self._txn._cursor.rowcount == 0:
-            raise NoSuchHomeChildError()
+        try:
+            yield self._txn.execSQL(
+                "delete from %(name)s where %(column_RESOURCE_ID)s = %%s" % self._childTable,
+                [child._resourceID],
+                raiseOnZeroRowCount=NoSuchHomeChildError
+            )
+        finally:
+            self._children.pop(name, None)
 
         child.notifyChanged()
 
@@ -881,6 +748,7 @@
             [self._ownerUID]
         ))[0][0])
 
+
     @inlineCallbacks
     def adjustQuotaUsedBytes(self, delta):
         """
@@ -888,7 +756,6 @@
         is done atomically. It is import to do the 'select ... for update' because a race also
         exists in the 'update ... x = x + 1' case as seen via unit tests.
         """
-        
         yield self._txn.execSQL("""
             select * from %(name)s
             where %(column_RESOURCE_ID)s = %%s
@@ -905,7 +772,6 @@
             """ % self._homeTable,
             [delta, self._resourceID]
         ))[0][0]
-        
         # Double check integrity
         if quotaUsedBytes < 0:
             log.error("Fixing quota adjusted below zero to %s by change amount %s" % (quotaUsedBytes, delta,))
@@ -916,8 +782,8 @@
                 """ % self._homeTable,
                 [self._resourceID]
             )
-            
 
+
     def notifierID(self, label="default"):
         if self._notifier:
             return self._notifier.getID(label)
@@ -1137,7 +1003,7 @@
 
     @inlineCallbacks
     def removeObjectResourceWithName(self, name):
-        
+
         uid, old_size = (yield self._txn.execSQL(
             "delete from %(name)s "
             "where %(column_RESOURCE_NAME)s = %%s and %(column_PARENT_RESOURCE_ID)s = %%s "

Modified: CalendarServer/trunk/txdav/common/datastore/test/util.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/test/util.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/common/datastore/test/util.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -28,14 +28,17 @@
 from twext.python.vcomponent import VComponent
 
 from twisted.internet import reactor
-from twisted.internet.defer import Deferred, succeed, inlineCallbacks
+from twisted.internet.defer import Deferred, inlineCallbacks
 from twisted.internet.task import deferLater
 from twisted.python import log
+from twisted.application.service import Service
 
 from txdav.common.datastore.sql import CommonDataStore, v1_schema
 from txdav.base.datastore.subpostgres import PostgresService,\
     DiagnosticConnectionWrapper
 from txdav.common.icommondatastore import NoSuchHomeChildError
+from txdav.base.datastore.asyncsqlpool import ConnectionPool
+from twisted.internet.defer import returnValue
 from twistedcaldav.notify import Notifier
 
 
@@ -52,6 +55,8 @@
         print connection.label, connection.state
     print '--- CONNECTIONS END ---'
 
+
+
 class SQLStoreBuilder(object):
     """
     Test-fixture-builder which can construct a PostgresStore.
@@ -67,31 +72,15 @@
 
         @return: a L{Deferred} which fires with an L{IDataStore}.
         """
-        currentTestID = testCase.id()
         dbRoot = CachingFilePath(self.SHARED_DB_PATH)
+        attachmentRoot = dbRoot.child("attachments")
         if self.sharedService is None:
             ready = Deferred()
             def getReady(connectionFactory):
-                attachmentRoot = dbRoot.child("attachments")
-                try:
-                    attachmentRoot.createDirectory()
-                except OSError:
-                    pass
-                try:
-                    self.store = CommonDataStore(
-                        lambda label=None: connectionFactory(
-                            label or currentTestID
-                        ),
-                        notifierFactory,
-                        attachmentRoot
-                    )
-                except:
-                    ready.errback()
-                    raise
-                else:
-                    self.cleanDatabase(testCase)
-                    ready.callback(self.store)
-                return self.store
+                self.makeAndCleanStore(
+                    testCase, notifierFactory, attachmentRoot
+                ).chainDeferred(ready)
+                return Service()
             self.sharedService = PostgresService(
                 dbRoot, getReady, v1_schema, resetSchema=True,
                 databaseName="caldav",
@@ -106,13 +95,10 @@
                 "before", "shutdown", startStopping)
             result = ready
         else:
-            self.store.notifierFactory = notifierFactory
-            self.cleanDatabase(testCase)
-            result = succeed(self.store)
-
+            result = self.makeAndCleanStore(
+                testCase, notifierFactory, attachmentRoot
+            )
         def cleanUp():
-            # FIXME: clean up any leaked connections and report them with an
-            # immediate test failure.
             def stopit():
                 self.sharedService.pauseMonitor()
             return deferLater(reactor, 0.1, stopit)
@@ -120,11 +106,42 @@
         return result
 
 
-    def cleanDatabase(self, testCase):
-        cleanupConn = self.store.connectionFactory(
+    @inlineCallbacks
+    def makeAndCleanStore(self, testCase, notifierFactory, attachmentRoot):
+        """
+        Create a L{CommonDataStore} specific to the given L{TestCase}.
+
+        This also creates a L{ConnectionPool} that gets stopped when the test
+        finishes, to make sure that any test which fails will terminate
+        cleanly.
+
+        @return: a L{Deferred} that fires with a L{CommonDataStore}
+        """
+        try:
+            attachmentRoot.createDirectory()
+        except OSError:
+            pass
+        cp = ConnectionPool(self.sharedService.produceConnection)
+        store = CommonDataStore(
+            cp.connection, notifierFactory, attachmentRoot
+        )
+        currentTestID = testCase.id()
+        store.label = currentTestID
+        cp.startService()
+        def stopIt():
+            return cp.stopService()
+        testCase.addCleanup(stopIt)
+        yield self.cleanStore(testCase, store)
+        returnValue(store)
+
+
+    @inlineCallbacks
+    def cleanStore(self, testCase, storeToClean):
+        cleanupTxn = storeToClean.sqlTxnFactory(
             "%s schema-cleanup" % (testCase.id(),)
         )
-        cursor = cleanupConn.cursor()
+        # TODO: should be getting these tables from a declaration of the schema
+        # somewhere.
         tables = ['INVITE',
                   'RESOURCE_PROPERTY',
                   'ATTACHMENT',
@@ -143,16 +160,16 @@
                   'NOTIFICATION_HOME']
         for table in tables:
             try:
-                cursor.execute("delete from "+table)
+                yield cleanupTxn.execSQL("delete from "+table, [])
             except:
                 log.err()
-        cleanupConn.commit()
-        cleanupConn.close()
+        yield cleanupTxn.commit()
 
 theStoreBuilder = SQLStoreBuilder()
 buildStore = theStoreBuilder.buildStore
 
 
+
 @inlineCallbacks
 def populateCalendarsFrom(requirements, store):
     """
@@ -215,6 +232,7 @@
     lastTransaction = None
     savedStore = None
     assertProvides = assertProvides
+    lastCommitSetUp = False
 
     def transactionUnderTest(self):
         """
@@ -222,12 +240,17 @@
         C[lastTransaction}.  Also makes sure to use the same store, saving the
         value from C{storeUnderTest}.
         """
+        if not self.lastCommitSetUp:
+            self.lastCommitSetUp = True
+            self.addCleanup(self.commitLast)
         if self.lastTransaction is not None:
             return self.lastTransaction
         if self.savedStore is None:
             self.savedStore = self.storeUnderTest()
         self.counter += 1
-        txn = self.lastTransaction = self.savedStore.newTransaction(self.id() + " #" + str(self.counter))
+        txn = self.lastTransaction = self.savedStore.newTransaction(
+            self.id() + " #" + str(self.counter)
+        )
         return txn
 
 
@@ -250,11 +273,12 @@
         self.lastTransaction = None
         return result
 
+
     def setUp(self):
         self.counter = 0
         self.notifierFactory = StubNotifierFactory()
 
-    def tearDown(self):
+    def commitLast(self):
         if self.lastTransaction is not None:
             return self.commit()
 

Modified: CalendarServer/trunk/txdav/idav.py
===================================================================
--- CalendarServer/trunk/txdav/idav.py	2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/idav.py	2010-11-01 22:38:48 UTC (rev 6551)
@@ -102,7 +102,7 @@
     def newTransaction(label=None):
         """
         Create a new transaction.
-        
+
         @param label: A label to assign to this transaction for diagnostic
             purposes.
         @type label: C{str}
@@ -172,6 +172,53 @@
 
 
 
+class IAsyncTransaction(Interface):
+    """
+    Asynchronous execution of SQL.
+
+    Note that there is no {begin()} method; if an L{IAsyncTransaction} exists,
+    it is assumed to have been started.
+    """
+
+    def execSQL(sql, args=(), raiseOnZeroRowCount=None):
+        """
+        Execute some SQL.
+
+        @param sql: an SQL string.
+
+        @type sql: C{str}
+
+        @param args: C{list} of arguments to interpolate into C{sql}.
+
+        @param raiseOnZeroRowCount: a 0-argument callable which returns an
+            exception to raise if the executed SQL does not affect any rows.
+
+        @return: L{Deferred} which fires C{list} of C{tuple}
+
+        @raise: C{raiseOnZeroRowCount} if it was specified and no rows were
+            affected.
+        """
+
+
+    def commit():
+        """
+        Commit changes caused by this transaction.
+
+        @return: L{Deferred} which fires with C{None} upon successful
+            completion of this transaction.
+        """
+
+
+    def abort():
+        """
+        Roll back changes caused by this transaction.
+
+        @return: L{Deferred} which fires with C{None} upon successful
+            rollback of this transaction.
+        """
+
+
+
 class ITransaction(Interface):
     """
     Transaction that can be aborted and either succeeds or fails in
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20101101/c0fd0ca8/attachment-0001.html>


More information about the calendarserver-changes mailing list