[CalendarServer-changes] [11102] CalendarServer/branches/users/cdaboo/store-scheduling

source_changes at macosforge.org source_changes at macosforge.org
Fri Apr 26 09:45:09 PDT 2013


Revision: 11102
          http://trac.calendarserver.org//changeset/11102
Author:   cdaboo at apple.com
Date:     2013-04-26 09:45:09 -0700 (Fri, 26 Apr 2013)
Log Message:
-----------
Merge from trunk.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/store-scheduling/bin/caldavd
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/platform/darwin/wiki.py
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/caldav.py
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/test/test_util.py
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/util.py
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/changeip_calendar.py
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/config.py
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/principals.py
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/gateway/caldavd.plist
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/principals/caldavd.plist
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/test_config.py
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/util.py
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/webcal/resource.py
    CalendarServer/branches/users/cdaboo/store-scheduling/conf/caldavd-apple.plist
    CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/dal/syntax.py
    CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/fixtures.py
    CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/queue.py
    CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_adbapi2.py
    CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_queue.py
    CalendarServer/branches/users/cdaboo/store-scheduling/twext/python/sendmsg.c
    CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/directory.py
    CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/test/test_directory.py
    CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/stdconfig.py
    CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_config.py
    CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_stdconfig.py
    CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/util.py
    CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/subpostgres.py
    CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/test/test_subpostgres.py
    CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql.py
    CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_16_to_17.sql
    CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_tables.py
    CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/test/test_sql_tables.py

Added Paths:
-----------
    CalendarServer/branches/users/cdaboo/store-scheduling/bin/calendarserver_monitor_work
    CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/workitems.py

Property Changed:
----------------
    CalendarServer/branches/users/cdaboo/store-scheduling/


Property changes on: CalendarServer/branches/users/cdaboo/store-scheduling
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
/CalendarServer/trunk:10876-11028
   + /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop:11060-11065
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
/CalendarServer/trunk:10876-11101

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/bin/caldavd
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/bin/caldavd	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/bin/caldavd	2013-04-26 16:45:09 UTC (rev 11102)
@@ -125,4 +125,6 @@
 
 export PYTHONPATH
 
-exec "${python}" "${twistdpath}" ${twistd_profile} ${twistd_reactor} ${daemonize} ${username} ${groupname} "${plugin_name}" ${configfile} ${service_type} ${errorlogenabled} ${profile} ${child_reactor};
+extra="-o FailIfUpgradeNeeded=False";
+
+exec "${python}" "${twistdpath}" ${twistd_profile} ${twistd_reactor} ${daemonize} ${username} ${groupname} "${plugin_name}" ${configfile} ${service_type} ${errorlogenabled} ${profile} ${child_reactor} ${extra};

Copied: CalendarServer/branches/users/cdaboo/store-scheduling/bin/calendarserver_monitor_work (from rev 11101, CalendarServer/trunk/bin/calendarserver_monitor_work)
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/bin/calendarserver_monitor_work	                        (rev 0)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/bin/calendarserver_monitor_work	2013-04-26 16:45:09 UTC (rev 11102)
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+
+##
+# Copyright (c) 2006-2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import os
+import sys
+
+# In OS X Server context, add to PATH to find Postgres utilities (initdb, pg_ctl)
+if "Server.app" in sys.argv[0]:
+    os.environ["PATH"] += ":" + os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "bin")
+
+#PYTHONPATH
+
+if __name__ == "__main__":
+    if "PYTHONPATH" in globals():
+        sys.path.insert(0, PYTHONPATH)
+    else:
+        try:
+            import _calendarserver_preamble
+        except ImportError:
+            sys.exc_clear()
+
+    from calendarserver.tools.workitems import main
+    main()

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/platform/darwin/wiki.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/platform/darwin/wiki.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/platform/darwin/wiki.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -48,7 +48,7 @@
             (jsonResponse, str(e)))
         raise WebAuthError("Could not look up token: %s" % (token,))
     if response["succeeded"]:
-        returnValue(response["shortname"])
+        returnValue(response["shortName"])
     else:
         raise WebAuthError("Could not look up token: %s" % (token,))
 

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/caldav.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/caldav.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -53,6 +53,8 @@
 from twisted.application.service import MultiService, IServiceMaker
 from twisted.application.service import Service
 
+from twistedcaldav.config import config, ConfigurationError
+from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
 from twext.web2.server import Site
 from twext.python.log import Logger, LoggingMixIn
 from twext.python.log import logLevelForNamespace, setLogLevelForNamespace
@@ -68,13 +70,10 @@
 )
 from txdav.common.datastore.upgrade.migrate import UpgradeToDatabaseService
 
-from twistedcaldav.config import ConfigurationError
-from twistedcaldav.config import config
 from twistedcaldav.directory import calendaruserproxy
 from twistedcaldav.directory.directory import GroupMembershipCacheUpdater
 from twistedcaldav.localization import processLocalizationFiles
 from twistedcaldav import memcachepool
-from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
 from twistedcaldav.upgrade import UpgradeFileSystemFormatService, PostDBImportService
 
 from calendarserver.tap.util import pgServiceFromConfig, getDBPool, MemoryLimitService
@@ -102,8 +101,7 @@
 from calendarserver.accesslog import AMPCommonAccessLoggingObserver
 from calendarserver.accesslog import AMPLoggingFactory
 from calendarserver.accesslog import RotatingFileAccessLoggingObserver
-from calendarserver.tap.util import getRootResource, computeProcessCount
-
+from calendarserver.tap.util import getRootResource
 from calendarserver.tap.util import storeFromConfig
 from calendarserver.tap.util import pgConnectorFromConfig
 from calendarserver.tap.util import oracleConnectorFromConfig
@@ -1270,18 +1268,6 @@
                 monitor.addProcess('memcached-%s' % (name,), memcachedArgv,
                                    env=PARENT_ENVIRONMENT)
 
-        #
-        # Calculate the number of processes to spawn
-        #
-        if config.MultiProcess.ProcessCount == 0:
-            # TODO: this should probably be happening in a configuration hook.
-            processCount = computeProcessCount(
-                config.MultiProcess.MinProcessCount,
-                config.MultiProcess.PerCPU,
-                config.MultiProcess.PerGB,
-            )
-            config.MultiProcess.ProcessCount = processCount
-            self.log_info("Configuring %d processes." % (processCount,))
 
         # Open the socket(s) to be inherited by the slaves
         inheritFDs = []
@@ -1431,7 +1417,7 @@
 
             return multi
 
-        ssvc = self.storageService(spawnerSvcCreator, uid, gid)
+        ssvc = self.storageService(spawnerSvcCreator, None, uid, gid)
         ssvc.setServiceParent(s)
         return s
 

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/test/test_util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/test/test_util.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/test/test_util.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -14,7 +14,8 @@
 # limitations under the License.
 ##
 
-from calendarserver.tap.util import computeProcessCount, directoryFromConfig, MemoryLimitService
+from calendarserver.tap.util import directoryFromConfig, MemoryLimitService
+from twistedcaldav.util import computeProcessCount
 from twistedcaldav.test.util import TestCase
 from twistedcaldav.config import config
 from twistedcaldav.directory.augment import AugmentXMLDB

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/util.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/util.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -67,7 +67,6 @@
 from twistedcaldav.timezones import TimezoneCache
 from twistedcaldav.timezoneservice import TimezoneServiceResource
 from twistedcaldav.timezonestdservice import TimezoneStdServiceResource
-from twistedcaldav.util import getMemorySize, getNCPU
 from twext.enterprise.ienterprise import POSTGRES_DIALECT
 from twext.enterprise.ienterprise import ORACLE_DIALECT
 from twext.enterprise.adbapi2 import ConnectionPool, ConnectionPoolConnection
@@ -770,37 +769,8 @@
 
 
 
-def computeProcessCount(minimum, perCPU, perGB, cpuCount=None, memSize=None):
-    """
-    Determine how many process to spawn based on installed RAM and CPUs,
-    returning at least "mininum"
-    """
 
-    if cpuCount is None:
-        try:
-            cpuCount = getNCPU()
-        except NotImplementedError, e:
-            log.error("Unable to detect number of CPUs: %s" % (str(e),))
-            return minimum
 
-    if memSize is None:
-        try:
-            memSize = getMemorySize()
-        except NotImplementedError, e:
-            log.error("Unable to detect amount of installed RAM: %s" % (str(e),))
-            return minimum
-
-    countByCore = perCPU * cpuCount
-    countByMemory = perGB * (memSize / (1024 * 1024 * 1024))
-
-    # Pick the smaller of the two:
-    count = min(countByCore, countByMemory)
-
-    # ...but at least "minimum"
-    return max(count, minimum)
-
-
-
 class FakeRequest(object):
 
     def __init__(self, rootResource, method, path, uri='/', transaction=None):
@@ -1010,17 +980,16 @@
             access=os.W_OK,
             create=(0750, config.UserName, config.GroupName),
         )
-    if config.LogRoot.startswith(config.ServerRoot + os.sep):
-        checkDirectory(
-            config.LogRoot,
-            "Log root",
-            access=os.W_OK,
-            create=(0750, config.UserName, config.GroupName),
-        )
-    if config.RunRoot.startswith(config.ServerRoot + os.sep):
-        checkDirectory(
-            config.RunRoot,
-            "Run root",
-            access=os.W_OK,
-            create=(0770, config.UserName, config.GroupName),
-        )
+    # Always create  these:
+    checkDirectory(
+        config.LogRoot,
+        "Log root",
+        access=os.W_OK,
+        create=(0750, config.UserName, config.GroupName),
+    )
+    checkDirectory(
+        config.RunRoot,
+        "Run root",
+        access=os.W_OK,
+        create=(0770, config.UserName, config.GroupName),
+    )

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/changeip_calendar.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/changeip_calendar.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/changeip_calendar.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -54,7 +54,7 @@
         sys.exit(1)
 
     verbose = False
