[CalendarServer-changes] [4927] CalendarServer/branches/users/sagen/deployment-inspection/ twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Thu Jan 14 22:05:52 PST 2010


Revision: 4927
          http://trac.macosforge.org/projects/calendarserver/changeset/4927
Author:   sagen at apple.com
Date:     2010-01-14 22:05:50 -0800 (Thu, 14 Jan 2010)
Log Message:
-----------
Adds an inspection port and fixes fake memcache timeout problem

Modified Paths:
--------------
    CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/accesslog.py
    CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/cluster.py
    CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/config.py
    CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/httpfactory.py
    CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcache.py
    CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcachepool.py
    CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/tap.py
    CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcache.py
    CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcachepool.py

Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/accesslog.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/accesslog.py	2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/accesslog.py	2010-01-15 06:05:50 UTC (rev 4927)
@@ -64,6 +64,10 @@
             request = eventDict['request']
             response = eventDict['response']
             loginfo = eventDict['loginfo']
+
+            channel = request.chanRequest.channel
+            if channel._inspection:
+                channel._inspection.add("access_log")
     
             # Try to determine authentication and authorization identifiers
             uid = "-"

Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/cluster.py	2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/cluster.py	2010-01-15 06:05:50 UTC (rev 4927)
@@ -117,6 +117,7 @@
              '-o', 'PIDFile=None',
              '-o', 'ErrorLogFile=None',
              '-o', 'LogID=%s' % (self.id,),
+             '-o', 'InspectionPort=%s' % (config.BaseInspectionPort + self.id,),
              '-o', 'MultiProcess/ProcessCount=%d' % (
                     config.MultiProcess['ProcessCount'],)])
 

Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/config.py	2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/config.py	2010-01-15 06:05:50 UTC (rev 4927)
@@ -173,6 +173,8 @@
     "DefaultLogLevel"   : "",
     "LogLevels"         : {},
     "LogID"          : "",
