[CalendarServer-changes] [8252] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Tue Nov 1 17:54:55 PDT 2011


Revision: 8252
          http://trac.macosforge.org/projects/calendarserver/changeset/8252
Author:   glyph at apple.com
Date:     2011-11-01 17:54:54 -0700 (Tue, 01 Nov 2011)
Log Message:
-----------
Make the load sim capable of parallel execution.

Modified Paths:
--------------
    CalendarServer/trunk/contrib/performance/loadtest/population.py
    CalendarServer/trunk/contrib/performance/loadtest/profiles.py
    CalendarServer/trunk/contrib/performance/loadtest/sim.py
    CalendarServer/trunk/contrib/performance/loadtest/test_sim.py

Added Paths:
-----------
    CalendarServer/trunk/contrib/performance/loadtest/ampsim.py
    CalendarServer/trunk/contrib/performance/loadtest/config.dist.plist

Property Changed:
----------------
    CalendarServer/trunk/


Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
   + /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593

Copied: CalendarServer/trunk/contrib/performance/loadtest/ampsim.py (from rev 8251, CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py)
===================================================================
--- CalendarServer/trunk/contrib/performance/loadtest/ampsim.py	                        (rev 0)
+++ CalendarServer/trunk/contrib/performance/loadtest/ampsim.py	2011-11-02 00:54:54 UTC (rev 8252)
@@ -0,0 +1,188 @@
+##
+# Copyright (c) 2011 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+##
+
+"""
+AMP-based simulator.
+"""
+
+if __name__ == '__main__':
+    # When run as a script, this is the worker process, receiving commands over
+    # stdin.
+    def runmain():
+        import traceback
+        try:
+            from twisted.python.log import startLogging
+            from sys import exit, stderr
+
+            startLogging(stderr)
+
+            from twisted.internet import reactor
+            from twisted.internet.stdio import StandardIO
+
+            from contrib.performance.loadtest.ampsim import Worker
+            from contrib.performance.loadtest.sim import LagTrackingReactor
+
+            StandardIO(Worker(LagTrackingReactor(reactor)))
+            reactor.run()
+        except:
+            traceback.print_exc()
+            exit(1)
+        else:
+            exit(0)
+    runmain()
+
+
+from copy import deepcopy
+
+from plistlib import writePlistToString, readPlistFromString
+
+from twisted.python.log import msg, addObserver
+from twisted.protocols.amp import AMP, Command, String, Unicode
+
+from twext.enterprise.adbapi2 import Pickle
+
+from contrib.performance.loadtest.sim import _DirectoryRecord,  LoadSimulator
+
+class Configure(Command):
+    """
+    Configure this worker process with the text of an XML property list.
+    """
+    arguments = [("plist", String())]
+
+
+
+class LogMessage(Command):
+    """
+    This message represents an observed log message being relayed from a worker
+    process to the manager process.
+    """
+    arguments = [("event", Pickle())]
+
+
+
+class Account(Command):
+    """
+    This message represents a L{_DirectoryRecord} loaded by the manager process
+    being relayed to a worker.
+    """
+    arguments = [
+        ("uid", Unicode()),
+        ("password", Unicode()),
+        ("commonName", Unicode()),
+        ("email", Unicode()),
+    ]
+
+
+
+class Worker(AMP):
+    """
+    Protocol to be run in the worker process, to handle messages from its
+    manager.
+    """
+
+    def __init__(self, reactor):
+        super(Worker, self).__init__()
+        self.reactor = reactor
+        self.records = []
+
+
+    @Account.responder
+    def account(self, **kw):
+        self.records.append(_DirectoryRecord(**kw))
+        return {}
+
+
+    @Configure.responder
+    def config(self, plist):
+        from sys import stderr
+        cfg = readPlistFromString(plist)
+        addObserver(self.emit)
+        sim = LoadSimulator.fromConfig(cfg)
+        sim.records = self.records
+        sim.attachServices(stderr)
+        return {}
+
+
+    def emit(self, eventDict):
+        if 'type' in eventDict:
+            self.reactor.callFromThread(
+                self.callRemote, LogMessage, event=eventDict
+            )
+
+
+    def connectionLost(self, reason):
+        super(Worker, self).connectionLost(reason)
+        msg("Standard IO connection lost.")
+        self.reactor.stop()
+
+
+
+class Manager(AMP):
+    """
+    Protocol to be run in the coordinating process, to respond to messages from
+    a single worker.
+    """
+
+    def __init__(self, loadsim, whichWorker, numWorkers, output):
+        super(Manager, self).__init__()
+        self.loadsim = loadsim
+        self.whichWorker = whichWorker
+        self.numWorkers = numWorkers
+        self.output = output
+
+
+    def connectionMade(self):
+        super(Manager, self).connectionMade()
+
+        for record in self.loadsim.records:
+            self.callRemote(Account,
+                            uid=record.uid,
+                            password=record.password,
+                            commonName=record.commonName,
+                            email=record.email)
+
+        workerConfig = deepcopy(self.loadsim.configTemplate)
+        # The list of workers is for the manager only; the workers themselves
+        # know they're workers because they _don't_ receive this list.
+        del workerConfig["workers"]
+        # The manager loads the accounts via the configured loader, then sends
+        # them out to the workers (right above), which look at the state at an
+        # instance level and therefore don't need a globally-named directory
+        # record loader.
+        del workerConfig["accounts"]
+
+        workerConfig["workerID"] = self.whichWorker
+        workerConfig["workerCount"] = self.numWorkers
+        workerConfig["observers"] = []
+        workerConfig.pop("accounts", None)
+
+        plist = writePlistToString(workerConfig)
+        self.output.write("Initiating worker configuration\n")
+        def completed(x):
+            self.output.write("Worker configuration complete.\n")
+        (self.callRemote(Configure, plist=plist)
+         .addCallback(completed))
+
+
+    @LogMessage.responder
+    def observed(self, event):
+        #from pprint import pformat
+        #self.output.write(pformat(event)+"\n")
+        msg(**event)
+        return {}
+
+