-    configFile = "/Library/Server/Calendar and Contacts/Config/caldavd.plist"
+    configFile = "/Library/Server/Calendar and Contacts/Config/caldavd-system.plist"
 
     for opt, arg in optargs:
         if opt in ("-h", "--help"):

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/config.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/config.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/config.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -32,43 +32,47 @@
 from twistedcaldav.config import config, ConfigDict, ConfigurationError, mergeData
 from twistedcaldav.stdconfig import DEFAULT_CONFIG_FILE
 WRITABLE_CONFIG_KEYS = [
-    "ServerHostName",
-    "HTTPPort",
-    "SSLPort",
-    "EnableSSL",
-    "RedirectHTTPToHTTPS",
-    "EnableCalDAV",
-    "EnableCardDAV",
-    "DataRoot",
-    "SSLCertificate",
-    "SSLPrivateKey",
-    "SSLAuthorityChain",
-    "EnableSearchAddressBook",
+    "Authentication.Basic.AllowedOverWireUnencrypted",
     "Authentication.Basic.Enabled",
-    "Authentication.Basic.AllowedOverWireUnencrypted",
+    "Authentication.Digest.AllowedOverWireUnencrypted",
     "Authentication.Digest.Enabled",
-    "Authentication.Digest.AllowedOverWireUnencrypted",
+    "Authentication.Kerberos.AllowedOverWireUnencrypted",
     "Authentication.Kerberos.Enabled",
-    "Authentication.Kerberos.AllowedOverWireUnencrypted",
     "Authentication.Wiki.Enabled",
+    "DataRoot",
+    "DefaultLogLevel",
+    "DirectoryAddressBook.params.queryPeopleRecords",
+    "DirectoryAddressBook.params.queryUserRecords",
+    "EnableCalDAV",
+    "EnableCardDAV",
+    "EnableSearchAddressBook",
+    "EnableSSL",
+    "HTTPPort",
+    "LogLevels",
+    "Notifications.Services.APNS.CalDAV.AuthorityChainPath",
+    "Notifications.Services.APNS.CalDAV.CertificatePath",
+    "Notifications.Services.APNS.CalDAV.PrivateKeyPath",
+    "Notifications.Services.APNS.CardDAV.AuthorityChainPath",
+    "Notifications.Services.APNS.CardDAV.CertificatePath",
+    "Notifications.Services.APNS.CardDAV.PrivateKeyPath",
+    "Notifications.Services.APNS.Enabled",
+    "RedirectHTTPToHTTPS",
     "Scheduling.iMIP.Enabled",
-    "Scheduling.iMIP.Receiving.Username",
+    "Scheduling.iMIP.Receiving.Port",
     "Scheduling.iMIP.Receiving.Server",
-    "Scheduling.iMIP.Receiving.Port",
     "Scheduling.iMIP.Receiving.Type",
+    "Scheduling.iMIP.Receiving.Username",
     "Scheduling.iMIP.Receiving.UseSSL",
+    "Scheduling.iMIP.Sending.Address",
+    "Scheduling.iMIP.Sending.Port",
+    "Scheduling.iMIP.Sending.Server",
     "Scheduling.iMIP.Sending.Username",
-    "Scheduling.iMIP.Sending.Server",
-    "Scheduling.iMIP.Sending.Port",
     "Scheduling.iMIP.Sending.UseSSL",
-    "Scheduling.iMIP.Sending.Address",
-    "Notifications.Services.APNS.Enabled",
-    "Notifications.Services.APNS.CalDAV.CertificatePath",
-    "Notifications.Services.APNS.CalDAV.AuthorityChainPath",
-    "Notifications.Services.APNS.CalDAV.PrivateKeyPath",
-    "Notifications.Services.APNS.CardDAV.CertificatePath",
-    "Notifications.Services.APNS.CardDAV.AuthorityChainPath",
-    "Notifications.Services.APNS.CardDAV.PrivateKeyPath",
+    "ServerHostName",
+    "SSLAuthorityChain",
+    "SSLCertificate",
+    "SSLPort",
+    "SSLPrivateKey",
 ]
 
 def usage(e=None):

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/principals.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/principals.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/principals.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -31,7 +31,7 @@
 
 from twistedcaldav.config import config
 from twistedcaldav.directory.directory import UnknownRecordTypeError, DirectoryError
