[CalendarServer-changes] [8245] CalendarServer/branches/users/glyph/parallel-sim/contrib/performance /loadtest
source_changes at macosforge.org
source_changes at macosforge.org
Tue Nov 1 17:51:29 PDT 2011
Revision: 8245
http://trac.macosforge.org/projects/calendarserver/changeset/8245
Author: glyph at apple.com
Date: 2011-11-01 17:51:29 -0700 (Tue, 01 Nov 2011)
Log Message:
-----------
first functional parallel simulator
Modified Paths:
--------------
CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py
CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/population.py
CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/profiles.py
CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py
Modified: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py 2011-11-02 00:51:14 UTC (rev 8244)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py 2011-11-02 00:51:29 UTC (rev 8245)
@@ -33,8 +33,9 @@
from twisted.internet.stdio import StandardIO
from contrib.performance.loadtest.ampsim import Worker
+ from contrib.performance.loadtest.sim import LagTrackingReactor
- StandardIO(Worker(reactor))
+ StandardIO(Worker(LagTrackingReactor(reactor)))
reactor.run()
except:
traceback.print_exc()
@@ -83,17 +84,19 @@
def config(self, plist):
from plistlib import readPlistFromString
from contrib.performance.loadtest.sim import LoadSimulator
+ from sys import stderr
cfg = readPlistFromString(plist)
addObserver(self.emit)
sim = LoadSimulator.fromConfig(cfg)
- sim.attachServices()
+ sim.attachServices(stderr)
return {}
def emit(self, eventDict):
- self.reactor.callFromThread(
- self.callRemote, LogMessage, event=eventDict
- )
+ if 'type' in eventDict:
+ self.reactor.callFromThread(
+ self.callRemote, LogMessage, event=eventDict
+ )
@@ -103,11 +106,12 @@
a single worker.
"""
- def __init__(self, loadsim, whichWorker, numWorkers):
+ def __init__(self, loadsim, whichWorker, numWorkers, output):
super(Manager, self).__init__()
self.loadsim = loadsim
self.whichWorker = whichWorker
self.numWorkers = numWorkers
+ self.output = output
def connectionMade(self):
@@ -116,11 +120,19 @@
del workerConfig["workers"]
workerConfig["workerID"] = self.whichWorker
workerConfig["workerCount"] = self.numWorkers
- self.callRemote(Configure, plist=writePlistToString(workerConfig))
+ workerConfig["observers"] = []
+ plist = writePlistToString(workerConfig)
+ self.output.write("Initiating worker configuration\n")
+ def completed(x):
+ self.output.write("Worker configuration complete.\n")
+ (self.callRemote(Configure, plist=plist)
+ .addCallback(completed))
@LogMessage.responder
def observed(self, event):
+ #from pprint import pformat
+ #self.output.write(pformat(event)+"\n")
msg(**event)
return {}
Modified: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/population.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/population.py 2011-11-02 00:51:14 UTC (rev 8244)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/population.py 2011-11-02 00:51:29 UTC (rev 8245)
@@ -150,7 +150,8 @@
class CalendarClientSimulator(object):
- def __init__(self, records, populator, parameters, reactor, server):
+ def __init__(self, records, populator, parameters, reactor, server,
+ workerIndex=0, workerCount=1):
self._records = records
self.populator = populator
self.reactor = reactor
@@ -158,6 +159,8 @@
self._pop = self.populator.populate(parameters)
self._user = 0
self._stopped = False
+ self.workerIndex = workerIndex
+ self.workerCount = workerCount
TimezoneCache.create()
@@ -197,9 +200,15 @@
def add(self, numClients):
for _ignore_n in range(numClients):
number = self._nextUserNumber()
+ clientType = self._pop.next()
+ if (number % self.workerCount) != self.workerIndex:
+ # If we're in a distributed work scenario and we are worker N,
+ # we have to skip all but every Nth request (since every node
+ # runs the same arrival policy).
+ continue
+
_ignore_user, auth = self._createUser(number)
- clientType = self._pop.next()
reactor = loggedReactor(self.reactor)
client = clientType.new(
reactor, self.server, self.getUserRecord(number), auth)
@@ -211,6 +220,8 @@
if profile.enabled:
d = profile.run()
d.addErrback(self._profileFailure, profileType, reactor)
+ # XXX this status message is prone to be slightly inaccurate, but isn't
+ # really used by much anyway.
msg(type="status", clientCount=self._user - 1)
Modified: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/profiles.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/profiles.py 2011-11-02 00:51:14 UTC (rev 8244)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/profiles.py 2011-11-02 00:51:29 UTC (rev 8245)
@@ -499,6 +499,7 @@
def observe(self, event):
if event.get("type") == "operation":
+ event = event.copy()
lag = event.get('lag')
if lag is None:
event['lag'] = ''
Modified: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py 2011-11-02 00:51:14 UTC (rev 8244)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py 2011-11-02 00:51:29 UTC (rev 8245)
@@ -183,7 +183,7 @@
"""
def __init__(self, server, arrival, parameters, observers=None,
records=None, reactor=None, runtime=None, workers=None,
- configTemplate=None, workerID=None):
+ configTemplate=None, workerID=None, workerCount=1):
if reactor is None:
from twisted.internet import reactor
self.server = server
@@ -196,6 +196,7 @@
self.workers = workers
self.configTemplate = configTemplate
self.workerID = workerID
+ self.workerCount = workerCount
@classmethod
@@ -222,7 +223,8 @@
workers = config.get("workers")
if workers is None:
# Client / place where the simulator actually runs configuration
- workerID = config.get("workerID")
+ workerID = config.get("workerID", 0)
+ workerCount = config.get("workerCount", 1)
configTemplate = None
server = 'http://127.0.0.1:8008/'
if 'server' in config:
@@ -257,7 +259,9 @@
server = ''
arrival = None
parameters = None
+ workerID = 0
configTemplate = config
+ workerCount = 1
observers = []
if 'observers' in config:
@@ -274,7 +278,7 @@
return cls(server, arrival, parameters, observers=observers,
records=records, runtime=runtime, reactor=reactor,
workers=workers, configTemplate=configTemplate,
- workerID=workerID)
+ workerID=workerID, workerCount=workerCount)
@classmethod
@@ -311,7 +315,9 @@
def createSimulator(self):
populator = Populator(Random())
return CalendarClientSimulator(
- self.records, populator, self.parameters, self.reactor, self.server)
+ self.records, populator, self.parameters, self.reactor, self.server,
+ self.workerID, self.workerCount
+ )
def createArrivalPolicy(self):
@@ -325,7 +331,9 @@
"""
if self.workers is not None:
return [
+ ObserverService,
WorkerSpawnerService,
+ ReporterService,
]
return [
ObserverService,
@@ -343,10 +351,8 @@
def run(self, output=stdout):
self.attachServices(output)
-
if self.runtime is not None:
self.reactor.callLater(self.runtime, self.reactor.stop)
-
self.reactor.run()
@@ -450,15 +456,31 @@
def connectionMade(self):
+ self.transport.getPeer = self.getPeer
+ self.transport.getHost = self.getHost
self.proto.makeConnection(self.transport)
- def dataReceived(self, data):
+ def getPeer(self):
+ return "Peer:PID:" + str(self.transport.pid)
+
+
+ def getHost(self):
+ return "Host:PID:" + str(self.transport.pid)
+
+
+ def outReceived(self, data):
self.proto.dataReceived(data)
- def connectionLost(self, reactor):
- self.proto.connectionLost(reactor)
+ def errReceived(self, error):
+ from twisted.python.log import msg
+ msg("stderr received from " + str(self.transport.pid))
+ msg(" " + repr(error))
+
+
+ def processEnded(self, reason):
+ self.proto.connectionLost(reason)
self.deferred.callback(None)
self.spawner.bridges.remove(self)
@@ -471,11 +493,12 @@
self.bridges = []
for workerID, worker in enumerate(self.loadsim.workers):
bridge = ProcessProtocolBridge(
- self, Manager(self.loadsim, workerID, len(self.loadsim.workers))
+ self, Manager(self.loadsim, workerID, len(self.loadsim.workers),
+ self.output)
)
self.bridges.append(bridge)
sh = '/bin/sh'
- self.reactor.spawnProcess(
+ self.loadsim.reactor.spawnProcess(
bridge, sh, [sh, "-c", worker], env=environ
)
@@ -486,7 +509,7 @@
for bridge in self.bridges:
bridge.transport.signalProcess(name)
killThemAll("TERM")
- self.reactor.callLater(TERMINATE_TIMEOUT, killThemAll, "KILL")
+ self.loadsim.reactor.callLater(TERMINATE_TIMEOUT, killThemAll, "KILL")
return gatherResults([bridge.deferred for bridge in self.bridges])
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111101/27928f5c/attachment-0001.html>
More information about the calendarserver-changes
mailing list