[CalendarServer-changes] [8244] CalendarServer/branches/users/glyph/parallel-sim/contrib/performance /loadtest

source_changes at macosforge.org source_changes at macosforge.org
Tue Nov 1 17:51:14 PDT 2011


Revision: 8244
          http://trac.macosforge.org/projects/calendarserver/changeset/8244
Author:   glyph at apple.com
Date:     2011-11-01 17:51:14 -0700 (Tue, 01 Nov 2011)
Log Message:
-----------
first cut at distributed sim

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py

Added Paths:
-----------
    CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py
    CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/config.dist.plist

Added: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py	2011-11-02 00:51:14 UTC (rev 8244)
@@ -0,0 +1,127 @@
+##
+# Copyright (c) 2011 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+##
+
+"""
+AMP-based simulator.
+"""
+
+if __name__ == '__main__':
+    # When run as a script, this is the worker process, receiving commands over
+    # stdin.
+    import traceback
+    try:
+        from twisted.python.log import startLogging
+        from sys import stderr, exit
+
+        startLogging(stderr)
+
+        from twisted.internet import reactor
+        from twisted.internet.stdio import StandardIO
+
+        from contrib.performance.loadtest.ampsim import Worker
+
+        StandardIO(Worker(reactor))
+        reactor.run()
+    except:
+        traceback.print_exc()
+        exit(1)
+    else:
+        exit(0)
+
+
+from copy import deepcopy
+
+from plistlib import writePlistToString
+from twisted.protocols.amp import AMP, Command, String
+from twext.enterprise.adbapi2 import Pickle
+
+from twisted.python.log import msg, addObserver
+
+
+class Configure(Command):
+    """
+    Configure this worker process with the text of an XML property list.
+    """
+    arguments = [("plist", String())]
+
+
+
+class LogMessage(Command):
+    """
+    A log message was received.
+    """
+    arguments = [("event", Pickle())]
+
+
+
+class Worker(AMP):
+    """
+    Protocol to be run in the worker process, to handle messages from its
+    manager.
+    """
+
+    def __init__(self, reactor):
+        super(Worker, self).__init__()
+        self.reactor = reactor
+
+
+    @Configure.responder
+    def config(self, plist):
+        from plistlib import readPlistFromString
+        from contrib.performance.loadtest.sim import LoadSimulator
+        cfg = readPlistFromString(plist)
+        addObserver(self.emit)
+        sim = LoadSimulator.fromConfig(cfg)
+        sim.attachServices()
+        return {}
+
+
+    def emit(self, eventDict):
+        self.reactor.callFromThread(
+            self.callRemote, LogMessage, event=eventDict
+        )
+
+
+
+class Manager(AMP):
+    """
+    Protocol to be run in the coordinating process, to respond to messages from
+    a single worker.
+    """
+
+    def __init__(self, loadsim, whichWorker, numWorkers):
+        super(Manager, self).__init__()
+        self.loadsim = loadsim
+        self.whichWorker = whichWorker
+        self.numWorkers = numWorkers
+
+
+    def connectionMade(self):
+        super(Manager, self).connectionMade()
+        workerConfig = deepcopy(self.loadsim.configTemplate)
+        del workerConfig["workers"]
+        workerConfig["workerID"] = self.whichWorker
+        workerConfig["workerCount"] = self.numWorkers
+        self.callRemote(Configure, plist=writePlistToString(workerConfig))
+
+
+    @LogMessage.responder
+    def observed(self, event):
+        msg(**event)
+        return {}
+
+