-from twistedcaldav.directory.directory import scheduleNextGroupCachingUpdate
+from twistedcaldav.directory.directory import schedulePolledGroupCachingUpdate
 
 from calendarserver.tools.util import (
     booleanArgument, proxySubprincipal, action_addProxyPrincipal,
@@ -510,7 +510,8 @@
         membersProperty = davxml.GroupMemberSet(*memberURLs)
         yield subPrincipal.writeProperty(membersProperty, None)
         if store is not None:
-            yield scheduleNextGroupCachingUpdate(store, 0)
+            # Schedule work the PeerConnectionPool will pick up as overdue
+            yield schedulePolledGroupCachingUpdate(store)
 
 
 

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/gateway/caldavd.plist
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/gateway/caldavd.plist	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/gateway/caldavd.plist	2013-04-26 16:45:09 UTC (rev 11102)
@@ -718,6 +718,10 @@
     <integer>30</integer> <!-- in minutes -->
 
 
+    <!-- For unit tests, enable SharedConnectionPool so we don't use up shared memory -->
+    <key>SharedConnectionPool</key>
+    <true/>
+
     <!--
         Twisted
       -->

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/principals/caldavd.plist
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/principals/caldavd.plist	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/principals/caldavd.plist	2013-04-26 16:45:09 UTC (rev 11102)
@@ -743,6 +743,9 @@
     <key>ResponseCacheTimeout</key>
     <integer>30</integer> <!-- in minutes -->
 
+    <!-- For unit tests, enable SharedConnectionPool so we don't use up shared memory -->
+    <key>SharedConnectionPool</key>
+    <true/>
 
     <!--
         Twisted

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/test_config.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/test_config.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/test_config.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -105,6 +105,8 @@
         self.assertEquals(results["result"]["EnableCalDAV"], True)
         self.assertEquals(results["result"]["EnableCardDAV"], True)
         self.assertEquals(results["result"]["EnableSSL"], False)
+        self.assertEquals(results["result"]["DefaultLogLevel"], "warn")
+
         self.assertEquals(results["result"]["Notifications"]["Services"]["APNS"]["Enabled"], False)
         self.assertEquals(results["result"]["Notifications"]["Services"]["APNS"]["CalDAV"]["CertificatePath"], "/example/calendar.cer")
 

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/util.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/util.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -50,7 +50,7 @@
 from twistedcaldav.directory import calendaruserproxy
 from twistedcaldav.directory.aggregate import AggregateDirectoryService
 from twistedcaldav.directory.directory import DirectoryService, DirectoryRecord
-from twistedcaldav.directory.directory import scheduleNextGroupCachingUpdate
+from twistedcaldav.directory.directory import schedulePolledGroupCachingUpdate
 from calendarserver.push.notifier import NotifierFactory
 
 from txdav.common.datastore.file import CommonDataStore
@@ -462,7 +462,8 @@
     (yield action_removeProxyPrincipal(rootResource, directory, store,
         principal, proxyPrincipal, proxyTypes=proxyTypes))
 
-    yield scheduleNextGroupCachingUpdate(store, 0)
+    # Schedule work the PeerConnectionPool will pick up as overdue
+    yield schedulePolledGroupCachingUpdate(store)
 
 
 
@@ -495,7 +496,8 @@
         (yield subPrincipal.writeProperty(membersProperty, None))
 
     if removed:
-        yield scheduleNextGroupCachingUpdate(store, 0)
+        # Schedule work the PeerConnectionPool will pick up as overdue
+        yield schedulePolledGroupCachingUpdate(store)
     returnValue(removed)
 
 

Copied: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/workitems.py (from rev 11101, CalendarServer/trunk/calendarserver/tools/workitems.py)
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/workitems.py	                        (rev 0)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/workitems.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -0,0 +1,188 @@
+#!/usr/bin/env python
+
+##
+# Copyright (c) 2006-2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+from __future__ import print_function
+
+from getopt import getopt, GetoptError
+import os
+import sys
+import curses
+import datetime
+
+from twisted.internet.defer import inlineCallbacks
+from calendarserver.tools.cmdline import utilityMain
+from twisted.application.service import Service
+from calendarserver.push.notifier import PushNotificationWork
+from twistedcaldav.directory.directory import GroupCacherPollingWork
+from twistedcaldav.scheduling.imip.inbound import IMIPPollingWork, IMIPReplyWork
+
+def usage(e=None):
+
+    name = os.path.basename(sys.argv[0])
+    print("usage: %s [options]" % (name,))
+    print("")
+    print("  TODO: describe usage")
+    print("")
+    print("options:")
+    print("  -h --help: print this help and exit")
+    print("  -e --error: send stderr to stdout")
+    print("  -f --config <path>: Specify caldavd.plist configuration path")
+    print("")
+
+    if e:
+        sys.exit(64)
+    else:
+        sys.exit(0)
+
+
+
+def main():
+
+    try:
+        (optargs, _ignore_args) = getopt(
+            sys.argv[1:], "hef:", [
+                "help",
+                "error",
+                "config=",
+            ],
+        )
+    except GetoptError, e:
+        usage(e)
+
+    #
+    # Get configuration
+    #
+    configFileName = None
+    debug = False
+
+    for opt, arg in optargs:
+        if opt in ("-h", "--help"):
+            usage()
+
+        if opt in ("-e", "--error"):
+            debug = True
+
+        elif opt in ("-f", "--config"):
+            configFileName = arg
+
+        else:
+            raise NotImplementedError(opt)
+
+    utilityMain(configFileName, WorkItemMonitorService, verbose=debug)
+
+class WorkItemMonitorService(Service):
+
+    def __init__(self, store):
+        self.store = store
+        from twisted.internet import reactor
+        self.reactor = reactor
+
+
+    def startService(self):
+        self.screen = curses.initscr()
+        self.windows = []
+        self.updateScreenGeometry()
+        self.reactor.callLater(0, self.updateDisplay)
+
+    def updateScreenGeometry(self):
+        for win in self.windows:
+            del win
+        winY, winX = self.screen.getmaxyx()
+        seencolumns = [1]
+        seenrows = [1]
+        heightSoFar = 0
+        begin_x = 0
+        begin_y = 0
+        # Specify height and width of each window as one of:
+        #    absolute value (int), e.g.: 42
+        #    percentage of window height / width (string), e.g.: "42%"
+        # Specify row and column for each window as though it is a cell in an invisible html table
+        # Itemize windows in ascending order by row, col
+        for title, height, width, row, col, workItemClass, fmt, attrs in (
+            ("Group Membership Indexing",   "10%", "50%",  1, 1, GroupCacherPollingWork, "", ()),
+            ("IMIP Reply Polling",          "10%", "50%",  1, 2, IMIPPollingWork, "", ()),
+            ("IMIP Reply Processing",       "20%", "100%", 2, 1, IMIPReplyWork, "%s %s", ("organizer", "attendee")),
+            ("Push Notifications",          "69%", "100%", 3, 1, PushNotificationWork, "%s", ("pushID",)),
+        ):
+            if (isinstance(height, basestring)):
+                height = max(int(winY * (float(height.strip("%")) / 100.0)), 3)
+            if (isinstance(width, basestring)):
+                width = max(int(winX * (float(width.strip("%")) / 100.0)), 10)
+            if col not in seencolumns:
+                heightSoFar = max(height, heightSoFar)
+                seencolumns.append(col)
+            if row not in seenrows:
+                begin_y = heightSoFar
+                heightSoFar += height
+                begin_x = 0
+                seenrows.append(row)
+                seencolumns = [col]
+            window = WorkWindow(height, width, begin_y, begin_x,
+                self.store, title, workItemClass, fmt, attrs)
+            self.windows.append(window)
+            begin_x += width
+
+
+    @inlineCallbacks
+    def updateDisplay(self):
+        for window in self.windows:
+            yield window.update()
+
+        self.reactor.callLater(1, self.updateDisplay)
+
+class WorkWindow(object):
+    def __init__(self, nlines, ncols, begin_y, begin_x,
+        store, title, workItemClass, fmt, attrs):
+        self.window = curses.newwin(nlines, ncols, begin_y, begin_x)
+        self.ncols = ncols
+        self.store = store
+        self.title = title
+        self.workItemClass = workItemClass
+        self.fmt = fmt
+        self.attrs = attrs
+
+    @inlineCallbacks
+    def update(self):
+        self.window.erase()
+        self.window.border()
+        self.window.addstr(0, 2, self.title)
+        txn = self.store.newTransaction()
+        records = (yield self.workItemClass.all(txn))
+
+        x = 1
+        y = 1
+        for record in records:
+            seconds = record.notBefore - datetime.datetime.utcnow()
+            try:
+                self.window.addstr(y, x, "%d seconds" % int(seconds.total_seconds()))
+            except curses.error:
+                continue
+            y += 1
+            if self.attrs:
+                try:
+                    s = self.fmt % tuple([getattr(record, str(a)) for a in self.attrs])
+                except Exception, e:
+                    s = "Error: %s" % (str(e),)
+                try:
+                    self.window.addnstr(y, x, s, self.ncols-2)
+                except curses.error:
+                    pass
+                y += 1
+        self.window.refresh()
+
+if __name__ == "__main__":
+    main()

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/webcal/resource.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/webcal/resource.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/webcal/resource.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -91,11 +91,11 @@
     def htmlContent(self, debug=False):
         if debug:
             cacheAttr = "_htmlContentDebug"
-            templateFileName = "debug_template.html"
+            templateFileName = "debug_standalone.html"
         else:
             cacheAttr = "_htmlContent"
-            templateFileName = "template.html"
-        templateFileName = os.path.join(config.WebCalendarRoot, "calendar", templateFileName)
+            templateFileName = "standalone.html"
+        templateFileName = os.path.join(config.WebCalendarRoot, templateFileName)
 
         #
         # See if the file changed, and dump the cached template if so.

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/conf/caldavd-apple.plist
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/conf/caldavd-apple.plist	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/conf/caldavd-apple.plist	2013-04-26 16:45:09 UTC (rev 11102)
@@ -112,6 +112,8 @@
             <string>-c deadlock_timeout=10</string>
             <string>-c log_line_prefix='%m [%p] '</string>
         </array>
+        <key>ExtraConnections</key>
+        <integer>20</integer>
     </dict>
 
     <!-- Data root -->

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/dal/syntax.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/dal/syntax.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/dal/syntax.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -147,7 +147,8 @@
 
     _paramstyles = {
         'pyformat': partial(FixedPlaceholder, "%s"),
-        'numeric': NumericPlaceholder
+        'numeric': NumericPlaceholder,
+        'qmark': defaultPlaceholder,
     }
 
 
@@ -1673,6 +1674,12 @@
 
 
     def _toSQL(self, queryGenerator):
+        if queryGenerator.dialect == SQLITE_DIALECT:
+            # FIXME - this is only stubbed out for testing right now, actual
+            # concurrency would require some kind of locking statement here.
+            # BEGIN IMMEDIATE maybe, if that's okay in the middle of a
+            # transaction or repeatedly?
+            return SQLFragment('select null')
         return SQLFragment('lock table ').append(
             self.table.subSQL(queryGenerator, [self.table])).append(
             SQLFragment(' in %s mode' % (self.mode,)))

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/fixtures.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/fixtures.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/fixtures.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -20,10 +20,24 @@
 """
 
 import sqlite3
+from Queue import Empty
+from itertools import count
 
+from zope.interface import implementer
+from zope.interface.verify import verifyClass
+
+from twisted.internet.interfaces import IReactorThreads
+from twisted.python.threadpool import ThreadPool
+
+from twisted.internet.task import Clock
+
 from twext.enterprise.adbapi2 import ConnectionPool
 from twext.enterprise.ienterprise import SQLITE_DIALECT
+from twext.enterprise.ienterprise import POSTGRES_DIALECT
+from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
+from twext.internet.threadutils import ThreadHolder
 
+
 def buildConnectionPool(testCase, schemaText="", dialect=SQLITE_DIALECT):
     """
     Build a L{ConnectionPool} for testing purposes, with the given C{testCase}.
@@ -57,3 +71,513 @@
     pool.startService()
     testCase.addCleanup(pool.stopService)
     return pool
+
+
+
+def resultOf(deferred, propagate=False):
+    """
+    Add a callback and errback which will capture the result of a L{Deferred} in
+    a list, and return that list.  If 'propagate' is True, pass through the
+    results.
+    """
+    results = []
+    if propagate:
+        def cb(r):
+            results.append(r)
+            return r
+    else:
+        cb = results.append
+    deferred.addBoth(cb)
+    return results
+
+
+
+class FakeThreadHolder(ThreadHolder):
+    """
+    Run things to submitted this ThreadHolder on the main thread, so that
+    execution is easier to control.
+    """
+
+    def __init__(self, test):
+        super(FakeThreadHolder, self).__init__(self)
+        self.test = test
+        self.started = False
+        self.stopped = False
+        self._workerIsRunning = False
+
+
+    def start(self):
+        self.started = True
+        return super(FakeThreadHolder, self).start()
+
+
+    def stop(self):
+        result = super(FakeThreadHolder, self).stop()
+        self.stopped = True
+        return result
+
+
+    @property
+    def _get_q(self):
+        return self._q_
+
+
+    @_get_q.setter
+    def _q(self, newq):
+        if newq is not None:
+            oget = newq.get
+            newq.get = lambda: oget(timeout=0)
+            oput = newq.put
+            def putit(x):
+                p = oput(x)
+                if not self.test.paused:
+                    self.flush()
+                return p
+            newq.put = putit
+        self._q_ = newq
+
+
+    def callFromThread(self, f, *a, **k):
+        result = f(*a, **k)
+        return result
+
+
+    def callInThread(self, f, *a, **k):
+        """
+        This should be called only once, to start the worker function that
+        dedicates a thread to this L{ThreadHolder}.
+        """
+        self._workerIsRunning = True
+
+
+    def flush(self):
+        """
+        Fire all deferreds previously returned from submit.
+        """
+        try:
+            while self._workerIsRunning and self._qpull():
+                pass
+            else:
+                self._workerIsRunning = False
+        except Empty:
+            pass
+
+
+
+ at implementer(IReactorThreads)
+class ClockWithThreads(Clock):
+    """
+    A testing reactor that supplies L{IReactorTime} and L{IReactorThreads}.
+    """
+
+    def __init__(self):
+        super(ClockWithThreads, self).__init__()
+        self._pool = ThreadPool()
+
+
+    def getThreadPool(self):
+        """
+        Get the threadpool.
+        """
+        return self._pool
+
+
+    def suggestThreadPoolSize(self, size):
+        """
+        Approximate the behavior of a 'real' reactor.
+        """
+        self._pool.adjustPoolsize(maxthreads=size)
+
+
+    def callInThread(self, thunk, *a, **kw):
+        """
+        No implementation.
+        """
+
+
+    def callFromThread(self, thunk, *a, **kw):
+        """
+        No implementation.
+        """
+
+verifyClass(IReactorThreads, ClockWithThreads)
+
+
+
+class ConnectionPoolHelper(object):
+    """
+    Connection pool setting-up facilities for tests that need a
+    L{ConnectionPool}.
+    """
+
+    dialect = POSTGRES_DIALECT
+    paramstyle = DEFAULT_PARAM_STYLE
+
+    def setUp(self, test=None, connect=None):
+        """
+        Support inheritance by L{TestCase} classes.
+        """
+        if test is None:
+            test = self
+        if connect is None:
+            self.factory = ConnectionFactory()
+            connect = self.factory.connect
+        self.connect = connect
+        self.paused             = False
+        self.holders            = []
+        self.pool               = ConnectionPool(connect,
+                                                 maxConnections=2,
+                                                 dialect=self.dialect,
+                                                 paramstyle=self.paramstyle)
+        self.pool._createHolder = self.makeAHolder
+        self.clock              = self.pool.reactor = ClockWithThreads()
+        self.pool.startService()
+        test.addCleanup(self.flushHolders)
+
+
+    def flushHolders(self):
+        """
+        Flush all pending C{submit}s since C{pauseHolders} was called.  This
+        makes sure the service is stopped and the fake ThreadHolders are all
+        executing their queues so failed tsets can exit cleanly.
+        """
+        self.paused = False
+        for holder in self.holders:
+            holder.flush()
+
+
+    def pauseHolders(self):
+        """
+        Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
+        L{Deferred}.
+        """
+        self.paused = True
+
+
+    def makeAHolder(self):
+        """
+        Make a ThreadHolder-alike.
+        """
+        fth = FakeThreadHolder(self)
+        self.holders.append(fth)
+        return fth
+
+
+    def resultOf(self, it):
+        return resultOf(it)
+
+
+    def createTransaction(self):
+        return self.pool.connection()
+
+
+    def translateError(self, err):
+        return err
+
+
+
+class SteppablePoolHelper(ConnectionPoolHelper):
+    """
+    A version of L{ConnectionPoolHelper} that can set up a connection pool
+    capable of firing all its L{Deferred}s on demand, synchronously, by using
+    SQLite.
+    """
+    dialect = SQLITE_DIALECT
+    paramstyle = sqlite3.paramstyle
+
+    def __init__(self, schema):
+        self.schema = schema
+
+
+    def setUp(self, test):
+        connect = synchronousConnectionFactory(test)
+        con = connect()
+        cur = con.cursor()
+        cur.executescript(self.schema)
+        con.commit()
+        super(SteppablePoolHelper, self).setUp(test, connect)
+
+
+    def rows(self, sql):
+        """
+        Get some rows from the database to compare in a test.
+        """
+        con = self.connect()
+        cur = con.cursor()
+        cur.execute(sql)
+        result = cur.fetchall()
+        con.commit()
+        return result
+
+
+
+def synchronousConnectionFactory(test):
+    tmpdb = test.mktemp()
+    def connect():
+        return sqlite3.connect(tmpdb)
+    return connect
+
+
+
+class Child(object):
+    """
+    An object with a L{Parent}, in its list of C{children}.
+    """
+    def __init__(self, parent):
+        self.closed = False
+        self.parent = parent
+        self.parent.children.append(self)
+
+
+    def close(self):
+        if self.parent._closeFailQueue:
+            raise self.parent._closeFailQueue.pop(0)
+        self.closed = True
+
+
+
+class Parent(object):
+    """
+    An object with a list of L{Child}ren.
+    """
+
+    def __init__(self):
+        self.children = []
+        self._closeFailQueue = []
+
+
+    def childCloseWillFail(self, exception):
+        """
+        Closing children of this object will result in the given exception.
+
+        @see: L{ConnectionFactory}
+        """
+        self._closeFailQueue.append(exception)
+
+
+
+class FakeConnection(Parent, Child):
+    """
+    Fake Stand-in for DB-API 2.0 connection.
+
+    @ivar executions: the number of statements which have been executed.
+
+    """
+
+    executions = 0
+
+    def __init__(self, factory):
+        """
+        Initialize list of cursors
+        """
+        Parent.__init__(self)
+        Child.__init__(self, factory)
+        self.id = factory.idcounter.next()
+        self._executeFailQueue = []
+        self._commitCount = 0
+        self._rollbackCount = 0
+
+
+    def executeWillFail(self, thunk):
+        """
+        The next call to L{FakeCursor.execute} will fail with an exception
+        returned from the given callable.
+        """
+        self._executeFailQueue.append(thunk)
+
+
+    @property
+    def cursors(self):
+        "Alias to make tests more readable."
+        return self.children
+
+
+    def cursor(self):
+        return FakeCursor(self)
+
+
+    def commit(self):
+        self._commitCount += 1
+        if self.parent.commitFail:
+            self.parent.commitFail = False
+            raise CommitFail()
+
+
+    def rollback(self):
+        self._rollbackCount += 1
+        if self.parent.rollbackFail:
+            self.parent.rollbackFail = False
+            raise RollbackFail()
+
+
+
+class RollbackFail(Exception):
+    """
+    Sample rollback-failure exception.
+    """
+
+
+
+class CommitFail(Exception):
+    """
+    Sample Commit-failure exception.
+    """
+
+
+
+class FakeCursor(Child):
+    """
+    Fake stand-in for a DB-API 2.0 cursor.
+    """
+    def __init__(self, connection):
+        Child.__init__(self, connection)
+        self.rowcount = 0
+        # not entirely correct, but all we care about is its truth value.
+        self.description = False
+        self.variables = []
+        self.allExecutions = []
+
+
+    @property
+    def connection(self):
+        "Alias to make tests more readable."
+        return self.parent
+
+
+    def execute(self, sql, args=()):
+        self.connection.executions += 1
+        if self.connection._executeFailQueue:
+            raise self.connection._executeFailQueue.pop(0)()
+        self.allExecutions.append((sql, args))
+        self.sql = sql
+        factory = self.connection.parent
+        self.description = factory.hasResults
+        if factory.hasResults and factory.shouldUpdateRowcount:
+            self.rowcount = 1
+        else:
+            self.rowcount = 0
+        return
+
+
+    def var(self, type, *args):
+        """
+        Return a database variable in the style of the cx_Oracle bindings.
+        """
+        v = FakeVariable(self, type, args)
+        self.variables.append(v)
+        return v
+
+
+    def fetchall(self):
+        """
+        Just echo the SQL that was executed in the last query.
+        """
+        if self.connection.parent.hasResults:
+            return [[self.connection.id, self.sql]]
+        if self.description:
+            return []
+        return None
+
+
+
+class FakeVariable(object):
+    def __init__(self, cursor, type, args):
+        self.cursor = cursor
+        self.type = type
+        self.args = args
+
+
+    def getvalue(self):
+        vv = self.cursor.connection.parent.varvals
+        if vv:
+            return vv.pop(0)
+        return self.cursor.variables.index(self) + 300
+
+
+    def __reduce__(self):
+        raise RuntimeError("Not pickleable (since oracle vars aren't)")
+
+
+
+class ConnectionFactory(Parent):
+    """
+    A factory for L{FakeConnection} objects.
+
+    @ivar shouldUpdateRowcount: Should C{execute} on cursors produced by
+        connections produced by this factory update their C{rowcount} or just
+        their C{description} attribute?
+
+    @ivar hasResults: should cursors produced by connections by this factory
+        have any results returned by C{fetchall()}?
+    """
+
+    rollbackFail = False
+    commitFail = False
+
+    def __init__(self, shouldUpdateRowcount=True, hasResults=True):
+        Parent.__init__(self)
+        self.idcounter = count(1)
+        self._connectResultQueue = []
+        self.defaultConnect()
+        self.varvals = []
+        self.shouldUpdateRowcount = shouldUpdateRowcount
+        self.hasResults = hasResults
+
+
+    @property
+    def connections(self):
+        "Alias to make tests more readable."
+        return self.children
+
+
+    def connect(self):
+        """
+        Implement the C{ConnectionFactory} callable expected by
+        L{ConnectionPool}.
+        """
+        if self._connectResultQueue:
+            thunk = self._connectResultQueue.pop(0)
+        else:
+            thunk = self._default
+        return thunk()
+
+
+    def willConnect(self):
+        """
+        Used by tests to queue a successful result for connect().
+        """
+        def thunk():
+            return FakeConnection(self)
+        self._connectResultQueue.append(thunk)
+
+
+    def willFail(self):
+        """
+        Used by tests to queue a successful result for connect().
+        """
+        def thunk():
+            raise FakeConnectionError()
+        self._connectResultQueue.append(thunk)
+
+
+    def defaultConnect(self):
+        """
+        By default, connection attempts will succeed.
+        """
+        self.willConnect()
+        self._default = self._connectResultQueue.pop()
+
+
+    def defaultFail(self):
+        """
+        By default, connection attempts will fail.
+        """
+        self.willFail()
+        self._default = self._connectResultQueue.pop()
+
+
+
+class FakeConnectionError(Exception):
+    """
+    Synthetic error that might occur during connection.
+    """

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/queue.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/queue.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -1342,6 +1342,7 @@
         def done(result):
             self._startingUp = None
             super(PeerConnectionPool, self).startService()
+            self._lostWorkCheckLoop()
             return result
 
 

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_adbapi2.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_adbapi2.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -18,22 +18,14 @@
 Tests for L{twext.enterprise.adbapi2}.
 """
 
-from itertools import count
-from Queue import Empty
+from zope.interface.verify import verifyObject
 
-from zope.interface.verify import verifyClass, verifyObject
-from zope.interface.declarations import implements
-
-from twisted.python.threadpool import ThreadPool
 from twisted.python.failure import Failure
 
 from twisted.trial.unittest import TestCase
 
-from twisted.internet.task import Clock
 from twisted.internet.defer import Deferred, fail
 
-from twisted.internet.interfaces import IReactorThreads
-
 from twisted.test.proto_helpers import StringTransport
 
 from twext.enterprise.ienterprise import ConnectionError
@@ -41,33 +33,17 @@
 from twext.enterprise.adbapi2 import ConnectionPoolClient
 from twext.enterprise.adbapi2 import ConnectionPoolConnection
 from twext.enterprise.ienterprise import IAsyncTransaction
-from twext.enterprise.ienterprise import POSTGRES_DIALECT
 from twext.enterprise.ienterprise import ICommandBlock
 from twext.enterprise.adbapi2 import FailsafeException
-from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
 from twext.enterprise.adbapi2 import ConnectionPool
-from twext.internet.threadutils import ThreadHolder
+from twext.enterprise.fixtures import ConnectionPoolHelper
+from twext.enterprise.fixtures import resultOf
+from twext.enterprise.fixtures import ClockWithThreads
+from twext.enterprise.fixtures import FakeConnectionError
+from twext.enterprise.fixtures import RollbackFail
+from twext.enterprise.fixtures import CommitFail
 from twext.enterprise.adbapi2 import Commit
 
-
-def resultOf(deferred, propagate=False):
-    """
-    Add a callback and errback which will capture the result of a L{Deferred} in
-    a list, and return that list.  If 'propagate' is True, pass through the
-    results.
-    """
-    results = []
-    if propagate:
-        def cb(r):
-            results.append(r)
-            return r
-    else:
-        cb = results.append
-    deferred.addBoth(cb)
-    return results
-
-
-
 class AssertResultHelper(object):
     """
     Mixin for asserting about synchronous Deferred results.
@@ -97,384 +73,6 @@
 
 
 
-class Child(object):
-    """
-    An object with a L{Parent}, in its list of C{children}.
-    """
-    def __init__(self, parent):
-        self.closed = False
-        self.parent = parent
-        self.parent.children.append(self)
-
-
-    def close(self):
-        if self.parent._closeFailQueue:
-            raise self.parent._closeFailQueue.pop(0)
-        self.closed = True
-
-
-
-class Parent(object):
-    """
-    An object with a list of L{Child}ren.
-    """
-
-    def __init__(self):
-        self.children = []
-        self._closeFailQueue = []
-
-
-    def childCloseWillFail(self, exception):
-        """
-        Closing children of this object will result in the given exception.
-
-        @see: L{ConnectionFactory}
-        """
-        self._closeFailQueue.append(exception)
-
-
-
-class FakeConnection(Parent, Child):
-    """
-    Fake Stand-in for DB-API 2.0 connection.
-
-    @ivar executions: the number of statements which have been executed.
-
-    """
-
-    executions = 0
-
-    def __init__(self, factory):
-        """
-        Initialize list of cursors
-        """
-        Parent.__init__(self)
-        Child.__init__(self, factory)
-        self.id = factory.idcounter.next()
-        self._executeFailQueue = []
-        self._commitCount = 0
-        self._rollbackCount = 0
-
-
-    def executeWillFail(self, thunk):
-        """
-        The next call to L{FakeCursor.execute} will fail with an exception
-        returned from the given callable.
-        """
-        self._executeFailQueue.append(thunk)
-
-
-    @property
-    def cursors(self):
-        "Alias to make tests more readable."
-        return self.children
-
-
-    def cursor(self):
-        return FakeCursor(self)
-
-
-    def commit(self):
-        self._commitCount += 1
-        if self.parent.commitFail:
-            self.parent.commitFail = False
-            raise CommitFail()
-
-
-    def rollback(self):
-        self._rollbackCount += 1
-        if self.parent.rollbackFail:
-            self.parent.rollbackFail = False
-            raise RollbackFail()
-
-
-
-class RollbackFail(Exception):
-    """
-    Sample rollback-failure exception.
-    """
-
-
-
-class CommitFail(Exception):
-    """
-    Sample Commit-failure exception.
-    """
-
-
-
-class FakeCursor(Child):
-    """
-    Fake stand-in for a DB-API 2.0 cursor.
-    """
-    def __init__(self, connection):
-        Child.__init__(self, connection)
-        self.rowcount = 0
-        # not entirely correct, but all we care about is its truth value.
-        self.description = False
-        self.variables = []
-        self.allExecutions = []
-
-
-    @property
-    def connection(self):
-        "Alias to make tests more readable."
-        return self.parent
-
-
-    def execute(self, sql, args=()):
-        self.connection.executions += 1
-        if self.connection._executeFailQueue:
-            raise self.connection._executeFailQueue.pop(0)()
-        self.allExecutions.append((sql, args))
-        self.sql = sql
-        factory = self.connection.parent
-        self.description = factory.hasResults
-        if factory.hasResults and factory.shouldUpdateRowcount:
-            self.rowcount = 1
-        else:
-            self.rowcount = 0
-        return
-
-
-    def var(self, type, *args):
-        """
-        Return a database variable in the style of the cx_Oracle bindings.
-        """
-        v = FakeVariable(self, type, args)
-        self.variables.append(v)
-        return v
-
-
-    def fetchall(self):
-        """
-        Just echo the SQL that was executed in the last query.
-        """
-        if self.connection.parent.hasResults:
-            return [[self.connection.id, self.sql]]
-        if self.description:
-            return []
-        return None
-
-
-
-class FakeVariable(object):
-    def __init__(self, cursor, type, args):
-        self.cursor = cursor
-        self.type = type
-        self.args = args
-
-
-    def getvalue(self):
-        vv = self.cursor.connection.parent.varvals
-        if vv:
-            return vv.pop(0)
-        return self.cursor.variables.index(self) + 300
-
-
-    def __reduce__(self):
-        raise RuntimeError("Not pickleable (since oracle vars aren't)")
-
-
-
-class ConnectionFactory(Parent):
-    """
-    A factory for L{FakeConnection} objects.
-
-    @ivar shouldUpdateRowcount: Should C{execute} on cursors produced by
-        connections produced by this factory update their C{rowcount} or just
-        their C{description} attribute?
-
-    @ivar hasResults: should cursors produced by connections by this factory
-        have any results returned by C{fetchall()}?
-    """
-
-    rollbackFail = False
-    commitFail = False
-
-    def __init__(self, shouldUpdateRowcount=True, hasResults=True):
-        Parent.__init__(self)
-        self.idcounter = count(1)
-        self._connectResultQueue = []
-        self.defaultConnect()
-        self.varvals = []
-        self.shouldUpdateRowcount = shouldUpdateRowcount
-        self.hasResults = hasResults
-
-
-    @property
-    def connections(self):
-        "Alias to make tests more readable."
-        return self.children
-
-
-    def connect(self):
-        """
-        Implement the C{ConnectionFactory} callable expected by
-        L{ConnectionPool}.
-        """
-        if self._connectResultQueue:
-            thunk = self._connectResultQueue.pop(0)
-        else:
-            thunk = self._default
-        return thunk()
-
-
-    def willConnect(self):
-        """
-        Used by tests to queue a successful result for connect().
-        """
-        def thunk():
-            return FakeConnection(self)
-        self._connectResultQueue.append(thunk)
-
-
-    def willFail(self):
-        """
-        Used by tests to queue a successful result for connect().
-        """
-        def thunk():
-            raise FakeConnectionError()
-        self._connectResultQueue.append(thunk)
-
-
-    def defaultConnect(self):
-        """
-        By default, connection attempts will succeed.
-        """
-        self.willConnect()
-        self._default = self._connectResultQueue.pop()
-
-
-    def defaultFail(self):
-        """
-        By default, connection attempts will fail.
-        """
-        self.willFail()
-        self._default = self._connectResultQueue.pop()
-
-
-
-class FakeConnectionError(Exception):
-    """
-    Synthetic error that might occur during connection.
-    """
-
-
-
-class FakeThreadHolder(ThreadHolder):
-    """
-    Run things to submitted this ThreadHolder on the main thread, so that
-    execution is easier to control.
-    """
-
-    def __init__(self, test):
-        super(FakeThreadHolder, self).__init__(self)
-        self.test = test
-        self.started = False
-        self.stopped = False
-        self._workerIsRunning = False
-
-
-    def start(self):
-        self.started = True
-        return super(FakeThreadHolder, self).start()
-
-
-    def stop(self):
-        result = super(FakeThreadHolder, self).stop()
-        self.stopped = True
-        return result
-
-
-    @property
-    def _get_q(self):
-        return self._q_
-
-
-    @_get_q.setter
-    def _q(self, newq):
-        if newq is not None:
-            oget = newq.get
-            newq.get = lambda: oget(timeout=0)
-            oput = newq.put
-            def putit(x):
-                p = oput(x)
-                if not self.test.paused:
-                    self.flush()
-                return p
-            newq.put = putit
-        self._q_ = newq
-
-
-    def callFromThread(self, f, *a, **k):
-        result = f(*a, **k)
-        return result
-
-
-    def callInThread(self, f, *a, **k):
-        """
-        This should be called only once, to start the worker function that
-        dedicates a thread to this L{ThreadHolder}.
-        """
-        self._workerIsRunning = True
-
-
-    def flush(self):
-        """
-        Fire all deferreds previously returned from submit.
-        """
-        try:
-            while self._workerIsRunning and self._qpull():
-                pass
-            else:
-                self._workerIsRunning = False
-        except Empty:
-            pass
-
-
-
-class ClockWithThreads(Clock):
-    """
-    A testing reactor that supplies L{IReactorTime} and L{IReactorThreads}.
-    """
-    implements(IReactorThreads)
-
-    def __init__(self):
-        super(ClockWithThreads, self).__init__()
-        self._pool = ThreadPool()
-
-
-    def getThreadPool(self):
-        """
-        Get the threadpool.
-        """
-        return self._pool
-
-
-    def suggestThreadPoolSize(self, size):
-        """
-        Approximate the behavior of a 'real' reactor.
-        """
-        self._pool.adjustPoolsize(maxthreads=size)
-
-
-    def callInThread(self, thunk, *a, **kw):
-        """
-        No implementation.
-        """
-
-
-    def callFromThread(self, thunk, *a, **kw):
-        """
-        No implementation.
-        """
-
-
-verifyClass(IReactorThreads, ClockWithThreads)
-
-
-
 class ConnectionPoolBootTests(TestCase):
     """
     Tests for the start-up phase of L{ConnectionPool}.
@@ -521,74 +119,6 @@
 
 
 
-class ConnectionPoolHelper(object):
-    """
-    Connection pool setting-up facilities for tests that need a
-    L{ConnectionPool}.
-    """
-
-    dialect = POSTGRES_DIALECT
-    paramstyle = DEFAULT_PARAM_STYLE
-
-    def setUp(self):
-        """
-        Create a L{ConnectionPool} attached to a C{ConnectionFactory}.  Start
-        the L{ConnectionPool}.
-        """
-        self.paused             = False
-        self.holders            = []
-        self.factory            = ConnectionFactory()
-        self.pool               = ConnectionPool(self.factory.connect,
-                                                 maxConnections=2,
-                                                 dialect=self.dialect,
-                                                 paramstyle=self.paramstyle)
-        self.pool._createHolder = self.makeAHolder
-        self.clock              = self.pool.reactor = ClockWithThreads()
-        self.pool.startService()
-        self.addCleanup(self.flushHolders)
-
-
-    def flushHolders(self):
-        """
-        Flush all pending C{submit}s since C{pauseHolders} was called.  This
-        makes sure the service is stopped and the fake ThreadHolders are all
-        executing their queues so failed tests can exit cleanly.
-        """
-        self.paused = False
-        for holder in self.holders:
-            holder.flush()
-
-
-    def pauseHolders(self):
-        """
-        Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
-        L{Deferred}.
-        """
-        self.paused = True
-
-
-    def makeAHolder(self):
-        """
-        Make a ThreadHolder-alike.
-        """
-        fth = FakeThreadHolder(self)
-        self.holders.append(fth)
-        return fth
-
-
-    def resultOf(self, it):
-        return resultOf(it)
-
-
-    def createTransaction(self):
-        return self.pool.connection()
-
-
-    def translateError(self, err):
-        return err
-
-
-
 class ConnectionPoolTests(ConnectionPoolHelper, TestCase, AssertResultHelper):
     """
     Tests for L{ConnectionPool}.

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_queue.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_queue.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -52,7 +52,8 @@
 from twext.enterprise.queue import ConnectionFromPeerNode
 from twext.enterprise.fixtures import buildConnectionPool
 from zope.interface.verify import verifyObject
-from twisted.test.proto_helpers import StringTransport
+from twisted.test.proto_helpers import StringTransport, MemoryReactor
+from twext.enterprise.fixtures import SteppablePoolHelper
 
 from twext.enterprise.queue import _BaseQueuer, NonPerformingQueuer
 import twext.enterprise.queue
@@ -70,6 +71,16 @@
 
 
 
+class MemoryReactorWithClock(MemoryReactor, Clock):
+    """
+    Simulate a real reactor.
+    """
+    def __init__(self):
+        MemoryReactor.__init__(self)
+        Clock.__init__(self)
+
+
+
 def transactionally(transactionCreator):
     """
     Perform the decorated function immediately in a transaction, replacing its
@@ -213,7 +224,6 @@
 
 
 
-
 class WorkItemTests(TestCase):
     """
     A L{WorkItem} is an item of work that can be executed.
@@ -467,7 +477,64 @@
         self.assertIdentical(result, proposal)
 
 
+    def test_workerConnectionPoolPerformWork(self):
+        """
+        L{WorkerConnectionPool.performWork} performs work by selecting a
+        L{ConnectionFromWorker} and sending it a L{PerformWork} command.
+        """
+        clock = Clock()
+        peerPool = PeerConnectionPool(clock, None, 4322, schema)
+        factory = peerPool.workerListenerFactory()
+        def peer():
+            p = factory.buildProtocol(None)
+            t = StringTransport()
+            p.makeConnection(t)
+            return p, t
+        worker1, trans1 = peer()
+        worker2, trans2 = peer()
+        # Ask the worker to do something.
+        worker1.performWork(schema.DUMMY_WORK_ITEM, 1)
+        self.assertEquals(worker1.currentLoad, 1)
+        self.assertEquals(worker2.currentLoad, 0)
 
+        # Now ask the pool to do something
+        peerPool.workerPool.performWork(schema.DUMMY_WORK_ITEM, 2)
+        self.assertEquals(worker1.currentLoad, 1)
+        self.assertEquals(worker2.currentLoad, 1)
+
+
+    def test_poolStartServiceChecksForWork(self):
+        """
+        L{PeerConnectionPool.startService} kicks off the idle work-check loop.
+        """
+        reactor = MemoryReactorWithClock()
+        cph = SteppablePoolHelper(nodeSchema + schemaText)
+        then = datetime.datetime(2012, 12, 12, 12, 12, 0)
+        reactor.advance(astimestamp(then))
+        cph.setUp(self)
+        pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321, schema)
+        now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
+        @transactionally(cph.pool.connection)
+        def createOldWork(txn):
+            one = DummyWorkItem.create(txn, workID=1, a=3, b=4, notBefore=then)
+            two = DummyWorkItem.create(txn, workID=2, a=7, b=9, notBefore=now)
+            return gatherResults([one, two])
+        pcp.startService()
+        cph.flushHolders()
+        reactor.advance(pcp.queueProcessTimeout * 2)
+        self.assertEquals(
+            cph.rows("select * from DUMMY_WORK_DONE"),
+            [(1, 7)]
+        )
+        cph.rows("delete from DUMMY_WORK_DONE")
+        reactor.advance(pcp.queueProcessTimeout * 2)
+        self.assertEquals(
+            cph.rows("select * from DUMMY_WORK_DONE"),
+            [(2, 16)]
+        )
+
+
+
 class HalfConnection(object):
     def __init__(self, protocol):
         self.protocol = protocol

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/python/sendmsg.c
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/python/sendmsg.c	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/python/sendmsg.c	2013-04-26 16:45:09 UTC (rev 11102)
@@ -117,6 +117,7 @@
     struct msghdr message_header;
     struct iovec iov[1];
     PyObject *ancillary = NULL;
