[CalendarServer-changes] [1357]
CalendarServer/branches/users/dreid/tap-tests-3/twistedcaldav/test/
test_tap.py
source_changes at macosforge.org
source_changes at macosforge.org
Fri Mar 9 14:17:27 PST 2007
Revision: 1357
http://trac.macosforge.org/projects/calendarserver/changeset/1357
Author: dreid at apple.com
Date: 2007-03-09 14:17:27 -0800 (Fri, 09 Mar 2007)
Log Message:
-----------
Merge forward
Added Paths:
-----------
CalendarServer/branches/users/dreid/tap-tests-3/twistedcaldav/test/test_tap.py
Copied: CalendarServer/branches/users/dreid/tap-tests-3/twistedcaldav/test/test_tap.py (from rev 1355, CalendarServer/branches/users/dreid/tap-tests-2/twistedcaldav/test/test_tap.py)
===================================================================
--- CalendarServer/branches/users/dreid/tap-tests-3/twistedcaldav/test/test_tap.py (rev 0)
+++ CalendarServer/branches/users/dreid/tap-tests-3/twistedcaldav/test/test_tap.py 2007-03-09 22:17:27 UTC (rev 1357)
@@ -0,0 +1,583 @@
+##
+# Copyright (c) 2007 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# DRI: David Reid, dreid at apple.com
+##
+
+import os
+from copy import deepcopy
+
+from twisted.trial import unittest
+
+from twisted.python.usage import Options, UsageError
+from twisted.python.util import sibpath
+from twisted.python.reflect import namedAny
+from twisted.application.service import IService
+from twisted.application import internet
+
+from twisted.web2.dav import auth
+from twisted.web2.log import LogWrapperResource
+
+from twistedcaldav.tap import CalDAVOptions, CalDAVServiceMaker
+from twistedcaldav import tap
+
+from twistedcaldav.config import config
+from twistedcaldav import config as config_mod
+from twistedcaldav.py.plistlib import writePlist
+
+from twistedcaldav.directory.aggregate import AggregateDirectoryService
+from twistedcaldav.directory.sudo import SudoDirectoryService
+from twistedcaldav.directory.directory import UnknownRecordTypeError
+
+
+class TestCalDAVOptions(CalDAVOptions):
+ """
+ A fake that implementation of CalDAVOptions that provides
+ empty implementations of checkDirectory and checkFile.
+ """
+
+ def checkDirectory(*args, **kwargs):
+ pass
+
+ def checkFile(*args, **kwargs):
+ pass
+
+
+class CalDAVOptionsTest(unittest.TestCase):
+ """
+ Test various parameters of our usage.Options subclass
+ """
+
+ def setUp(self):
+ """
+ Set up our options object, giving it a parent, and forcing the
+ global config to be loaded from defaults.
+ """
+
+ self.config = TestCalDAVOptions()
+ self.config.parent = Options()
+ self.config.parent['uid'] = None
+ self.config.parent['gid'] = None
+
+ config_mod.parseConfig('non-existant-config')
+
+ def test_overridesConfig(self):
+ """
+ Test that values on the command line's -o and --option options
+ overide the config file
+ """
+
+ argv = ['-o', 'EnableSACLs',
+ '-o', 'HTTPPort=80',
+ '-o', 'BindAddresses=127.0.0.1,127.0.0.2,127.0.0.3',
+ '-o', 'DocumentRoot=/dev/null',
+ '-o', 'UserName=None',
+ '-o', 'EnableProxyPrincipals=False']
+
+ self.config.parseOptions(argv)
+
+ self.assertEquals(config.EnableSACLs, True)
+ self.assertEquals(config.HTTPPort, 80)
+ self.assertEquals(config.BindAddresses, ['127.0.0.1',
+ '127.0.0.2',
+ '127.0.0.3'])
+ self.assertEquals(config.DocumentRoot, '/dev/null')
+ self.assertEquals(config.UserName, None)
+ self.assertEquals(config.EnableProxyPrincipals, False)
+
+ argv = ['-o', 'Authentication=This Doesn\'t Matter']
+
+ self.assertRaises(UsageError, self.config.parseOptions, argv)
+
+ def test_setsParent(self):
+ """
+ Test that certain values are set on the parent (i.e. twistd's
+ Option's object)
+ """
+
+ argv = ['-o', 'ErrorLogFile=/dev/null',
+ '-o', 'PIDFile=/dev/null']
+
+ self.config.parseOptions(argv)
+
+ self.assertEquals(self.config.parent['logfile'], '/dev/null')
+
+ self.assertEquals(self.config.parent['pidfile'], '/dev/null')
+
+ def test_specifyConfigFile(self):
+ """
+ Test that specifying a config file from the command line
+ loads the global config with those values properly.
+ """
+
+ myConfig = deepcopy(config_mod.defaultConfig)
+
+ myConfig['Authentication']['Basic']['Enabled'] = False
+
+ myConfig['MultiProcess']['LoadBalancer']['Enabled'] = False
+
+ myConfig['HTTPPort'] = 80
+
+ myConfig['ServerHostName'] = 'calendar.calenderserver.org'
+
+ myConfigFile = self.mktemp()
+ writePlist(myConfig, myConfigFile)
+
+ args = ['-f', myConfigFile]
+
+ self.config.parseOptions(args)
+
+ self.assertEquals(config.ServerHostName, myConfig['ServerHostName'])
+
+ self.assertEquals(config.MultiProcess['LoadBalancer']['Enabled'],
+ myConfig['MultiProcess']['LoadBalancer']['Enabled'])
+
+ self.assertEquals(config.HTTPPort, myConfig['HTTPPort'])
+
+ self.assertEquals(config.Authentication['Basic']['Enabled'],
+ myConfig['Authentication']['Basic']['Enabled'])
+
+
+class BaseServiceMakerTests(unittest.TestCase):
+ """
+ Utility class for ServiceMaker tests.
+ """
+
+ def setUp(self):
+ self.options = TestCalDAVOptions()
+ self.options.parent = Options()
+ self.options.parent['gid'] = None
+ self.options.parent['uid'] = None
+
+ self.config = deepcopy(config_mod.defaultConfig)
+
+ accountsFile = sibpath(os.path.dirname(__file__),
+ 'directory/test/accounts.xml')
+
+ self.config['DirectoryService'] = {
+ 'params': {'xmlFile': accountsFile},
+ 'type': 'twistedcaldav.directory.xmlfile.XMLDirectoryService'
+ }
+
+ self.config['DocumentRoot'] = self.mktemp()
+
+ self.config['HTTPPort'] = 8008
+ self.config['SSLPort'] = 8443
+
+ self.config['SSLPrivateKey'] = sibpath(__file__, 'data/server.pem')
+ self.config['SSLCertificate'] = sibpath(__file__, 'data/server.pem')
+
+ os.mkdir(self.config['DocumentRoot'])
+
+ self.configFile = self.mktemp()
+
+ self.writeConfig()
+
+ def writeConfig(self):
+ """
+ Flush self.config out to self.configFile
+ """
+
+ writePlist(self.config, self.configFile)
+
+ def makeService(self):
+ """
+ Create a service by calling into CalDAVServiceMaker with
+ self.configFile
+ """
+
+ self.options.parseOptions(['-f', self.configFile])
+
+ return CalDAVServiceMaker().makeService(self.options)
+
+ def getSite(self):
+ """
+ Get the server.Site from the service by finding the HTTPFactory
+ """
+
+ service = self.makeService()
+
+ return service.services[0].args[1].protocolArgs['requestFactory']
+
+
+
+class CalDAVServiceMakerTests(BaseServiceMakerTests):
+ """
+ Test the service maker's behavior
+ """
+
+ def test_makeServiceDispatcher(self):
+ """
+ Test the default options of the dispatching makeService
+ """
+ validServices = ['singleprocess', 'slave', 'master', 'multiprocess']
+
+ for service in validServices:
+ self.config['ServerType'] = service
+ self.writeConfig()
+ self.makeService()
+
+ self.config['ServerType'] = 'Unknown Service'
+ self.writeConfig()
+ self.assertRaises(UsageError, self.makeService)
+
+
+class SingleServiceTest(BaseServiceMakerTests):
+ """
+ Test various configurations of the single service
+ """
+
+ def test_defaultService(self):
+ """
+ Test the value of a single process service in it's simplest
+ configuration.
+ """
+
+ service = self.makeService()
+
+ self.failUnless(IService(service))
+
+ self.failUnless(service.services)
+
+ self.failUnless(isinstance(service, tap.CalDAVService))
+
+ def test_defaultListeners(self):
+ """
+ Test that the single process service has sub services with the
+ default TCP and SSL configuration
+ """
+
+ service = self.makeService()
+
+ expectedSubServices = ((internet.TCPServer, self.config['HTTPPort']),
+ (internet.SSLServer, self.config['SSLPort']))
+
+ configuredSubServices = [(s.__class__, s.args)
+ for s in service.services]
+
+ for serviceClass, serviceArgs in configuredSubServices:
+ self.failUnless(
+ serviceClass in (s[0] for s in expectedSubServices))
+
+ self.assertEquals(serviceArgs[0],
+ dict(expectedSubServices)[serviceClass])
+
+ def test_SSLKeyConfiguration(self):
+ """
+ Test that the configuration of the SSLServer reflect the config file's
+ SSL Private Key and SSL Certificate
+ """
+
+ service = self.makeService()
+
+ sslService = None
+ for s in service.services:
+ if isinstance(s, internet.SSLServer):
+ sslService = s
+ break
+
+ context = sslService.args[2]
+
+ self.assertEquals(self.config['SSLPrivateKey'],
+ context.privateKeyFileName)
+
+ self.assertEquals(self.config['SSLCertificate'],
+ context.certificateFileName)
+
+ def test_noSSL(self):
+ """
+ Test the single service to make sure there is no SSL Service when SSL
+ is disabled
+ """
+
+ del self.config['SSLPort']
+ self.writeConfig()
+ service = self.makeService()
+ import pdb; pdb.set_trace()
+
+ self.config['SSLPort']
+
+ self.failIf(
+ internet.SSLServer in [s.__class__ for s in service.services])
+
+ def test_SSLOnly(self):
+ """
+ Test the single service to make sure there is no TCPServer when
+ SSLOnly is turned on.
+ """
+
+ del self.config['HTTPPort']
+ self.writeConfig()
+ service = self.makeService()
+
+ self.failIf(
+ internet.TCPServer in [s.__class__ for s in service.services])
+
+ def test_singleBindAddresses(self):
+ """
+ Test that the TCPServer and SSLServers are bound to the proper address
+ """
+
+ self.config['BindAddresses'] = ['127.0.0.1']
+ self.writeConfig()
+ service = self.makeService()
+
+ for s in service.services:
+ self.assertEquals(s.kwargs['interface'], '127.0.0.1')
+
+ def test_multipleBindAddresses(self):
+ """
+ Test that the TCPServer and SSLServers are bound to the proper
+ addresses.
+ """
+
+ self.config['BindAddresses'] = ['127.0.0.1', '10.0.0.2', '172.53.13.123']
+ self.writeConfig()
+ service = self.makeService()
+
+ tcpServers = []
+ sslServers = []
+
+ for s in service.services:
+ if isinstance(s, internet.TCPServer):
+ tcpServers.append(s)
+ elif isinstance(s, internet.SSLServer):
+ sslServers.append(s)
+
+ self.assertEquals(len(tcpServers), len(self.config['BindAddresses']))
+ self.assertEquals(len(sslServers), len(self.config['BindAddresses']))
+
+ for addr in self.config['BindAddresses']:
+ for s in tcpServers:
+ if s.kwargs['interface'] == addr:
+ tcpServers.remove(s)
+
+ for s in sslServers:
+ if s.kwargs['interface'] == addr:
+ sslServers.remove(s)
+
+ self.assertEquals(len(tcpServers), 0)
+ self.assertEquals(len(sslServers), 0)
+
+
+class ServiceHTTPFactoryTests(BaseServiceMakerTests):
+ """
+ Test the configuration of the initial resource hierarchy of the
+ single service
+ """
+
+ def test_AuthWrapperAllEnabled(self):
+ """
+ Test the configuration of the authentication wrapper
+ when all schemes are enabled.
+ """
+ self.config['Authentication']['Digest']['Enabled'] = True
+ self.config['Authentication']['Kerberos']['Enabled'] = True
+ self.config['Authentication']['Basic']['Enabled'] = True
+
+ self.writeConfig()
+ site = self.getSite()
+
+ self.failUnless(isinstance(
+ site.resource.resource,
+ auth.AuthenticationWrapper))
+
+ authWrapper = site.resource.resource
+
+ expectedSchemes = ['negotiate', 'digest', 'basic']
+
+ for scheme in authWrapper.credentialFactories:
+ self.failUnless(scheme in expectedSchemes)
+
+ self.assertEquals(len(expectedSchemes),
+ len(authWrapper.credentialFactories))
+
+ def test_AuthWrapperPartialEnabled(self):
+ """
+ Test that the expected credential factories exist when
+ only a partial set of authentication schemes is
+ enabled.
+ """
+
+ self.config['Authentication']['Basic']['Enabled'] = False
+ self.config['Authentication']['Kerberos']['Enabled'] = False
+
+ self.writeConfig()
+ site = self.getSite()
+
+ authWrapper = site.resource.resource
+
+ expectedSchemes = ['digest']
+
+ for scheme in authWrapper.credentialFactories:
+ self.failUnless(scheme in expectedSchemes)
+
+ self.assertEquals(len(expectedSchemes),
+ len(authWrapper.credentialFactories))
+
+ def test_LogWrapper(self):
+ """
+ Test the configuration of the log wrapper
+ """
+
+ site = self.getSite()
+
+ self.failUnless(isinstance(
+ site.resource,
+ LogWrapperResource))
+
+ def test_rootResource(self):
+ """
+ Test the root resource
+ """
+ site = self.getSite()
+ root = site.resource.resource.resource
+
+ self.failUnless(isinstance(root, CalDAVServiceMaker.rootResourceClass))
+
+ def test_principalResource(self):
+ """
+ Test the principal resource
+ """
+ site = self.getSite()
+ root = site.resource.resource.resource
+
+ self.failUnless(isinstance(
+ root.getChild('principals'),
+ CalDAVServiceMaker.principalResourceClass))
+
+ def test_calendarResource(self):
+ """
+ Test the calendar resource
+ """
+ site = self.getSite()
+ root = site.resource.resource.resource
+
+ self.failUnless(isinstance(
+ root.getChild('calendars'),
+ CalDAVServiceMaker.calendarResourceClass))
+
+
+class DirectoryServiceTest(BaseServiceMakerTests):
+ """
+ Tests of the directory service
+ """
+
+ def test_sameDirectory(self):
+ """
+ Test that the principal hierarchy has a reference
+ to the same DirectoryService as the calendar hierarchy
+ """
+
+ site = self.getSite()
+ principals = site.resource.resource.resource.getChild('principals')
+ calendars = site.resource.resource.resource.getChild('calendars')
+
+ self.assertEquals(principals.directory,
+ calendars.directory)
+
+ def test_aggregateDirectory(self):
+ """
+ Assert that the base directory service is actually
+ an AggregateDirectoryService
+ """
+ site = self.getSite()
+ principals = site.resource.resource.resource.getChild('principals')
+ directory = principals.directory
+
+ self.failUnless(isinstance(
+ directory,
+ AggregateDirectoryService))
+
+ def test_sudoDirectoryService(self):
+ """
+ Test that a sudo directory service is available if the
+ SudoersFile is set and exists
+ """
+ site = self.getSite()
+ principals = site.resource.resource.resource.getChild('principals')
+ directory = principals.directory
+
+ self.failUnless(self.config['SudoersFile'])
+
+ sudoService = directory.serviceForRecordType(
+ SudoDirectoryService.recordType_sudoers)
+
+ self.assertEquals(sudoService.plistFile.path,
+ os.path.abspath(self.config['SudoersFile']))
+
+ self.failUnless(SudoDirectoryService.recordType_sudoers in
+ directory.userRecordTypes)
+
+ def test_sudoDirectoryServiceNoFile(self):
+ """
+ Test that there is no SudoDirectoryService if
+ the SudoersFile does not exist.
+ """
+ self.config['SudoersFile'] = self.mktemp()
+
+ self.writeConfig()
+ site = self.getSite()
+ principals = site.resource.resource.resource.getChild('principals')
+ directory = principals.directory
+
+ self.failUnless(self.config['SudoersFile'])
+
+ self.assertRaises(
+ UnknownRecordTypeError,
+ directory.serviceForRecordType,
+ SudoDirectoryService.recordType_sudoers)
+
+ def test_sudoDirectoryServiceNotConfigured(self):
+ """
+ Test that there is no SudoDirectoryService if
+ the SudoersFile is not configured
+ """
+
+ self.config['SudoersFile'] = ''
+ self.writeConfig()
+
+ site = self.getSite()
+ principals = site.resource.resource.resource.getChild('principals')
+ directory = principals.directory
+
+ self.failIf(self.config['SudoersFile'])
+
+ self.assertRaises(
+ UnknownRecordTypeError,
+ directory.serviceForRecordType,
+ SudoDirectoryService.recordType_sudoers)
+
+ def test_configuredDirectoryService(self):
+ """
+ Test that the real directory service is the directory service
+ set in the configuration file.
+ """
+
+ self.config['SudoersFile'] = ''
+ self.writeConfig()
+
+ site = self.getSite()
+ principals = site.resource.resource.resource.getChild('principals')
+ directory = principals.directory
+
+ realDirectory = directory.serviceForRecordType('users')
+
+ configuredDirectory = namedAny(
+ self.config['DirectoryService']['type'])
+
+ self.failUnless(isinstance(
+ realDirectory,
+ configuredDirectory))
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20070309/0669582b/attachment.html
More information about the calendarserver-changes
mailing list