Added: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/config.dist.plist
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/config.dist.plist	                        (rev 0)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/config.dist.plist	2011-11-02 00:51:14 UTC (rev 8244)
@@ -0,0 +1,290 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+    Copyright (c) 2011 Apple Inc. All rights reserved.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+  -->
+
+<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
+<plist version="1.0">
+	<dict>
+		<!-- This is a distributed orchestrator configuration; 'workers' is a list of
+							shell commands to run sub-processes.
+							-->
+		<key>workers</key>
+		<array>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+			<string>./python contrib/performance/loadtest/ampsim.py</string>
+		</array>
+		<!-- Identify the server to be load tested. -->
+		<key>server</key>
+		<string>https://127.0.0.1:8443/</string>
+
+		<!-- Define the credentials of the clients which will be used to load test 
+			the server. These credentials must already be valid on the server. -->
+		<key>accounts</key>
+		<dict>
+			<!-- The loader is the fully-qualified Python name of a callable which 
+				returns a list of directory service records defining all of the client accounts 
+				to use. contrib.performance.loadtest.sim.recordsFromCSVFile reads username, 
+				password, mailto triples from a CSV file and returns them as a list of faked 
+				directory service records. -->
+			<key>loader</key>
+			<string>contrib.performance.loadtest.sim.recordsFromCSVFile</string>
+
+			<!-- Keyword arguments may be passed to the loader. -->
+			<key>params</key>
+			<dict>
+				<!-- recordsFromCSVFile interprets the path relative to the config.plist, 
+					to make it independent of the script's working directory while still allowing 
+					a relative path. This isn't a great solution. -->
+				<key>path</key>
+				<string>contrib/performance/loadtest/accounts.csv</string>
+			</dict>
+		</dict>
+
+		<!-- Define how many clients will participate in the load test and how 
+			they will show up. -->
+		<key>arrival</key>
+		<dict>
+
+			<!-- Specify a class which creates new clients and introduces them into 
+				the test. contrib.performance.loadtest.population.SmoothRampUp introduces 
+				groups of new clients at fixed intervals up to a maximum. The size of the 
+				group, interval, and maximum are configured by the parameters below. The 
+				total number of clients is groups * groupSize, which needs to be no larger 
+				than the number of credentials created in the accounts section. -->
+			<key>factory</key>
+			<string>contrib.performance.loadtest.population.SmoothRampUp</string>
+
+			<key>params</key>
+			<dict>
+				<!-- groups gives the total number of groups of clients to introduce. -->
+				<key>groups</key>
+				<integer>99</integer>
+
+				<!-- groupSize is the number of clients in each group of clients. It's 
+					really only a "smooth" ramp up if this is pretty small. -->
+				<key>groupSize</key>
+				<integer>1</integer>
+
+				<!-- Number of seconds between the introduction of each group. -->
+				<key>interval</key>
+				<integer>3</integer>
+			</dict>
+
+		</dict>
+
+		<!-- Define the kinds of software and user behavior the load simulation 
+			will simulate. -->
+		<key>clients</key>
+
+		<!-- Have as many different kinds of software and user behavior configurations 
+			as you want. Each is a dict -->
+		<array>
+
+			<dict>
+
+				<!-- Here is a Snow Leopard iCal simulator. -->
+				<key>software</key>
+				<string>contrib.performance.loadtest.ical.SnowLeopard</string>
+
+				<!-- Arguments to use to initialize the SnowLeopard instance. -->
+				<key>params</key>
+				<dict>
+					<!-- SnowLeopard can poll the calendar home at some interval. This is 
+						in seconds. -->
+					<key>calendarHomePollInterval</key>
+					<integer>30</integer>
+
+					<!-- If the server advertises xmpp push, SnowLeopard can wait for notifications 
+						about calendar home changes instead of polling for them periodically. If 
+						this option is true, then look for the server advertisement for xmpp push 
+						and use it if possible. Still fall back to polling if there is no xmpp push 
+						advertised. -->
+					<key>supportPush</key>
+					<false />
+				</dict>
+
+				<!-- The profiles define certain types of user behavior on top of the 
+					client software being simulated. -->
+				<key>profiles</key>
+				<array>
+
+					<!-- First an event-creating profile, which will periodically create 
+						new events at a random time on a random calendar. -->
+					<dict>
+						<key>class</key>
+						<string>contrib.performance.loadtest.profiles.Eventer</string>
+
+						<key>params</key>
+						<dict>
+							<key>enabled</key>
+							<true/>
+
+							<!-- Define the interval (in seconds) at which this profile will use 
+								its client to create a new event. -->
+							<key>interval</key>
+							<integer>60</integer>
+
+							<!-- Define how start times (DTSTART) for the randomly generated events 
+								will be selected. This is an example of a "Distribution" parameter. The value 
+								for most "Distribution" parameters are interchangeable and extensible. -->
+							<key>eventStartDistribution</key>
+							<dict>
+
+								<!-- This distribution is pretty specialized. It produces timestamps 
+									in the near future, limited to certain days of the week and certain hours 
+									of the day. -->
+								<key>type</key>
+								<string>contrib.performance.stats.WorkDistribution</string>
+
+								<key>params</key>
+								<dict>
+									<!-- These are the days of the week the distribution will use. -->
+									<key>daysOfWeek</key>
+									<array>
+										<string>mon</string>
+										<string>tue</string>
+										<string>wed</string>
+										<string>thu</string>
+										<string>fri</string>
+									</array>
+
+									<!-- The earliest hour of a day at which an event might be scheduled. -->
+									<key>beginHour</key>
+									<integer>8</integer>
+
+									<!-- And the latest hour of a day (at which an event will be scheduled 
+										to begin!). -->
+									<key>endHour</key>
+									<integer>16</integer>
+
+									<!-- The timezone in which the event is scheduled. (XXX Does this 
+										really work right?) -->
+									<key>tzname</key>
+									<string>America/Los_Angeles</string>
+								</dict>
+							</dict>
+						</dict>
+					</dict>
+
+					<!-- This profile invites new attendees to existing events. -->
+					<dict>
+						<key>class</key>
+						<string>contrib.performance.loadtest.profiles.Inviter</string>
+
+						<key>params</key>
+						<dict>
+							<key>enabled</key>
+							<true/>
+
+							<!-- Define the frequency at which new invitations will be sent out. -->
+							<key>sendInvitationDistribution</key>
+							<dict>
+								<key>type</key>
+								<string>contrib.performance.stats.NormalDistribution</string>
+								<key>params</key>
+								<dict>
+									<!-- mu gives the mean of the normal distribution (in seconds). -->
+									<key>mu</key>
+									<integer>60</integer>
+
+									<!-- and sigma gives its standard deviation. -->
+									<key>sigma</key>
+									<integer>5</integer>
+								</dict>
+							</dict>
+
+							<!-- Define the distribution of who will be invited to an event. Each 
+								set of credentials loaded by the load tester has an index; samples from this 
+								distribution will be added to that index to arrive at the index of some other 
+								credentials, which will be the target of the invitation. -->
+							<key>inviteeDistanceDistribution</key>
+							<dict>
+								<key>type</key>
+								<string>contrib.performance.stats.UniformIntegerDistribution</string>
+								<key>params</key>
+								<dict>
+									<!-- The minimum value (inclusive) of the uniform distribution. -->
+									<key>min</key>
+									<integer>-100</integer>
+									<!-- The maximum value (exclusive) of the uniform distribution. -->
+									<key>max</key>
+									<integer>101</integer>
+								</dict>
+							</dict>
+						</dict>
+					</dict>
+
+					<!-- This profile accepts invitations to events, handles cancels, and
+					     handles replies received. -->
+					<dict>
+						<key>class</key>
+						<string>contrib.performance.loadtest.profiles.Accepter</string>
+
+						<key>params</key>
+						<dict>
+							<key>enabled</key>
+							<true/>
+
+							<!-- Define how long to wait after seeing a new invitation before 
+								accepting it. -->
+							<key>acceptDelayDistribution</key>
+							<dict>
+								<key>type</key>
+								<string>contrib.performance.stats.NormalDistribution</string>
+								<key>params</key>
+								<dict>
+									<!-- mean -->
+									<key>mu</key>
+									<integer>360</integer>
+									<!-- standard deviation -->
+									<key>sigma</key>
+									<integer>60</integer>
+								</dict>
+							</dict>
+						</dict>
+					</dict>
+				</array>
+
+				<!-- Determine the frequency at which this client configuration will 
+					appear in the clients which are created by the load tester. -->
+				<key>weight</key>
+				<integer>1</integer>
+			</dict>
+		</array>
+
+		<!-- Define some log observers to report on the load test. -->
+		<key>observers</key>
+		<array>
+			<!-- ReportStatistics generates an end-of-run summary of the HTTP requests 
+				made, their timings, and their results. -->
+			<string>contrib.performance.loadtest.population.ReportStatistics</string>
+
+			<!-- RequestLogger generates a realtime log of all HTTP requests made 
+				during the load test. -->
+			<string>contrib.performance.loadtest.ical.RequestLogger</string>
+
+			<!-- OperationLogger generates an end-of-run summary of the gross operations 
+				performed (logical operations which may span more than one HTTP request, 
+				such as inviting an attendee to an event). -->
+			<string>contrib.performance.loadtest.profiles.OperationLogger</string>
+		</array>
+	</dict>
+</plist>