+    PyObject *ultimate_result = NULL;
     static char *kwlist[] = {"fd", "data", "flags", "ancillary", NULL};
 
     if (!PyArg_ParseTupleAndKeywords(
@@ -148,14 +149,14 @@
             PyErr_Format(PyExc_TypeError,
                          "sendmsg argument 3 expected list, got %s",
                          ancillary->ob_type->tp_name);
-            return NULL;
+            goto finished;
         }
 
         PyObject *iterator = PyObject_GetIter(ancillary);
         PyObject *item = NULL;
 
         if (iterator == NULL) {
-            return NULL;
+            goto finished;
         }
 
         size_t all_data_len = 0;
@@ -172,7 +173,7 @@
                         &level, &type, &data, &data_len)) {
                 Py_DECREF(item);
                 Py_DECREF(iterator);
-                return NULL;
+                goto finished;
             }
 
             prev_all_data_len = all_data_len;
@@ -185,7 +186,7 @@
                 PyErr_Format(PyExc_OverflowError,
                              "Too much msg_control to fit in a size_t: %zu",
                              prev_all_data_len);
-                return NULL;
+                goto finished;
             }
         }
 
@@ -199,15 +200,13 @@
                 PyErr_Format(PyExc_OverflowError,
                              "Too much msg_control to fit in a socklen_t: %zu",
                              all_data_len);