Copied: CalendarServer/trunk/contrib/performance/loadtest/config.dist.plist (from rev 8251, CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/config.dist.plist)
===================================================================
--- CalendarServer/trunk/contrib/performance/loadtest/config.dist.plist	                        (rev 0)
+++ CalendarServer/trunk/contrib/performance/loadtest/config.dist.plist	2011-11-02 00:54:54 UTC (rev 8252)
@@ -0,0 +1,290 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+    Copyright (c) 2011 Apple Inc. All rights reserved.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+  -->
+
+<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
+<plist version="1.0">
+	<dict>
+		<!-- This is a distributed orchestrator configuration; 'workers' is a list of
+							shell commands to run sub-processes.
+							-->
+		<key>workers</key>
+		<array>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+		</array>
+		<!-- Identify the server to be load tested. -->
+		<key>server</key>
+		<string>https://127.0.0.1:8443/</string>
+
+		<!-- Define the credentials of the clients which will be used to load test 
+			the server. These credentials must already be valid on the server. -->
+		<key>accounts</key>
+		<dict>
+			<!-- The loader is the fully-qualified Python name of a callable which 
+				returns a list of directory service records defining all of the client accounts 
+				to use. contrib.performance.loadtest.sim.recordsFromCSVFile reads username, 
+				password, mailto triples from a CSV file and returns them as a list of faked 
+				directory service records. -->
+			<key>loader</key>
+			<string>contrib.performance.loadtest.sim.recordsFromCSVFile</string>
+
+			<!-- Keyword arguments may be passed to the loader. -->
+			<key>params</key>
+			<dict>
+				<!-- recordsFromCSVFile interprets the path relative to the config.plist, 
+					to make it independent of the script's working directory while still allowing 
+					a relative path. This isn't a great solution. -->
+				<key>path</key>
+				<string>contrib/performance/loadtest/accounts.csv</string>
+			</dict>
+		</dict>
+
+		<!-- Define how many clients will participate in the load test and how 
+			they will show up. -->
+		<key>arrival</key>
+		<dict>
+
+			<!-- Specify a class which creates new clients and introduces them into 
+				the test. contrib.performance.loadtest.population.SmoothRampUp introduces 
+				groups of new clients at fixed intervals up to a maximum. The size of the 
+				group, interval, and maximum are configured by the parameters below. The 
+				total number of clients is groups * groupSize, which needs to be no larger 
+				than the number of credentials created in the accounts section. -->
+			<key>factory</key>
+			<string>contrib.performance.loadtest.population.SmoothRampUp</string>
+
+			<key>params</key>
+			<dict>
+				<!-- groups gives the total number of groups of clients to introduce. -->
+				<key>groups</key>
+				<integer>99</integer>
+
+				<!-- groupSize is the number of clients in each group of clients. It's 
+					really only a "smooth" ramp up if this is pretty small. -->
+				<key>groupSize</key>
+				<integer>1</integer>
+
+				<!-- Number of seconds between the introduction of each group. -->
+				<key>interval</key>
+				<integer>3</integer>
+			</dict>
+
+		</dict>
+
+		<!-- Define the kinds of software and user behavior the load simulation 
+			will simulate. -->
+		<key>clients</key>
+
+		<!-- Have as many different kinds of software and user behavior configurations 
+			as you want. Each is a dict -->
+		<array>
+
+			<dict>
+
+				<!-- Here is a Snow Leopard iCal simulator. -->
+				<key>software</key>
+				<string>contrib.performance.loadtest.ical.SnowLeopard</string>
+
+				<!-- Arguments to use to initialize the SnowLeopard instance. -->
+				<key>params</key>
+				<dict>
+					<!-- SnowLeopard can poll the calendar home at some interval. This is 
+						in seconds. -->
+					<key>calendarHomePollInterval</key>
+					<integer>30</integer>
+
+					<!-- If the server advertises xmpp push, SnowLeopard can wait for notifications 
+						about calendar home changes instead of polling for them periodically. If 
+						this option is true, then look for the server advertisement for xmpp push 
+						and use it if possible. Still fall back to polling if there is no xmpp push 
+						advertised. -->
+					<key>supportPush</key>
+					<false />
+				</dict>
+
+				<!-- The profiles define certain types of user behavior on top of the 
+					client software being simulated. -->
+				<key>profiles</key>
+				<array>
+
+					<!-- First an event-creating profile, which will periodically create 
+						new events at a random time on a random calendar. -->
+					<dict>
+						<key>class</key>
+						<string>contrib.performance.loadtest.profiles.Eventer</string>
+
+						<key>params</key>
+						<dict>
+							<key>enabled</key>
+							<true/>
+
+							<!-- Define the interval (in seconds) at which this profile will use 
+								its client to create a new event. -->
+							<key>interval</key>
+							<integer>60</integer>
+
+							<!-- Define how start times (DTSTART) for the randomly generated events 
+								will be selected. This is an example of a "Distribution" parameter. The value 
+								for most "Distribution" parameters are interchangeable and extensible. -->
+							<key>eventStartDistribution</key>
+							<dict>
+
+								<!-- This distribution is pretty specialized. It produces timestamps 
+									in the near future, limited to certain days of the week and certain hours 
+									of the day. -->
+								<key>type</key>
+								<string>contrib.performance.stats.WorkDistribution</string>
+
+								<key>params</key>
+								<dict>
+									<!-- These are the days of the week the distribution will use. -->
+									<key>daysOfWeek</key>
+									<array>
+										<string>mon</string>
+										<string>tue</string>
+										<string>wed</string>
+										<string>thu</string>
+										<string>fri</string>
+									</array>
+
+									<!-- The earliest hour of a day at which an event might be scheduled. -->
+									<key>beginHour</key>
+									<integer>8</integer>
+
+									<!-- And the latest hour of a day (at which an event will be scheduled 
+										to begin!). -->
+									<key>endHour</key>
+									<integer>16</integer>
+
+									<!-- The timezone in which the event is scheduled. (XXX Does this 
+										really work right?) -->
+									<key>tzname</key>
+									<string>America/Los_Angeles</string>
+								</dict>
+							</dict>
+						</dict>
+					</dict>
+
+					<!-- This profile invites new attendees to existing events. -->
+					<dict>
+						<key>class</key>
+						<string>contrib.performance.loadtest.profiles.Inviter</string>
+
+						<key>params</key>
+						<dict>
+							<key>enabled</key>
+							<true/>
+
+							<!-- Define the frequency at which new invitations will be sent out. -->
+							<key>sendInvitationDistribution</key>
+							<dict>
+								<key>type</key>
+								<string>contrib.performance.stats.NormalDistribution</string>
+								<key>params</key>
+								<dict>
+									<!-- mu gives the mean of the normal distribution (in seconds). -->
+									<key>mu</key>
+									<integer>60</integer>
+
+									<!-- and sigma gives its standard deviation. -->
+									<key>sigma</key>
+									<integer>5</integer>
+								</dict>
+							</dict>
+
+							<!-- Define the distribution of who will be invited to an event. Each 
+								set of credentials loaded by the load tester has an index; samples from this 
+								distribution will be added to that index to arrive at the index of some other 
+								credentials, which will be the target of the invitation. -->
+							<key>inviteeDistanceDistribution</key>
+							<dict>
+								<key>type</key>
+								<string>contrib.performance.stats.UniformIntegerDistribution</string>
+								<key>params</key>
+								<dict>
+									<!-- The minimum value (inclusive) of the uniform distribution. -->
+									<key>min</key>
+									<integer>-100</integer>
+									<!-- The maximum value (exclusive) of the uniform distribution. -->
+									<key>max</key>
+									<integer>101</integer>
+								</dict>
+							</dict>
+						</dict>
+					</dict>
+
+					<!-- This profile accepts invitations to events, handles cancels, and
+					     handles replies received. -->
+					<dict>
+						<key>class</key>
+						<string>contrib.performance.loadtest.profiles.Accepter</string>
+
+						<key>params</key>
+						<dict>
+							<key>enabled</key>
+							<true/>
+
+							<!-- Define how long to wait after seeing a new invitation before 
+								accepting it. -->
+							<key>acceptDelayDistribution</key>
+							<dict>
+								<key>type</key>
+								<string>contrib.performance.stats.NormalDistribution</string>
+								<key>params</key>
+								<dict>
+									<!-- mean -->
+									<key>mu</key>
+									<integer>360</integer>
+									<!-- standard deviation -->
+									<key>sigma</key>
+									<integer>60</integer>
+								</dict>
+							</dict>
+						</dict>
+					</dict>
+				</array>
+
+				<!-- Determine the frequency at which this client configuration will 
+					appear in the clients which are created by the load tester. -->
+				<key>weight</key>
+				<integer>1</integer>
+			</dict>
+		</array>
+
+		<!-- Define some log observers to report on the load test. -->
+		<key>observers</key>
+		<array>
+			<!-- ReportStatistics generates an end-of-run summary of the HTTP requests 
+				made, their timings, and their results. -->
+			<string>contrib.performance.loadtest.population.ReportStatistics</string>
+
+			<!-- RequestLogger generates a realtime log of all HTTP requests made 
+				during the load test. -->
+			<string>contrib.performance.loadtest.ical.RequestLogger</string>
+
+			<!-- OperationLogger generates an end-of-run summary of the gross operations 
+				performed (logical operations which may span more than one HTTP request, 
+				such as inviting an attendee to an event). -->
+			<string>contrib.performance.loadtest.profiles.OperationLogger</string>
+		</array>
+	</dict>
+</plist>

