[CalendarServer-changes] [15091] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Wed Sep 2 13:34:57 PDT 2015


Revision: 15091
          http://trac.calendarserver.org//changeset/15091
Author:   cdaboo at apple.com
Date:     2015-09-02 13:34:57 -0700 (Wed, 02 Sep 2015)
Log Message:
-----------
Add a tool to look at the processing rate of TestWork items. Make it easy to use the loadtest tool with other server accounts.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/dashboard_service.py
    CalendarServer/trunk/contrib/performance/jobqueue/loadtest.py

Added Paths:
-----------
    CalendarServer/trunk/contrib/performance/jobqueue/workrate.py

Modified: CalendarServer/trunk/calendarserver/dashboard_service.py
===================================================================
--- CalendarServer/trunk/calendarserver/dashboard_service.py	2015-09-02 19:54:05 UTC (rev 15090)
+++ CalendarServer/trunk/calendarserver/dashboard_service.py	2015-09-02 20:34:57 UTC (rev 15091)
@@ -24,9 +24,12 @@
 
 from twistedcaldav.config import config
 
+from txdav.common.datastore.work.load_work import TestWork
 from txdav.dps.client import DirectoryService as DirectoryProxyClientService
 from txdav.who.cache import CachingDirectoryService
 
+from twext.enterprise.jobs.queue import WorkerConnectionPool
+
 import json
 
 """
@@ -161,6 +164,28 @@
         return succeed({"workers": loads, "level": level})
 
 
+    @inlineCallbacks
+    def data_test_work(self):
+        """
+        Return the number of TEST_WORK items in the job queue.
+
+        @return: a string containing the JSON result.
+        @rtype: L{str}
+        """
+
+        results = {}
+        if self.factory.store:
+            txn = self.factory.store.newTransaction()
+            results["queued"] = yield TestWork.count(txn)
+            results["completed"] = WorkerConnectionPool.completed.get(TestWork.workType(), 0)
+            yield txn.commit()
+        else:
+            results["queued"] = 0
+            results["completed"] = 0
+
+        returnValue(results)
+
+
     def data_directory(self):
         """
         Return a summary of directory service calls.

Modified: CalendarServer/trunk/contrib/performance/jobqueue/loadtest.py
===================================================================
--- CalendarServer/trunk/contrib/performance/jobqueue/loadtest.py	2015-09-02 19:54:05 UTC (rev 15090)
+++ CalendarServer/trunk/contrib/performance/jobqueue/loadtest.py	2015-09-02 20:34:57 UTC (rev 15091)
@@ -23,6 +23,7 @@
 import random
 import sys
 import time