-                return NULL;
+                goto finished;
             }
             message_header.msg_control = malloc(all_data_len);
             if (!message_header.msg_control) {
                 PyErr_NoMemory();
-                return NULL;
+                goto finished;
             }
-        } else {
-            message_header.msg_control = NULL;
         }
         message_header.msg_controllen = (socklen_t) all_data_len;
 
@@ -215,8 +214,7 @@
         item = NULL;
 
         if (!iterator) {
-            free(message_header.msg_control);
-            return NULL;
+            goto finished;
         }
 
         /* Unpack the tuples into the control message. */
@@ -239,8 +237,7 @@
                                   &data_len)) {
                 Py_DECREF(item);
                 Py_DECREF(iterator);
-                free(message_header.msg_control);
-                return NULL;
+                goto finished;
             }
 
             control_message->cmsg_level = level;
@@ -250,12 +247,9 @@
             if (data_size > SOCKLEN_MAX) {
                 Py_DECREF(item);
                 Py_DECREF(iterator);
-                free(message_header.msg_control);
-
                 PyErr_Format(PyExc_OverflowError,
                              "CMSG_LEN(%zd) > SOCKLEN_MAX", data_len);
-
-                return NULL;
+                goto finished;
             }
 
             control_message->cmsg_len = (socklen_t) data_size;