Modified: CalendarServer/trunk/contrib/performance/loadtest/population.py
===================================================================
--- CalendarServer/trunk/contrib/performance/loadtest/population.py	2011-11-02 00:52:59 UTC (rev 8251)
+++ CalendarServer/trunk/contrib/performance/loadtest/population.py	2011-11-02 00:54:54 UTC (rev 8252)
@@ -1,3 +1,4 @@
+# -*- test-case-name: contrib.performance.loadtest.test_population -*-
 ##
 # Copyright (c) 2010 Apple Inc. All rights reserved.
 #
@@ -149,7 +150,8 @@
 
 
 class CalendarClientSimulator(object):
-    def __init__(self, records, populator, parameters, reactor, server):
+    def __init__(self, records, populator, parameters, reactor, server,
+                 workerIndex=0, workerCount=1):
         self._records = records
         self.populator = populator
         self.reactor = reactor
@@ -157,6 +159,8 @@
         self._pop = self.populator.populate(parameters)
         self._user = 0
         self._stopped = False
+        self.workerIndex = workerIndex
+        self.workerCount = workerCount
 
         TimezoneCache.create()
 
@@ -196,9 +200,15 @@
     def add(self, numClients):
         for _ignore_n in range(numClients):
             number = self._nextUserNumber()