Modified: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py	2011-11-02 00:50:59 UTC (rev 8243)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py	2011-11-02 00:51:14 UTC (rev 8244)
@@ -16,6 +16,7 @@
 #
 ##
 
+from os import environ
 from xml.parsers.expat import ExpatError
 from sys import argv, stdout
 from random import Random
@@ -31,11 +32,16 @@
 from twisted.application.service import Service
 from twisted.application.service import MultiService
 
+from twisted.internet.protocol import ProcessProtocol
+
 from contrib.performance.loadtest.ical import SnowLeopard
 from contrib.performance.loadtest.profiles import Eventer, Inviter, Accepter
 from contrib.performance.loadtest.population import (
     Populator, ProfileType, ClientType, PopulationParameters, SmoothRampUp,
     CalendarClientSimulator)
+from twisted.internet.defer import Deferred
+from twisted.internet.defer import gatherResults
+from contrib.performance.loadtest.ampsim import Manager
 
 
 class _DirectoryRecord(object):
@@ -176,7 +182,8 @@
         under load.
     """
     def __init__(self, server, arrival, parameters, observers=None,
-                 records=None, reactor=None, runtime=None):
+                 records=None, reactor=None, runtime=None, workers=None,
+                 configTemplate=None, workerID=None):
         if reactor is None:
             from twisted.internet import reactor
         self.server = server
@@ -186,6 +193,9 @@
         self.records = records
         self.reactor = LagTrackingReactor(reactor)
         self.runtime = runtime
+        self.workers = workers
+        self.configTemplate = configTemplate
+        self.workerID = workerID
 
 
     @classmethod
@@ -203,40 +213,52 @@
 
 
     @classmethod
-    def fromConfig(cls, config, runtime=None, output=stdout):
+    def fromConfig(cls, config, runtime=None, output=stdout, reactor=None):
         """
         Create a L{LoadSimulator} from a parsed instance of a configuration
         property list.
         """
 
-        server = 'http://127.0.0.1:8008/'
-        if 'server' in config:
-            server = config['server']
+        workers = config.get("workers")
+        if workers is None:
+            # Client / place where the simulator actually runs configuration
+            workerID = config.get("workerID")
+            configTemplate = None
+            server = 'http://127.0.0.1:8008/'
+            if 'server' in config:
+                server = config['server']
 
-        if 'arrival' in config:
-            arrival = Arrival(
-                namedAny(config['arrival']['factory']), 
-                config['arrival']['params'])
+            if 'arrival' in config:
+                arrival = Arrival(
+                    namedAny(config['arrival']['factory']), 
+                    config['arrival']['params'])
+            else:
+                arrival = Arrival(
+                    SmoothRampUp, dict(groups=10, groupSize=1, interval=3))
+
+            parameters = PopulationParameters()
+            if 'clients' in config:
+                for clientConfig in config['clients']:
+                    parameters.addClient(
+                        clientConfig["weight"],
+                        ClientType(
+                            namedAny(clientConfig["software"]),
+                            cls._convertParams(clientConfig["params"]),
+                            [ProfileType(
+                                    namedAny(profile["class"]),
+                                    cls._convertParams(profile["params"]))
+                             for profile in clientConfig["profiles"]]))
+            if not parameters.clients:
+                parameters.addClient(1,
+                                     ClientType(SnowLeopard, {},
+                                                [Eventer, Inviter, Accepter]))
         else:
-            arrival = Arrival(
-                SmoothRampUp, dict(groups=10, groupSize=1, interval=3))
+            # Manager / observer process.
+            server = ''
+            arrival = None
+            parameters = None
+            configTemplate = config
 
-        parameters = PopulationParameters()
-        if 'clients' in config:
-            for clientConfig in config['clients']:
-                parameters.addClient(
-                    clientConfig["weight"],
-                    ClientType(
-                        namedAny(clientConfig["software"]),
-                        cls._convertParams(clientConfig["params"]),
-                        [ProfileType(
-                                namedAny(profile["class"]),
-                                cls._convertParams(profile["params"]))
-                         for profile in clientConfig["profiles"]]))
-        if not parameters.clients:
-            parameters.addClient(
-                1, ClientType(SnowLeopard, {}, [Eventer, Inviter, Accepter]))
-
         observers = []
         if 'observers' in config:
             for observerName in config['observers']:
@@ -249,9 +271,10 @@
             records.extend(namedAny(loader)(**params))
             output.write("Loaded {0} accounts.\n".format(len(records)))
 
-        return cls(server, arrival, parameters,
-                   observers=observers, records=records,
-                   runtime=runtime)
+        return cls(server, arrival, parameters, observers=observers,
+                   records=records, runtime=runtime, reactor=reactor,
+                   workers=workers, configTemplate=configTemplate,
+                   workerID=workerID)
 
 
     @classmethod
@@ -295,24 +318,42 @@
         return self.arrival.factory(self.reactor, **self.arrival.parameters)
 
 
-    def run(self, output=stdout):
+    def serviceClasses(self):
+        """
+        Return a list of L{SimService} subclasses for C{attachServices} to
+        instantiate and attach to the reactor.
+        """
+        if self.workers is not None:
+            return [
+                WorkerSpawnerService,
+            ]
+        return [
+            ObserverService,
+            SimulatorService,
+            ReporterService,
+        ]
+
+
+    def attachServices(self, output):
         ms = MultiService()
-        for svcclass in [
-                ObserverService,
-                SimulatorService,
-                ReporterService,
-            ]:
+        for svcclass in self.serviceClasses():
             svcclass(self, output).setServiceParent(ms)
-
         attachService(self.reactor, ms)
 
+
+    def run(self, output=stdout):
+        self.attachServices(output)
+
         if self.runtime is not None:
             self.reactor.callLater(self.runtime, self.reactor.stop)
 
         self.reactor.run()
 
 
+main = LoadSimulator.main
 
+
+
 def attachService(reactor, service):
     """
     Attach a given L{IService} provider to the given L{IReactorCore}; cause it