+    "BaseInspectionPort" : 10000,
+    "InspectionPort" : "",
 
     "AccountingCategories": {
         "iTIP": False,

Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/httpfactory.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/httpfactory.py	2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/httpfactory.py	2010-01-15 06:05:50 UTC (rev 4927)
@@ -19,6 +19,8 @@
 from twisted.internet import protocol
 from twisted.python import log
 from twisted.web2.channel.http import HTTPFactory, HTTPChannel
+from twisted.web2.server import Request, Site
+from twistedcaldav.inspection import Inspector
 
 from twistedcaldav.config import config
 
@@ -72,29 +74,90 @@
 class LimitingHTTPChannel(HTTPChannel):
 
     def connectionMade(self):
-        HTTPChannel.connectionMade(self)
+        if self._inspection:
+            self._inspection.add("conn_made")
+
+        retVal = HTTPChannel.connectionMade(self)
         if self.factory.outstandingRequests >= self.factory.maxRequests:
             # log.msg("Overloaded")
             self.factory.myServer.myPort.stopReading()
+        return retVal
 
     def connectionLost(self, reason):
-        HTTPChannel.connectionLost(self, reason)
+        if self._inspection:
+            self._inspection.add("conn_lost")
+            self._inspection.complete()
+
+        retVal = HTTPChannel.connectionLost(self, reason)
         if self.factory.outstandingRequests < self.factory.resumeRequests:
             # log.msg("Resuming")
             self.factory.myServer.myPort.startReading()
+        return retVal
 
+    _firstChunkReceived = False
+
+    def dataReceived(self, data):
+        if not self._firstChunkReceived:
+            self._firstChunkReceived = True
+            if self._inspection:
+                self._inspection.add("first_byte")
+
+        return HTTPChannel.dataReceived(self, data)
+
+    def requestReadFinished(self, request):
+        if self._inspection:
+            self._inspection.add("body_received")
+
+        return HTTPChannel.requestReadFinished(self, request)
+
+    def requestWriteFinished(self, request):
+        if self._inspection:
+            self._inspection.add("resp_finish")
+
+        return HTTPChannel.requestWriteFinished(self, request)
+
+
 class LimitingHTTPFactory(HTTPFactory):
     protocol = LimitingHTTPChannel
 
+
     def __init__(self, requestFactory, maxRequests=600, maxAccepts=100, resumeRequests=550,
         **kwargs):
         HTTPFactory.__init__(self, requestFactory, maxRequests, **kwargs)
         self.maxAccepts = maxAccepts
         self.resumeRequests = resumeRequests
+        self.channelCounter = 0
 
     def buildProtocol(self, addr):
 
         p = protocol.ServerFactory.buildProtocol(self, addr)
         for arg, value in self.protocolArgs.iteritems():
             setattr(p, arg, value)
+        self.channelCounter += 1
+
+        p._inspection = Inspector.getInspection(self.channelCounter)
         return p
+
+class LimitingRequest(Request):
+
+    def __init__(self, *args, **kwargs):
+        Request.__init__(self, *args, **kwargs)
+        self.extendedLogItems = {}
+        channel = self.chanRequest.channel
+        if channel._inspection:
+            channel._inspection.add("headers_received")
+            self.extendedLogItems['insp'] = channel._inspection.id
+
+    def writeResponse(self, response):
+        channel = self.chanRequest.channel
+        if channel._inspection:
+            channel._inspection.add("resp_start")
+
+        Request.writeResponse(self, response)
+
+
+class LimitingSite(Site):
+
+    def __call__(self, *args, **kwargs):
+        return LimitingRequest(site=self, *args, **kwargs)
+

Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcache.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcache.py	2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcache.py	2010-01-15 06:05:50 UTC (rev 4927)
@@ -34,7 +34,6 @@
 
 
 from twisted.protocols.basic import LineReceiver
-from twisted.protocols.policies import TimeoutMixin
 from twisted.internet.defer import Deferred, fail, TimeoutError
 from twisted.python import log
 
@@ -109,13 +108,10 @@
 
 
 
-class MemCacheProtocol(LineReceiver, TimeoutMixin):
+class MemCacheProtocol(LineReceiver):
     """
     MemCache protocol: connect to a memcached server to store/retrieve values.
 
-    @ivar persistentTimeOut: the timeout period used to wait for a response.
-    @type persistentTimeOut: C{int}
-
     @ivar _current: current list of requests waiting for an answer from the
         server.
     @type _current: C{deque} of L{Command}
@@ -133,44 +129,20 @@
     """
     MAX_KEY_LENGTH = 250
 
-    def __init__(self, timeOut=60):
+    def __init__(self):
         """
         Create the protocol.
-
-        @param timeOut: the timeout to wait before detecting that the
-            connection is dead and close it. It's expressed in seconds.
-        @type timeOut: C{int}
         """
         self._current = deque()
         self._lenExpected = None
         self._getBuffer = None
         self._bufferLength = None
-        self.persistentTimeOut = self.timeOut = timeOut
 
 
-    def timeoutConnection(self):
-        """
-        Close the connection in case of timeout.
-        """
-        for cmd in self._current:
-            cmd.fail(TimeoutError("Connection timeout"))
-        self.transport.loseConnection()
-
-
-    def sendLine(self, line):
-        """
-        Override sendLine to add a timeout to response.
-        """
-        if not self._current:
-            self.setTimeout(self.persistentTimeOut)
-        LineReceiver.sendLine(self, line)
-
-
     def rawDataReceived(self, data):
         """
         Collect data for a get.
         """
-        self.resetTimeout()
         self._getBuffer.append(data)
         self._bufferLength += len(data)
         if self._bufferLength >= self._lenExpected + 2:
@@ -310,7 +282,6 @@
         """
         Receive line commands from the server.
         """
-        self.resetTimeout()
         token = line.split(" ", 1)[0]
         # First manage standard commands without space
         cmd = getattr(self, "cmd_%s" % (token,), None)
@@ -331,9 +302,6 @@
                 cmd = self._current.popleft()
                 val = int(line)
                 cmd.success(val)
-        if not self._current:
-            # No pending request, remove timeout
-            self.setTimeout(None)
 
 
     def increment(self, key, val=1):

Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcachepool.py	2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcachepool.py	2010-01-15 06:05:50 UTC (rev 4927)
@@ -16,7 +16,8 @@
 
 from twisted.python.failure import Failure
 from twisted.internet.defer import Deferred, fail
-from twisted.internet.protocol import ReconnectingClientFactory
+from twisted.internet.error import ConnectionLost, ConnectionDone
+from twisted.internet.protocol import ClientFactory
 
 from twistedcaldav.log import LoggingMixIn
 from twistedcaldav.memcache import MemCacheProtocol, NoSuchCommand
@@ -43,7 +44,7 @@
 
 
 
-class MemCacheClientFactory(ReconnectingClientFactory, LoggingMixIn):
+class MemCacheClientFactory(ClientFactory, LoggingMixIn):
     """
     A client factory for MemCache that reconnects and notifies a pool of it's
     state.
@@ -70,12 +71,6 @@
         if self._protocolInstance is not None:
             self.connectionPool.clientBusy(self._protocolInstance)
 
-        ReconnectingClientFactory.clientConnectionLost(
-            self,
-            connector,
-            reason)
-
-
     def clientConnectionFailed(self, connector, reason):
         """
         Notify the connectionPool that we're unable to connect
@@ -84,12 +79,6 @@
         if self._protocolInstance is not None:
             self.connectionPool.clientBusy(self._protocolInstance)
 
-        ReconnectingClientFactory.clientConnectionFailed(
-            self,
-            connector,
-            reason)
-
-
     def buildProtocol(self, addr):
         """
         Attach the C{self.connectionPool} to the protocol so it can tell it,
@@ -196,6 +185,12 @@
             self.clientFree(client)
             return result
 
+        def _freeClientAfterError(error):
+            if not error.check(ConnectionLost, ConnectionDone):
+                self.clientFree(client)
+            return error
+
+
         self.clientBusy(client)
         method = getattr(client, command, None)
         if method is not None:
@@ -203,7 +198,7 @@
         else:
             d = fail(Failure(NoSuchCommand()))
 
-        d.addCallback(_freeClientAfterRequest)
+        d.addCallbacks(_freeClientAfterRequest, _freeClientAfterError)
 
         return d
 

Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/tap.py	2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/tap.py	2010-01-15 06:05:50 UTC (rev 4927)
@@ -38,7 +38,6 @@
 from twisted.web2.dav import auth
 from twisted.web2.auth.basic import BasicCredentialFactory
 
-from twisted.web2.server import Site
 
 from twistedcaldav.log import Logger, logLevelForNamespace, setLogLevelForNamespace
 from twistedcaldav.accesslog import DirectoryLogWrapperResource
@@ -52,7 +51,8 @@
 from twistedcaldav.directory.principal import DirectoryPrincipalProvisioningResource
 from twistedcaldav.directory.aggregate import AggregateDirectoryService
 from twistedcaldav.directory.sudo import SudoDirectoryService
-from twistedcaldav.httpfactory import HTTP503LoggingFactory, LimitingHTTPFactory
+from twistedcaldav.httpfactory import HTTP503LoggingFactory, LimitingHTTPFactory, LimitingSite
+from twistedcaldav.inspection import InspectionFactory
 from twistedcaldav.static import CalendarHomeProvisioningFile
 from twistedcaldav.static import TimezoneServiceFile
 from twistedcaldav.timezones import TimezoneCache
@@ -714,7 +714,7 @@
 
         service = CalDAVService(logObserver)
 
-        site = Site(realRoot)
+        site = LimitingSite(realRoot)
 
 
         # If inheriting file descriptors from the master, use those to handle
@@ -837,6 +837,12 @@
 
         signal.signal(signal.SIGUSR1, sigusr1_handler)
 
+        internet.TCPServer(
+            int(config.InspectionPort),
+            InspectionFactory(),
+            interface="127.0.0.1"
+        ).setServiceParent(service)
+
         return service
 
     makeService_Combined = makeService_Combined

Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcache.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcache.py	2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcache.py	2010-01-15 06:05:50 UTC (rev 4927)
@@ -11,7 +11,7 @@
 from twisted.trial.unittest import TestCase
 from twisted.test.proto_helpers import StringTransportWithDisconnection
 from twisted.internet.task import Clock
-from twisted.internet.defer import Deferred, gatherResults, TimeoutError
+from twisted.internet.defer import Deferred, gatherResults
 
 
 
@@ -222,119 +222,7 @@
             "VALUE bar 0 %s\r\n%s\r\nEND\r\n" % (len(s), s))
 
 
-    def test_timeOut(self):
-        """
-        Test the timeout on outgoing requests: when timeout is detected, all
-        current commands should fail with a L{TimeoutError}, and the
-        connection should be closed.
-        """
-        d1 = self.proto.get("foo")
-        d2 = self.proto.get("bar")
-        d3 = Deferred()
-        self.proto.connectionLost = d3.callback
 
-        self.clock.advance(self.proto.persistentTimeOut)
-        self.assertFailure(d1, TimeoutError)
-        self.assertFailure(d2, TimeoutError)
-        def checkMessage(error):
-            self.assertEquals(str(error), "Connection timeout")
-        d1.addCallback(checkMessage)
-        return gatherResults([d1, d2, d3])
-
-
-    def test_timeoutRemoved(self):
-        """
-        When a request gets a response, no pending timeout call should remain
-        around.
-        """
-        d = self.proto.get("foo")
-
-        self.clock.advance(self.proto.persistentTimeOut - 1)
-        self.proto.dataReceived("VALUE foo 0 3\r\nbar\r\nEND\r\n")
-
-        def check(result):
-            self.assertEquals(result, (0, "bar"))
-            self.assertEquals(len(self.clock.calls), 0)
-        d.addCallback(check)
-        return d
-
-
-    def test_timeOutRaw(self):
-        """
-        Test the timeout when raw mode was started: the timeout should not be
-        reset until all the data has been received, so we can have a
-        L{TimeoutError} when waiting for raw data.
-        """
-        d1 = self.proto.get("foo")
-        d2 = Deferred()
-        self.proto.connectionLost = d2.callback
-
-        self.proto.dataReceived("VALUE foo 0 10\r\n12345")
-        self.clock.advance(self.proto.persistentTimeOut)
-        self.assertFailure(d1, TimeoutError)
-        return gatherResults([d1, d2])
-
-
-    def test_timeOutStat(self):
-        """
-        Test the timeout when stat command has started: the timeout should not
-        be reset until the final B{END} is received.
-        """
-        d1 = self.proto.stats()
-        d2 = Deferred()
-        self.proto.connectionLost = d2.callback
-
-        self.proto.dataReceived("STAT foo bar\r\n")
-        self.clock.advance(self.proto.persistentTimeOut)
-        self.assertFailure(d1, TimeoutError)
-        return gatherResults([d1, d2])
-
-
-    def test_timeoutPipelining(self):
-        """
-        When two requests are sent, a timeout call should remain around for the
-        second request, and its timeout time should be correct.
-        """
-        d1 = self.proto.get("foo")
-        d2 = self.proto.get("bar")
-        d3 = Deferred()
-        self.proto.connectionLost = d3.callback
-
-        self.clock.advance(self.proto.persistentTimeOut - 1)
-        self.proto.dataReceived("VALUE foo 0 3\r\nbar\r\nEND\r\n")
-
-        def check(result):
-            self.assertEquals(result, (0, "bar"))
-            self.assertEquals(len(self.clock.calls), 1)
-            for i in range(self.proto.persistentTimeOut):
-                self.clock.advance(1)
-            return self.assertFailure(d2, TimeoutError).addCallback(checkTime)
-        def checkTime(ignored):
-            # Check that the timeout happened C{self.proto.persistentTimeOut}
-            # after the last response
-            self.assertEquals(self.clock.seconds(),
-                    2 * self.proto.persistentTimeOut - 1)
-        d1.addCallback(check)
-        return d1
-
-
-    def test_timeoutNotReset(self):
-        """
-        Check that timeout is not resetted for every command, but keep the
-        timeout from the first command without response.
-        """
-        d1 = self.proto.get("foo")
-        d3 = Deferred()
-        self.proto.connectionLost = d3.callback
-
-        self.clock.advance(self.proto.persistentTimeOut - 1)
-        d2 = self.proto.get("bar")
-        self.clock.advance(1)
-        self.assertFailure(d1, TimeoutError)
-        self.assertFailure(d2, TimeoutError)
-        return gatherResults([d1, d2, d3])
-
-
     def test_tooLongKey(self):
         """
         Test that an error is raised when trying to use a too long key: the

Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcachepool.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcachepool.py	2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcachepool.py	2010-01-15 06:05:50 UTC (rev 4927)
@@ -176,12 +176,6 @@
                           [('gone', self.protocol)])
 
 
-    def tearDown(self):
-        """
-        Make sure the L{MemCacheClientFactory} isn't trying to reconnect
-        anymore.
-        """
-        self.factory.stopTrying()
 
 
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100114/13899563/attachment-0001.html>


More information about the calendarserver-changes mailing list