+            clientType = self._pop.next()
+            if (number % self.workerCount) != self.workerIndex:
+                # If we're in a distributed work scenario and we are worker N,
+                # we have to skip all but every Nth request (since every node
+                # runs the same arrival policy).
+                continue
+
             _ignore_user, auth = self._createUser(number)
 
-            clientType = self._pop.next()
             reactor = loggedReactor(self.reactor)
             client = clientType.new(
                 reactor, self.server, self.getUserRecord(number), auth)
@@ -210,6 +220,8 @@
                 if profile.enabled:
                     d = profile.run()
                     d.addErrback(self._profileFailure, profileType, reactor)
+        # XXX this status message is prone to be slightly inaccurate, but isn't
+        # really used by much anyway.
         msg(type="status", clientCount=self._user - 1)
 
 

Modified: CalendarServer/trunk/contrib/performance/loadtest/profiles.py
===================================================================
--- CalendarServer/trunk/contrib/performance/loadtest/profiles.py	2011-11-02 00:52:59 UTC (rev 8251)
+++ CalendarServer/trunk/contrib/performance/loadtest/profiles.py	2011-11-02 00:54:54 UTC (rev 8252)
@@ -499,6 +499,7 @@
 
     def observe(self, event):
         if event.get("type") == "operation":
+            event = event.copy()
             lag = event.get('lag')
             if lag is None:
                 event['lag'] = ''

