[CalendarServer-changes] [4780] CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/ client/pool.py
source_changes at macosforge.org
source_changes at macosforge.org
Thu Nov 19 11:06:49 PST 2009
Revision: 4780
http://trac.macosforge.org/projects/calendarserver/changeset/4780
Author: cdaboo at apple.com
Date: 2009-11-19 11:06:49 -0800 (Thu, 19 Nov 2009)
Log Message:
-----------
Get rid of inlineCallbacks and fix some race conditions.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py
Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py 2009-11-19 18:33:37 UTC (rev 4779)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py 2009-11-19 19:06:49 UTC (rev 4780)
@@ -22,7 +22,7 @@
from twext.internet.ssl import ChainingOpenSSLContextFactory
from twisted.internet.address import IPv4Address
-from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
+from twisted.internet.defer import Deferred
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.web2 import responsecode
from twisted.web2.client.http import HTTPClientProtocol
@@ -46,10 +46,10 @@
connectionPool = None
maxRetries = 2
- def __init__(self, reactor, deferred):
+ def __init__(self, reactor):
self.reactor = reactor
self.instance = None
- self.deferred = deferred
+ self.deferred = Deferred()
def clientConnectionLost(self, connector, reason):
"""
@@ -63,28 +63,12 @@
self.log_error("ReverseProxy connection lost: %s" % (reason,))
if self.instance is not None:
self.connectionPool.clientGone(self.instance)
-# if self.instance is not None:
-# self.connectionPool.clientBusy(self.instance)
-#
-# ReconnectingClientFactory.clientConnectionLost(
-# self,
-# connector,
-# reason
-# )
def clientConnectionFailed(self, connector, reason):
"""
Notify the connectionPool that we're unable to connect
"""
self.log_error("ReverseProxy connection failed: %s" % (reason,))
-# if self.instance is not None:
-# self.connectionPool.clientBusy(self.instance)
-
-# ReconnectingClientFactory.clientConnectionFailed(
-# self,
-# connector,
-# reason
-# )
if hasattr(self, "deferred"):
self.reactor.callLater(0, self.deferred.errback, reason)
del self.deferred
@@ -157,7 +141,6 @@
self.shutdown_deferred = Deferred()
return self.shutdown_deferred
- @inlineCallbacks
def _newClientConnection(self):
"""
Create a new client connection.
@@ -174,28 +157,28 @@
return client
- d = Deferred()
- factory = self.clientFactory(self._reactor, d)
+ def _badGateway(f):
+ raise HTTPError(StatusResponse(responsecode.BAD_GATEWAY, "Could not connect to reverse proxy host."))
+
+ factory = self.clientFactory(self._reactor)
factory.noisy = False
factory.connectionPool = self
- try:
- if self._scheme == "https":
- context = ChainingOpenSSLContextFactory(config.SSLPrivateKey, config.SSLCertificate, certificateChainFile=config.SSLAuthorityChain, sslmethod=getattr(OpenSSL.SSL, config.SSLMethod))
- self._reactor.connectSSL(self._serverAddress.host, self._serverAddress.port, factory, context)
- elif self._scheme == "http":
- self._reactor.connectTCP(self._serverAddress.host, self._serverAddress.port, factory)
- else:
- raise ValueError("URL scheme for client pool not supported")
- client = (yield d)
- except:
- raise HTTPError(StatusResponse(responsecode.BAD_GATEWAY, "Could not connect to reverse proxy host."))
- finally:
- self._pendingConnects -= 1
- returnValue(client)
+ if self._scheme == "https":
+ context = ChainingOpenSSLContextFactory(config.SSLPrivateKey, config.SSLCertificate, certificateChainFile=config.SSLAuthorityChain, sslmethod=getattr(OpenSSL.SSL, config.SSLMethod))
+ self._reactor.connectSSL(self._serverAddress.host, self._serverAddress.port, factory, context)
+ elif self._scheme == "http":
+ self._reactor.connectTCP(self._serverAddress.host, self._serverAddress.port, factory)
+ else:
+ raise ValueError("URL scheme for client pool not supported")
- @inlineCallbacks
+ d = factory.deferred
+
+ d.addCallback(_connected)
+ d.addErrback(_badGateway)
+ return d
+
def _performRequestOnClient(self, client, request, *args, **kwargs):
"""
Perform the given request on the given client.
@@ -213,15 +196,15 @@
@return: A L{Deferred} that fires with the result of the given command.
"""
- self.clientBusy(client)
- try:
- response = (yield client.submitRequest(request, closeAfter=False))
- finally:
+ def _freeClientAfterRequest(result):
self.clientFree(client)
+ return result
- returnValue(response)
+ self.clientBusy(client)
+ d = client.submitRequest(request, closeAfter=False)
+ d.addBoth(_freeClientAfterRequest)
+ return d
- @inlineCallbacks
def submitRequest(self, request, *args, **kwargs):
"""
Select an available client and perform the given request on it.
@@ -240,20 +223,20 @@
if len(self._freeClients) > 0:
client = self._freeClients.pop()
+ d = self._performRequestOnClient(client, request, *args, **kwargs)
+
elif len(self._busyClients) + self._pendingConnects >= self._maxClients:
d = Deferred()
self._commands.append((d, request, args, kwargs))
self.log_debug("Request queued: %s, %r, %r" % (request, args, kwargs))
self._logClientStats()
- response = (yield d)
+
else:
- client = (yield self._newClientConnection())
+ d = self._newClientConnection()
+ d.addCallback(self._performRequestOnClient, request, *args, **kwargs)
- if client:
- response = (yield self._performRequestOnClient(client, request, *args, **kwargs))
+ return d
- returnValue(response)
-
def _logClientStats(self):
self.log_debug("Clients #free: %d, #busy: %d, "
"#pending: %d, #queued: %d" % (
@@ -307,13 +290,13 @@
self.shutdown_deferred.callback(None)
if len(self._commands) > 0:
- d, command, args, kwargs = self._commands.pop(0)
+ d, request, args, kwargs = self._commands.pop(0)
- self.log_debug("Performing Queued Command: %s, %r, %r" % (
- command, args, kwargs))
+ self.log_debug("Performing Queued Request: %s, %r, %r" % (
+ request, args, kwargs))
self._logClientStats()
- _ign_d = self.performRequest(command, *args, **kwargs)
+ _ign_d = self.submitRequest(request, *args, **kwargs)
_ign_d.addCallback(d.callback)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20091119/38951781/attachment.html>
More information about the calendarserver-changes
mailing list