[CalendarServer-changes] [4927] CalendarServer/branches/users/sagen/deployment-inspection/ twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Thu Jan 14 22:05:52 PST 2010
Revision: 4927
http://trac.macosforge.org/projects/calendarserver/changeset/4927
Author: sagen at apple.com
Date: 2010-01-14 22:05:50 -0800 (Thu, 14 Jan 2010)
Log Message:
-----------
Adds an inspection port and fixes fake memcache timeout problem
Modified Paths:
--------------
CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/accesslog.py
CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/cluster.py
CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/config.py
CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/httpfactory.py
CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcache.py
CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcachepool.py
CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/tap.py
CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcache.py
CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcachepool.py
Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/accesslog.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/accesslog.py 2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/accesslog.py 2010-01-15 06:05:50 UTC (rev 4927)
@@ -64,6 +64,10 @@
request = eventDict['request']
response = eventDict['response']
loginfo = eventDict['loginfo']
+
+ channel = request.chanRequest.channel
+ if channel._inspection:
+ channel._inspection.add("access_log")
# Try to determine authentication and authorization identifiers
uid = "-"
Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/cluster.py 2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/cluster.py 2010-01-15 06:05:50 UTC (rev 4927)
@@ -117,6 +117,7 @@
'-o', 'PIDFile=None',
'-o', 'ErrorLogFile=None',
'-o', 'LogID=%s' % (self.id,),
+ '-o', 'InspectionPort=%s' % (config.BaseInspectionPort + self.id,),
'-o', 'MultiProcess/ProcessCount=%d' % (
config.MultiProcess['ProcessCount'],)])
Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/config.py 2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/config.py 2010-01-15 06:05:50 UTC (rev 4927)
@@ -173,6 +173,8 @@
"DefaultLogLevel" : "",
"LogLevels" : {},
"LogID" : "",
+ "BaseInspectionPort" : 10000,
+ "InspectionPort" : "",
"AccountingCategories": {
"iTIP": False,
Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/httpfactory.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/httpfactory.py 2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/httpfactory.py 2010-01-15 06:05:50 UTC (rev 4927)
@@ -19,6 +19,8 @@
from twisted.internet import protocol
from twisted.python import log
from twisted.web2.channel.http import HTTPFactory, HTTPChannel
+from twisted.web2.server import Request, Site
+from twistedcaldav.inspection import Inspector
from twistedcaldav.config import config
@@ -72,29 +74,90 @@
class LimitingHTTPChannel(HTTPChannel):
def connectionMade(self):
- HTTPChannel.connectionMade(self)
+ if self._inspection:
+ self._inspection.add("conn_made")
+
+ retVal = HTTPChannel.connectionMade(self)
if self.factory.outstandingRequests >= self.factory.maxRequests:
# log.msg("Overloaded")
self.factory.myServer.myPort.stopReading()
+ return retVal
def connectionLost(self, reason):
- HTTPChannel.connectionLost(self, reason)
+ if self._inspection:
+ self._inspection.add("conn_lost")
+ self._inspection.complete()
+
+ retVal = HTTPChannel.connectionLost(self, reason)
if self.factory.outstandingRequests < self.factory.resumeRequests:
# log.msg("Resuming")
self.factory.myServer.myPort.startReading()
+ return retVal
+ _firstChunkReceived = False
+
+ def dataReceived(self, data):
+ if not self._firstChunkReceived:
+ self._firstChunkReceived = True
+ if self._inspection:
+ self._inspection.add("first_byte")
+
+ return HTTPChannel.dataReceived(self, data)
+
+ def requestReadFinished(self, request):
+ if self._inspection:
+ self._inspection.add("body_received")
+
+ return HTTPChannel.requestReadFinished(self, request)
+
+ def requestWriteFinished(self, request):
+ if self._inspection:
+ self._inspection.add("resp_finish")
+
+ return HTTPChannel.requestWriteFinished(self, request)
+
+
class LimitingHTTPFactory(HTTPFactory):
protocol = LimitingHTTPChannel
+
def __init__(self, requestFactory, maxRequests=600, maxAccepts=100, resumeRequests=550,
**kwargs):
HTTPFactory.__init__(self, requestFactory, maxRequests, **kwargs)
self.maxAccepts = maxAccepts
self.resumeRequests = resumeRequests
+ self.channelCounter = 0
def buildProtocol(self, addr):
p = protocol.ServerFactory.buildProtocol(self, addr)
for arg, value in self.protocolArgs.iteritems():
setattr(p, arg, value)
+ self.channelCounter += 1
+
+ p._inspection = Inspector.getInspection(self.channelCounter)
return p
+
+class LimitingRequest(Request):
+
+ def __init__(self, *args, **kwargs):
+ Request.__init__(self, *args, **kwargs)
+ self.extendedLogItems = {}
+ channel = self.chanRequest.channel
+ if channel._inspection:
+ channel._inspection.add("headers_received")
+ self.extendedLogItems['insp'] = channel._inspection.id
+
+ def writeResponse(self, response):
+ channel = self.chanRequest.channel
+ if channel._inspection:
+ channel._inspection.add("resp_start")
+
+ Request.writeResponse(self, response)
+
+
+class LimitingSite(Site):
+
+ def __call__(self, *args, **kwargs):
+ return LimitingRequest(site=self, *args, **kwargs)
+
Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcache.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcache.py 2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcache.py 2010-01-15 06:05:50 UTC (rev 4927)
@@ -34,7 +34,6 @@
from twisted.protocols.basic import LineReceiver
-from twisted.protocols.policies import TimeoutMixin
from twisted.internet.defer import Deferred, fail, TimeoutError
from twisted.python import log
@@ -109,13 +108,10 @@
-class MemCacheProtocol(LineReceiver, TimeoutMixin):
+class MemCacheProtocol(LineReceiver):
"""
MemCache protocol: connect to a memcached server to store/retrieve values.
- @ivar persistentTimeOut: the timeout period used to wait for a response.
- @type persistentTimeOut: C{int}
-
@ivar _current: current list of requests waiting for an answer from the
server.
@type _current: C{deque} of L{Command}
@@ -133,44 +129,20 @@
"""
MAX_KEY_LENGTH = 250
- def __init__(self, timeOut=60):
+ def __init__(self):
"""
Create the protocol.
-
- @param timeOut: the timeout to wait before detecting that the
- connection is dead and close it. It's expressed in seconds.
- @type timeOut: C{int}
"""
self._current = deque()
self._lenExpected = None
self._getBuffer = None
self._bufferLength = None
- self.persistentTimeOut = self.timeOut = timeOut
- def timeoutConnection(self):
- """
- Close the connection in case of timeout.
- """
- for cmd in self._current:
- cmd.fail(TimeoutError("Connection timeout"))
- self.transport.loseConnection()
-
-
- def sendLine(self, line):
- """
- Override sendLine to add a timeout to response.
- """
- if not self._current:
- self.setTimeout(self.persistentTimeOut)
- LineReceiver.sendLine(self, line)
-
-
def rawDataReceived(self, data):
"""
Collect data for a get.
"""
- self.resetTimeout()
self._getBuffer.append(data)
self._bufferLength += len(data)
if self._bufferLength >= self._lenExpected + 2:
@@ -310,7 +282,6 @@
"""
Receive line commands from the server.
"""
- self.resetTimeout()
token = line.split(" ", 1)[0]
# First manage standard commands without space
cmd = getattr(self, "cmd_%s" % (token,), None)
@@ -331,9 +302,6 @@
cmd = self._current.popleft()
val = int(line)
cmd.success(val)
- if not self._current:
- # No pending request, remove timeout
- self.setTimeout(None)
def increment(self, key, val=1):
Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcachepool.py 2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/memcachepool.py 2010-01-15 06:05:50 UTC (rev 4927)
@@ -16,7 +16,8 @@
from twisted.python.failure import Failure
from twisted.internet.defer import Deferred, fail
-from twisted.internet.protocol import ReconnectingClientFactory
+from twisted.internet.error import ConnectionLost, ConnectionDone
+from twisted.internet.protocol import ClientFactory
from twistedcaldav.log import LoggingMixIn
from twistedcaldav.memcache import MemCacheProtocol, NoSuchCommand
@@ -43,7 +44,7 @@
-class MemCacheClientFactory(ReconnectingClientFactory, LoggingMixIn):
+class MemCacheClientFactory(ClientFactory, LoggingMixIn):
"""
A client factory for MemCache that reconnects and notifies a pool of it's
state.
@@ -70,12 +71,6 @@
if self._protocolInstance is not None:
self.connectionPool.clientBusy(self._protocolInstance)
- ReconnectingClientFactory.clientConnectionLost(
- self,
- connector,
- reason)
-
-
def clientConnectionFailed(self, connector, reason):
"""
Notify the connectionPool that we're unable to connect
@@ -84,12 +79,6 @@
if self._protocolInstance is not None:
self.connectionPool.clientBusy(self._protocolInstance)
- ReconnectingClientFactory.clientConnectionFailed(
- self,
- connector,
- reason)
-
-
def buildProtocol(self, addr):
"""
Attach the C{self.connectionPool} to the protocol so it can tell it,
@@ -196,6 +185,12 @@
self.clientFree(client)
return result
+ def _freeClientAfterError(error):
+ if not error.check(ConnectionLost, ConnectionDone):
+ self.clientFree(client)
+ return error
+
+
self.clientBusy(client)
method = getattr(client, command, None)
if method is not None:
@@ -203,7 +198,7 @@
else:
d = fail(Failure(NoSuchCommand()))
- d.addCallback(_freeClientAfterRequest)
+ d.addCallbacks(_freeClientAfterRequest, _freeClientAfterError)
return d
Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/tap.py 2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/tap.py 2010-01-15 06:05:50 UTC (rev 4927)
@@ -38,7 +38,6 @@
from twisted.web2.dav import auth
from twisted.web2.auth.basic import BasicCredentialFactory
-from twisted.web2.server import Site
from twistedcaldav.log import Logger, logLevelForNamespace, setLogLevelForNamespace
from twistedcaldav.accesslog import DirectoryLogWrapperResource
@@ -52,7 +51,8 @@
from twistedcaldav.directory.principal import DirectoryPrincipalProvisioningResource
from twistedcaldav.directory.aggregate import AggregateDirectoryService
from twistedcaldav.directory.sudo import SudoDirectoryService
-from twistedcaldav.httpfactory import HTTP503LoggingFactory, LimitingHTTPFactory
+from twistedcaldav.httpfactory import HTTP503LoggingFactory, LimitingHTTPFactory, LimitingSite
+from twistedcaldav.inspection import InspectionFactory
from twistedcaldav.static import CalendarHomeProvisioningFile
from twistedcaldav.static import TimezoneServiceFile
from twistedcaldav.timezones import TimezoneCache
@@ -714,7 +714,7 @@
service = CalDAVService(logObserver)
- site = Site(realRoot)
+ site = LimitingSite(realRoot)
# If inheriting file descriptors from the master, use those to handle
@@ -837,6 +837,12 @@
signal.signal(signal.SIGUSR1, sigusr1_handler)
+ internet.TCPServer(
+ int(config.InspectionPort),
+ InspectionFactory(),
+ interface="127.0.0.1"
+ ).setServiceParent(service)
+
return service
makeService_Combined = makeService_Combined
Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcache.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcache.py 2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcache.py 2010-01-15 06:05:50 UTC (rev 4927)
@@ -11,7 +11,7 @@
from twisted.trial.unittest import TestCase
from twisted.test.proto_helpers import StringTransportWithDisconnection
from twisted.internet.task import Clock
-from twisted.internet.defer import Deferred, gatherResults, TimeoutError
+from twisted.internet.defer import Deferred, gatherResults
@@ -222,119 +222,7 @@
"VALUE bar 0 %s\r\n%s\r\nEND\r\n" % (len(s), s))
- def test_timeOut(self):
- """
- Test the timeout on outgoing requests: when timeout is detected, all
- current commands should fail with a L{TimeoutError}, and the
- connection should be closed.
- """
- d1 = self.proto.get("foo")
- d2 = self.proto.get("bar")
- d3 = Deferred()
- self.proto.connectionLost = d3.callback
- self.clock.advance(self.proto.persistentTimeOut)
- self.assertFailure(d1, TimeoutError)
- self.assertFailure(d2, TimeoutError)
- def checkMessage(error):
- self.assertEquals(str(error), "Connection timeout")
- d1.addCallback(checkMessage)
- return gatherResults([d1, d2, d3])
-
-
- def test_timeoutRemoved(self):
- """
- When a request gets a response, no pending timeout call should remain
- around.
- """
- d = self.proto.get("foo")
-
- self.clock.advance(self.proto.persistentTimeOut - 1)
- self.proto.dataReceived("VALUE foo 0 3\r\nbar\r\nEND\r\n")
-
- def check(result):
- self.assertEquals(result, (0, "bar"))
- self.assertEquals(len(self.clock.calls), 0)
- d.addCallback(check)
- return d
-
-
- def test_timeOutRaw(self):
- """
- Test the timeout when raw mode was started: the timeout should not be
- reset until all the data has been received, so we can have a
- L{TimeoutError} when waiting for raw data.
- """
- d1 = self.proto.get("foo")
- d2 = Deferred()
- self.proto.connectionLost = d2.callback
-
- self.proto.dataReceived("VALUE foo 0 10\r\n12345")
- self.clock.advance(self.proto.persistentTimeOut)
- self.assertFailure(d1, TimeoutError)
- return gatherResults([d1, d2])
-
-
- def test_timeOutStat(self):
- """
- Test the timeout when stat command has started: the timeout should not
- be reset until the final B{END} is received.
- """
- d1 = self.proto.stats()
- d2 = Deferred()
- self.proto.connectionLost = d2.callback
-
- self.proto.dataReceived("STAT foo bar\r\n")
- self.clock.advance(self.proto.persistentTimeOut)
- self.assertFailure(d1, TimeoutError)
- return gatherResults([d1, d2])
-
-
- def test_timeoutPipelining(self):
- """
- When two requests are sent, a timeout call should remain around for the
- second request, and its timeout time should be correct.
- """
- d1 = self.proto.get("foo")
- d2 = self.proto.get("bar")
- d3 = Deferred()
- self.proto.connectionLost = d3.callback
-
- self.clock.advance(self.proto.persistentTimeOut - 1)
- self.proto.dataReceived("VALUE foo 0 3\r\nbar\r\nEND\r\n")
-
- def check(result):
- self.assertEquals(result, (0, "bar"))
- self.assertEquals(len(self.clock.calls), 1)
- for i in range(self.proto.persistentTimeOut):
- self.clock.advance(1)
- return self.assertFailure(d2, TimeoutError).addCallback(checkTime)
- def checkTime(ignored):
- # Check that the timeout happened C{self.proto.persistentTimeOut}
- # after the last response
- self.assertEquals(self.clock.seconds(),
- 2 * self.proto.persistentTimeOut - 1)
- d1.addCallback(check)
- return d1
-
-
- def test_timeoutNotReset(self):
- """
- Check that timeout is not resetted for every command, but keep the
- timeout from the first command without response.
- """
- d1 = self.proto.get("foo")
- d3 = Deferred()
- self.proto.connectionLost = d3.callback
-
- self.clock.advance(self.proto.persistentTimeOut - 1)
- d2 = self.proto.get("bar")
- self.clock.advance(1)
- self.assertFailure(d1, TimeoutError)
- self.assertFailure(d2, TimeoutError)
- return gatherResults([d1, d2, d3])
-
-
def test_tooLongKey(self):
"""
Test that an error is raised when trying to use a too long key: the
Modified: CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcachepool.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcachepool.py 2010-01-15 06:04:33 UTC (rev 4926)
+++ CalendarServer/branches/users/sagen/deployment-inspection/twistedcaldav/test/test_memcachepool.py 2010-01-15 06:05:50 UTC (rev 4927)
@@ -176,12 +176,6 @@
[('gone', self.protocol)])
- def tearDown(self):
- """
- Make sure the L{MemCacheClientFactory} isn't trying to reconnect
- anymore.
- """
- self.factory.stopTrying()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100114/13899563/attachment-0001.html>
More information about the calendarserver-changes
mailing list