Modified: CalendarServer/trunk/contrib/performance/loadtest/sim.py
===================================================================
--- CalendarServer/trunk/contrib/performance/loadtest/sim.py	2011-11-02 00:52:59 UTC (rev 8251)
+++ CalendarServer/trunk/contrib/performance/loadtest/sim.py	2011-11-02 00:54:54 UTC (rev 8252)
@@ -1,3 +1,4 @@
+# -*- test-case-name: contrib.performance.loadtest.test_sim -*-
 ##
 # Copyright (c) 2011 Apple Inc. All rights reserved.
 #
@@ -15,6 +16,7 @@
 #
 ##
 
+from os import environ
 from xml.parsers.expat import ExpatError
 from sys import argv, stdout
 from random import Random
@@ -27,11 +29,18 @@
 from twisted.python.usage import UsageError, Options
 from twisted.python.reflect import namedAny
 
+from twisted.application.service import Service
+from twisted.application.service import MultiService
+
+from twisted.internet.protocol import ProcessProtocol
+
 from contrib.performance.loadtest.ical import SnowLeopard
 from contrib.performance.loadtest.profiles import Eventer, Inviter, Accepter
 from contrib.performance.loadtest.population import (
     Populator, ProfileType, ClientType, PopulationParameters, SmoothRampUp,
     CalendarClientSimulator)
+from twisted.internet.defer import Deferred
+from twisted.internet.defer import gatherResults
 
 
 class _DirectoryRecord(object):
@@ -64,6 +73,15 @@
         in pathObj.getContent().splitlines()]
 
 
+
+def recordsFromCount(count, uid=u"user%02d", password=u"user%02d",
+                     commonName=u"User %02d", email=u"user%02d at example.com"):
+    for i in range(1, count + 1):
+        yield _DirectoryRecord(uid % i, password % i,
+                               commonName % i, email % i)
+
+
+
 class LagTrackingReactor(object):
     """
     This reactor wraps another reactor and proxies all attribute
@@ -167,12 +185,13 @@
     @type arrival: L{Arrival}
     @type parameters: L{PopulationParameters}
 
-    @ivar records: A C{list} of L{DirectoryRecord} instances giving
+    @ivar records: A C{list} of L{_DirectoryRecord} instances giving
         user information about the accounts on the server being put
         under load.
     """
     def __init__(self, server, arrival, parameters, observers=None,
-                 records=None, reactor=None, runtime=None):
+                 records=None, reactor=None, runtime=None, workers=None,
+                 configTemplate=None, workerID=None, workerCount=1):
         if reactor is None:
             from twisted.internet import reactor
         self.server = server
@@ -182,10 +201,14 @@
         self.records = records
         self.reactor = LagTrackingReactor(reactor)
         self.runtime = runtime
+        self.workers = workers
+        self.configTemplate = configTemplate
+        self.workerID = workerID
+        self.workerCount = workerCount
 
 
     @classmethod
-    def fromCommandLine(cls, args=None):
+    def fromCommandLine(cls, args=None, output=stdout):
         if args is None:
             args = argv[1:]
 
@@ -195,50 +218,77 @@
         except UsageError, e:
             raise SystemExit(str(e))
 
-        server = 'http://127.0.0.1:8008/'
-        if 'server' in options.config:
-            server = options.config['server']
+        return cls.fromConfig(options.config, options['runtime'], output)
 