+from urlparse import urlparse
 
 PRIORITY = {
     "low": 0,
@@ -35,14 +36,23 @@
     # Random time delay
     time.sleep(random.randint(0, config["interval"]) / 1000.0)
 
+    url = urlparse(config["server"])
+    use_ssl = url[0] == "https"
+    if "@" in url[1]:
+        auth, net_loc = url[1].split("@")
+    else:
+        auth = "admin:admin"
+        net_loc = url[1]
+    host, port = net_loc.split(":")
+    port = int(port)
+    user, pswd = auth.split(":")
+
     headers = {}
     headers["User-Agent"] = "httploop/1"
     headers["Depth"] = "1"
-    headers["Authorization"] = "Basic " + "admin:admin".encode("base64")[:-1]
+    headers["Authorization"] = "Basic " + "{}:{}".format(user, pswd).encode("base64")[:-1]
     headers["Content-Type"] = "application/json"
 
-    host, port = config["server"].split(":")
-    port = int(port)
     interval = config["interval"] / 1000.0
     total = config["limit"] / config["numProcesses"]
 
@@ -59,7 +69,7 @@
 
     base_time = time.time()
     while not complete.value:
-        http = SmartHTTPConnection(host, port, True, False)
+        http = SmartHTTPConnection(host, port, use_ssl, False)
 
         try:
             count += 1
@@ -101,7 +111,7 @@
     -i MSEC        Millisecond delay between each request [1000]
     -r RATE        Requests/second rate [10]
     -j JOBS        Number of jobs per HTTP request [1]
-    -s HOST:PORT   Host/port to connect to [localhost:8443]
+    -s URL         URL to connect to [https://localhost:8443]
     -b SEC         Number of seconds for notBefore [0]
     -d MSEC        Number of milliseconds for the work [10]
     -l NUM         Total number of requests from all processes
@@ -139,7 +149,7 @@
         "numProcesses": 10,
         "interval": 1000,
         "jobs": 1,
-        "server": "localhost:8443",
+        "server": "https://localhost:8443",
         "when": 0,
         "delay": 10,
         "priority": "high",

Added: CalendarServer/trunk/contrib/performance/jobqueue/workrate.py
===================================================================
--- CalendarServer/trunk/contrib/performance/jobqueue/workrate.py	                        (rev 0)
+++ CalendarServer/trunk/contrib/performance/jobqueue/workrate.py	2015-09-02 20:34:57 UTC (rev 15091)
@@ -0,0 +1,224 @@
+#!/usr/bin/env python
+##
+# Copyright (c) 2012-2015 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+from __future__ import print_function
+
+from getopt import getopt, GetoptError
+import errno
+import json
+import os
+import sched
+import socket
+import sys
+import time
+
+
+def usage(e=None):
+    name = os.path.basename(sys.argv[0])
+    print("usage: %s [options]" % (name,))
+    print("")
+    print("options:")
+    print("  -h --help: print this help and exit")
+    print("  -s: server host (and optional port) [localhost:8100]")
+    print("      or unix socket path prefixed by 'unix:'")
+    print("")
+    print("This tool monitors the server's job assignment rate.")
+
+    if e:
+        sys.exit(64)
+    else:
+        sys.exit(0)
+
+
+
+def main():
+    try:
+        (optargs, _ignore_args) = getopt(
+            sys.argv[1:], "hs:", [
+                "help",
+            ],
+        )
+    except GetoptError, e:
+        usage(e)
+
+    #
+    # Get configuration
+    #
+    server = ("localhost", 8100)
+
+    for opt, arg in optargs:
+        if opt in ("-h", "--help"):
+            usage()
+
+        elif opt in ("-s"):
+            if not arg.startswith("unix:"):
+                server = arg.split(":")
+                if len(server) == 1:
+                    server.append(8100)
+                else:
+                    server[1] = int(server[1])
+                server = tuple(server)
+            else:
+                server = arg
+
+        else:
+            raise NotImplementedError(opt)
+
+    d = Monitor(server)
+    d.run()
+
+
+
+class Monitor(object):
+    """
+    Main monitor controller. Use Python's L{sched} feature to schedule
+    updates.
+    """
+
+    screen = None
+    registered_windows = {}
+    registered_order = []
+
+    def __init__(self, server):
+        self.paused = False
+        self.seconds = 1.0
+        self.sched = sched.scheduler(time.time, time.sleep)
+        self.client = MonitorClient(server)
+        self.client.addItem("test_work")
+        self.last_queued = None
+        self.last_completed = None
+        self.last_time = None
+
+
+    def run(self):
+        """
+        Create the initial window and run the L{scheduler}.
+        """
+        self.sched.enter(self.seconds, 0, self.updateResults, ())
+        self.sched.run()
+
+
+    def updateResults(self):
+        """
+        Periodic update of the current window and check for a key press.
+        """
+
+        t = time.time()
+        self.client.update()
+        if len(self.client.currentData) == 0:
+            print("Failed to read any valid data from the server - exiting")
+            sys.exit(1)
+
+        queued = self.client.currentData["test_work"]["queued"]
+        completed = self.client.currentData["test_work"]["completed"]
+        if self.last_queued is not None:
+            diff_queued = (self.last_queued - queued) / (t - self.last_time)
+            diff_completed = (completed - self.last_completed) / (t - self.last_time)
+        else:
+            diff_queued = 0
+            diff_completed = 0
+        self.last_queued = queued
+        self.last_completed = completed
+        self.last_time = t
+        print("{}\t{:.1f}\t{:.1f}".format(queued, diff_queued, diff_completed,))
+
+        self.sched.enter(max(self.seconds - (time.time() - t), 0), 0, self.updateResults, ())
+
+
+
+class MonitorClient(object):
+    """
+    Client that connects to a server and fetches information.
+    """
+
+    def __init__(self, sockname):
+        self.socket = None
+        if isinstance(sockname, str):
+            self.sockname = sockname[5:]
+            self.useTCP = False
+        else:
+            self.sockname = sockname
+            self.useTCP = True
+        self.currentData = {}
+        self.items = []
+
+
+    def readSock(self, items):
+        """
+        Open a socket, send the specified request, and retrieve the response. Keep the socket open.
+        """
+        try:
+            if self.socket is None:
+                self.socket = socket.socket(socket.AF_INET if self.useTCP else socket.AF_UNIX, socket.SOCK_STREAM)
+                self.socket.connect(self.sockname)
+                self.socket.setblocking(0)
+            self.socket.sendall(json.dumps(items) + "\r\n")
+            data = ""
+            t = time.time()
+            while not data.endswith("\n"):
+                try:
+                    d = self.socket.recv(1024)
+                except socket.error as se:
+                    if se.args[0] != errno.EWOULDBLOCK:
+                        raise
+                    if time.time() - t > 5:
+                        raise socket.error
+                    continue
+                if d:
+                    data += d
+                else:
+                    break
+            data = json.loads(data)
+        except socket.error:
+            data = {}
+            self.socket = None
+        except ValueError:
+            data = {}
+        return data
+
+
+    def update(self):
+        """
+        Update the current data from the server.
+        """
+
+        # Only read each item once
+        self.currentData = self.readSock(list(set(self.items)))
+
+
+    def getOneItem(self, item):
+        """
+        Update the current data from the server.
+        """
+        data = self.readSock([item])
+        return data[item] if data else None
+
+
+    def addItem(self, item):
+        """
+        Add a server data item to monitor.
+        """
+        self.items.append(item)
+
+
+    def removeItem(self, item):
+        """
+        No need to monitor this item.
+        """
+        self.items.remove(item)
+
+if __name__ == "__main__":
+    main()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150902/196a0a18/attachment-0001.html>


More information about the calendarserver-changes mailing list