[CalendarServer-changes] [11102] CalendarServer/branches/users/cdaboo/store-scheduling
source_changes at macosforge.org
source_changes at macosforge.org
Fri Apr 26 09:45:09 PDT 2013
Revision: 11102
http://trac.calendarserver.org//changeset/11102
Author: cdaboo at apple.com
Date: 2013-04-26 09:45:09 -0700 (Fri, 26 Apr 2013)
Log Message:
-----------
Merge from trunk.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/store-scheduling/bin/caldavd
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/platform/darwin/wiki.py
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/caldav.py
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/test/test_util.py
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/util.py
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/changeip_calendar.py
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/config.py
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/principals.py
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/gateway/caldavd.plist
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/principals/caldavd.plist
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/test_config.py
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/util.py
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/webcal/resource.py
CalendarServer/branches/users/cdaboo/store-scheduling/conf/caldavd-apple.plist
CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/dal/syntax.py
CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/fixtures.py
CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/queue.py
CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_adbapi2.py
CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_queue.py
CalendarServer/branches/users/cdaboo/store-scheduling/twext/python/sendmsg.c
CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/directory.py
CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/test/test_directory.py
CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/stdconfig.py
CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_config.py
CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_stdconfig.py
CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/util.py
CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/subpostgres.py
CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/test/test_subpostgres.py
CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql.py
CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_16_to_17.sql
CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_tables.py
CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/test/test_sql_tables.py
Added Paths:
-----------
CalendarServer/branches/users/cdaboo/store-scheduling/bin/calendarserver_monitor_work
CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/workitems.py
Property Changed:
----------------
CalendarServer/branches/users/cdaboo/store-scheduling/
Property changes on: CalendarServer/branches/users/cdaboo/store-scheduling
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
/CalendarServer/trunk:10876-11028
+ /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop:11060-11065
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
/CalendarServer/trunk:10876-11101
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/bin/caldavd
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/bin/caldavd 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/bin/caldavd 2013-04-26 16:45:09 UTC (rev 11102)
@@ -125,4 +125,6 @@
export PYTHONPATH
-exec "${python}" "${twistdpath}" ${twistd_profile} ${twistd_reactor} ${daemonize} ${username} ${groupname} "${plugin_name}" ${configfile} ${service_type} ${errorlogenabled} ${profile} ${child_reactor};
+extra="-o FailIfUpgradeNeeded=False";
+
+exec "${python}" "${twistdpath}" ${twistd_profile} ${twistd_reactor} ${daemonize} ${username} ${groupname} "${plugin_name}" ${configfile} ${service_type} ${errorlogenabled} ${profile} ${child_reactor} ${extra};
Copied: CalendarServer/branches/users/cdaboo/store-scheduling/bin/calendarserver_monitor_work (from rev 11101, CalendarServer/trunk/bin/calendarserver_monitor_work)
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/bin/calendarserver_monitor_work (rev 0)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/bin/calendarserver_monitor_work 2013-04-26 16:45:09 UTC (rev 11102)
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+
+##
+# Copyright (c) 2006-2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import os
+import sys
+
+# In OS X Server context, add to PATH to find Postgres utilities (initdb, pg_ctl)
+if "Server.app" in sys.argv[0]:
+ os.environ["PATH"] += ":" + os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), "bin")
+
+#PYTHONPATH
+
+if __name__ == "__main__":
+ if "PYTHONPATH" in globals():
+ sys.path.insert(0, PYTHONPATH)
+ else:
+ try:
+ import _calendarserver_preamble
+ except ImportError:
+ sys.exc_clear()
+
+ from calendarserver.tools.workitems import main
+ main()
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/platform/darwin/wiki.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/platform/darwin/wiki.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/platform/darwin/wiki.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -48,7 +48,7 @@
(jsonResponse, str(e)))
raise WebAuthError("Could not look up token: %s" % (token,))
if response["succeeded"]:
- returnValue(response["shortname"])
+ returnValue(response["shortName"])
else:
raise WebAuthError("Could not look up token: %s" % (token,))
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/caldav.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/caldav.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -53,6 +53,8 @@
from twisted.application.service import MultiService, IServiceMaker
from twisted.application.service import Service
+from twistedcaldav.config import config, ConfigurationError
+from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
from twext.web2.server import Site
from twext.python.log import Logger, LoggingMixIn
from twext.python.log import logLevelForNamespace, setLogLevelForNamespace
@@ -68,13 +70,10 @@
)
from txdav.common.datastore.upgrade.migrate import UpgradeToDatabaseService
-from twistedcaldav.config import ConfigurationError
-from twistedcaldav.config import config
from twistedcaldav.directory import calendaruserproxy
from twistedcaldav.directory.directory import GroupMembershipCacheUpdater
from twistedcaldav.localization import processLocalizationFiles
from twistedcaldav import memcachepool
-from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
from twistedcaldav.upgrade import UpgradeFileSystemFormatService, PostDBImportService
from calendarserver.tap.util import pgServiceFromConfig, getDBPool, MemoryLimitService
@@ -102,8 +101,7 @@
from calendarserver.accesslog import AMPCommonAccessLoggingObserver
from calendarserver.accesslog import AMPLoggingFactory
from calendarserver.accesslog import RotatingFileAccessLoggingObserver
-from calendarserver.tap.util import getRootResource, computeProcessCount
-
+from calendarserver.tap.util import getRootResource
from calendarserver.tap.util import storeFromConfig
from calendarserver.tap.util import pgConnectorFromConfig
from calendarserver.tap.util import oracleConnectorFromConfig
@@ -1270,18 +1268,6 @@
monitor.addProcess('memcached-%s' % (name,), memcachedArgv,
env=PARENT_ENVIRONMENT)
- #
- # Calculate the number of processes to spawn
- #
- if config.MultiProcess.ProcessCount == 0:
- # TODO: this should probably be happening in a configuration hook.
- processCount = computeProcessCount(
- config.MultiProcess.MinProcessCount,
- config.MultiProcess.PerCPU,
- config.MultiProcess.PerGB,
- )
- config.MultiProcess.ProcessCount = processCount
- self.log_info("Configuring %d processes." % (processCount,))
# Open the socket(s) to be inherited by the slaves
inheritFDs = []
@@ -1431,7 +1417,7 @@
return multi
- ssvc = self.storageService(spawnerSvcCreator, uid, gid)
+ ssvc = self.storageService(spawnerSvcCreator, None, uid, gid)
ssvc.setServiceParent(s)
return s
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/test/test_util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/test/test_util.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/test/test_util.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -14,7 +14,8 @@
# limitations under the License.
##
-from calendarserver.tap.util import computeProcessCount, directoryFromConfig, MemoryLimitService
+from calendarserver.tap.util import directoryFromConfig, MemoryLimitService
+from twistedcaldav.util import computeProcessCount
from twistedcaldav.test.util import TestCase
from twistedcaldav.config import config
from twistedcaldav.directory.augment import AugmentXMLDB
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/util.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tap/util.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -67,7 +67,6 @@
from twistedcaldav.timezones import TimezoneCache
from twistedcaldav.timezoneservice import TimezoneServiceResource
from twistedcaldav.timezonestdservice import TimezoneStdServiceResource
-from twistedcaldav.util import getMemorySize, getNCPU
from twext.enterprise.ienterprise import POSTGRES_DIALECT
from twext.enterprise.ienterprise import ORACLE_DIALECT
from twext.enterprise.adbapi2 import ConnectionPool, ConnectionPoolConnection
@@ -770,37 +769,8 @@
-def computeProcessCount(minimum, perCPU, perGB, cpuCount=None, memSize=None):
- """
- Determine how many process to spawn based on installed RAM and CPUs,
- returning at least "mininum"
- """
- if cpuCount is None:
- try:
- cpuCount = getNCPU()
- except NotImplementedError, e:
- log.error("Unable to detect number of CPUs: %s" % (str(e),))
- return minimum
- if memSize is None:
- try:
- memSize = getMemorySize()
- except NotImplementedError, e:
- log.error("Unable to detect amount of installed RAM: %s" % (str(e),))
- return minimum
-
- countByCore = perCPU * cpuCount
- countByMemory = perGB * (memSize / (1024 * 1024 * 1024))
-
- # Pick the smaller of the two:
- count = min(countByCore, countByMemory)
-
- # ...but at least "minimum"
- return max(count, minimum)
-
-
-
class FakeRequest(object):
def __init__(self, rootResource, method, path, uri='/', transaction=None):
@@ -1010,17 +980,16 @@
access=os.W_OK,
create=(0750, config.UserName, config.GroupName),
)
- if config.LogRoot.startswith(config.ServerRoot + os.sep):
- checkDirectory(
- config.LogRoot,
- "Log root",
- access=os.W_OK,
- create=(0750, config.UserName, config.GroupName),
- )
- if config.RunRoot.startswith(config.ServerRoot + os.sep):
- checkDirectory(
- config.RunRoot,
- "Run root",
- access=os.W_OK,
- create=(0770, config.UserName, config.GroupName),
- )
+ # Always create these:
+ checkDirectory(
+ config.LogRoot,
+ "Log root",
+ access=os.W_OK,
+ create=(0750, config.UserName, config.GroupName),
+ )
+ checkDirectory(
+ config.RunRoot,
+ "Run root",
+ access=os.W_OK,
+ create=(0770, config.UserName, config.GroupName),
+ )
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/changeip_calendar.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/changeip_calendar.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/changeip_calendar.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -54,7 +54,7 @@
sys.exit(1)
verbose = False
- configFile = "/Library/Server/Calendar and Contacts/Config/caldavd.plist"
+ configFile = "/Library/Server/Calendar and Contacts/Config/caldavd-system.plist"
for opt, arg in optargs:
if opt in ("-h", "--help"):
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/config.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/config.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/config.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -32,43 +32,47 @@
from twistedcaldav.config import config, ConfigDict, ConfigurationError, mergeData
from twistedcaldav.stdconfig import DEFAULT_CONFIG_FILE
WRITABLE_CONFIG_KEYS = [
- "ServerHostName",
- "HTTPPort",
- "SSLPort",
- "EnableSSL",
- "RedirectHTTPToHTTPS",
- "EnableCalDAV",
- "EnableCardDAV",
- "DataRoot",
- "SSLCertificate",
- "SSLPrivateKey",
- "SSLAuthorityChain",
- "EnableSearchAddressBook",
+ "Authentication.Basic.AllowedOverWireUnencrypted",
"Authentication.Basic.Enabled",
- "Authentication.Basic.AllowedOverWireUnencrypted",
+ "Authentication.Digest.AllowedOverWireUnencrypted",
"Authentication.Digest.Enabled",
- "Authentication.Digest.AllowedOverWireUnencrypted",
+ "Authentication.Kerberos.AllowedOverWireUnencrypted",
"Authentication.Kerberos.Enabled",
- "Authentication.Kerberos.AllowedOverWireUnencrypted",
"Authentication.Wiki.Enabled",
+ "DataRoot",
+ "DefaultLogLevel",
+ "DirectoryAddressBook.params.queryPeopleRecords",
+ "DirectoryAddressBook.params.queryUserRecords",
+ "EnableCalDAV",
+ "EnableCardDAV",
+ "EnableSearchAddressBook",
+ "EnableSSL",
+ "HTTPPort",
+ "LogLevels",
+ "Notifications.Services.APNS.CalDAV.AuthorityChainPath",
+ "Notifications.Services.APNS.CalDAV.CertificatePath",
+ "Notifications.Services.APNS.CalDAV.PrivateKeyPath",
+ "Notifications.Services.APNS.CardDAV.AuthorityChainPath",
+ "Notifications.Services.APNS.CardDAV.CertificatePath",
+ "Notifications.Services.APNS.CardDAV.PrivateKeyPath",
+ "Notifications.Services.APNS.Enabled",
+ "RedirectHTTPToHTTPS",
"Scheduling.iMIP.Enabled",
- "Scheduling.iMIP.Receiving.Username",
+ "Scheduling.iMIP.Receiving.Port",
"Scheduling.iMIP.Receiving.Server",
- "Scheduling.iMIP.Receiving.Port",
"Scheduling.iMIP.Receiving.Type",
+ "Scheduling.iMIP.Receiving.Username",
"Scheduling.iMIP.Receiving.UseSSL",
+ "Scheduling.iMIP.Sending.Address",
+ "Scheduling.iMIP.Sending.Port",
+ "Scheduling.iMIP.Sending.Server",
"Scheduling.iMIP.Sending.Username",
- "Scheduling.iMIP.Sending.Server",
- "Scheduling.iMIP.Sending.Port",
"Scheduling.iMIP.Sending.UseSSL",
- "Scheduling.iMIP.Sending.Address",
- "Notifications.Services.APNS.Enabled",
- "Notifications.Services.APNS.CalDAV.CertificatePath",
- "Notifications.Services.APNS.CalDAV.AuthorityChainPath",
- "Notifications.Services.APNS.CalDAV.PrivateKeyPath",
- "Notifications.Services.APNS.CardDAV.CertificatePath",
- "Notifications.Services.APNS.CardDAV.AuthorityChainPath",
- "Notifications.Services.APNS.CardDAV.PrivateKeyPath",
+ "ServerHostName",
+ "SSLAuthorityChain",
+ "SSLCertificate",
+ "SSLPort",
+ "SSLPrivateKey",
]
def usage(e=None):
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/principals.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/principals.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/principals.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -31,7 +31,7 @@
from twistedcaldav.config import config
from twistedcaldav.directory.directory import UnknownRecordTypeError, DirectoryError
-from twistedcaldav.directory.directory import scheduleNextGroupCachingUpdate
+from twistedcaldav.directory.directory import schedulePolledGroupCachingUpdate
from calendarserver.tools.util import (
booleanArgument, proxySubprincipal, action_addProxyPrincipal,
@@ -510,7 +510,8 @@
membersProperty = davxml.GroupMemberSet(*memberURLs)
yield subPrincipal.writeProperty(membersProperty, None)
if store is not None:
- yield scheduleNextGroupCachingUpdate(store, 0)
+ # Schedule work the PeerConnectionPool will pick up as overdue
+ yield schedulePolledGroupCachingUpdate(store)
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/gateway/caldavd.plist
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/gateway/caldavd.plist 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/gateway/caldavd.plist 2013-04-26 16:45:09 UTC (rev 11102)
@@ -718,6 +718,10 @@
<integer>30</integer> <!-- in minutes -->
+ <!-- For unit tests, enable SharedConnectionPool so we don't use up shared memory -->
+ <key>SharedConnectionPool</key>
+ <true/>
+
<!--
Twisted
-->
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/principals/caldavd.plist
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/principals/caldavd.plist 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/principals/caldavd.plist 2013-04-26 16:45:09 UTC (rev 11102)
@@ -743,6 +743,9 @@
<key>ResponseCacheTimeout</key>
<integer>30</integer> <!-- in minutes -->
+ <!-- For unit tests, enable SharedConnectionPool so we don't use up shared memory -->
+ <key>SharedConnectionPool</key>
+ <true/>
<!--
Twisted
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/test_config.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/test_config.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/test/test_config.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -105,6 +105,8 @@
self.assertEquals(results["result"]["EnableCalDAV"], True)
self.assertEquals(results["result"]["EnableCardDAV"], True)
self.assertEquals(results["result"]["EnableSSL"], False)
+ self.assertEquals(results["result"]["DefaultLogLevel"], "warn")
+
self.assertEquals(results["result"]["Notifications"]["Services"]["APNS"]["Enabled"], False)
self.assertEquals(results["result"]["Notifications"]["Services"]["APNS"]["CalDAV"]["CertificatePath"], "/example/calendar.cer")
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/util.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/util.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -50,7 +50,7 @@
from twistedcaldav.directory import calendaruserproxy
from twistedcaldav.directory.aggregate import AggregateDirectoryService
from twistedcaldav.directory.directory import DirectoryService, DirectoryRecord
-from twistedcaldav.directory.directory import scheduleNextGroupCachingUpdate
+from twistedcaldav.directory.directory import schedulePolledGroupCachingUpdate
from calendarserver.push.notifier import NotifierFactory
from txdav.common.datastore.file import CommonDataStore
@@ -462,7 +462,8 @@
(yield action_removeProxyPrincipal(rootResource, directory, store,
principal, proxyPrincipal, proxyTypes=proxyTypes))
- yield scheduleNextGroupCachingUpdate(store, 0)
+ # Schedule work the PeerConnectionPool will pick up as overdue
+ yield schedulePolledGroupCachingUpdate(store)
@@ -495,7 +496,8 @@
(yield subPrincipal.writeProperty(membersProperty, None))
if removed:
- yield scheduleNextGroupCachingUpdate(store, 0)
+ # Schedule work the PeerConnectionPool will pick up as overdue
+ yield schedulePolledGroupCachingUpdate(store)
returnValue(removed)
Copied: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/workitems.py (from rev 11101, CalendarServer/trunk/calendarserver/tools/workitems.py)
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/workitems.py (rev 0)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/tools/workitems.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -0,0 +1,188 @@
+#!/usr/bin/env python
+
+##
+# Copyright (c) 2006-2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+from __future__ import print_function
+
+from getopt import getopt, GetoptError
+import os
+import sys
+import curses
+import datetime
+
+from twisted.internet.defer import inlineCallbacks
+from calendarserver.tools.cmdline import utilityMain
+from twisted.application.service import Service
+from calendarserver.push.notifier import PushNotificationWork
+from twistedcaldav.directory.directory import GroupCacherPollingWork
+from twistedcaldav.scheduling.imip.inbound import IMIPPollingWork, IMIPReplyWork
+
+def usage(e=None):
+
+ name = os.path.basename(sys.argv[0])
+ print("usage: %s [options]" % (name,))
+ print("")
+ print(" TODO: describe usage")
+ print("")
+ print("options:")
+ print(" -h --help: print this help and exit")
+ print(" -e --error: send stderr to stdout")
+ print(" -f --config <path>: Specify caldavd.plist configuration path")
+ print("")
+
+ if e:
+ sys.exit(64)
+ else:
+ sys.exit(0)
+
+
+
+def main():
+
+ try:
+ (optargs, _ignore_args) = getopt(
+ sys.argv[1:], "hef:", [
+ "help",
+ "error",
+ "config=",
+ ],
+ )
+ except GetoptError, e:
+ usage(e)
+
+ #
+ # Get configuration
+ #
+ configFileName = None
+ debug = False
+
+ for opt, arg in optargs:
+ if opt in ("-h", "--help"):
+ usage()
+
+ if opt in ("-e", "--error"):
+ debug = True
+
+ elif opt in ("-f", "--config"):
+ configFileName = arg
+
+ else:
+ raise NotImplementedError(opt)
+
+ utilityMain(configFileName, WorkItemMonitorService, verbose=debug)
+
+class WorkItemMonitorService(Service):
+
+ def __init__(self, store):
+ self.store = store
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+
+ def startService(self):
+ self.screen = curses.initscr()
+ self.windows = []
+ self.updateScreenGeometry()
+ self.reactor.callLater(0, self.updateDisplay)
+
+ def updateScreenGeometry(self):
+ for win in self.windows:
+ del win
+ winY, winX = self.screen.getmaxyx()
+ seencolumns = [1]
+ seenrows = [1]
+ heightSoFar = 0
+ begin_x = 0
+ begin_y = 0
+ # Specify height and width of each window as one of:
+ # absolute value (int), e.g.: 42
+ # percentage of window height / width (string), e.g.: "42%"
+ # Specify row and column for each window as though it is a cell in an invisible html table
+ # Itemize windows in ascending order by row, col
+ for title, height, width, row, col, workItemClass, fmt, attrs in (
+ ("Group Membership Indexing", "10%", "50%", 1, 1, GroupCacherPollingWork, "", ()),
+ ("IMIP Reply Polling", "10%", "50%", 1, 2, IMIPPollingWork, "", ()),
+ ("IMIP Reply Processing", "20%", "100%", 2, 1, IMIPReplyWork, "%s %s", ("organizer", "attendee")),
+ ("Push Notifications", "69%", "100%", 3, 1, PushNotificationWork, "%s", ("pushID",)),
+ ):
+ if (isinstance(height, basestring)):
+ height = max(int(winY * (float(height.strip("%")) / 100.0)), 3)
+ if (isinstance(width, basestring)):
+ width = max(int(winX * (float(width.strip("%")) / 100.0)), 10)
+ if col not in seencolumns:
+ heightSoFar = max(height, heightSoFar)
+ seencolumns.append(col)
+ if row not in seenrows:
+ begin_y = heightSoFar
+ heightSoFar += height
+ begin_x = 0
+ seenrows.append(row)
+ seencolumns = [col]
+ window = WorkWindow(height, width, begin_y, begin_x,
+ self.store, title, workItemClass, fmt, attrs)
+ self.windows.append(window)
+ begin_x += width
+
+
+ @inlineCallbacks
+ def updateDisplay(self):
+ for window in self.windows:
+ yield window.update()
+
+ self.reactor.callLater(1, self.updateDisplay)
+
+class WorkWindow(object):
+ def __init__(self, nlines, ncols, begin_y, begin_x,
+ store, title, workItemClass, fmt, attrs):
+ self.window = curses.newwin(nlines, ncols, begin_y, begin_x)
+ self.ncols = ncols
+ self.store = store
+ self.title = title
+ self.workItemClass = workItemClass
+ self.fmt = fmt
+ self.attrs = attrs
+
+ @inlineCallbacks
+ def update(self):
+ self.window.erase()
+ self.window.border()
+ self.window.addstr(0, 2, self.title)
+ txn = self.store.newTransaction()
+ records = (yield self.workItemClass.all(txn))
+
+ x = 1
+ y = 1
+ for record in records:
+ seconds = record.notBefore - datetime.datetime.utcnow()
+ try:
+ self.window.addstr(y, x, "%d seconds" % int(seconds.total_seconds()))
+ except curses.error:
+ continue
+ y += 1
+ if self.attrs:
+ try:
+ s = self.fmt % tuple([getattr(record, str(a)) for a in self.attrs])
+ except Exception, e:
+ s = "Error: %s" % (str(e),)
+ try:
+ self.window.addnstr(y, x, s, self.ncols-2)
+ except curses.error:
+ pass
+ y += 1
+ self.window.refresh()
+
+if __name__ == "__main__":
+ main()
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/webcal/resource.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/webcal/resource.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/calendarserver/webcal/resource.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -91,11 +91,11 @@
def htmlContent(self, debug=False):
if debug:
cacheAttr = "_htmlContentDebug"
- templateFileName = "debug_template.html"
+ templateFileName = "debug_standalone.html"
else:
cacheAttr = "_htmlContent"
- templateFileName = "template.html"
- templateFileName = os.path.join(config.WebCalendarRoot, "calendar", templateFileName)
+ templateFileName = "standalone.html"
+ templateFileName = os.path.join(config.WebCalendarRoot, templateFileName)
#
# See if the file changed, and dump the cached template if so.
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/conf/caldavd-apple.plist
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/conf/caldavd-apple.plist 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/conf/caldavd-apple.plist 2013-04-26 16:45:09 UTC (rev 11102)
@@ -112,6 +112,8 @@
<string>-c deadlock_timeout=10</string>
<string>-c log_line_prefix='%m [%p] '</string>
</array>
+ <key>ExtraConnections</key>
+ <integer>20</integer>
</dict>
<!-- Data root -->
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/dal/syntax.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/dal/syntax.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/dal/syntax.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -147,7 +147,8 @@
_paramstyles = {
'pyformat': partial(FixedPlaceholder, "%s"),
- 'numeric': NumericPlaceholder
+ 'numeric': NumericPlaceholder,
+ 'qmark': defaultPlaceholder,
}
@@ -1673,6 +1674,12 @@
def _toSQL(self, queryGenerator):
+ if queryGenerator.dialect == SQLITE_DIALECT:
+ # FIXME - this is only stubbed out for testing right now, actual
+ # concurrency would require some kind of locking statement here.
+ # BEGIN IMMEDIATE maybe, if that's okay in the middle of a
+ # transaction or repeatedly?
+ return SQLFragment('select null')
return SQLFragment('lock table ').append(
self.table.subSQL(queryGenerator, [self.table])).append(
SQLFragment(' in %s mode' % (self.mode,)))
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/fixtures.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/fixtures.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/fixtures.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -20,10 +20,24 @@
"""
import sqlite3
+from Queue import Empty
+from itertools import count
+from zope.interface import implementer
+from zope.interface.verify import verifyClass
+
+from twisted.internet.interfaces import IReactorThreads
+from twisted.python.threadpool import ThreadPool
+
+from twisted.internet.task import Clock
+
from twext.enterprise.adbapi2 import ConnectionPool
from twext.enterprise.ienterprise import SQLITE_DIALECT
+from twext.enterprise.ienterprise import POSTGRES_DIALECT
+from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
+from twext.internet.threadutils import ThreadHolder
+
def buildConnectionPool(testCase, schemaText="", dialect=SQLITE_DIALECT):
"""
Build a L{ConnectionPool} for testing purposes, with the given C{testCase}.
@@ -57,3 +71,513 @@
pool.startService()
testCase.addCleanup(pool.stopService)
return pool
+
+
+
+def resultOf(deferred, propagate=False):
+ """
+ Add a callback and errback which will capture the result of a L{Deferred} in
+ a list, and return that list. If 'propagate' is True, pass through the
+ results.
+ """
+ results = []
+ if propagate:
+ def cb(r):
+ results.append(r)
+ return r
+ else:
+ cb = results.append
+ deferred.addBoth(cb)
+ return results
+
+
+
+class FakeThreadHolder(ThreadHolder):
+ """
+ Run things to submitted this ThreadHolder on the main thread, so that
+ execution is easier to control.
+ """
+
+ def __init__(self, test):
+ super(FakeThreadHolder, self).__init__(self)
+ self.test = test
+ self.started = False
+ self.stopped = False
+ self._workerIsRunning = False
+
+
+ def start(self):
+ self.started = True
+ return super(FakeThreadHolder, self).start()
+
+
+ def stop(self):
+ result = super(FakeThreadHolder, self).stop()
+ self.stopped = True
+ return result
+
+
+ @property
+ def _get_q(self):
+ return self._q_
+
+
+ @_get_q.setter
+ def _q(self, newq):
+ if newq is not None:
+ oget = newq.get
+ newq.get = lambda: oget(timeout=0)
+ oput = newq.put
+ def putit(x):
+ p = oput(x)
+ if not self.test.paused:
+ self.flush()
+ return p
+ newq.put = putit
+ self._q_ = newq
+
+
+ def callFromThread(self, f, *a, **k):
+ result = f(*a, **k)
+ return result
+
+
+ def callInThread(self, f, *a, **k):
+ """
+ This should be called only once, to start the worker function that
+ dedicates a thread to this L{ThreadHolder}.
+ """
+ self._workerIsRunning = True
+
+
+ def flush(self):
+ """
+ Fire all deferreds previously returned from submit.
+ """
+ try:
+ while self._workerIsRunning and self._qpull():
+ pass
+ else:
+ self._workerIsRunning = False
+ except Empty:
+ pass
+
+
+
+ at implementer(IReactorThreads)
+class ClockWithThreads(Clock):
+ """
+ A testing reactor that supplies L{IReactorTime} and L{IReactorThreads}.
+ """
+
+ def __init__(self):
+ super(ClockWithThreads, self).__init__()
+ self._pool = ThreadPool()
+
+
+ def getThreadPool(self):
+ """
+ Get the threadpool.
+ """
+ return self._pool
+
+
+ def suggestThreadPoolSize(self, size):
+ """
+ Approximate the behavior of a 'real' reactor.
+ """
+ self._pool.adjustPoolsize(maxthreads=size)
+
+
+ def callInThread(self, thunk, *a, **kw):
+ """
+ No implementation.
+ """
+
+
+ def callFromThread(self, thunk, *a, **kw):
+ """
+ No implementation.
+ """
+
+verifyClass(IReactorThreads, ClockWithThreads)
+
+
+
+class ConnectionPoolHelper(object):
+ """
+ Connection pool setting-up facilities for tests that need a
+ L{ConnectionPool}.
+ """
+
+ dialect = POSTGRES_DIALECT
+ paramstyle = DEFAULT_PARAM_STYLE
+
+ def setUp(self, test=None, connect=None):
+ """
+ Support inheritance by L{TestCase} classes.
+ """
+ if test is None:
+ test = self
+ if connect is None:
+ self.factory = ConnectionFactory()
+ connect = self.factory.connect
+ self.connect = connect
+ self.paused = False
+ self.holders = []
+ self.pool = ConnectionPool(connect,
+ maxConnections=2,
+ dialect=self.dialect,
+ paramstyle=self.paramstyle)
+ self.pool._createHolder = self.makeAHolder
+ self.clock = self.pool.reactor = ClockWithThreads()
+ self.pool.startService()
+ test.addCleanup(self.flushHolders)
+
+
+ def flushHolders(self):
+ """
+ Flush all pending C{submit}s since C{pauseHolders} was called. This
+ makes sure the service is stopped and the fake ThreadHolders are all
+ executing their queues so failed tsets can exit cleanly.
+ """
+ self.paused = False
+ for holder in self.holders:
+ holder.flush()
+
+
+ def pauseHolders(self):
+ """
+ Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
+ L{Deferred}.
+ """
+ self.paused = True
+
+
+ def makeAHolder(self):
+ """
+ Make a ThreadHolder-alike.
+ """
+ fth = FakeThreadHolder(self)
+ self.holders.append(fth)
+ return fth
+
+
+ def resultOf(self, it):
+ return resultOf(it)
+
+
+ def createTransaction(self):
+ return self.pool.connection()
+
+
+ def translateError(self, err):
+ return err
+
+
+
+class SteppablePoolHelper(ConnectionPoolHelper):
+ """
+ A version of L{ConnectionPoolHelper} that can set up a connection pool
+ capable of firing all its L{Deferred}s on demand, synchronously, by using
+ SQLite.
+ """
+ dialect = SQLITE_DIALECT
+ paramstyle = sqlite3.paramstyle
+
+ def __init__(self, schema):
+ self.schema = schema
+
+
+ def setUp(self, test):
+ connect = synchronousConnectionFactory(test)
+ con = connect()
+ cur = con.cursor()
+ cur.executescript(self.schema)
+ con.commit()
+ super(SteppablePoolHelper, self).setUp(test, connect)
+
+
+ def rows(self, sql):
+ """
+ Get some rows from the database to compare in a test.
+ """
+ con = self.connect()
+ cur = con.cursor()
+ cur.execute(sql)
+ result = cur.fetchall()
+ con.commit()
+ return result
+
+
+
+def synchronousConnectionFactory(test):
+ tmpdb = test.mktemp()
+ def connect():
+ return sqlite3.connect(tmpdb)
+ return connect
+
+
+
+class Child(object):
+ """
+ An object with a L{Parent}, in its list of C{children}.
+ """
+ def __init__(self, parent):
+ self.closed = False
+ self.parent = parent
+ self.parent.children.append(self)
+
+
+ def close(self):
+ if self.parent._closeFailQueue:
+ raise self.parent._closeFailQueue.pop(0)
+ self.closed = True
+
+
+
+class Parent(object):
+ """
+ An object with a list of L{Child}ren.
+ """
+
+ def __init__(self):
+ self.children = []
+ self._closeFailQueue = []
+
+
+ def childCloseWillFail(self, exception):
+ """
+ Closing children of this object will result in the given exception.
+
+ @see: L{ConnectionFactory}
+ """
+ self._closeFailQueue.append(exception)
+
+
+
+class FakeConnection(Parent, Child):
+ """
+ Fake Stand-in for DB-API 2.0 connection.
+
+ @ivar executions: the number of statements which have been executed.
+
+ """
+
+ executions = 0
+
+ def __init__(self, factory):
+ """
+ Initialize list of cursors
+ """
+ Parent.__init__(self)
+ Child.__init__(self, factory)
+ self.id = factory.idcounter.next()
+ self._executeFailQueue = []
+ self._commitCount = 0
+ self._rollbackCount = 0
+
+
+ def executeWillFail(self, thunk):
+ """
+ The next call to L{FakeCursor.execute} will fail with an exception
+ returned from the given callable.
+ """
+ self._executeFailQueue.append(thunk)
+
+
+ @property
+ def cursors(self):
+ "Alias to make tests more readable."
+ return self.children
+
+
+ def cursor(self):
+ return FakeCursor(self)
+
+
+ def commit(self):
+ self._commitCount += 1
+ if self.parent.commitFail:
+ self.parent.commitFail = False
+ raise CommitFail()
+
+
+ def rollback(self):
+ self._rollbackCount += 1
+ if self.parent.rollbackFail:
+ self.parent.rollbackFail = False
+ raise RollbackFail()
+
+
+
+class RollbackFail(Exception):
+ """
+ Sample rollback-failure exception.
+ """
+
+
+
+class CommitFail(Exception):
+ """
+ Sample Commit-failure exception.
+ """
+
+
+
+class FakeCursor(Child):
+ """
+ Fake stand-in for a DB-API 2.0 cursor.
+ """
+ def __init__(self, connection):
+ Child.__init__(self, connection)
+ self.rowcount = 0
+ # not entirely correct, but all we care about is its truth value.
+ self.description = False
+ self.variables = []
+ self.allExecutions = []
+
+
+ @property
+ def connection(self):
+ "Alias to make tests more readable."
+ return self.parent
+
+
+ def execute(self, sql, args=()):
+ self.connection.executions += 1
+ if self.connection._executeFailQueue:
+ raise self.connection._executeFailQueue.pop(0)()
+ self.allExecutions.append((sql, args))
+ self.sql = sql
+ factory = self.connection.parent
+ self.description = factory.hasResults
+ if factory.hasResults and factory.shouldUpdateRowcount:
+ self.rowcount = 1
+ else:
+ self.rowcount = 0
+ return
+
+
+ def var(self, type, *args):
+ """
+ Return a database variable in the style of the cx_Oracle bindings.
+ """
+ v = FakeVariable(self, type, args)
+ self.variables.append(v)
+ return v
+
+
+ def fetchall(self):
+ """
+ Just echo the SQL that was executed in the last query.
+ """
+ if self.connection.parent.hasResults:
+ return [[self.connection.id, self.sql]]
+ if self.description:
+ return []
+ return None
+
+
+
+class FakeVariable(object):
+ def __init__(self, cursor, type, args):
+ self.cursor = cursor
+ self.type = type
+ self.args = args
+
+
+ def getvalue(self):
+ vv = self.cursor.connection.parent.varvals
+ if vv:
+ return vv.pop(0)
+ return self.cursor.variables.index(self) + 300
+
+
+ def __reduce__(self):
+ raise RuntimeError("Not pickleable (since oracle vars aren't)")
+
+
+
+class ConnectionFactory(Parent):
+ """
+ A factory for L{FakeConnection} objects.
+
+ @ivar shouldUpdateRowcount: Should C{execute} on cursors produced by
+ connections produced by this factory update their C{rowcount} or just
+ their C{description} attribute?
+
+ @ivar hasResults: should cursors produced by connections by this factory
+ have any results returned by C{fetchall()}?
+ """
+
+ rollbackFail = False
+ commitFail = False
+
+ def __init__(self, shouldUpdateRowcount=True, hasResults=True):
+ Parent.__init__(self)
+ self.idcounter = count(1)
+ self._connectResultQueue = []
+ self.defaultConnect()
+ self.varvals = []
+ self.shouldUpdateRowcount = shouldUpdateRowcount
+ self.hasResults = hasResults
+
+
+ @property
+ def connections(self):
+ "Alias to make tests more readable."
+ return self.children
+
+
+ def connect(self):
+ """
+ Implement the C{ConnectionFactory} callable expected by
+ L{ConnectionPool}.
+ """
+ if self._connectResultQueue:
+ thunk = self._connectResultQueue.pop(0)
+ else:
+ thunk = self._default
+ return thunk()
+
+
+ def willConnect(self):
+ """
+ Used by tests to queue a successful result for connect().
+ """
+ def thunk():
+ return FakeConnection(self)
+ self._connectResultQueue.append(thunk)
+
+
+ def willFail(self):
+ """
+ Used by tests to queue a successful result for connect().
+ """
+ def thunk():
+ raise FakeConnectionError()
+ self._connectResultQueue.append(thunk)
+
+
+ def defaultConnect(self):
+ """
+ By default, connection attempts will succeed.
+ """
+ self.willConnect()
+ self._default = self._connectResultQueue.pop()
+
+
+ def defaultFail(self):
+ """
+ By default, connection attempts will fail.
+ """
+ self.willFail()
+ self._default = self._connectResultQueue.pop()
+
+
+
+class FakeConnectionError(Exception):
+ """
+ Synthetic error that might occur during connection.
+ """
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/queue.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/queue.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -1342,6 +1342,7 @@
def done(result):
self._startingUp = None
super(PeerConnectionPool, self).startService()
+ self._lostWorkCheckLoop()
return result
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_adbapi2.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_adbapi2.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -18,22 +18,14 @@
Tests for L{twext.enterprise.adbapi2}.
"""
-from itertools import count
-from Queue import Empty
+from zope.interface.verify import verifyObject
-from zope.interface.verify import verifyClass, verifyObject
-from zope.interface.declarations import implements
-
-from twisted.python.threadpool import ThreadPool
from twisted.python.failure import Failure
from twisted.trial.unittest import TestCase
-from twisted.internet.task import Clock
from twisted.internet.defer import Deferred, fail
-from twisted.internet.interfaces import IReactorThreads
-
from twisted.test.proto_helpers import StringTransport
from twext.enterprise.ienterprise import ConnectionError
@@ -41,33 +33,17 @@
from twext.enterprise.adbapi2 import ConnectionPoolClient
from twext.enterprise.adbapi2 import ConnectionPoolConnection
from twext.enterprise.ienterprise import IAsyncTransaction
-from twext.enterprise.ienterprise import POSTGRES_DIALECT
from twext.enterprise.ienterprise import ICommandBlock
from twext.enterprise.adbapi2 import FailsafeException
-from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
from twext.enterprise.adbapi2 import ConnectionPool
-from twext.internet.threadutils import ThreadHolder
+from twext.enterprise.fixtures import ConnectionPoolHelper
+from twext.enterprise.fixtures import resultOf
+from twext.enterprise.fixtures import ClockWithThreads
+from twext.enterprise.fixtures import FakeConnectionError
+from twext.enterprise.fixtures import RollbackFail
+from twext.enterprise.fixtures import CommitFail
from twext.enterprise.adbapi2 import Commit
-
-def resultOf(deferred, propagate=False):
- """
- Add a callback and errback which will capture the result of a L{Deferred} in
- a list, and return that list. If 'propagate' is True, pass through the
- results.
- """
- results = []
- if propagate:
- def cb(r):
- results.append(r)
- return r
- else:
- cb = results.append
- deferred.addBoth(cb)
- return results
-
-
-
class AssertResultHelper(object):
"""
Mixin for asserting about synchronous Deferred results.
@@ -97,384 +73,6 @@
-class Child(object):
- """
- An object with a L{Parent}, in its list of C{children}.
- """
- def __init__(self, parent):
- self.closed = False
- self.parent = parent
- self.parent.children.append(self)
-
-
- def close(self):
- if self.parent._closeFailQueue:
- raise self.parent._closeFailQueue.pop(0)
- self.closed = True
-
-
-
-class Parent(object):
- """
- An object with a list of L{Child}ren.
- """
-
- def __init__(self):
- self.children = []
- self._closeFailQueue = []
-
-
- def childCloseWillFail(self, exception):
- """
- Closing children of this object will result in the given exception.
-
- @see: L{ConnectionFactory}
- """
- self._closeFailQueue.append(exception)
-
-
-
-class FakeConnection(Parent, Child):
- """
- Fake Stand-in for DB-API 2.0 connection.
-
- @ivar executions: the number of statements which have been executed.
-
- """
-
- executions = 0
-
- def __init__(self, factory):
- """
- Initialize list of cursors
- """
- Parent.__init__(self)
- Child.__init__(self, factory)
- self.id = factory.idcounter.next()
- self._executeFailQueue = []
- self._commitCount = 0
- self._rollbackCount = 0
-
-
- def executeWillFail(self, thunk):
- """
- The next call to L{FakeCursor.execute} will fail with an exception
- returned from the given callable.
- """
- self._executeFailQueue.append(thunk)
-
-
- @property
- def cursors(self):
- "Alias to make tests more readable."
- return self.children
-
-
- def cursor(self):
- return FakeCursor(self)
-
-
- def commit(self):
- self._commitCount += 1
- if self.parent.commitFail:
- self.parent.commitFail = False
- raise CommitFail()
-
-
- def rollback(self):
- self._rollbackCount += 1
- if self.parent.rollbackFail:
- self.parent.rollbackFail = False
- raise RollbackFail()
-
-
-
-class RollbackFail(Exception):
- """
- Sample rollback-failure exception.
- """
-
-
-
-class CommitFail(Exception):
- """
- Sample Commit-failure exception.
- """
-
-
-
-class FakeCursor(Child):
- """
- Fake stand-in for a DB-API 2.0 cursor.
- """
- def __init__(self, connection):
- Child.__init__(self, connection)
- self.rowcount = 0
- # not entirely correct, but all we care about is its truth value.
- self.description = False
- self.variables = []
- self.allExecutions = []
-
-
- @property
- def connection(self):
- "Alias to make tests more readable."
- return self.parent
-
-
- def execute(self, sql, args=()):
- self.connection.executions += 1
- if self.connection._executeFailQueue:
- raise self.connection._executeFailQueue.pop(0)()
- self.allExecutions.append((sql, args))
- self.sql = sql
- factory = self.connection.parent
- self.description = factory.hasResults
- if factory.hasResults and factory.shouldUpdateRowcount:
- self.rowcount = 1
- else:
- self.rowcount = 0
- return
-
-
- def var(self, type, *args):
- """
- Return a database variable in the style of the cx_Oracle bindings.
- """
- v = FakeVariable(self, type, args)
- self.variables.append(v)
- return v
-
-
- def fetchall(self):
- """
- Just echo the SQL that was executed in the last query.
- """
- if self.connection.parent.hasResults:
- return [[self.connection.id, self.sql]]
- if self.description:
- return []
- return None
-
-
-
-class FakeVariable(object):
- def __init__(self, cursor, type, args):
- self.cursor = cursor
- self.type = type
- self.args = args
-
-
- def getvalue(self):
- vv = self.cursor.connection.parent.varvals
- if vv:
- return vv.pop(0)
- return self.cursor.variables.index(self) + 300
-
-
- def __reduce__(self):
- raise RuntimeError("Not pickleable (since oracle vars aren't)")
-
-
-
-class ConnectionFactory(Parent):
- """
- A factory for L{FakeConnection} objects.
-
- @ivar shouldUpdateRowcount: Should C{execute} on cursors produced by
- connections produced by this factory update their C{rowcount} or just
- their C{description} attribute?
-
- @ivar hasResults: should cursors produced by connections by this factory
- have any results returned by C{fetchall()}?
- """
-
- rollbackFail = False
- commitFail = False
-
- def __init__(self, shouldUpdateRowcount=True, hasResults=True):
- Parent.__init__(self)
- self.idcounter = count(1)
- self._connectResultQueue = []
- self.defaultConnect()
- self.varvals = []
- self.shouldUpdateRowcount = shouldUpdateRowcount
- self.hasResults = hasResults
-
-
- @property
- def connections(self):
- "Alias to make tests more readable."
- return self.children
-
-
- def connect(self):
- """
- Implement the C{ConnectionFactory} callable expected by
- L{ConnectionPool}.
- """
- if self._connectResultQueue:
- thunk = self._connectResultQueue.pop(0)
- else:
- thunk = self._default
- return thunk()
-
-
- def willConnect(self):
- """
- Used by tests to queue a successful result for connect().
- """
- def thunk():
- return FakeConnection(self)
- self._connectResultQueue.append(thunk)
-
-
- def willFail(self):
- """
- Used by tests to queue a successful result for connect().
- """
- def thunk():
- raise FakeConnectionError()
- self._connectResultQueue.append(thunk)
-
-
- def defaultConnect(self):
- """
- By default, connection attempts will succeed.
- """
- self.willConnect()
- self._default = self._connectResultQueue.pop()
-
-
- def defaultFail(self):
- """
- By default, connection attempts will fail.
- """
- self.willFail()
- self._default = self._connectResultQueue.pop()
-
-
-
-class FakeConnectionError(Exception):
- """
- Synthetic error that might occur during connection.
- """
-
-
-
-class FakeThreadHolder(ThreadHolder):
- """
- Run things to submitted this ThreadHolder on the main thread, so that
- execution is easier to control.
- """
-
- def __init__(self, test):
- super(FakeThreadHolder, self).__init__(self)
- self.test = test
- self.started = False
- self.stopped = False
- self._workerIsRunning = False
-
-
- def start(self):
- self.started = True
- return super(FakeThreadHolder, self).start()
-
-
- def stop(self):
- result = super(FakeThreadHolder, self).stop()
- self.stopped = True
- return result
-
-
- @property
- def _get_q(self):
- return self._q_
-
-
- @_get_q.setter
- def _q(self, newq):
- if newq is not None:
- oget = newq.get
- newq.get = lambda: oget(timeout=0)
- oput = newq.put
- def putit(x):
- p = oput(x)
- if not self.test.paused:
- self.flush()
- return p
- newq.put = putit
- self._q_ = newq
-
-
- def callFromThread(self, f, *a, **k):
- result = f(*a, **k)
- return result
-
-
- def callInThread(self, f, *a, **k):
- """
- This should be called only once, to start the worker function that
- dedicates a thread to this L{ThreadHolder}.
- """
- self._workerIsRunning = True
-
-
- def flush(self):
- """
- Fire all deferreds previously returned from submit.
- """
- try:
- while self._workerIsRunning and self._qpull():
- pass
- else:
- self._workerIsRunning = False
- except Empty:
- pass
-
-
-
-class ClockWithThreads(Clock):
- """
- A testing reactor that supplies L{IReactorTime} and L{IReactorThreads}.
- """
- implements(IReactorThreads)
-
- def __init__(self):
- super(ClockWithThreads, self).__init__()
- self._pool = ThreadPool()
-
-
- def getThreadPool(self):
- """
- Get the threadpool.
- """
- return self._pool
-
-
- def suggestThreadPoolSize(self, size):
- """
- Approximate the behavior of a 'real' reactor.
- """
- self._pool.adjustPoolsize(maxthreads=size)
-
-
- def callInThread(self, thunk, *a, **kw):
- """
- No implementation.
- """
-
-
- def callFromThread(self, thunk, *a, **kw):
- """
- No implementation.
- """
-
-
-verifyClass(IReactorThreads, ClockWithThreads)
-
-
-
class ConnectionPoolBootTests(TestCase):
"""
Tests for the start-up phase of L{ConnectionPool}.
@@ -521,74 +119,6 @@
-class ConnectionPoolHelper(object):
- """
- Connection pool setting-up facilities for tests that need a
- L{ConnectionPool}.
- """
-
- dialect = POSTGRES_DIALECT
- paramstyle = DEFAULT_PARAM_STYLE
-
- def setUp(self):
- """
- Create a L{ConnectionPool} attached to a C{ConnectionFactory}. Start
- the L{ConnectionPool}.
- """
- self.paused = False
- self.holders = []
- self.factory = ConnectionFactory()
- self.pool = ConnectionPool(self.factory.connect,
- maxConnections=2,
- dialect=self.dialect,
- paramstyle=self.paramstyle)
- self.pool._createHolder = self.makeAHolder
- self.clock = self.pool.reactor = ClockWithThreads()
- self.pool.startService()
- self.addCleanup(self.flushHolders)
-
-
- def flushHolders(self):
- """
- Flush all pending C{submit}s since C{pauseHolders} was called. This
- makes sure the service is stopped and the fake ThreadHolders are all
- executing their queues so failed tests can exit cleanly.
- """
- self.paused = False
- for holder in self.holders:
- holder.flush()
-
-
- def pauseHolders(self):
- """
- Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
- L{Deferred}.
- """
- self.paused = True
-
-
- def makeAHolder(self):
- """
- Make a ThreadHolder-alike.
- """
- fth = FakeThreadHolder(self)
- self.holders.append(fth)
- return fth
-
-
- def resultOf(self, it):
- return resultOf(it)
-
-
- def createTransaction(self):
- return self.pool.connection()
-
-
- def translateError(self, err):
- return err
-
-
-
class ConnectionPoolTests(ConnectionPoolHelper, TestCase, AssertResultHelper):
"""
Tests for L{ConnectionPool}.
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_queue.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/enterprise/test/test_queue.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -52,7 +52,8 @@
from twext.enterprise.queue import ConnectionFromPeerNode
from twext.enterprise.fixtures import buildConnectionPool
from zope.interface.verify import verifyObject
-from twisted.test.proto_helpers import StringTransport
+from twisted.test.proto_helpers import StringTransport, MemoryReactor
+from twext.enterprise.fixtures import SteppablePoolHelper
from twext.enterprise.queue import _BaseQueuer, NonPerformingQueuer
import twext.enterprise.queue
@@ -70,6 +71,16 @@
+class MemoryReactorWithClock(MemoryReactor, Clock):
+ """
+ Simulate a real reactor.
+ """
+ def __init__(self):
+ MemoryReactor.__init__(self)
+ Clock.__init__(self)
+
+
+
def transactionally(transactionCreator):
"""
Perform the decorated function immediately in a transaction, replacing its
@@ -213,7 +224,6 @@
-
class WorkItemTests(TestCase):
"""
A L{WorkItem} is an item of work that can be executed.
@@ -467,7 +477,64 @@
self.assertIdentical(result, proposal)
+ def test_workerConnectionPoolPerformWork(self):
+ """
+ L{WorkerConnectionPool.performWork} performs work by selecting a
+ L{ConnectionFromWorker} and sending it a L{PerformWork} command.
+ """
+ clock = Clock()
+ peerPool = PeerConnectionPool(clock, None, 4322, schema)
+ factory = peerPool.workerListenerFactory()
+ def peer():
+ p = factory.buildProtocol(None)
+ t = StringTransport()
+ p.makeConnection(t)
+ return p, t
+ worker1, trans1 = peer()
+ worker2, trans2 = peer()
+ # Ask the worker to do something.
+ worker1.performWork(schema.DUMMY_WORK_ITEM, 1)
+ self.assertEquals(worker1.currentLoad, 1)
+ self.assertEquals(worker2.currentLoad, 0)
+ # Now ask the pool to do something
+ peerPool.workerPool.performWork(schema.DUMMY_WORK_ITEM, 2)
+ self.assertEquals(worker1.currentLoad, 1)
+ self.assertEquals(worker2.currentLoad, 1)
+
+
+ def test_poolStartServiceChecksForWork(self):
+ """
+ L{PeerConnectionPool.startService} kicks off the idle work-check loop.
+ """
+ reactor = MemoryReactorWithClock()
+ cph = SteppablePoolHelper(nodeSchema + schemaText)
+ then = datetime.datetime(2012, 12, 12, 12, 12, 0)
+ reactor.advance(astimestamp(then))
+ cph.setUp(self)
+ pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321, schema)
+ now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
+ @transactionally(cph.pool.connection)
+ def createOldWork(txn):
+ one = DummyWorkItem.create(txn, workID=1, a=3, b=4, notBefore=then)
+ two = DummyWorkItem.create(txn, workID=2, a=7, b=9, notBefore=now)
+ return gatherResults([one, two])
+ pcp.startService()
+ cph.flushHolders()
+ reactor.advance(pcp.queueProcessTimeout * 2)
+ self.assertEquals(
+ cph.rows("select * from DUMMY_WORK_DONE"),
+ [(1, 7)]
+ )
+ cph.rows("delete from DUMMY_WORK_DONE")
+ reactor.advance(pcp.queueProcessTimeout * 2)
+ self.assertEquals(
+ cph.rows("select * from DUMMY_WORK_DONE"),
+ [(2, 16)]
+ )
+
+
+
class HalfConnection(object):
def __init__(self, protocol):
self.protocol = protocol
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twext/python/sendmsg.c
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twext/python/sendmsg.c 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twext/python/sendmsg.c 2013-04-26 16:45:09 UTC (rev 11102)
@@ -117,6 +117,7 @@
struct msghdr message_header;
struct iovec iov[1];
PyObject *ancillary = NULL;
+ PyObject *ultimate_result = NULL;
static char *kwlist[] = {"fd", "data", "flags", "ancillary", NULL};
if (!PyArg_ParseTupleAndKeywords(
@@ -148,14 +149,14 @@
PyErr_Format(PyExc_TypeError,
"sendmsg argument 3 expected list, got %s",
ancillary->ob_type->tp_name);
- return NULL;
+ goto finished;
}
PyObject *iterator = PyObject_GetIter(ancillary);
PyObject *item = NULL;
if (iterator == NULL) {
- return NULL;
+ goto finished;
}
size_t all_data_len = 0;
@@ -172,7 +173,7 @@
&level, &type, &data, &data_len)) {
Py_DECREF(item);
Py_DECREF(iterator);
- return NULL;
+ goto finished;
}
prev_all_data_len = all_data_len;
@@ -185,7 +186,7 @@
PyErr_Format(PyExc_OverflowError,
"Too much msg_control to fit in a size_t: %zu",
prev_all_data_len);
- return NULL;
+ goto finished;
}
}
@@ -199,15 +200,13 @@
PyErr_Format(PyExc_OverflowError,
"Too much msg_control to fit in a socklen_t: %zu",
all_data_len);
- return NULL;
+ goto finished;
}
message_header.msg_control = malloc(all_data_len);
if (!message_header.msg_control) {
PyErr_NoMemory();
- return NULL;
+ goto finished;
}
- } else {
- message_header.msg_control = NULL;
}
message_header.msg_controllen = (socklen_t) all_data_len;
@@ -215,8 +214,7 @@
item = NULL;
if (!iterator) {
- free(message_header.msg_control);
- return NULL;
+ goto finished;
}
/* Unpack the tuples into the control message. */
@@ -239,8 +237,7 @@
&data_len)) {
Py_DECREF(item);
Py_DECREF(iterator);
- free(message_header.msg_control);
- return NULL;
+ goto finished;
}
control_message->cmsg_level = level;
@@ -250,12 +247,9 @@
if (data_size > SOCKLEN_MAX) {
Py_DECREF(item);
Py_DECREF(iterator);
- free(message_header.msg_control);
-
PyErr_Format(PyExc_OverflowError,
"CMSG_LEN(%zd) > SOCKLEN_MAX", data_len);
-
- return NULL;
+ goto finished;
}
control_message->cmsg_len = (socklen_t) data_size;
@@ -271,8 +265,7 @@
Py_DECREF(iterator);
if (PyErr_Occurred()) {
- free(message_header.msg_control);
- return NULL;
+ goto finished;
}
}
@@ -280,13 +273,16 @@
if (sendmsg_result < 0) {
PyErr_SetFromErrno(sendmsg_socket_error);
- if (message_header.msg_control) {
- free(message_header.msg_control);
- }
- return NULL;
+ goto finished;
+ } else {
+ ultimate_result = Py_BuildValue("n", sendmsg_result);
}
- return Py_BuildValue("n", sendmsg_result);
+ finished:
+ if (message_header.msg_control) {
+ free(message_header.msg_control);
+ }
+ return ultimate_result;
}
static PyObject *sendmsg_recvmsg(PyObject *self, PyObject *args, PyObject *keywds) {
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/directory.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/directory.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/directory.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -33,7 +33,7 @@
from twext.enterprise.dal.record import fromTable
from twext.enterprise.dal.syntax import Delete
-from twext.enterprise.queue import WorkItem
+from twext.enterprise.queue import WorkItem, PeerConnectionPool
from twext.python.log import Logger, LoggingMixIn
from twext.web2.dav.auth import IPrincipalCredentials
from twext.web2.dav.util import joinURL
@@ -998,11 +998,22 @@
txn = store.newTransaction()
notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
log.debug("Scheduling next group cacher update: %s" % (notBefore,))
- yield txn.enqueue(GroupCacherPollingWork, notBefore=notBefore)
+ wp = (yield txn.enqueue(GroupCacherPollingWork, notBefore=notBefore))
yield txn.commit()
+ returnValue(wp)
+def schedulePolledGroupCachingUpdate(store):
+ """
+ Schedules a group caching update work item in "the past" so PeerConnectionPool's
+ overdue-item logic picks it up quickly.
+ """
+ seconds = -PeerConnectionPool.queueProcessTimeout
+ return scheduleNextGroupCachingUpdate(store, seconds)
+
+
+
def diffAssignments(old, new):
"""
Compare two proxy assignment lists and return their differences in the form of
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/test/test_directory.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/test/test_directory.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/directory/test/test_directory.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -20,12 +20,13 @@
from twistedcaldav.test.util import TestCase
from twistedcaldav.test.util import xmlFile, augmentsFile, proxiesFile, dirTest
from twistedcaldav.config import config
-from twistedcaldav.directory.directory import DirectoryService, DirectoryRecord, GroupMembershipCache, GroupMembershipCacheUpdater, diffAssignments
+from twistedcaldav.directory.directory import DirectoryService, DirectoryRecord, GroupMembershipCache, GroupMembershipCacheUpdater, diffAssignments, schedulePolledGroupCachingUpdate
from twistedcaldav.directory.xmlfile import XMLDirectoryService
from twistedcaldav.directory.calendaruserproxyloader import XMLCalendarUserProxyLoader
from twistedcaldav.directory import augment, calendaruserproxy
from twistedcaldav.directory.util import normalizeUUID
from twistedcaldav.directory.principal import DirectoryPrincipalProvisioningResource
+from txdav.common.datastore.test.util import buildStore
import cPickle as pickle
import uuid
@@ -738,8 +739,34 @@
])
)
+ @inlineCallbacks
+ def testScheduling(self):
+ """
+ Exercise schedulePolledGroupCachingUpdate
+ """
+ groupCacher = StubGroupCacher()
+ def decorateTransaction(txn):
+ txn._groupCacher = groupCacher
+
+ store = yield buildStore(self, None)
+ store.callWithNewTransactions(decorateTransaction)
+ wp = (yield schedulePolledGroupCachingUpdate(store))
+ yield wp.whenExecuted()
+ self.assertTrue(groupCacher.called)
+
+ testScheduling.skip = "Fix WorkProposal to track delayed calls and cancel them"
+
+class StubGroupCacher(object):
+ def __init__(self):
+ self.called = False
+ self.updateSeconds = 99
+
+ def updateCache(self):
+ self.called = True
+
+
class RecordsMatchingTokensTests(TestCase):
@inlineCallbacks
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/stdconfig.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/stdconfig.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -37,7 +37,9 @@
from twisted.python.runtime import platform
from calendarserver.push.util import getAPNTopicFromCertificate
+from twistedcaldav.util import computeProcessCount
+
log = Logger()
if platform.isMacOSX():
@@ -324,8 +326,8 @@
"ConfigRoot" : "Config",
"LogRoot" : "/var/log/caldavd",
"RunRoot" : "/var/run/caldavd",
- "WebCalendarRoot" : "/Applications/Server.app/Contents/ServerRoot/usr/share/collabd",
-
+ "WebCalendarRoot" : "/Applications/Server.app/Contents/ServerRoot/usr/share/collabd/webcal/public",
+
#
# Quotas
#
@@ -430,7 +432,7 @@
},
"Wiki": {
"Enabled": False,
- "Cookie": "apple_webauth_token",
+ "Cookie": "cc.collabd_session_guid",
"URL": "http://127.0.0.1:8089/RPC2",
"UserMethod": "userForSession",
"WikiMethod": "accessLevelForUserWikiCalendar",
@@ -907,8 +909,14 @@
"DatabaseName": "caldav",
"LogFile": "postgres.log",
"ListenAddresses": [],
- "SharedBuffers": 30,
- "MaxConnections": 20,
+ "SharedBuffers": 0, # BuffersToConnectionsRatio * MaxConnections
+ # Note: don't set this, it will be computed dynamically
+ # See _updateMultiProcess( ) below for details
+ "MaxConnections": 0, # Dynamically computed based on ProcessCount, etc.
+ # Note: don't set this, it will be computed dynamically
+ # See _updateMultiProcess( ) below for details
+ "ExtraConnections": 3, # how many extra connections to leave for utilities
+ "BuffersToConnectionsRatio": 1.5,
"Options": [
"-c standard_conforming_strings=on",
],
@@ -1079,9 +1087,11 @@
# Remove possible trailing slash from ServerRoot
try:
configDict["ServerRoot"] = configDict["ServerRoot"].rstrip("/")
+ configDict["ServerRoot"] = os.path.abspath(configDict["ServerRoot"])
except KeyError:
pass
+
for root, relativePath in RELATIVE_PATHS:
if root in configDict:
if isinstance(relativePath, str):
@@ -1128,7 +1138,38 @@
configDict.ServerHostName = hostname
+def _updateMultiProcess(configDict, reloading=False):
+ """
+ Dynamically compute ProcessCount if it's set to 0. Always compute
+ MaxConnections and SharedBuffers based on ProcessCount, ExtraConnections,
+ SharedConnectionPool, MaxDBConnectionsPerPool, and BuffersToConnectionsRatio
+ """
+ if configDict.MultiProcess.ProcessCount == 0:
+ processCount = computeProcessCount(
+ configDict.MultiProcess.MinProcessCount,
+ configDict.MultiProcess.PerCPU,
+ configDict.MultiProcess.PerGB,
+ )
+ configDict.MultiProcess.ProcessCount = processCount
+ # Start off with extra connections to be used by command line utilities and
+ # administration/inspection tools
+ maxConnections = configDict.Postgres.ExtraConnections
+
+ if configDict.SharedConnectionPool:
+ # If SharedConnectionPool is enabled, then only the master process will
+ # be connection to the database, therefore use MaxDBConnectionsPerPool
+ maxConnections += configDict.MaxDBConnectionsPerPool
+ else:
+ # Otherwise the master *and* each worker process will be connecting
+ maxConnections += ((configDict.MultiProcess.ProcessCount + 1) *
+ configDict.MaxDBConnectionsPerPool)
+
+ configDict.Postgres.MaxConnections = maxConnections
+ configDict.Postgres.SharedBuffers = int(configDict.Postgres.MaxConnections *
+ configDict.Postgres.BuffersToConnectionsRatio)
+
+
def _preUpdateDirectoryService(configDict, items, reloading=False):
# Special handling for directory services configs
dsType = items.get("DirectoryService", {}).get("type", None)
@@ -1502,6 +1543,7 @@
_preUpdateDirectoryAddressBookBackingDirectoryService,
)
POST_UPDATE_HOOKS = (
+ _updateMultiProcess,
_updateDataStore,
_updateHostName,
_postUpdateDirectoryService,
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_config.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_config.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_config.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -83,7 +83,8 @@
def testDefaults(self):
for key, value in DEFAULT_CONFIG.iteritems():
- if key in ("ServerHostName", "Notifications"):
+ if key in ("ServerHostName", "Notifications", "MultiProcess",
+ "Postgres"):
# Value is calculated and may vary
continue
for item in RELATIVE_PATHS:
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_stdconfig.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_stdconfig.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/test/test_stdconfig.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -15,14 +15,16 @@
# limitations under the License.
##
+import os
from cStringIO import StringIO
from twext.python.filepath import CachingFilePath as FilePath
from twisted.trial.unittest import TestCase
-from twistedcaldav.config import Config
+from twistedcaldav.config import Config, ConfigDict
from twistedcaldav.stdconfig import NoUnicodePlistParser, PListConfigProvider,\
- _updateDataStore
+ _updateDataStore, _updateMultiProcess
+import twistedcaldav.stdconfig
nonASCIIValue = "→←"
nonASCIIPlist = "<plist version='1.0'><string>%s</string></plist>" % (
@@ -149,3 +151,31 @@
}
_updateDataStore(configDict)
self.assertEquals(configDict["ServerRoot"], "/a/b/c")
+
+ configDict = {
+ "ServerRoot" : "./a",
+ }
+ _updateDataStore(configDict)
+ self.assertEquals(configDict["ServerRoot"], os.path.join(os.getcwd(), "a"))
+
+ def test_updateMultiProcess(self):
+ def stubProcessCount(*args):
+ return 3
+ self.patch(twistedcaldav.stdconfig, "computeProcessCount", stubProcessCount)
+ configDict = ConfigDict({
+ "MultiProcess" : {
+ "ProcessCount" : 0,
+ "MinProcessCount" : 2,
+ "PerCPU" : 1,
+ "PerGB" : 1,
+ },
+ "Postgres" : {
+ "ExtraConnections" : 5,
+ "BuffersToConnectionsRatio" : 1.5,
+ },
+ "SharedConnectionPool" : False,
+ "MaxDBConnectionsPerPool" : 10,
+ })
+ _updateMultiProcess(configDict)
+ self.assertEquals(45, configDict.Postgres.MaxConnections)
+ self.assertEquals(67, configDict.Postgres.SharedBuffers)
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/util.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/twistedcaldav/util.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -111,6 +111,37 @@
+def computeProcessCount(minimum, perCPU, perGB, cpuCount=None, memSize=None):
+ """
+ Determine how many process to spawn based on installed RAM and CPUs,
+ returning at least "mininum"
+ """
+
+ if cpuCount is None:
+ try:
+ cpuCount = getNCPU()
+ except NotImplementedError, e:
+ log.error("Unable to detect number of CPUs: %s" % (str(e),))
+ return minimum
+
+ if memSize is None:
+ try:
+ memSize = getMemorySize()
+ except NotImplementedError, e:
+ log.error("Unable to detect amount of installed RAM: %s" % (str(e),))
+ return minimum
+
+ countByCore = perCPU * cpuCount
+ countByMemory = perGB * (memSize / (1024 * 1024 * 1024))
+
+ # Pick the smaller of the two:
+ count = min(countByCore, countByMemory)
+
+ # ...but at least "minimum"
+ return max(count, minimum)
+
+
+
##
# Module management
##
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/subpostgres.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/subpostgres.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/subpostgres.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -27,14 +27,12 @@
from twisted.python.procutils import which
from twisted.internet.protocol import ProcessProtocol
-from twisted.python.reflect import namedAny
from twext.python.log import Logger
from twext.python.filepath import CachingFilePath
-pgdb = namedAny("pgdb")
+import pgdb
from twisted.protocols.basic import LineReceiver
-from twisted.internet import reactor
from twisted.internet.defer import Deferred
from txdav.base.datastore.dbapiclient import DBAPIConnector
from txdav.base.datastore.dbapiclient import postgresPreflight
@@ -168,7 +166,8 @@
spawnedDBUser="caldav",
importFileName=None,
pgCtl="pg_ctl",
- initDB="initdb"):
+ initDB="initdb",
+ reactor=None):
"""
Initialize a L{PostgresService} pointed at a data store directory.
@@ -194,6 +193,7 @@
MultiService.__init__(self)
self.subServiceFactory = subServiceFactory
self.dataStoreDirectory = dataStoreDirectory
+ self.workingDir = self.dataStoreDirectory.child("working")
self.resetSchema = resetSchema
# In order to delay a shutdown until database initialization has
@@ -239,8 +239,16 @@
self.openConnections = []
self._pgCtl = pgCtl
self._initdb = initDB
+ self._reactor = reactor
+ @property
+ def reactor(self):
+ if self._reactor is None:
+ from twisted.internet import reactor
+ self._reactor = reactor
+ return self._reactor
+
def pgCtl(self):
"""
Locate the path to pg_ctl.
@@ -300,17 +308,12 @@
return self._connectorFor(databaseName).connect(label)
- def ready(self):
+ def ready(self, createDatabaseConn, createDatabaseCursor):
"""
Subprocess is ready. Time to initialize the subservice.
If the database has not been created and there is a dump file,
then the dump file is imported.
"""
- createDatabaseConn = self.produceConnection(
- 'schema creation', 'postgres'
- )
- createDatabaseCursor = createDatabaseConn.cursor()
- createDatabaseCursor.execute("commit")
if self.resetSchema:
try:
@@ -347,9 +350,6 @@
connection.commit()
connection.close()
- # TODO: anyone know why these two lines are here?
- connection = self.produceConnection()
- cursor = connection.cursor()
if self.shutdownDeferred is None:
# Only continue startup if we've not begun shutdown
@@ -383,6 +383,30 @@
"""
Start the database and initialize the subservice.
"""
+
+ def createConnection():
+ createDatabaseConn = self.produceConnection(
+ 'schema creation', 'postgres'
+ )
+ createDatabaseCursor = createDatabaseConn.cursor()
+ createDatabaseCursor.execute("commit")
+ return createDatabaseConn, createDatabaseCursor
+
+ try:
+ createDatabaseConn, createDatabaseCursor = createConnection()
+ except pgdb.DatabaseError:
+ # We could not connect the database, so attempt to start it
+ pass
+ except Exception, e:
+ # Some other unexpected error is preventing us from connecting
+ # to the database
+ log.warn("Failed to connect to Postgres: %s" % (str(e)))
+ else:
+ # Database is running, so just use our connection
+ self.ready(createDatabaseConn, createDatabaseCursor)
+ self.deactivateDelayedShutdown()
+ return
+
monitor = _PostgresMonitor(self)
pgCtl = self.pgCtl()
# check consistency of initdb and postgres?
@@ -400,7 +424,7 @@
options.append("-c standard_conforming_strings=on")
options.extend(self.options)
- reactor.spawnProcess(
+ self.reactor.spawnProcess(
monitor, pgCtl,
[
pgCtl,
@@ -412,13 +436,13 @@
"-o",
" ".join(options),
],
- self.env,
+ env=self.env, path=self.workingDir.path,
uid=self.uid, gid=self.gid,
)
self.monitor = monitor
def gotReady(result):
self.shouldStopDatabase = result
- self.ready()
+ self.ready(*createConnection())
self.deactivateDelayedShutdown()
def reportit(f):
log.err(f)
@@ -432,7 +456,6 @@
MultiService.startService(self)
self.activateDelayedShutdown()
clusterDir = self.dataStoreDirectory.child("cluster")
- workingDir = self.dataStoreDirectory.child("working")
env = self.env = os.environ.copy()
env.update(PGDATA=clusterDir.path,
PGHOST=self.host,
@@ -448,15 +471,16 @@
else:
if not self.dataStoreDirectory.isdir():
self.dataStoreDirectory.createDirectory()
- if not workingDir.isdir():
- workingDir.createDirectory()
+ if not self.workingDir.isdir():
+ self.workingDir.createDirectory()
if self.uid and self.gid:
os.chown(self.dataStoreDirectory.path, self.uid, self.gid)
- os.chown(workingDir.path, self.uid, self.gid)
+ os.chown(self.workingDir.path, self.uid, self.gid)
dbInited = Deferred()
- reactor.spawnProcess(
+ self.reactor.spawnProcess(
CapturingProcessProtocol(dbInited, None),
- initdb, [initdb, "-E", "UTF8", "-U", self.spawnedDBUser], env, workingDir.path,
+ initdb, [initdb, "-E", "UTF8", "-U", self.spawnedDBUser],
+ env=env, path=self.workingDir.path,
uid=self.uid, gid=self.gid,
)
def doCreate(result):
@@ -486,9 +510,9 @@
if self.shouldStopDatabase:
monitor = _PostgresMonitor()
pgCtl = self.pgCtl()
- reactor.spawnProcess(monitor, pgCtl,
+ self.reactor.spawnProcess(monitor, pgCtl,
[pgCtl, '-l', 'logfile', 'stop'],
- self.env,
+ env=self.env, path=self.workingDir.path,
uid=self.uid, gid=self.gid,
)
return monitor.completionDeferred
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/test/test_subpostgres.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/test/test_subpostgres.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/base/datastore/test/test_subpostgres.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -20,12 +20,19 @@
from twisted.trial.unittest import TestCase
+# NOTE: This import will fail eventuall when this functionality is added to
+# MemoryReactor:
+from twisted.runner.test.test_procmon import DummyProcessReactor
+
+from twisted.python.filepath import FilePath
from twext.python.filepath import CachingFilePath
from txdav.base.datastore.subpostgres import PostgresService
from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.application.service import Service
+import pgdb
+
class SubprocessStartup(TestCase):
"""
Tests for starting and stopping the subprocess.
@@ -185,3 +192,73 @@
values = cursor.fetchall()
self.assertEquals(values, [["value1"],["value2"]])
+ def test_startDatabaseRunning(self):
+ """ Ensure that if we can connect to postgres we don't spawn pg_ctl """
+
+ self.cursorHistory = []
+
+ class DummyCursor(object):
+ def __init__(self, historyHolder):
+ self.historyHolder = historyHolder
+
+ def execute(self, *args):
+ self.historyHolder.cursorHistory.append(args)
+
+ def close(self):
+ pass
+
+ class DummyConnection(object):
+ def __init__(self, historyHolder):
+ self.historyHolder = historyHolder
+
+ def cursor(self):
+ return DummyCursor(self.historyHolder)
+
+ def commit(self):
+ pass
+
+ def close(self):
+ pass
+
+ def produceConnection(*args):
+ return DummyConnection(self)
+
+ dummyReactor = DummyProcessReactor()
+ svc = PostgresService(
+ FilePath("postgres_4.pgdb"),
+ lambda x : Service(),
+ "",
+ reactor=dummyReactor,
+ )
+ svc.produceConnection = produceConnection
+ svc.env = {}
+ svc.startDatabase()
+ self.assertEquals(
+ self.cursorHistory,
+ [
+ ('commit',),
+ ("create database subpostgres with encoding 'UTF8'",),
+ ('',)
+ ]
+ )
+ self.assertEquals(dummyReactor.spawnedProcesses, [])
+
+
+ def test_startDatabaseNotRunning(self):
+ """ Ensure that if we can't connect to postgres we spawn pg_ctl """
+
+ def produceConnection(*args):
+ raise pgdb.DatabaseError
+
+ dummyReactor = DummyProcessReactor()
+ svc = PostgresService(
+ FilePath("postgres_4.pgdb"),
+ lambda x : Service(),
+ "",
+ reactor=dummyReactor,
+ )
+ svc.produceConnection = produceConnection
+ svc.env = {}
+ svc.startDatabase()
+ self.assertEquals(len(dummyReactor.spawnedProcesses), 1)
+ self.assertTrue(dummyReactor.spawnedProcesses[0]._executable.endswith("pg_ctl"))
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -57,7 +57,7 @@
from txdav.carddav.iaddressbookstore import IAddressBookTransaction
from txdav.common.datastore.common import HomeChildBase
-from txdav.common.datastore.sql_tables import schema
+from txdav.common.datastore.sql_tables import schema, splitSQLString
from txdav.common.datastore.sql_tables import _BIND_MODE_OWN, \
_BIND_STATUS_ACCEPTED, _BIND_STATUS_DECLINED, \
NOTIFICATION_OBJECT_REVISIONS_TABLE
@@ -72,7 +72,6 @@
from twext.python.clsprop import classproperty
from twext.enterprise.ienterprise import AlreadyFinishedError
-from twext.enterprise.dal.parseschema import significant
from twext.enterprise.dal.syntax import \
Delete, utcNowSQL, Union, Insert, Len, Max, Parameter, SavepointAction, \
@@ -95,7 +94,6 @@
from pycalendar.datetime import PyCalendarDateTime
from cStringIO import StringIO
-from sqlparse import parse
import time
current_sql_schema = getModule(__name__).filePath.sibling("sql_schema").child("current.sql").getContent()
@@ -1015,19 +1013,11 @@
@inlineCallbacks
def execSQLBlock(self, sql):
"""
- Execute a block of SQL by parsing it out into individual statements and execute
- each of those.
-
+ Execute SQL statements parsed by splitSQLString.
FIXME: temporary measure for handling large schema upgrades. This should NOT be used
for regular SQL operations - only upgrades.
"""
- parsed = parse(sql)
- for stmt in parsed:
- while stmt.tokens and not significant(stmt.tokens[0]):
- stmt.tokens.pop(0)
- if not stmt.tokens:
- continue
- stmt = str(stmt).rstrip(";")
+ for stmt in splitSQLString(sql):
yield self.execSQL(stmt)
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_16_to_17.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_16_to_17.sql 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_16_to_17.sql 2013-04-26 16:45:09 UTC (rev 11102)
@@ -23,6 +23,13 @@
-- CALENDAR_OBJECT clean-up --
------------------------------
+begin
+for i in (select constraint_name from user_cons_columns where column_name = 'ORGANIZER_OBJECT')
+loop
+execute immediate 'alter table calendar_object drop constraint ' || i.constraint_name;
+end loop;
+end;
+
alter table CALENDAR_OBJECT
drop (ORGANIZER_OBJECT);
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_tables.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_tables.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/sql_tables.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -27,8 +27,9 @@
from twext.enterprise.ienterprise import ORACLE_DIALECT, POSTGRES_DIALECT
from twext.enterprise.dal.syntax import Insert
from twext.enterprise.ienterprise import ORACLE_TABLE_NAME_MAX
-from twext.enterprise.dal.parseschema import schemaFromPath
-
+from twext.enterprise.dal.parseschema import schemaFromPath, significant
+from sqlparse import parse
+from re import compile
import hashlib
@@ -384,6 +385,44 @@
out.write('\n);\n\n')
+
+def splitSQLString(sqlString):
+ """
+ Strings which mix zero or more sql statements with zero or more pl/sql
+ statements need to be split into individual sql statements for execution.
+ This function was written to allow execution of pl/sql during Oracle schema
+ upgrades.
+ """
+ aggregated = ''
+ inPlSQL = None
+ parsed = parse(sqlString)
+ for stmt in parsed:
+ while stmt.tokens and not significant(stmt.tokens[0]):
+ stmt.tokens.pop(0)
+ if not stmt.tokens:
+ continue
+ if inPlSQL is not None:
+ agg = str(stmt).strip()
+ if "end;".lower() in agg.lower():
+ inPlSQL = None
+ aggregated += agg
+ rex = compile("\n +")
+ aggregated = rex.sub('\n', aggregated)
+ yield aggregated.strip()
+ continue
+ aggregated += agg
+ continue
+ if inPlSQL is None:
+ #if 'begin'.lower() in str(stmt).split()[0].lower():
+ if 'begin'.lower() in str(stmt).lower():
+ inPlSQL = True
+ aggregated += str(stmt)
+ continue
+ else:
+ continue
+ yield str(stmt).rstrip().rstrip(";")
+
+
if __name__ == '__main__':
import sys
if len(sys.argv) == 2:
Modified: CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/test/test_sql_tables.py
===================================================================
--- CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/test/test_sql_tables.py 2013-04-26 15:54:08 UTC (rev 11101)
+++ CalendarServer/branches/users/cdaboo/store-scheduling/txdav/common/datastore/test/test_sql_tables.py 2013-04-26 16:45:09 UTC (rev 11102)
@@ -31,10 +31,12 @@
from twext.enterprise.dal.syntax import SchemaSyntax
from txdav.common.datastore.sql_tables import schema, _translateSchema
-from txdav.common.datastore.sql_tables import SchemaBroken
+from txdav.common.datastore.sql_tables import SchemaBroken, splitSQLString
from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
+from textwrap import dedent
+
class SampleSomeColumns(TestCase, SchemaTestHelper):
"""
Sample some columns from the tables defined by L{schema} and verify that
@@ -268,3 +270,175 @@
+class SQLSplitterTests(TestCase):
+ """
+ Test that strings which mix zero or more sql statements with zero or more
+ pl/sql statements are split into individual statements.
+ """
+
+ def test_dontSplitOneStatement(self):
+ """
+ A single sql statement yields a single string
+ """
+ result = splitSQLString("select * from foo;")
+ r1 = result.next()
+ self.assertEquals(r1, "select * from foo")
+ self.assertRaises(StopIteration, result.next)
+
+
+ def test_returnTwoSimpleStatements(self):
+ """
+ Two simple sql statements yield two separate strings
+ """
+ result = splitSQLString("select count(*) from baz; select bang from boop;")
+ r1 = result.next()
+ self.assertEquals(r1, "select count(*) from baz")
+ r2 = result.next()
+ self.assertEquals(r2, "select bang from boop")
+ self.assertRaises(StopIteration, result.next)
+
+
+ def test_returnOneComplexStatement(self):
+ """
+ One complex sql statement yields a single string
+ """
+ bigSQL = dedent(
+ '''SELECT
+ CL.CODE,
+ CL.CATEGORY,
+ FROM
+ CLIENTS_SUPPLIERS CL
+ INVOICES I
+ WHERE
+ CL.CODE = I.CODE AND
+ CL.CATEGORY = I.CATEGORY AND
+ CL.UP_DATE =
+ (SELECT
+ MAX(CL2.UP_DATE)
+ FROM
+ CLIENTS_SUPPLIERS CL2
+ WHERE
+ CL2.CODE = I.CODE AND
+ CL2.CATEGORY = I.CATEGORY AND
+ CL2.UP_DATE <= I.EMISSION
+ ) AND
+ I.EMISSION BETWEEN DATE1 AND DATE2;''')
+ result = splitSQLString(bigSQL)
+ r1 = result.next()
+ self.assertEquals(r1, bigSQL.rstrip(";"))
+ self.assertRaises(StopIteration, result.next)
+
+
+ def test_returnOnePlSQL(self):
+ """
+ One pl/sql block yields a single string
+ """
+ plsql = dedent(
+ '''BEGIN
+ LOOP
+ INSERT INTO T1 VALUES(i,i);
+ i := i+1;
+ EXIT WHEN i>100;
+ END LOOP;
+ END;''')
+ s1 = 'BEGIN\nLOOP\nINSERT INTO T1 VALUES(i,i);i := i+1;EXIT WHEN i>100;END LOOP;END;'
+ result = splitSQLString(plsql)
+ r1 = result.next()
+ self.assertEquals(r1, s1)
+ self.assertRaises(StopIteration, result.next)
+
+
+ def test_returnOnePlSQLAndOneSQL(self):
+ """
+ One sql statement and one pl/sql statement yields two separate strings
+ """
+ sql = dedent(
+ '''SELECT EGM.Name, BioEntity.BioEntityId INTO AUX
+ FROM EGM
+ INNER JOIN BioEntity
+ ON EGM.name LIKE BioEntity.Name AND EGM.TypeId = BioEntity.TypeId
+ OPTION (MERGE JOIN);''')
+ plsql = dedent(
+ '''BEGIN
+ FOR i IN 1..10 LOOP
+ IF MOD(i,2) = 0 THEN
+ INSERT INTO temp VALUES (i, x, 'i is even');
+ ELSE
+ INSERT INTO temp VALUES (i, x, 'i is odd');
+ END IF;
+ x := x + 100;
+ END LOOP;
+ COMMIT;
+ END;''')
+ s2 = "BEGIN\nFOR i IN 1..10 LOOP\nIF MOD(i,2) = 0 THEN\nINSERT INTO temp VALUES (i, x, 'i is even');ELSE\nINSERT INTO temp VALUES (i, x, 'i is odd');END IF;x := x + 100;END LOOP;COMMIT;END;"
+ result = splitSQLString(sql+plsql)
+ r1 = result.next()
+ self.assertEquals(r1, sql.rstrip(";"))
+ r2 = result.next()
+ self.assertEquals(r2, s2)
+ self.assertRaises(StopIteration, result.next)
+
+ def test_actualSchemaUpgrade(self):
+ """
+ A real-world schema upgrade is split into the expected number of statements,
+ ignoring comments
+ """
+ realsql = dedent(
+ '''
+ ----
+ -- Copyright (c) 2011-2013 Apple Inc. All rights reserved.
+ --
+ -- Licensed under the Apache License, Version 2.0 (the "License");
+ -- you may not use this file except in compliance with the License.
+ -- You may obtain a copy of the License at
+ --
+ -- http://www.apache.org/licenses/LICENSE-2.0
+ --
+ -- Unless required by applicable law or agreed to in writing, software
+ -- distributed under the License is distributed on an "AS IS" BASIS,
+ -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ -- See the License for the specific language governing permissions and
+ -- limitations under the License.
+ ----
+
+ ---------------------------------------------------
+ -- Upgrade database schema from VERSION 16 to 17 --
+ ---------------------------------------------------
+
+
+ ------------------------------
+ -- CALENDAR_OBJECT clean-up --
+ ------------------------------
+
+ begin
+ for i in (select constraint_name from user_cons_columns where column_name = 'ORGANIZER_OBJECT')
+ loop
+ execute immediate 'alter table calendar_object drop constraint ' || i.constraint_name;
+ end loop;
+ end;
+
+ alter table CALENDAR_OBJECT
+ drop (ORGANIZER_OBJECT);
+
+ create index CALENDAR_OBJECT_ICALE_82e731d5 on CALENDAR_OBJECT (
+ ICALENDAR_UID
+ );
+
+
+ -- Now update the version
+ update CALENDARSERVER set VALUE = '17' where NAME = 'VERSION';
+ ''')
+ s1 = "begin\nfor i in (select constraint_name from user_cons_columns where column_name = 'ORGANIZER_OBJECT')\nloop\nexecute immediate 'alter table calendar_object drop constraint ' || i.constraint_name;end loop;end;"
+ s2 = 'alter table CALENDAR_OBJECT\n drop (ORGANIZER_OBJECT)'
+ s3 = 'create index CALENDAR_OBJECT_ICALE_82e731d5 on CALENDAR_OBJECT (\n ICALENDAR_UID\n)'
+ s4 = "update CALENDARSERVER set VALUE = '17' where NAME = 'VERSION'"
+ result = splitSQLString(realsql)
+ r1 = result.next()
+ self.assertEquals(r1, s1)
+ r2 = result.next()
+ self.assertEquals(r2, s2)
+ r3 = result.next()
+ self.assertEquals(r3, s3)
+ r4 = result.next()
+ self.assertEquals(r4, s4)
+ self.assertRaises(StopIteration, result.next)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130426/ecab7912/attachment-0001.html>
More information about the calendarserver-changes
mailing list