@@ -383,6 +424,9 @@
     """
 
     def stopService(self):
+        """
+        Emit the report to the specified output file.
+        """
         super(ReporterService, self).stopService()
         failures = []
         for obs in self.loadsim.observers:
@@ -396,9 +440,57 @@
             self.output.write('PASS\n')
 
 
-main = LoadSimulator.main
 
+class ProcessProtocolBridge(ProcessProtocol):
 
+    def __init__(self, spawner, proto):
+        self.spawner = spawner
+        self.proto = proto
+        self.deferred = Deferred()
 
+
+    def connectionMade(self):
+        self.proto.makeConnection(self.transport)
+
+
+    def dataReceived(self, data):
+        self.proto.dataReceived(data)
+
+
+    def connectionLost(self, reactor):
+        self.proto.connectionLost(reactor)
+        self.deferred.callback(None)
+        self.spawner.bridges.remove(self)
+
+
+
+class WorkerSpawnerService(SimService):
+
+    def startService(self):
+        super(WorkerSpawnerService, self).startService()
+        self.bridges = []
+        for workerID, worker in enumerate(self.loadsim.workers):
+            bridge = ProcessProtocolBridge(
+                self, Manager(self.loadsim, workerID, len(self.loadsim.workers))
+            )
+            self.bridges.append(bridge)
+            sh = '/bin/sh'
+            self.reactor.spawnProcess(
+                bridge, sh, [sh, "-c", worker], env=environ
+            )
+
+
+    def stopService(self):
+        TERMINATE_TIMEOUT = 30.0
+        def killThemAll(name):
+            for bridge in self.bridges:
+                bridge.transport.signalProcess(name)
+        killThemAll("TERM")
+        self.reactor.callLater(TERMINATE_TIMEOUT, killThemAll, "KILL")
+        return gatherResults([bridge.deferred for bridge in self.bridges])
+
+
+
 if __name__ == '__main__':
     main()
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111101/2261209d/attachment-0001.html>


More information about the calendarserver-changes mailing list