-        if 'arrival' in options.config:
-            arrival = Arrival(
-                namedAny(options.config['arrival']['factory']), 
-                options.config['arrival']['params'])
+
+    @classmethod
+    def fromConfig(cls, config, runtime=None, output=stdout, reactor=None):
+        """
+        Create a L{LoadSimulator} from a parsed instance of a configuration
+        property list.
+        """
+
+        workers = config.get("workers")
+        if workers is None:
+            # Client / place where the simulator actually runs configuration
+            workerID = config.get("workerID", 0)
+            workerCount = config.get("workerCount", 1)
+            configTemplate = None
+            server = 'http://127.0.0.1:8008/'
+            if 'server' in config:
+                server = config['server']
+
+            if 'arrival' in config:
+                arrival = Arrival(
+                    namedAny(config['arrival']['factory']), 
+                    config['arrival']['params'])
+            else:
+                arrival = Arrival(
+                    SmoothRampUp, dict(groups=10, groupSize=1, interval=3))
+
+            parameters = PopulationParameters()
+            if 'clients' in config:
+                for clientConfig in config['clients']:
+                    parameters.addClient(
+                        clientConfig["weight"],
+                        ClientType(
+                            namedAny(clientConfig["software"]),
+                            cls._convertParams(clientConfig["params"]),
+                            [ProfileType(
+                                    namedAny(profile["class"]),
+                                    cls._convertParams(profile["params"]))
+                             for profile in clientConfig["profiles"]]))
+            if not parameters.clients:
+                parameters.addClient(1,
+                                     ClientType(SnowLeopard, {},
+                                                [Eventer, Inviter, Accepter]))
         else:
-            arrival = Arrival(
-                SmoothRampUp, dict(groups=10, groupSize=1, interval=3))
+            # Manager / observer process.
+            server = ''
+            arrival = None
+            parameters = None
+            workerID = 0
+            configTemplate = config
+            workerCount = 1
 
-        parameters = PopulationParameters()
-        if 'clients' in options.config:
-            for clientConfig in options.config['clients']:
-                parameters.addClient(
-                    clientConfig["weight"],
-                    ClientType(
-                        namedAny(clientConfig["software"]),
-                        cls._convertParams(clientConfig["params"]),
-                        [ProfileType(
-                                namedAny(profile["class"]),
-                                cls._convertParams(profile["params"]))
-                         for profile in clientConfig["profiles"]]))
-        if not parameters.clients:
-            parameters.addClient(
-                1, ClientType(SnowLeopard, {}, [Eventer, Inviter, Accepter]))
-
         observers = []
-        if 'observers' in options.config:
-            for observerName in options.config['observers']:
+        if 'observers' in config:
+            for observerName in config['observers']:
                 observers.append(namedAny(observerName)())
 
         records = []
-        if 'accounts' in options.config:
-            loader = options.config['accounts']['loader']
-            params = options.config['accounts']['params']
+        if 'accounts' in config:
+            loader = config['accounts']['loader']
+            params = config['accounts']['params']
             records.extend(namedAny(loader)(**params))
-            print 'Loaded', len(records), 'accounts.'
+            output.write("Loaded {0} accounts.\n".format(len(records)))
 
-        return cls(server, arrival, parameters,
-                   observers=observers, records=records,
-                   runtime=options['runtime'])
+        return cls(server, arrival, parameters, observers=observers,
+                   records=records, runtime=runtime, reactor=reactor,
+                   workers=workers, configTemplate=configTemplate,
+                   workerID=workerID, workerCount=workerCount)
 
