[CalendarServer-changes] [8245] CalendarServer/branches/users/glyph/parallel-sim/contrib/performance /loadtest

source_changes at macosforge.org source_changes at macosforge.org
Tue Nov 1 17:51:29 PDT 2011


Revision: 8245
          http://trac.macosforge.org/projects/calendarserver/changeset/8245
Author:   glyph at apple.com
Date:     2011-11-01 17:51:29 -0700 (Tue, 01 Nov 2011)
Log Message:
-----------
first functional parallel simulator

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py
    CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/population.py
    CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/profiles.py
    CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py

Modified: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py	2011-11-02 00:51:14 UTC (rev 8244)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/ampsim.py	2011-11-02 00:51:29 UTC (rev 8245)
@@ -33,8 +33,9 @@
         from twisted.internet.stdio import StandardIO
 
         from contrib.performance.loadtest.ampsim import Worker
+        from contrib.performance.loadtest.sim import LagTrackingReactor
 
-        StandardIO(Worker(reactor))
+        StandardIO(Worker(LagTrackingReactor(reactor)))
         reactor.run()
     except:
         traceback.print_exc()
@@ -83,17 +84,19 @@
     def config(self, plist):
         from plistlib import readPlistFromString
         from contrib.performance.loadtest.sim import LoadSimulator
+        from sys import stderr
         cfg = readPlistFromString(plist)
         addObserver(self.emit)
         sim = LoadSimulator.fromConfig(cfg)
-        sim.attachServices()
+        sim.attachServices(stderr)
         return {}
 
 
     def emit(self, eventDict):
-        self.reactor.callFromThread(
-            self.callRemote, LogMessage, event=eventDict
-        )
+        if 'type' in eventDict:
+            self.reactor.callFromThread(
+                self.callRemote, LogMessage, event=eventDict
+            )
 
 
 
@@ -103,11 +106,12 @@
     a single worker.
     """
 
-    def __init__(self, loadsim, whichWorker, numWorkers):
+    def __init__(self, loadsim, whichWorker, numWorkers, output):
         super(Manager, self).__init__()
         self.loadsim = loadsim
         self.whichWorker = whichWorker
         self.numWorkers = numWorkers
+        self.output = output
 
 
     def connectionMade(self):
@@ -116,11 +120,19 @@
         del workerConfig["workers"]
         workerConfig["workerID"] = self.whichWorker
         workerConfig["workerCount"] = self.numWorkers
-        self.callRemote(Configure, plist=writePlistToString(workerConfig))
+        workerConfig["observers"] = []
+        plist = writePlistToString(workerConfig)
+        self.output.write("Initiating worker configuration\n")
+        def completed(x):
+            self.output.write("Worker configuration complete.\n")
+        (self.callRemote(Configure, plist=plist)
+         .addCallback(completed))
 
 
     @LogMessage.responder
     def observed(self, event):
+        #from pprint import pformat
+        #self.output.write(pformat(event)+"\n")
         msg(**event)
         return {}
 

Modified: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/population.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/population.py	2011-11-02 00:51:14 UTC (rev 8244)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/population.py	2011-11-02 00:51:29 UTC (rev 8245)
@@ -150,7 +150,8 @@
 
 
 class CalendarClientSimulator(object):
-    def __init__(self, records, populator, parameters, reactor, server):
+    def __init__(self, records, populator, parameters, reactor, server,
+                 workerIndex=0, workerCount=1):
         self._records = records
         self.populator = populator
         self.reactor = reactor
@@ -158,6 +159,8 @@
         self._pop = self.populator.populate(parameters)
         self._user = 0
         self._stopped = False
+        self.workerIndex = workerIndex
+        self.workerCount = workerCount
 
         TimezoneCache.create()
 
@@ -197,9 +200,15 @@
     def add(self, numClients):
         for _ignore_n in range(numClients):
             number = self._nextUserNumber()
+            clientType = self._pop.next()
+            if (number % self.workerCount) != self.workerIndex:
+                # If we're in a distributed work scenario and we are worker N,
+                # we have to skip all but every Nth request (since every node
+                # runs the same arrival policy).
+                continue
+
             _ignore_user, auth = self._createUser(number)
 
-            clientType = self._pop.next()
             reactor = loggedReactor(self.reactor)
             client = clientType.new(
                 reactor, self.server, self.getUserRecord(number), auth)
@@ -211,6 +220,8 @@
                 if profile.enabled:
                     d = profile.run()
                     d.addErrback(self._profileFailure, profileType, reactor)
+        # XXX this status message is prone to be slightly inaccurate, but isn't
+        # really used by much anyway.
         msg(type="status", clientCount=self._user - 1)
 
 

Modified: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/profiles.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/profiles.py	2011-11-02 00:51:14 UTC (rev 8244)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/profiles.py	2011-11-02 00:51:29 UTC (rev 8245)
@@ -499,6 +499,7 @@
 
     def observe(self, event):
         if event.get("type") == "operation":
+            event = event.copy()
             lag = event.get('lag')
             if lag is None:
                 event['lag'] = ''

Modified: CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py	2011-11-02 00:51:14 UTC (rev 8244)
+++ CalendarServer/branches/users/glyph/parallel-sim/contrib/performance/loadtest/sim.py	2011-11-02 00:51:29 UTC (rev 8245)
@@ -183,7 +183,7 @@
     """
     def __init__(self, server, arrival, parameters, observers=None,
                  records=None, reactor=None, runtime=None, workers=None,
-                 configTemplate=None, workerID=None):
+                 configTemplate=None, workerID=None, workerCount=1):
         if reactor is None:
             from twisted.internet import reactor
         self.server = server
