[CalendarServer-changes] [8244] CalendarServer/branches/users/glyph/parallel-sim/contrib/performance /loadtest
source_changes at macosforge.org
source_changes at macosforge.org
Tue Nov 1 17:51:14 PDT 2011
Revision: 8244
http://trac.macosforge.org/projects/calendarserver/changeset/8244
Author: glyph at apple.com
Date: 2011-11-01 17:51:14 -0700 (Tue, 01 Nov 2011)
Log Message:
-----------
first cut at distributed sim
Modified Paths:
--------------
CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py
Added Paths:
-----------
CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py
CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/config.dist.plist
Added: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py (rev 0)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py 2011-11-02 00:51:14 UTC (rev 8244)
@@ -0,0 +1,127 @@
+##
+# Copyright (c) 2011 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+##
+
+"""
+AMP-based simulator.
+"""
+
+if __name__ == '__main__':
+ # When run as a script, this is the worker process, receiving commands over
+ # stdin.
+ import traceback
+ try:
+ from twisted.python.log import startLogging
+ from sys import stderr, exit
+
+ startLogging(stderr)
+
+ from twisted.internet import reactor
+ from twisted.internet.stdio import StandardIO
+
+ from contrib.performance.loadtest.ampsim import Worker
+
+ StandardIO(Worker(reactor))
+ reactor.run()
+ except:
+ traceback.print_exc()
+ exit(1)
+ else:
+ exit(0)
+
+
+from copy import deepcopy
+
+from plistlib import writePlistToString
+from twisted.protocols.amp import AMP, Command, String
+from twext.enterprise.adbapi2 import Pickle
+
+from twisted.python.log import msg, addObserver
+
+
+class Configure(Command):
+ """
+ Configure this worker process with the text of an XML property list.
+ """
+ arguments = [("plist", String())]
+
+
+
+class LogMessage(Command):
+ """
+ A log message was received.
+ """
+ arguments = [("event", Pickle())]
+
+
+
+class Worker(AMP):
+ """
+ Protocol to be run in the worker process, to handle messages from its
+ manager.
+ """
+
+ def __init__(self, reactor):
+ super(Worker, self).__init__()
+ self.reactor = reactor
+
+
+ @Configure.responder
+ def config(self, plist):
+ from plistlib import readPlistFromString
+ from contrib.performance.loadtest.sim import LoadSimulator
+ cfg = readPlistFromString(plist)
+ addObserver(self.emit)
+ sim = LoadSimulator.fromConfig(cfg)
+ sim.attachServices()
+ return {}
+
+
+ def emit(self, eventDict):
+ self.reactor.callFromThread(
+ self.callRemote, LogMessage, event=eventDict
+ )
+
+
+
+class Manager(AMP):
+ """
+ Protocol to be run in the coordinating process, to respond to messages from
+ a single worker.
+ """
+
+ def __init__(self, loadsim, whichWorker, numWorkers):
+ super(Manager, self).__init__()
+ self.loadsim = loadsim
+ self.whichWorker = whichWorker
+ self.numWorkers = numWorkers
+
+
+ def connectionMade(self):
+ super(Manager, self).connectionMade()
+ workerConfig = deepcopy(self.loadsim.configTemplate)
+ del workerConfig["workers"]
+ workerConfig["workerID"] = self.whichWorker
+ workerConfig["workerCount"] = self.numWorkers
+ self.callRemote(Configure, plist=writePlistToString(workerConfig))
+
+
+ @LogMessage.responder
+ def observed(self, event):
+ msg(**event)
+ return {}
+
+
Added: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/config.dist.plist
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/config.dist.plist (rev 0)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/config.dist.plist 2011-11-02 00:51:14 UTC (rev 8244)
@@ -0,0 +1,290 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Copyright (c) 2011 Apple Inc. All rights reserved.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
+<plist version="1.0">
+ <dict>
+ <!-- This is a distributed orchestrator configuration; 'workers' is a list of
+ shell commands to run sub-processes.
+ -->
+ <key>workers</key>
+ <array>
+ <string>./python contrib/performance/loadtest/ampsim.py</string>
+ <string>./python contrib/performance/loadtest/ampsim.py</string>
+ <string>./python contrib/performance/loadtest/ampsim.py</string>
+ <string>./python contrib/performance/loadtest/ampsim.py</string>
+ <string>./python contrib/performance/loadtest/ampsim.py</string>
+ <string>./python contrib/performance/loadtest/ampsim.py</string>
+ </array>
+ <!-- Identify the server to be load tested. -->
+ <key>server</key>
+ <string>https://127.0.0.1:8443/</string>
+
+ <!-- Define the credentials of the clients which will be used to load test
+ the server. These credentials must already be valid on the server. -->
+ <key>accounts</key>
+ <dict>
+ <!-- The loader is the fully-qualified Python name of a callable which
+ returns a list of directory service records defining all of the client accounts
+ to use. contrib.performance.loadtest.sim.recordsFromCSVFile reads username,
+ password, mailto triples from a CSV file and returns them as a list of faked
+ directory service records. -->
+ <key>loader</key>
+ <string>contrib.performance.loadtest.sim.recordsFromCSVFile</string>
+
+ <!-- Keyword arguments may be passed to the loader. -->
+ <key>params</key>
+ <dict>
+ <!-- recordsFromCSVFile interprets the path relative to the config.plist,
+ to make it independent of the script's working directory while still allowing
+ a relative path. This isn't a great solution. -->
+ <key>path</key>
+ <string>contrib/performance/loadtest/accounts.csv</string>
+ </dict>
+ </dict>
+
+ <!-- Define how many clients will participate in the load test and how
+ they will show up. -->
+ <key>arrival</key>
+ <dict>
+
+ <!-- Specify a class which creates new clients and introduces them into
+ the test. contrib.performance.loadtest.population.SmoothRampUp introduces
+ groups of new clients at fixed intervals up to a maximum. The size of the
+ group, interval, and maximum are configured by the parameters below. The
+ total number of clients is groups * groupSize, which needs to be no larger
+ than the number of credentials created in the accounts section. -->
+ <key>factory</key>
+ <string>contrib.performance.loadtest.population.SmoothRampUp</string>
+
+ <key>params</key>
+ <dict>
+ <!-- groups gives the total number of groups of clients to introduce. -->
+ <key>groups</key>
+ <integer>99</integer>
+
+ <!-- groupSize is the number of clients in each group of clients. It's
+ really only a "smooth" ramp up if this is pretty small. -->
+ <key>groupSize</key>
+ <integer>1</integer>
+
+ <!-- Number of seconds between the introduction of each group. -->
+ <key>interval</key>
+ <integer>3</integer>
+ </dict>
+
+ </dict>
+
+ <!-- Define the kinds of software and user behavior the load simulation
+ will simulate. -->
+ <key>clients</key>
+
+ <!-- Have as many different kinds of software and user behavior configurations
+ as you want. Each is a dict -->
+ <array>
+
+ <dict>
+
+ <!-- Here is a Snow Leopard iCal simulator. -->
+ <key>software</key>
+ <string>contrib.performance.loadtest.ical.SnowLeopard</string>
+
+ <!-- Arguments to use to initialize the SnowLeopard instance. -->
+ <key>params</key>
+ <dict>
+ <!-- SnowLeopard can poll the calendar home at some interval. This is
+ in seconds. -->
+ <key>calendarHomePollInterval</key>
+ <integer>30</integer>
+
+ <!-- If the server advertises xmpp push, SnowLeopard can wait for notifications
+ about calendar home changes instead of polling for them periodically. If
+ this option is true, then look for the server advertisement for xmpp push
+ and use it if possible. Still fall back to polling if there is no xmpp push
+ advertised. -->
+ <key>supportPush</key>
+ <false />
+ </dict>
+
+ <!-- The profiles define certain types of user behavior on top of the
+ client software being simulated. -->
+ <key>profiles</key>
+ <array>
+
+ <!-- First an event-creating profile, which will periodically create
+ new events at a random time on a random calendar. -->
+ <dict>
+ <key>class</key>
+ <string>contrib.performance.loadtest.profiles.Eventer</string>
+
+ <key>params</key>
+ <dict>
+ <key>enabled</key>
+ <true/>
+
+ <!-- Define the interval (in seconds) at which this profile will use
+ its client to create a new event. -->
+ <key>interval</key>
+ <integer>60</integer>
+
+ <!-- Define how start times (DTSTART) for the randomly generated events
+ will be selected. This is an example of a "Distribution" parameter. The value
+ for most "Distribution" parameters are interchangeable and extensible. -->
+ <key>eventStartDistribution</key>
+ <dict>
+
+ <!-- This distribution is pretty specialized. It produces timestamps
+ in the near future, limited to certain days of the week and certain hours
+ of the day. -->
+ <key>type</key>
+ <string>contrib.performance.stats.WorkDistribution</string>
+
+ <key>params</key>
+ <dict>
+ <!-- These are the days of the week the distribution will use. -->
+ <key>daysOfWeek</key>
+ <array>
+ <string>mon</string>
+ <string>tue</string>
+ <string>wed</string>
+ <string>thu</string>
+ <string>fri</string>
+ </array>
+
+ <!-- The earliest hour of a day at which an event might be scheduled. -->
+ <key>beginHour</key>
+ <integer>8</integer>
+
+ <!-- And the latest hour of a day (at which an event will be scheduled
+ to begin!). -->
+ <key>endHour</key>
+ <integer>16</integer>
+
+ <!-- The timezone in which the event is scheduled. (XXX Does this
+ really work right?) -->
+ <key>tzname</key>
+ <string>America/Los_Angeles</string>
+ </dict>
+ </dict>
+ </dict>
+ </dict>
+
+ <!-- This profile invites new attendees to existing events. -->
+ <dict>
+ <key>class</key>
+ <string>contrib.performance.loadtest.profiles.Inviter</string>
+
+ <key>params</key>
+ <dict>
+ <key>enabled</key>
+ <true/>
+
+ <!-- Define the frequency at which new invitations will be sent out. -->
+ <key>sendInvitationDistribution</key>
+ <dict>
+ <key>type</key>
+ <string>contrib.performance.stats.NormalDistribution</string>
+ <key>params</key>
+ <dict>
+ <!-- mu gives the mean of the normal distribution (in seconds). -->
+ <key>mu</key>
+ <integer>60</integer>
+
+ <!-- and sigma gives its standard deviation. -->
+ <key>sigma</key>
+ <integer>5</integer>
+ </dict>
+ </dict>
+
+ <!-- Define the distribution of who will be invited to an event. Each
+ set of credentials loaded by the load tester has an index; samples from this
+ distribution will be added to that index to arrive at the index of some other
+ credentials, which will be the target of the invitation. -->
+ <key>inviteeDistanceDistribution</key>
+ <dict>
+ <key>type</key>
+ <string>contrib.performance.stats.UniformIntegerDistribution</string>
+ <key>params</key>
+ <dict>
+ <!-- The minimum value (inclusive) of the uniform distribution. -->
+ <key>min</key>
+ <integer>-100</integer>
+ <!-- The maximum value (exclusive) of the uniform distribution. -->
+ <key>max</key>
+ <integer>101</integer>
+ </dict>
+ </dict>
+ </dict>
+ </dict>
+
+ <!-- This profile accepts invitations to events, handles cancels, and
+ handles replies received. -->
+ <dict>
+ <key>class</key>
+ <string>contrib.performance.loadtest.profiles.Accepter</string>
+
+ <key>params</key>
+ <dict>
+ <key>enabled</key>
+ <true/>
+
+ <!-- Define how long to wait after seeing a new invitation before
+ accepting it. -->
+ <key>acceptDelayDistribution</key>
+ <dict>
+ <key>type</key>
+ <string>contrib.performance.stats.NormalDistribution</string>
+ <key>params</key>
+ <dict>
+ <!-- mean -->
+ <key>mu</key>
+ <integer>360</integer>
+ <!-- standard deviation -->
+ <key>sigma</key>
+ <integer>60</integer>
+ </dict>
+ </dict>
+ </dict>
+ </dict>
+ </array>
+
+ <!-- Determine the frequency at which this client configuration will
+ appear in the clients which are created by the load tester. -->
+ <key>weight</key>
+ <integer>1</integer>
+ </dict>
+ </array>
+
+ <!-- Define some log observers to report on the load test. -->
+ <key>observers</key>
+ <array>
+ <!-- ReportStatistics generates an end-of-run summary of the HTTP requests
+ made, their timings, and their results. -->
+ <string>contrib.performance.loadtest.population.ReportStatistics</string>
+
+ <!-- RequestLogger generates a realtime log of all HTTP requests made
+ during the load test. -->
+ <string>contrib.performance.loadtest.ical.RequestLogger</string>
+
+ <!-- OperationLogger generates an end-of-run summary of the gross operations
+ performed (logical operations which may span more than one HTTP request,
+ such as inviting an attendee to an event). -->
+ <string>contrib.performance.loadtest.profiles.OperationLogger</string>
+ </array>
+ </dict>
+</plist>
Modified: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py 2011-11-02 00:50:59 UTC (rev 8243)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py 2011-11-02 00:51:14 UTC (rev 8244)
@@ -16,6 +16,7 @@
#
##
+from os import environ
from xml.parsers.expat import ExpatError
from sys import argv, stdout
from random import Random
@@ -31,11 +32,16 @@
from twisted.application.service import Service
from twisted.application.service import MultiService
+from twisted.internet.protocol import ProcessProtocol
+
from contrib.performance.loadtest.ical import SnowLeopard
from contrib.performance.loadtest.profiles import Eventer, Inviter, Accepter
from contrib.performance.loadtest.population import (
Populator, ProfileType, ClientType, PopulationParameters, SmoothRampUp,
CalendarClientSimulator)
+from twisted.internet.defer import Deferred
+from twisted.internet.defer import gatherResults
+from contrib.performance.loadtest.ampsim import Manager
class _DirectoryRecord(object):
@@ -176,7 +182,8 @@
under load.
"""
def __init__(self, server, arrival, parameters, observers=None,
- records=None, reactor=None, runtime=None):
+ records=None, reactor=None, runtime=None, workers=None,
+ configTemplate=None, workerID=None):
if reactor is None:
from twisted.internet import reactor
self.server = server
@@ -186,6 +193,9 @@
self.records = records
self.reactor = LagTrackingReactor(reactor)
self.runtime = runtime
+ self.workers = workers
+ self.configTemplate = configTemplate
+ self.workerID = workerID
@classmethod
@@ -203,40 +213,52 @@
@classmethod
- def fromConfig(cls, config, runtime=None, output=stdout):
+ def fromConfig(cls, config, runtime=None, output=stdout, reactor=None):
"""
Create a L{LoadSimulator} from a parsed instance of a configuration
property list.
"""
- server = 'http://127.0.0.1:8008/'
- if 'server' in config:
- server = config['server']
+ workers = config.get("workers")
+ if workers is None:
+ # Client / place where the simulator actually runs configuration
+ workerID = config.get("workerID")
+ configTemplate = None
+ server = 'http://127.0.0.1:8008/'
+ if 'server' in config:
+ server = config['server']
- if 'arrival' in config:
- arrival = Arrival(
- namedAny(config['arrival']['factory']),
- config['arrival']['params'])
+ if 'arrival' in config:
+ arrival = Arrival(
+ namedAny(config['arrival']['factory']),
+ config['arrival']['params'])
+ else:
+ arrival = Arrival(
+ SmoothRampUp, dict(groups=10, groupSize=1, interval=3))
+
+ parameters = PopulationParameters()
+ if 'clients' in config:
+ for clientConfig in config['clients']:
+ parameters.addClient(
+ clientConfig["weight"],
+ ClientType(
+ namedAny(clientConfig["software"]),
+ cls._convertParams(clientConfig["params"]),
+ [ProfileType(
+ namedAny(profile["class"]),
+ cls._convertParams(profile["params"]))
+ for profile in clientConfig["profiles"]]))
+ if not parameters.clients:
+ parameters.addClient(1,
+ ClientType(SnowLeopard, {},
+ [Eventer, Inviter, Accepter]))
else:
- arrival = Arrival(
- SmoothRampUp, dict(groups=10, groupSize=1, interval=3))
+ # Manager / observer process.
+ server = ''
+ arrival = None
+ parameters = None
+ configTemplate = config
- parameters = PopulationParameters()
- if 'clients' in config:
- for clientConfig in config['clients']:
- parameters.addClient(
- clientConfig["weight"],
- ClientType(
- namedAny(clientConfig["software"]),
- cls._convertParams(clientConfig["params"]),
- [ProfileType(
- namedAny(profile["class"]),
- cls._convertParams(profile["params"]))
- for profile in clientConfig["profiles"]]))
- if not parameters.clients:
- parameters.addClient(
- 1, ClientType(SnowLeopard, {}, [Eventer, Inviter, Accepter]))
-
observers = []
if 'observers' in config:
for observerName in config['observers']:
@@ -249,9 +271,10 @@
records.extend(namedAny(loader)(**params))
output.write("Loaded {0} accounts.\n".format(len(records)))
- return cls(server, arrival, parameters,
- observers=observers, records=records,
- runtime=runtime)
+ return cls(server, arrival, parameters, observers=observers,
+ records=records, runtime=runtime, reactor=reactor,
+ workers=workers, configTemplate=configTemplate,
+ workerID=workerID)
@classmethod
@@ -295,24 +318,42 @@
return self.arrival.factory(self.reactor, **self.arrival.parameters)
- def run(self, output=stdout):
+ def serviceClasses(self):
+ """
+ Return a list of L{SimService} subclasses for C{attachServices} to
+ instantiate and attach to the reactor.
+ """
+ if self.workers is not None:
+ return [
+ WorkerSpawnerService,
+ ]
+ return [
+ ObserverService,
+ SimulatorService,
+ ReporterService,
+ ]
+
+
+ def attachServices(self, output):
ms = MultiService()
- for svcclass in [
- ObserverService,
- SimulatorService,
- ReporterService,
- ]:
+ for svcclass in self.serviceClasses():
svcclass(self, output).setServiceParent(ms)
-
attachService(self.reactor, ms)
+
+ def run(self, output=stdout):
+ self.attachServices(output)
+
if self.runtime is not None:
self.reactor.callLater(self.runtime, self.reactor.stop)
self.reactor.run()
+main = LoadSimulator.main
+
+
def attachService(reactor, service):
"""
Attach a given L{IService} provider to the given L{IReactorCore}; cause it
@@ -383,6 +424,9 @@
"""
def stopService(self):
+ """
+ Emit the report to the specified output file.
+ """
super(ReporterService, self).stopService()
failures = []
for obs in self.loadsim.observers:
@@ -396,9 +440,57 @@
self.output.write('PASS\n')
-main = LoadSimulator.main
+class ProcessProtocolBridge(ProcessProtocol):
+ def __init__(self, spawner, proto):
+ self.spawner = spawner
+ self.proto = proto
+ self.deferred = Deferred()
+
+ def connectionMade(self):
+ self.proto.makeConnection(self.transport)
+
+
+ def dataReceived(self, data):
+ self.proto.dataReceived(data)
+
+
+ def connectionLost(self, reactor):
+ self.proto.connectionLost(reactor)
+ self.deferred.callback(None)
+ self.spawner.bridges.remove(self)
+
+
+
+class WorkerSpawnerService(SimService):
+
+ def startService(self):
+ super(WorkerSpawnerService, self).startService()
+ self.bridges = []
+ for workerID, worker in enumerate(self.loadsim.workers):
+ bridge = ProcessProtocolBridge(
+ self, Manager(self.loadsim, workerID, len(self.loadsim.workers))
+ )
+ self.bridges.append(bridge)
+ sh = '/bin/sh'
+ self.reactor.spawnProcess(
+ bridge, sh, [sh, "-c", worker], env=environ
+ )
+
+
+ def stopService(self):
+ TERMINATE_TIMEOUT = 30.0
+ def killThemAll(name):
+ for bridge in self.bridges:
+ bridge.transport.signalProcess(name)
+ killThemAll("TERM")
+ self.reactor.callLater(TERMINATE_TIMEOUT, killThemAll, "KILL")
+ return gatherResults([bridge.deferred for bridge in self.bridges])
+
+
+
if __name__ == '__main__':
main()
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111101/2261209d/attachment-0001.html>
More information about the calendarserver-changes
mailing list