@@ -271,8 +265,7 @@
         Py_DECREF(iterator);
 
         if (PyErr_Occurred()) {
-            free(message_header.msg_control);
-            return NULL;
+            goto finished;
         }
     }
 
@@ -280,13 +273,16 @@
 
     if (sendmsg_result < 0) {
         PyErr_SetFromErrno(sendmsg_socket_error);
-        if (message_header.msg_control) {
-            free(message_header.msg_control);
-        }
-        return NULL;
+        goto finished;
+    } else {
+        ultimate_result = Py_BuildValue("n", sendmsg_result);
     }
 
-    return Py_BuildValue("n", sendmsg_result);
+  finished:
+    if (message_header.msg_control) {
+        free(message_header.msg_control);
+    }
+    return ultimate_result;
 }
 
 static PyObject *sendmsg_recvmsg(PyObject *self, PyObject *args, PyObject *keywds) {

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/directory.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/directory.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/directory.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -33,7 +33,7 @@
 
 from twext.enterprise.dal.record import fromTable
 from twext.enterprise.dal.syntax import Delete
-from twext.enterprise.queue import WorkItem
+from twext.enterprise.queue import WorkItem, PeerConnectionPool
 from twext.python.log import Logger, LoggingMixIn
 from twext.web2.dav.auth import IPrincipalCredentials
 from twext.web2.dav.util import joinURL
@@ -998,11 +998,22 @@
     txn = store.newTransaction()
     notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
     log.debug("Scheduling next group cacher update: %s" % (notBefore,))
-    yield txn.enqueue(GroupCacherPollingWork, notBefore=notBefore)
+    wp = (yield txn.enqueue(GroupCacherPollingWork, notBefore=notBefore))
     yield txn.commit()
+    returnValue(wp)
 
 
 
+def schedulePolledGroupCachingUpdate(store):
+    """
+    Schedules a group caching update work item in "the past" so PeerConnectionPool's
+    overdue-item logic picks it up quickly.
+    """
+    seconds = -PeerConnectionPool.queueProcessTimeout
+    return scheduleNextGroupCachingUpdate(store, seconds)
+
+
+
 def diffAssignments(old, new):
     """
     Compare two proxy assignment lists and return their differences in the form of

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/test/test_directory.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/test/test_directory.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/test/test_directory.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -20,12 +20,13 @@
 from twistedcaldav.test.util import TestCase
 from twistedcaldav.test.util import xmlFile, augmentsFile, proxiesFile, dirTest
 from twistedcaldav.config import config
-from twistedcaldav.directory.directory import DirectoryService, DirectoryRecord, GroupMembershipCache, GroupMembershipCacheUpdater, diffAssignments
+from twistedcaldav.directory.directory import DirectoryService, DirectoryRecord, GroupMembershipCache, GroupMembershipCacheUpdater, diffAssignments, schedulePolledGroupCachingUpdate
 from twistedcaldav.directory.xmlfile import XMLDirectoryService
 from twistedcaldav.directory.calendaruserproxyloader import XMLCalendarUserProxyLoader
 from twistedcaldav.directory import augment, calendaruserproxy
 from twistedcaldav.directory.util import normalizeUUID
 from twistedcaldav.directory.principal import DirectoryPrincipalProvisioningResource
+from txdav.common.datastore.test.util import buildStore
 
 import cPickle as pickle
 import uuid
@@ -738,8 +739,34 @@
             ])
         )
 
+    @inlineCallbacks
+    def testScheduling(self):
+        """
+        Exercise schedulePolledGroupCachingUpdate
+        """
 
+        groupCacher = StubGroupCacher()
 
+        def decorateTransaction(txn):
+            txn._groupCacher = groupCacher
+
+        store = yield buildStore(self, None)
+        store.callWithNewTransactions(decorateTransaction)
+        wp = (yield schedulePolledGroupCachingUpdate(store))
+        yield wp.whenExecuted()
+        self.assertTrue(groupCacher.called)
+
+    testScheduling.skip = "Fix WorkProposal to track delayed calls and cancel them"
+
+class StubGroupCacher(object):
+    def __init__(self):
+        self.called = False
+        self.updateSeconds = 99
+
+    def updateCache(self):
+        self.called = True
+
+
 class RecordsMatchingTokensTests(TestCase):
 
     @inlineCallbacks

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/stdconfig.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/stdconfig.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -37,7 +37,9 @@
 from twisted.python.runtime import platform
 
 from calendarserver.push.util import getAPNTopicFromCertificate
+from twistedcaldav.util import computeProcessCount
 
+
 log = Logger()
 
 if platform.isMacOSX():
@@ -324,8 +326,8 @@
     "ConfigRoot"              : "Config",
     "LogRoot"                 : "/var/log/caldavd",
     "RunRoot"                 : "/var/run/caldavd",
-    "WebCalendarRoot"         : "/Applications/Server.app/Contents/ServerRoot/usr/share/collabd",
-
+    "WebCalendarRoot"         : "/Applications/Server.app/Contents/ServerRoot/usr/share/collabd/webcal/public",
+    
     #
     # Quotas
     #
@@ -430,7 +432,7 @@
         },
         "Wiki": {
             "Enabled": False,
-            "Cookie": "apple_webauth_token",
+            "Cookie": "cc.collabd_session_guid",
             "URL": "http://127.0.0.1:8089/RPC2",
             "UserMethod": "userForSession",
             "WikiMethod": "accessLevelForUserWikiCalendar",
@@ -907,8 +909,14 @@
         "DatabaseName": "caldav",
         "LogFile": "postgres.log",
         "ListenAddresses": [],
-        "SharedBuffers": 30,
-        "MaxConnections": 20,
+        "SharedBuffers": 0, # BuffersToConnectionsRatio * MaxConnections
+                            # Note: don't set this, it will be computed dynamically
+                            # See _updateMultiProcess( ) below for details
+        "MaxConnections": 0, # Dynamically computed based on ProcessCount, etc.
+                             # Note: don't set this, it will be computed dynamically
+                             # See _updateMultiProcess( ) below for details
+        "ExtraConnections": 3, # how many extra connections to leave for utilities
+        "BuffersToConnectionsRatio": 1.5,
         "Options": [
             "-c standard_conforming_strings=on",
         ],
@@ -1079,9 +1087,11 @@
     # Remove possible trailing slash from ServerRoot
     try:
         configDict["ServerRoot"] = configDict["ServerRoot"].rstrip("/")
+        configDict["ServerRoot"] = os.path.abspath(configDict["ServerRoot"])
     except KeyError:
         pass
 
+
     for root, relativePath in RELATIVE_PATHS:
         if root in configDict:
             if isinstance(relativePath, str):
@@ -1128,7 +1138,38 @@
         configDict.ServerHostName = hostname
 
 
+def _updateMultiProcess(configDict, reloading=False):
+    """
+    Dynamically compute ProcessCount if it's set to 0.  Always compute
+    MaxConnections and SharedBuffers based on ProcessCount, ExtraConnections,
+    SharedConnectionPool, MaxDBConnectionsPerPool, and BuffersToConnectionsRatio
+    """
+    if configDict.MultiProcess.ProcessCount == 0:
+        processCount = computeProcessCount(
+            configDict.MultiProcess.MinProcessCount,
+            configDict.MultiProcess.PerCPU,
+            configDict.MultiProcess.PerGB,
+        )
+        configDict.MultiProcess.ProcessCount = processCount
 
+    # Start off with extra connections to be used by command line utilities and
+    # administration/inspection tools
+    maxConnections = configDict.Postgres.ExtraConnections
+
+    if configDict.SharedConnectionPool:
+        # If SharedConnectionPool is enabled, then only the master process will
+        # be connection to the database, therefore use MaxDBConnectionsPerPool
+        maxConnections += configDict.MaxDBConnectionsPerPool
+    else:
+        # Otherwise the master *and* each worker process will be connecting
+        maxConnections += ((configDict.MultiProcess.ProcessCount + 1) *
+            configDict.MaxDBConnectionsPerPool)
+
+    configDict.Postgres.MaxConnections = maxConnections
+    configDict.Postgres.SharedBuffers = int(configDict.Postgres.MaxConnections *
+        configDict.Postgres.BuffersToConnectionsRatio)
+
+
 def _preUpdateDirectoryService(configDict, items, reloading=False):
     # Special handling for directory services configs
     dsType = items.get("DirectoryService", {}).get("type", None)
@@ -1502,6 +1543,7 @@
     _preUpdateDirectoryAddressBookBackingDirectoryService,
     )
 POST_UPDATE_HOOKS = (
+    _updateMultiProcess,
     _updateDataStore,
     _updateHostName,
     _postUpdateDirectoryService,

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_config.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_config.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_config.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -83,7 +83,8 @@
 
     def testDefaults(self):
         for key, value in DEFAULT_CONFIG.iteritems():
-            if key in ("ServerHostName", "Notifications"):
+            if key in ("ServerHostName", "Notifications", "MultiProcess",
+                "Postgres"):
                 # Value is calculated and may vary
                 continue
             for item in RELATIVE_PATHS:

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_stdconfig.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_stdconfig.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_stdconfig.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -15,14 +15,16 @@
 # limitations under the License.
 ##
 
+import os
 from cStringIO import StringIO
 
 from twext.python.filepath import CachingFilePath as FilePath
 from twisted.trial.unittest import TestCase
 
-from twistedcaldav.config import Config
+from twistedcaldav.config import Config, ConfigDict
 from twistedcaldav.stdconfig import NoUnicodePlistParser, PListConfigProvider,\
-    _updateDataStore
+    _updateDataStore, _updateMultiProcess
+import twistedcaldav.stdconfig
 
 nonASCIIValue = "→←"
 nonASCIIPlist = "<plist version='1.0'><string>%s</string></plist>" % (
@@ -149,3 +151,31 @@
         }
         _updateDataStore(configDict)
         self.assertEquals(configDict["ServerRoot"], "/a/b/c")
+
+        configDict = {
+            "ServerRoot" : "./a",
+        }
+        _updateDataStore(configDict)
+        self.assertEquals(configDict["ServerRoot"], os.path.join(os.getcwd(), "a"))
+
+    def test_updateMultiProcess(self):
+        def stubProcessCount(*args):
+            return 3
+        self.patch(twistedcaldav.stdconfig, "computeProcessCount", stubProcessCount)
+        configDict = ConfigDict({
+            "MultiProcess" : {
+                "ProcessCount" : 0,
+                "MinProcessCount" : 2,
+                "PerCPU" : 1,
+                "PerGB" : 1,
+            },
+            "Postgres" : {
+                "ExtraConnections" : 5,
+                "BuffersToConnectionsRatio" : 1.5,
+            },
+            "SharedConnectionPool" : False,
+            "MaxDBConnectionsPerPool" : 10,
+        })
+        _updateMultiProcess(configDict)
+        self.assertEquals(45, configDict.Postgres.MaxConnections)
+        self.assertEquals(67, configDict.Postgres.SharedBuffers)

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/util.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/util.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -111,6 +111,37 @@
 
 
 
+def computeProcessCount(minimum, perCPU, perGB, cpuCount=None, memSize=None):
+    """
+    Determine how many process to spawn based on installed RAM and CPUs,
+    returning at least "mininum"
+    """
+
+    if cpuCount is None:
+        try:
+            cpuCount = getNCPU()
+        except NotImplementedError, e:
+            log.error("Unable to detect number of CPUs: %s" % (str(e),))
+            return minimum
+
+    if memSize is None:
+        try:
+            memSize = getMemorySize()
+        except NotImplementedError, e:
+            log.error("Unable to detect amount of installed RAM: %s" % (str(e),))
+            return minimum
+
+    countByCore = perCPU * cpuCount
+    countByMemory = perGB * (memSize / (1024 * 1024 * 1024))
+
+    # Pick the smaller of the two:
+    count = min(countByCore, countByMemory)
+
+    # ...but at least "minimum"
+    return max(count, minimum)
+
+
+
 ##
 # Module management
 ##

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/subpostgres.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/subpostgres.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/subpostgres.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -27,14 +27,12 @@
 from twisted.python.procutils import which
 from twisted.internet.protocol import ProcessProtocol
 
-from twisted.python.reflect import namedAny
 from twext.python.log import Logger
 from twext.python.filepath import CachingFilePath
 
-pgdb = namedAny("pgdb")
+import pgdb
 
 from twisted.protocols.basic import LineReceiver
-from twisted.internet import reactor
 from twisted.internet.defer import Deferred
 from txdav.base.datastore.dbapiclient import DBAPIConnector
 from txdav.base.datastore.dbapiclient import postgresPreflight
@@ -168,7 +166,8 @@
                  spawnedDBUser="caldav",
                  importFileName=None,
                  pgCtl="pg_ctl",
-                 initDB="initdb"):
+                 initDB="initdb",
+                 reactor=None):
         """
         Initialize a L{PostgresService} pointed at a data store directory.
 
@@ -194,6 +193,7 @@
         MultiService.__init__(self)
         self.subServiceFactory = subServiceFactory
         self.dataStoreDirectory = dataStoreDirectory
+        self.workingDir = self.dataStoreDirectory.child("working")
         self.resetSchema = resetSchema
 
         # In order to delay a shutdown until database initialization has
@@ -239,8 +239,16 @@
         self.openConnections = []
         self._pgCtl = pgCtl
         self._initdb = initDB
+        self._reactor = reactor
 
+    @property
+    def reactor(self):
+        if self._reactor is None:
+            from twisted.internet import reactor
+            self._reactor = reactor
+        return self._reactor
 
+
     def pgCtl(self):
         """
         Locate the path to pg_ctl.
@@ -300,17 +308,12 @@
         return self._connectorFor(databaseName).connect(label)
 
 
-    def ready(self):
+    def ready(self, createDatabaseConn, createDatabaseCursor):
         """
         Subprocess is ready.  Time to initialize the subservice.
         If the database has not been created and there is a dump file,
         then the dump file is imported.
         """
-        createDatabaseConn = self.produceConnection(
-            'schema creation', 'postgres'
-        )
-        createDatabaseCursor = createDatabaseConn.cursor()
-        createDatabaseCursor.execute("commit")
 
         if self.resetSchema:
             try:
@@ -347,9 +350,6 @@
             connection.commit()
             connection.close()
 
-        # TODO: anyone know why these two lines are here?
-        connection = self.produceConnection()
-        cursor = connection.cursor()
 
         if self.shutdownDeferred is None:
             # Only continue startup if we've not begun shutdown
@@ -383,6 +383,30 @@
         """
         Start the database and initialize the subservice.
         """
+
+        def createConnection():
+            createDatabaseConn = self.produceConnection(
+                'schema creation', 'postgres'
+            )
+            createDatabaseCursor = createDatabaseConn.cursor()
+            createDatabaseCursor.execute("commit")
+            return createDatabaseConn, createDatabaseCursor
+
+        try:
+            createDatabaseConn, createDatabaseCursor = createConnection()
+        except pgdb.DatabaseError:
+            # We could not connect the database, so attempt to start it
+            pass
+        except Exception, e:
+            # Some other unexpected error is preventing us from connecting
+            # to the database
+            log.warn("Failed to connect to Postgres: %s" % (str(e)))
+        else:
+            # Database is running, so just use our connection
+            self.ready(createDatabaseConn, createDatabaseCursor)
+            self.deactivateDelayedShutdown()
+            return
+
         monitor = _PostgresMonitor(self)
         pgCtl = self.pgCtl()
         # check consistency of initdb and postgres?
@@ -400,7 +424,7 @@
         options.append("-c standard_conforming_strings=on")
         options.extend(self.options)
 
-        reactor.spawnProcess(
+        self.reactor.spawnProcess(
             monitor, pgCtl,
             [
                 pgCtl,
@@ -412,13 +436,13 @@
                 "-o",
                 " ".join(options),
             ],
-            self.env,
+            env=self.env, path=self.workingDir.path,
             uid=self.uid, gid=self.gid,
         )
         self.monitor = monitor
         def gotReady(result):
             self.shouldStopDatabase = result
-            self.ready()
+            self.ready(*createConnection())
             self.deactivateDelayedShutdown()
         def reportit(f):
             log.err(f)
@@ -432,7 +456,6 @@
         MultiService.startService(self)
         self.activateDelayedShutdown()
         clusterDir = self.dataStoreDirectory.child("cluster")
-        workingDir = self.dataStoreDirectory.child("working")
         env = self.env = os.environ.copy()
         env.update(PGDATA=clusterDir.path,
                    PGHOST=self.host,
@@ -448,15 +471,16 @@
         else:
             if not self.dataStoreDirectory.isdir():
                 self.dataStoreDirectory.createDirectory()
-            if not workingDir.isdir():
-                workingDir.createDirectory()
+            if not self.workingDir.isdir():
+                self.workingDir.createDirectory()
             if self.uid and self.gid:
                 os.chown(self.dataStoreDirectory.path, self.uid, self.gid)
-                os.chown(workingDir.path, self.uid, self.gid)
+                os.chown(self.workingDir.path, self.uid, self.gid)
             dbInited = Deferred()
-            reactor.spawnProcess(
+            self.reactor.spawnProcess(
                 CapturingProcessProtocol(dbInited, None),
-                initdb, [initdb, "-E", "UTF8", "-U", self.spawnedDBUser], env, workingDir.path,
+                initdb, [initdb, "-E", "UTF8", "-U", self.spawnedDBUser],
+                env=env, path=self.workingDir.path,
                 uid=self.uid, gid=self.gid,
             )
             def doCreate(result):
@@ -486,9 +510,9 @@
             if self.shouldStopDatabase:
                 monitor = _PostgresMonitor()
                 pgCtl = self.pgCtl()
-                reactor.spawnProcess(monitor, pgCtl,
+                self.reactor.spawnProcess(monitor, pgCtl,
                     [pgCtl, '-l', 'logfile', 'stop'],
-                    self.env,
+                    env=self.env, path=self.workingDir.path,
                     uid=self.uid, gid=self.gid,
                 )
                 return monitor.completionDeferred

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/test/test_subpostgres.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/test/test_subpostgres.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/test/test_subpostgres.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -20,12 +20,19 @@
 
 from twisted.trial.unittest import TestCase
 
+# NOTE: This import will fail eventuall when this functionality is added to 
+# MemoryReactor:
+from twisted.runner.test.test_procmon import DummyProcessReactor
+
+from twisted.python.filepath import FilePath
 from twext.python.filepath import CachingFilePath
 
 from txdav.base.datastore.subpostgres import PostgresService
 from twisted.internet.defer import inlineCallbacks, Deferred
 from twisted.application.service import Service
 
+import pgdb
+
 class SubprocessStartup(TestCase):
     """
     Tests for starting and stopping the subprocess.
@@ -185,3 +192,73 @@
         values = cursor.fetchall()
         self.assertEquals(values, [["value1"],["value2"]])
 
+    def test_startDatabaseRunning(self):
+        """ Ensure that if we can connect to postgres we don't spawn pg_ctl """
+
+        self.cursorHistory = []
+
+        class DummyCursor(object):
+            def __init__(self, historyHolder):
+                self.historyHolder = historyHolder 
+
+            def execute(self, *args):
+                self.historyHolder.cursorHistory.append(args)
+
+            def close(self):
+                pass
+
+        class DummyConnection(object):
+            def __init__(self, historyHolder):
+                self.historyHolder = historyHolder
+
+            def cursor(self):
+                return DummyCursor(self.historyHolder)
+
+            def commit(self):
+                pass
+
+            def close(self):
+                pass
+
+        def produceConnection(*args):
+            return DummyConnection(self)
+
+        dummyReactor = DummyProcessReactor()
+        svc = PostgresService(
+            FilePath("postgres_4.pgdb"),
+            lambda x : Service(),
+            "",
+             reactor=dummyReactor,
+        )
+        svc.produceConnection = produceConnection
+        svc.env = {}
+        svc.startDatabase()
+        self.assertEquals(
+            self.cursorHistory,
+            [
+                ('commit',),
+                ("create database subpostgres with encoding 'UTF8'",),
+                ('',)
+            ]
+        )
+        self.assertEquals(dummyReactor.spawnedProcesses, [])
+
+
+    def test_startDatabaseNotRunning(self):
+        """ Ensure that if we can't connect to postgres we spawn pg_ctl """
+
+        def produceConnection(*args):
+            raise pgdb.DatabaseError
+
+        dummyReactor = DummyProcessReactor()
+        svc = PostgresService(
+            FilePath("postgres_4.pgdb"),
+            lambda x : Service(),
+            "",
+             reactor=dummyReactor,
+        )
+        svc.produceConnection = produceConnection
+        svc.env = {}
+        svc.startDatabase()
+        self.assertEquals(len(dummyReactor.spawnedProcesses), 1)
+        self.assertTrue(dummyReactor.spawnedProcesses[0]._executable.endswith("pg_ctl"))

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -57,7 +57,7 @@
 from txdav.carddav.iaddressbookstore import IAddressBookTransaction
 
 from txdav.common.datastore.common import HomeChildBase
-from txdav.common.datastore.sql_tables import schema
+from txdav.common.datastore.sql_tables import schema, splitSQLString
 from txdav.common.datastore.sql_tables import _BIND_MODE_OWN, \
     _BIND_STATUS_ACCEPTED, _BIND_STATUS_DECLINED, \
     NOTIFICATION_OBJECT_REVISIONS_TABLE
@@ -72,7 +72,6 @@
 
 from twext.python.clsprop import classproperty
 from twext.enterprise.ienterprise import AlreadyFinishedError
-from twext.enterprise.dal.parseschema import significant
 
 from twext.enterprise.dal.syntax import \
     Delete, utcNowSQL, Union, Insert, Len, Max, Parameter, SavepointAction, \
@@ -95,7 +94,6 @@
 from pycalendar.datetime import PyCalendarDateTime
 
 from cStringIO import StringIO
-from sqlparse import parse
 import time
 
 current_sql_schema = getModule(__name__).filePath.sibling("sql_schema").child("current.sql").getContent()
@@ -1015,19 +1013,11 @@
     @inlineCallbacks
     def execSQLBlock(self, sql):
         """
-        Execute a block of SQL by parsing it out into individual statements and execute
-        each of those.
-
+        Execute SQL statements parsed by splitSQLString.
         FIXME: temporary measure for handling large schema upgrades. This should NOT be used
         for regular SQL operations - only upgrades.
         """
-        parsed = parse(sql)
-        for stmt in parsed:
-            while stmt.tokens and not significant(stmt.tokens[0]):
-                stmt.tokens.pop(0)
-            if not stmt.tokens:
-                continue
-            stmt = str(stmt).rstrip(";")
+        for stmt in splitSQLString(sql):
             yield self.execSQL(stmt)
 
 

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_16_to_17.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_16_to_17.sql	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_16_to_17.sql	2013-04-26 16:45:09 UTC (rev 11102)
@@ -23,6 +23,13 @@
 -- CALENDAR_OBJECT clean-up --
 ------------------------------
 
+begin
+for i in (select constraint_name from user_cons_columns where column_name = 'ORGANIZER_OBJECT')
+loop
+execute immediate 'alter table calendar_object drop constraint ' || i.constraint_name;
+end loop;
+end;
+
 alter table CALENDAR_OBJECT
  drop (ORGANIZER_OBJECT);
 

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_tables.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_tables.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_tables.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -27,8 +27,9 @@
 from twext.enterprise.ienterprise import ORACLE_DIALECT, POSTGRES_DIALECT
 from twext.enterprise.dal.syntax import Insert
 from twext.enterprise.ienterprise import ORACLE_TABLE_NAME_MAX
-from twext.enterprise.dal.parseschema import schemaFromPath
-
+from twext.enterprise.dal.parseschema import schemaFromPath, significant
+from sqlparse import parse
+from re import compile
 import hashlib
 
 
@@ -384,6 +385,44 @@
         out.write('\n);\n\n')
 
 
+
+def splitSQLString(sqlString):
+    """
+    Strings which mix zero or more sql statements with zero or more pl/sql
+    statements need to be split into individual sql statements for execution.
+    This function was written to allow execution of pl/sql during Oracle schema
+    upgrades.
+    """
+    aggregated = ''
+    inPlSQL = None
+    parsed = parse(sqlString)
+    for stmt in parsed:
+        while stmt.tokens and not significant(stmt.tokens[0]):
+            stmt.tokens.pop(0)
+        if not stmt.tokens:
+            continue
+        if inPlSQL is not None:
+            agg = str(stmt).strip()
+            if "end;".lower() in agg.lower():
+                inPlSQL = None
+                aggregated += agg
+                rex = compile("\n +")
+                aggregated = rex.sub('\n', aggregated)
+                yield aggregated.strip()
+                continue
+            aggregated += agg
+            continue
+        if inPlSQL is None:
+            #if 'begin'.lower() in str(stmt).split()[0].lower():
+            if 'begin'.lower() in str(stmt).lower():
+                inPlSQL = True
+                aggregated += str(stmt)
+                continue
+        else:
+            continue
+        yield str(stmt).rstrip().rstrip(";")
+
+
 if __name__ == '__main__':
     import sys
     if len(sys.argv) == 2:

Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/test/test_sql_tables.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/test/test_sql_tables.py	2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/test/test_sql_tables.py	2013-04-26 16:45:09 UTC (rev 11102)
@@ -31,10 +31,12 @@
 from twext.enterprise.dal.syntax import SchemaSyntax
 
 from txdav.common.datastore.sql_tables import schema, _translateSchema
-from txdav.common.datastore.sql_tables import SchemaBroken
+from txdav.common.datastore.sql_tables import SchemaBroken, splitSQLString
 
 from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
 
+from textwrap import dedent
+
 class SampleSomeColumns(TestCase, SchemaTestHelper):
     """
     Sample some columns from the tables defined by L{schema} and verify that
@@ -268,3 +270,175 @@
 
 
 
+class SQLSplitterTests(TestCase):
+    """
+    Test that strings which mix zero or more sql statements with zero or more
+    pl/sql statements are split into individual statements.
+    """
+
+    def test_dontSplitOneStatement(self):
+        """
+        A single sql statement yields a single string
+        """
+        result = splitSQLString("select * from foo;")
+        r1 = result.next()
+        self.assertEquals(r1, "select * from foo")
+        self.assertRaises(StopIteration, result.next)
+
+
+    def test_returnTwoSimpleStatements(self):
+        """
+        Two simple sql statements yield two separate strings
+        """
+        result = splitSQLString("select count(*) from baz; select bang from boop;")
+        r1 = result.next()
+        self.assertEquals(r1, "select count(*) from baz")
+        r2 = result.next()
+        self.assertEquals(r2, "select bang from boop")
+        self.assertRaises(StopIteration, result.next)
+        
+
+    def test_returnOneComplexStatement(self):
+        """
+        One complex sql statement yields a single string
+        """
+        bigSQL = dedent(
+        '''SELECT
+              CL.CODE,
+              CL.CATEGORY,
+           FROM
+              CLIENTS_SUPPLIERS CL
+              INVOICES I
+           WHERE
+              CL.CODE = I.CODE AND
+              CL.CATEGORY = I.CATEGORY AND
+              CL.UP_DATE = 
+                (SELECT
+                   MAX(CL2.UP_DATE)
+                 FROM
+                   CLIENTS_SUPPLIERS CL2
+                 WHERE
+                   CL2.CODE = I.CODE AND
+                   CL2.CATEGORY = I.CATEGORY AND
+                   CL2.UP_DATE <= I.EMISSION
+                ) AND
+                I.EMISSION BETWEEN DATE1 AND DATE2;''')
+        result = splitSQLString(bigSQL)
+        r1 = result.next()
+        self.assertEquals(r1, bigSQL.rstrip(";")) 
+        self.assertRaises(StopIteration, result.next)
+
+
+    def test_returnOnePlSQL(self):
+        """
+        One pl/sql block yields a single string
+        """
+        plsql = dedent(
+        '''BEGIN
+           LOOP
+               INSERT INTO T1 VALUES(i,i);
+               i := i+1;
+               EXIT WHEN i>100;
+           END LOOP;
+           END;''')
+        s1 = 'BEGIN\nLOOP\nINSERT INTO T1 VALUES(i,i);i := i+1;EXIT WHEN i>100;END LOOP;END;'
+        result = splitSQLString(plsql)
+        r1 = result.next()
+        self.assertEquals(r1, s1)
+        self.assertRaises(StopIteration, result.next)
+
+
+    def test_returnOnePlSQLAndOneSQL(self):
+        """
+        One sql statement and one pl/sql statement yields two separate strings
+        """
+        sql = dedent(
+        '''SELECT EGM.Name, BioEntity.BioEntityId INTO AUX
+            FROM EGM 
+            INNER JOIN BioEntity 
+                ON EGM.name LIKE BioEntity.Name AND EGM.TypeId = BioEntity.TypeId
+            OPTION (MERGE JOIN);''')
+        plsql = dedent(
+        '''BEGIN
+           FOR i IN 1..10 LOOP
+               IF MOD(i,2) = 0 THEN
+                   INSERT INTO temp VALUES (i, x, 'i is even');
+               ELSE
+                   INSERT INTO temp VALUES (i, x, 'i is odd');
+               END IF;
+               x := x + 100;
+           END LOOP;
+           COMMIT;
+           END;''')
+        s2 = "BEGIN\nFOR i IN 1..10 LOOP\nIF MOD(i,2) = 0 THEN\nINSERT INTO temp VALUES (i, x, 'i is even');ELSE\nINSERT INTO temp VALUES (i, x, 'i is odd');END IF;x := x + 100;END LOOP;COMMIT;END;"
+        result = splitSQLString(sql+plsql)
+        r1 = result.next()
+        self.assertEquals(r1, sql.rstrip(";"))
+        r2 = result.next()
+        self.assertEquals(r2, s2)
+        self.assertRaises(StopIteration, result.next)
+
+    def test_actualSchemaUpgrade(self):
+        """
+        A real-world schema upgrade is split into the expected number of statements,
+        ignoring comments
+        """
+        realsql = dedent(
+        '''
+        ----
+        -- Copyright (c) 2011-2013 Apple Inc. All rights reserved.
+        --
+        -- Licensed under the Apache License, Version 2.0 (the "License");
+        -- you may not use this file except in compliance with the License.
+        -- You may obtain a copy of the License at
+        --
+        -- http://www.apache.org/licenses/LICENSE-2.0
+        --
+        -- Unless required by applicable law or agreed to in writing, software
+        -- distributed under the License is distributed on an "AS IS" BASIS,
+        -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+        -- See the License for the specific language governing permissions and
+        -- limitations under the License.
+        ----
+
+        ---------------------------------------------------
+        -- Upgrade database schema from VERSION 16 to 17 --
+        ---------------------------------------------------
+
+
+        ------------------------------
+        -- CALENDAR_OBJECT clean-up --
+        ------------------------------
+
+        begin
+        for i in (select constraint_name from user_cons_columns where column_name = 'ORGANIZER_OBJECT')
+        loop
+        execute immediate 'alter table calendar_object drop constraint ' || i.constraint_name;
+        end loop;
+        end;
+
+        alter table CALENDAR_OBJECT
+         drop (ORGANIZER_OBJECT);
+
+        create index CALENDAR_OBJECT_ICALE_82e731d5 on CALENDAR_OBJECT (
+            ICALENDAR_UID
+        );
+
+
+        -- Now update the version
+        update CALENDARSERVER set VALUE = '17' where NAME = 'VERSION';
+        ''')
+        s1 = "begin\nfor i in (select constraint_name from user_cons_columns where column_name = 'ORGANIZER_OBJECT')\nloop\nexecute immediate 'alter table calendar_object drop constraint ' || i.constraint_name;end loop;end;"
+        s2 = 'alter table CALENDAR_OBJECT\n drop (ORGANIZER_OBJECT)'
+        s3 = 'create index CALENDAR_OBJECT_ICALE_82e731d5 on CALENDAR_OBJECT (\n    ICALENDAR_UID\n)'
+        s4 = "update CALENDARSERVER set VALUE = '17' where NAME = 'VERSION'"
+        result = splitSQLString(realsql)
+        r1 = result.next()
+        self.assertEquals(r1, s1)
+        r2 = result.next()
+        self.assertEquals(r2, s2)
+        r3 = result.next()
+        self.assertEquals(r3, s3)
+        r4 = result.next()
+        self.assertEquals(r4, s4)
+        self.assertRaises(StopIteration, result.next)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130426/ecab7912/attachment-0001.html>


More information about the calendarserver-changes mailing list