@@ -196,6 +196,7 @@
         self.workers = workers
         self.configTemplate = configTemplate
         self.workerID = workerID
+        self.workerCount = workerCount
 
 
     @classmethod
@@ -222,7 +223,8 @@
         workers = config.get("workers")
         if workers is None:
             # Client / place where the simulator actually runs configuration
-            workerID = config.get("workerID")
+            workerID = config.get("workerID", 0)
+            workerCount = config.get("workerCount", 1)
             configTemplate = None
             server = 'http://127.0.0.1:8008/'
             if 'server' in config:
@@ -257,7 +259,9 @@
             server = ''
             arrival = None
             parameters = None
+            workerID = 0
             configTemplate = config
+            workerCount = 1
 
         observers = []
         if 'observers' in config:
@@ -274,7 +278,7 @@
         return cls(server, arrival, parameters, observers=observers,
                    records=records, runtime=runtime, reactor=reactor,
                    workers=workers, configTemplate=configTemplate,
-                   workerID=workerID)
+                   workerID=workerID, workerCount=workerCount)
 
 
     @classmethod
@@ -311,7 +315,9 @@
     def createSimulator(self):
         populator = Populator(Random())
         return CalendarClientSimulator(
-            self.records, populator, self.parameters, self.reactor, self.server)
+            self.records, populator, self.parameters, self.reactor, self.server,
+            self.workerID, self.workerCount
+        )
 
 
     def createArrivalPolicy(self):
@@ -325,7 +331,9 @@
         """
         if self.workers is not None:
             return [
+                ObserverService,
                 WorkerSpawnerService,
+                ReporterService,
             ]
         return [
             ObserverService,
@@ -343,10 +351,8 @@
 
     def run(self, output=stdout):
         self.attachServices(output)
-
         if self.runtime is not None:
             self.reactor.callLater(self.runtime, self.reactor.stop)
-
         self.reactor.run()
 
 
@@ -450,15 +456,31 @@
 
 
     def connectionMade(self):
+        self.transport.getPeer = self.getPeer
+        self.transport.getHost = self.getHost
         self.proto.makeConnection(self.transport)
 
 
-    def dataReceived(self, data):
+    def getPeer(self):
+        return "Peer:PID:" + str(self.transport.pid)
+
+
+    def getHost(self):
+        return "Host:PID:" + str(self.transport.pid)
+
+
+    def outReceived(self, data):
         self.proto.dataReceived(data)
 
 
-    def connectionLost(self, reactor):
-        self.proto.connectionLost(reactor)
+    def errReceived(self, error):
+        from twisted.python.log import msg
+        msg("stderr received from " + str(self.transport.pid))
+        msg("    " + repr(error))
+
+
+    def processEnded(self, reason):
+        self.proto.connectionLost(reason)
         self.deferred.callback(None)
         self.spawner.bridges.remove(self)
 
@@ -471,11 +493,12 @@
         self.bridges = []
         for workerID, worker in enumerate(self.loadsim.workers):
             bridge = ProcessProtocolBridge(
-                self, Manager(self.loadsim, workerID, len(self.loadsim.workers))
+                self, Manager(self.loadsim, workerID, len(self.loadsim.workers),
+                              self.output)
             )
             self.bridges.append(bridge)
             sh = '/bin/sh'
-            self.reactor.spawnProcess(
+            self.loadsim.reactor.spawnProcess(
                 bridge, sh, [sh, "-c", worker], env=environ
             )
 
@@ -486,7 +509,7 @@
             for bridge in self.bridges:
                 bridge.transport.signalProcess(name)
         killThemAll("TERM")
-        self.reactor.callLater(TERMINATE_TIMEOUT, killThemAll, "KILL")
+        self.loadsim.reactor.callLater(TERMINATE_TIMEOUT, killThemAll, "KILL")
         return gatherResults([bridge.deferred for bridge in self.bridges])
 
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111101/27928f5c/attachment-0001.html>


More information about the calendarserver-changes mailing list