+
     @classmethod
     def _convertParams(cls, params):
         """
@@ -273,40 +323,206 @@
     def createSimulator(self):
         populator = Populator(Random())
         return CalendarClientSimulator(
-            self.records, populator, self.parameters, self.reactor, self.server)
+            self.records, populator, self.parameters, self.reactor, self.server,
+            self.workerID, self.workerCount
+        )
 
 
     def createArrivalPolicy(self):
         return self.arrival.factory(self.reactor, **self.arrival.parameters)
-        
 
-    def run(self):
-        for obs in self.observers:
-            addObserver(obs.observe)
-        sim = self.createSimulator()
 
-        def stop():
-            for obs in self.observers:
-                removeObserver(obs.observe)
-            sim.stop()
-        self.reactor.addSystemEventTrigger('before', 'shutdown', stop)
+    def serviceClasses(self):
+        """
+        Return a list of L{SimService} subclasses for C{attachServices} to
+        instantiate and attach to the reactor.
+        """
+        if self.workers is not None:
+            return [
+                ObserverService,
+                WorkerSpawnerService,
+                ReporterService,
+            ]
+        return [
+            ObserverService,
+            SimulatorService,
+            ReporterService,
+        ]
 
-        arrivalPolicy = self.createArrivalPolicy()
-        arrivalPolicy.run(sim)
+
+    def attachServices(self, output):
+        ms = MultiService()
+        for svcclass in self.serviceClasses():
+            svcclass(self, output).setServiceParent(ms)
+        attachService(self.reactor, ms)
+
+
+    def run(self, output=stdout):
+        self.attachServices(output)
         if self.runtime is not None:
             self.reactor.callLater(self.runtime, self.reactor.stop)
         self.reactor.run()
+
+
+main = LoadSimulator.main
+
+
+
+def attachService(reactor, service):
+    """
+    Attach a given L{IService} provider to the given L{IReactorCore}; cause it
+    to be started when the reactor starts, and stopped when the reactor stops.
+    """
+    reactor.callWhenRunning(service.startService)
+    reactor.addSystemEventTrigger('before', 'shutdown', service.stopService)
+
+
+
+class SimService(Service, object):
+    """
+    Base class for services associated with the L{LoadSimulator}.
+    """
+
+    def __init__(self, loadsim, output):
+        super(SimService, self).__init__()
+        self.loadsim = loadsim
+        self.output = output
+
+
+
+class ObserverService(SimService):
+    """
+    A service that adds and removes a L{LoadSimulator}'s set of observers at
+    start and stop time.
+    """
+
+    def startService(self):
+        """
+        Start observing.
+        """
+        super(ObserverService, self).startService()
+        for obs in self.loadsim.observers:
+            addObserver(obs.observe)
+
+
+    def stopService(self):
+        super(ObserverService, self).startService()
+        for obs in self.loadsim.observers:
+            removeObserver(obs.observe)
+
+
+
+class SimulatorService(SimService):
+    """
+    A service that starts the L{CalendarClientSimulator} associated with the
+    L{LoadSimulator} and stops it at shutdown.
+    """
+
+    def startService(self):
+        super(SimulatorService, self).startService()
+        self.clientsim = self.loadsim.createSimulator()
+        arrivalPolicy = self.loadsim.createArrivalPolicy()
+        arrivalPolicy.run(self.clientsim)
+
+
+    def stopService(self):
+        super(SimulatorService, self).stopService()
+        return self.clientsim.stop()
+
+
+
+class ReporterService(SimService):
+    """
+    A service which reports all the results from all the observers on a load
+    simulator when it is stopped.
+    """
+
+    def stopService(self):
+        """
+        Emit the report to the specified output file.
+        """
+        super(ReporterService, self).stopService()
         failures = []
-        for obs in self.observers:
+        for obs in self.loadsim.observers:
             obs.report()
             failures.extend(obs.failures())
         if failures:
-            print 'FAIL'
-            print '\n'.join(failures)
+            self.output.write('FAIL\n')
+            self.output.write('\n'.join(failures))
+            self.output.write('\n')
         else:
-            print 'PASS'
+            self.output.write('PASS\n')
 
-main = LoadSimulator.main
 
+
+class ProcessProtocolBridge(ProcessProtocol):
+
+    def __init__(self, spawner, proto):
+        self.spawner = spawner
+        self.proto = proto
+        self.deferred = Deferred()
+
+
+    def connectionMade(self):
+        self.transport.getPeer = self.getPeer
+        self.transport.getHost = self.getHost
+        self.proto.makeConnection(self.transport)
+
+
+    def getPeer(self):
+        return "Peer:PID:" + str(self.transport.pid)
+
+
+    def getHost(self):
+        return "Host:PID:" + str(self.transport.pid)
+
+
+    def outReceived(self, data):
+        self.proto.dataReceived(data)
+
+
+    def errReceived(self, error):
+        from twisted.python.log import msg
+        msg("stderr received from " + str(self.transport.pid))
+        msg("    " + repr(error))
+
+
+    def processEnded(self, reason):
+        self.proto.connectionLost(reason)
+        self.deferred.callback(None)
+        self.spawner.bridges.remove(self)
+
+
+
+class WorkerSpawnerService(SimService):
+
+    def startService(self):
+        from contrib.performance.loadtest.ampsim import Manager
+        super(WorkerSpawnerService, self).startService()
+        self.bridges = []
+        for workerID, worker in enumerate(self.loadsim.workers):
+            bridge = ProcessProtocolBridge(
+                self, Manager(self.loadsim, workerID, len(self.loadsim.workers),
+                              self.output)
+            )
+            self.bridges.append(bridge)
+            sh = '/bin/sh'
+            self.loadsim.reactor.spawnProcess(
+                bridge, sh, [sh, "-c", worker], env=environ
+            )
+
+
+    def stopService(self):
+        TERMINATE_TIMEOUT = 30.0
+        def killThemAll(name):
+            for bridge in self.bridges:
+                bridge.transport.signalProcess(name)
+        killThemAll("TERM")
+        self.loadsim.reactor.callLater(TERMINATE_TIMEOUT, killThemAll, "KILL")
+        return gatherResults([bridge.deferred for bridge in self.bridges])
+
+
+
 if __name__ == '__main__':
     main()
+

Modified: CalendarServer/trunk/contrib/performance/loadtest/test_sim.py
===================================================================
--- CalendarServer/trunk/contrib/performance/loadtest/test_sim.py	2011-11-02 00:52:59 UTC (rev 8251)
+++ CalendarServer/trunk/contrib/performance/loadtest/test_sim.py	2011-11-02 00:54:54 UTC (rev 8252)
@@ -16,6 +16,7 @@
 ##
 
 from plistlib import writePlistToString
+from cStringIO import StringIO
 
 from twisted.python.log import msg
 from twisted.python.usage import UsageError
@@ -176,14 +177,28 @@
 class Reactor(object):
     message = "some event to be observed"
 
+    def __init__(self):
+        self._triggers = []
+        self._whenRunning = []
+
+
     def run(self):
+        for thunk in self._whenRunning:
+            thunk()
         msg(self.message)
+        for phase, event, thunk in self._triggers:
+            if event == 'shutdown':
+                thunk()
 
 
-    def addSystemEventTrigger(self, *args):
-        pass
+    def callWhenRunning(self, thunk):
+        self._whenRunning.append(thunk)
 
 
+    def addSystemEventTrigger(self, phase, event, thunk):
+        self._triggers.append((phase, event, thunk))
+
+
 class Observer(object):
     def __init__(self):
         self.reported = False
@@ -261,7 +276,9 @@
             }
         configpath = FilePath(self.mktemp())
         configpath.setContent(writePlistToString(config))
-        sim = LoadSimulator.fromCommandLine(['--config', configpath.path])
+        io = StringIO()
+        sim = LoadSimulator.fromCommandLine(['--config', configpath.path], io)
+        self.assertEquals(io.getvalue(), "Loaded 2 accounts.\n")
         self.assertEqual(2, len(sim.records))
         self.assertEqual(sim.records[0].uid, 'foo')
         self.assertEqual(sim.records[0].password, 'bar')
@@ -287,7 +304,8 @@
             }
         configpath = FilePath(self.mktemp())
         configpath.setContent(writePlistToString(config))
-        sim = LoadSimulator.fromCommandLine(['--config', configpath.path])
+        sim = LoadSimulator.fromCommandLine(['--config', configpath.path],
+                                            StringIO())
         self.assertEqual(99, len(sim.records))
         self.assertEqual(sim.records[0].uid, 'user01')
         self.assertEqual(sim.records[0].password, 'user01')
@@ -313,7 +331,8 @@
         }
         configpath = FilePath(self.mktemp())
         configpath.setContent(writePlistToString(config))
-        sim = LoadSimulator.fromCommandLine(['--config', configpath.path])
+        sim = LoadSimulator.fromCommandLine(['--config', configpath.path],
+                                            StringIO())
         self.assertEqual(2, len(sim.records))
         self.assertEqual(sim.records[0].uid, 'user1')
         self.assertEqual(sim.records[0].password, 'user1')
@@ -343,7 +362,8 @@
         }
         configpath = FilePath(self.mktemp())
         configpath.setContent(writePlistToString(config))
-        sim = LoadSimulator.fromCommandLine(['--config', configpath.path])
+        sim = LoadSimulator.fromCommandLine(['--config', configpath.path],
+                                            StringIO())
         self.assertEqual(3, len(sim.records))
         self.assertEqual(sim.records[0].uid, 'USER001')
         self.assertEqual(sim.records[0].password, 'PASSWORD001')
@@ -491,7 +511,10 @@
             "http://example.com:123/",
             Arrival(lambda reactor: NullArrival(), {}),
             None, observers, reactor=Reactor())
-        sim.run()
+        io = StringIO()
+        sim.run(io)
+        self.assertEquals(io.getvalue(), "PASS\n")
         self.assertTrue(observers[0].reported)
         self.assertEquals(
             observers[0].events[0]['message'], (Reactor.message,))
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111101/be99b3f5/attachment-0001.html>


More information about the calendarserver-changes mailing list