[CalendarServer-changes] [15091] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Wed Sep 2 13:34:57 PDT 2015
Revision: 15091
http://trac.calendarserver.org//changeset/15091
Author: cdaboo at apple.com
Date: 2015-09-02 13:34:57 -0700 (Wed, 02 Sep 2015)
Log Message:
-----------
Add a tool to look at the processing rate of TestWork items. Make it easy to use the loadtest tool with other server accounts.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/dashboard_service.py
CalendarServer/trunk/contrib/performance/jobqueue/loadtest.py
Added Paths:
-----------
CalendarServer/trunk/contrib/performance/jobqueue/workrate.py
Modified: CalendarServer/trunk/calendarserver/dashboard_service.py
===================================================================
--- CalendarServer/trunk/calendarserver/dashboard_service.py 2015-09-02 19:54:05 UTC (rev 15090)
+++ CalendarServer/trunk/calendarserver/dashboard_service.py 2015-09-02 20:34:57 UTC (rev 15091)
@@ -24,9 +24,12 @@
from twistedcaldav.config import config
+from txdav.common.datastore.work.load_work import TestWork
from txdav.dps.client import DirectoryService as DirectoryProxyClientService
from txdav.who.cache import CachingDirectoryService
+from twext.enterprise.jobs.queue import WorkerConnectionPool
+
import json
"""
@@ -161,6 +164,28 @@
return succeed({"workers": loads, "level": level})
+ @inlineCallbacks
+ def data_test_work(self):
+ """
+ Return the number of TEST_WORK items in the job queue.
+
+ @return: a string containing the JSON result.
+ @rtype: L{str}
+ """
+
+ results = {}
+ if self.factory.store:
+ txn = self.factory.store.newTransaction()
+ results["queued"] = yield TestWork.count(txn)
+ results["completed"] = WorkerConnectionPool.completed.get(TestWork.workType(), 0)
+ yield txn.commit()
+ else:
+ results["queued"] = 0
+ results["completed"] = 0
+
+ returnValue(results)
+
+
def data_directory(self):
"""
Return a summary of directory service calls.
Modified: CalendarServer/trunk/contrib/performance/jobqueue/loadtest.py
===================================================================
--- CalendarServer/trunk/contrib/performance/jobqueue/loadtest.py 2015-09-02 19:54:05 UTC (rev 15090)
+++ CalendarServer/trunk/contrib/performance/jobqueue/loadtest.py 2015-09-02 20:34:57 UTC (rev 15091)
@@ -23,6 +23,7 @@
import random
import sys
import time
+from urlparse import urlparse
PRIORITY = {
"low": 0,
@@ -35,14 +36,23 @@
# Random time delay
time.sleep(random.randint(0, config["interval"]) / 1000.0)
+ url = urlparse(config["server"])
+ use_ssl = url[0] == "https"
+ if "@" in url[1]:
+ auth, net_loc = url[1].split("@")
+ else:
+ auth = "admin:admin"
+ net_loc = url[1]
+ host, port = net_loc.split(":")
+ port = int(port)
+ user, pswd = auth.split(":")
+
headers = {}
headers["User-Agent"] = "httploop/1"
headers["Depth"] = "1"
- headers["Authorization"] = "Basic " + "admin:admin".encode("base64")[:-1]
+ headers["Authorization"] = "Basic " + "{}:{}".format(user, pswd).encode("base64")[:-1]
headers["Content-Type"] = "application/json"
- host, port = config["server"].split(":")
- port = int(port)
interval = config["interval"] / 1000.0
total = config["limit"] / config["numProcesses"]
@@ -59,7 +69,7 @@
base_time = time.time()
while not complete.value:
- http = SmartHTTPConnection(host, port, True, False)
+ http = SmartHTTPConnection(host, port, use_ssl, False)
try:
count += 1
@@ -101,7 +111,7 @@
-i MSEC Millisecond delay between each request [1000]
-r RATE Requests/second rate [10]
-j JOBS Number of jobs per HTTP request [1]
- -s HOST:PORT Host/port to connect to [localhost:8443]
+ -s URL URL to connect to [https://localhost:8443]
-b SEC Number of seconds for notBefore [0]
-d MSEC Number of milliseconds for the work [10]
-l NUM Total number of requests from all processes
@@ -139,7 +149,7 @@
"numProcesses": 10,
"interval": 1000,
"jobs": 1,
- "server": "localhost:8443",
+ "server": "https://localhost:8443",
"when": 0,
"delay": 10,
"priority": "high",
Added: CalendarServer/trunk/contrib/performance/jobqueue/workrate.py
===================================================================
--- CalendarServer/trunk/contrib/performance/jobqueue/workrate.py (rev 0)
+++ CalendarServer/trunk/contrib/performance/jobqueue/workrate.py 2015-09-02 20:34:57 UTC (rev 15091)
@@ -0,0 +1,224 @@
+#!/usr/bin/env python
+##
+# Copyright (c) 2012-2015 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+from __future__ import print_function
+
+from getopt import getopt, GetoptError
+import errno
+import json
+import os
+import sched
+import socket
+import sys
+import time
+
+
+def usage(e=None):
+ name = os.path.basename(sys.argv[0])
+ print("usage: %s [options]" % (name,))
+ print("")
+ print("options:")
+ print(" -h --help: print this help and exit")
+ print(" -s: server host (and optional port) [localhost:8100]")
+ print(" or unix socket path prefixed by 'unix:'")
+ print("")
+ print("This tool monitors the server's job assignment rate.")
+
+ if e:
+ sys.exit(64)
+ else:
+ sys.exit(0)
+
+
+
+def main():
+ try:
+ (optargs, _ignore_args) = getopt(
+ sys.argv[1:], "hs:", [
+ "help",
+ ],
+ )
+ except GetoptError, e:
+ usage(e)
+
+ #
+ # Get configuration
+ #
+ server = ("localhost", 8100)
+
+ for opt, arg in optargs:
+ if opt in ("-h", "--help"):
+ usage()
+
+ elif opt in ("-s"):
+ if not arg.startswith("unix:"):
+ server = arg.split(":")
+ if len(server) == 1:
+ server.append(8100)
+ else:
+ server[1] = int(server[1])
+ server = tuple(server)
+ else:
+ server = arg
+
+ else:
+ raise NotImplementedError(opt)
+
+ d = Monitor(server)
+ d.run()
+
+
+
+class Monitor(object):
+ """
+ Main monitor controller. Use Python's L{sched} feature to schedule
+ updates.
+ """
+
+ screen = None
+ registered_windows = {}
+ registered_order = []
+
+ def __init__(self, server):
+ self.paused = False
+ self.seconds = 1.0
+ self.sched = sched.scheduler(time.time, time.sleep)
+ self.client = MonitorClient(server)
+ self.client.addItem("test_work")
+ self.last_queued = None
+ self.last_completed = None
+ self.last_time = None
+
+
+ def run(self):
+ """
+ Create the initial window and run the L{scheduler}.
+ """
+ self.sched.enter(self.seconds, 0, self.updateResults, ())
+ self.sched.run()
+
+
+ def updateResults(self):
+ """
+ Periodic update of the current window and check for a key press.
+ """
+
+ t = time.time()
+ self.client.update()
+ if len(self.client.currentData) == 0:
+ print("Failed to read any valid data from the server - exiting")
+ sys.exit(1)
+
+ queued = self.client.currentData["test_work"]["queued"]
+ completed = self.client.currentData["test_work"]["completed"]
+ if self.last_queued is not None:
+ diff_queued = (self.last_queued - queued) / (t - self.last_time)
+ diff_completed = (completed - self.last_completed) / (t - self.last_time)
+ else:
+ diff_queued = 0
+ diff_completed = 0
+ self.last_queued = queued
+ self.last_completed = completed
+ self.last_time = t
+ print("{}\t{:.1f}\t{:.1f}".format(queued, diff_queued, diff_completed,))
+
+ self.sched.enter(max(self.seconds - (time.time() - t), 0), 0, self.updateResults, ())
+
+
+
+class MonitorClient(object):
+ """
+ Client that connects to a server and fetches information.
+ """
+
+ def __init__(self, sockname):
+ self.socket = None
+ if isinstance(sockname, str):
+ self.sockname = sockname[5:]
+ self.useTCP = False
+ else:
+ self.sockname = sockname
+ self.useTCP = True
+ self.currentData = {}
+ self.items = []
+
+
+ def readSock(self, items):
+ """
+ Open a socket, send the specified request, and retrieve the response. Keep the socket open.
+ """
+ try:
+ if self.socket is None:
+ self.socket = socket.socket(socket.AF_INET if self.useTCP else socket.AF_UNIX, socket.SOCK_STREAM)
+ self.socket.connect(self.sockname)
+ self.socket.setblocking(0)
+ self.socket.sendall(json.dumps(items) + "\r\n")
+ data = ""
+ t = time.time()
+ while not data.endswith("\n"):
+ try:
+ d = self.socket.recv(1024)
+ except socket.error as se:
+ if se.args[0] != errno.EWOULDBLOCK:
+ raise
+ if time.time() - t > 5:
+ raise socket.error
+ continue
+ if d:
+ data += d
+ else:
+ break
+ data = json.loads(data)
+ except socket.error:
+ data = {}
+ self.socket = None
+ except ValueError:
+ data = {}
+ return data
+
+
+ def update(self):
+ """
+ Update the current data from the server.
+ """
+
+ # Only read each item once
+ self.currentData = self.readSock(list(set(self.items)))
+
+
+ def getOneItem(self, item):
+ """
+ Update the current data from the server.
+ """
+ data = self.readSock([item])
+ return data[item] if data else None
+
+
+ def addItem(self, item):
+ """
+ Add a server data item to monitor.
+ """
+ self.items.append(item)
+
+
+ def removeItem(self, item):
+ """
+ No need to monitor this item.
+ """
+ self.items.remove(item)
+
+if __name__ == "__main__":
+ main()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150902/196a0a18/attachment-0001.html>
More information about the calendarserver-changes
mailing list