[CalendarServer-changes] [4804] CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Tue Nov 24 12:54:20 PST 2009
Revision: 4804
http://trac.macosforge.org/projects/calendarserver/changeset/4804
Author: cdaboo at apple.com
Date: 2009-11-24 12:53:14 -0800 (Tue, 24 Nov 2009)
Log Message:
-----------
Catching up to changes made on deployment branch.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py
CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/reverseproxy.py
CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/directory/directory.py
CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/ischedule.py
CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/scheduler.py
Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py 2009-11-24 16:30:24 UTC (rev 4803)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py 2009-11-24 20:53:14 UTC (rev 4804)
@@ -17,13 +17,14 @@
__all__ = [
"installPools",
"installPool",
- "getReverseProxyPool",
+ "getHTTPClientPool",
]
from twext.internet.ssl import ChainingOpenSSLContextFactory
from twisted.internet.address import IPv4Address
-from twisted.internet.defer import Deferred
-from twisted.internet.protocol import ReconnectingClientFactory
+from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
+from twisted.internet.error import ConnectionLost, ConnectionDone, ConnectError
+from twisted.internet.protocol import ClientFactory
from twisted.web2 import responsecode
from twisted.web2.client.http import HTTPClientProtocol
from twisted.web2.http import StatusResponse, HTTPError
@@ -32,57 +33,55 @@
import OpenSSL
import urlparse
-class ReverseProxyClientFactory(ReconnectingClientFactory, LoggingMixIn):
+class PooledHTTPClientFactory(ClientFactory, LoggingMixIn):
"""
- A client factory for HTTPClient that reconnects and notifies a pool of it's
- state.
+ A client factory for HTTPClient that notifies a pool of it's state. It the connection
+ fails in the middle of a request it will retry the request.
+ @ivar protocol: The current instance of our protocol that we pass
+ to our connectionPool.
@ivar connectionPool: A managing connection pool that we notify of events.
- @ivar deferred: A L{Deferred} that represents the initial connection.
- @ivar _protocolInstance: The current instance of our protocol that we pass
- to our connectionPool.
"""
protocol = HTTPClientProtocol
connectionPool = None
- maxRetries = 2
def __init__(self, reactor):
self.reactor = reactor
self.instance = None
- self.deferred = Deferred()
+ self.onConnect = Deferred()
+ self.afterConnect = Deferred()
def clientConnectionLost(self, connector, reason):
"""
Notify the connectionPool that we've lost our connection.
"""
+ if hasattr(self, "afterConnect"):
+ self.reactor.callLater(0, self.afterConnect.errback, reason)
+ del self.afterConnect
+
if self.connectionPool.shutdown_requested:
# The reactor is stopping; don't reconnect
return
- self.log_error("ReverseProxy connection lost: %s" % (reason,))
- if self.instance is not None:
- self.connectionPool.clientGone(self.instance)
-
def clientConnectionFailed(self, connector, reason):
"""
Notify the connectionPool that we're unable to connect
"""
- self.log_error("ReverseProxy connection failed: %s" % (reason,))
- if hasattr(self, "deferred"):
- self.reactor.callLater(0, self.deferred.errback, reason)
- del self.deferred
+ if hasattr(self, "onConnect"):
+ self.reactor.callLater(0, self.onConnect.errback, reason)
+ del self.onConnect
+ elif hasattr(self, "afterConnect"):
+ self.reactor.callLater(0, self.afterConnect.errback, reason)
+ del self.afterConnect
def buildProtocol(self, addr):
- if self.instance is not None:
- self.connectionPool.clientGone(self.instance)
-
self.instance = self.protocol()
- self.reactor.callLater(0, self.deferred.callback, self.instance)
- del self.deferred
+ self.reactor.callLater(0, self.onConnect.callback, self.instance)
+ del self.onConnect
return self.instance
-class ReverseProxyPool(LoggingMixIn):
+class HTTPClientPool(LoggingMixIn):
"""
A connection pool for HTTPClientProtocol instances.
@@ -100,9 +99,10 @@
@ivar _pendingConnects: A C{int} indicating how many connections are in
progress.
"""
- clientFactory = ReverseProxyClientFactory
+ clientFactory = PooledHTTPClientFactory
+ maxRetries = 2
- def __init__(self, scheme, serverAddress, maxClients=5, reactor=None):
+ def __init__(self, name, scheme, serverAddress, maxClients=5, reactor=None):
"""
@param serverAddress: An L{IPv4Address} indicating the server to
connect to.
@@ -110,6 +110,8 @@
@param reactor: An L{IReactorTCP{ provider used to initiate new
connections.
"""
+
+ self._name = name
self._scheme = scheme
self._serverAddress = serverAddress
self._maxClients = maxClients
@@ -125,12 +127,12 @@
self._busyClients = set([])
self._freeClients = set([])
self._pendingConnects = 0
- self._commands = []
+ self._pendingRequests = []
def _isIdle(self):
return (
len(self._busyClients) == 0 and
- len(self._commands) == 0 and
+ len(self._pendingRequests) == 0 and
self._pendingConnects == 0
)
@@ -152,17 +154,7 @@
self._pendingConnects += 1
- def _connected(client):
- self._pendingConnects -= 1
-
- return client
-
- def _badGateway(f):
- raise HTTPError(StatusResponse(responsecode.BAD_GATEWAY, "Could not connect to reverse proxy host."))
-
factory = self.clientFactory(self._reactor)
- factory.noisy = False
-
factory.connectionPool = self
if self._scheme == "https":
@@ -173,10 +165,24 @@
else:
raise ValueError("URL scheme for client pool not supported")
- d = factory.deferred
+ def _doneOK(client):
+ self._pendingConnects -= 1
- d.addCallback(_connected)
- d.addErrback(_badGateway)
+ def _goneClientAfterError(f, client):
+ f.trap(ConnectionLost, ConnectionDone, ConnectError)
+ self.clientGone(client)
+
+ d2 = factory.afterConnect
+ d2.addErrback(_goneClientAfterError, client)
+ return client
+
+ def _doneError(result):
+ self._pendingConnects -= 1
+ return result
+
+ d = factory.onConnect
+ d.addCallbacks(_doneOK, _doneError)
+
return d
def _performRequestOnClient(self, client, request, *args, **kwargs):
@@ -200,11 +206,16 @@
self.clientFree(client)
return result
+ def _goneClientAfterError(result):
+ self.clientGone(client)
+ return result
+
self.clientBusy(client)
d = client.submitRequest(request, closeAfter=False)
- d.addBoth(_freeClientAfterRequest)
+ d.addCallbacks(_freeClientAfterRequest, _goneClientAfterError)
return d
+ @inlineCallbacks
def submitRequest(self, request, *args, **kwargs):
"""
Select an available client and perform the given request on it.
@@ -219,15 +230,39 @@
@return: A L{Deferred} that fires with the result of the given command.
"""
- client = None
+ # Try this maxRetries times
+ for ctr in xrange(self.maxRetries + 1):
+ try:
+ response = (yield self._submitRequest(request, args, kwargs))
+ except (ConnectionLost, ConnectionDone, ConnectError), e:
+ self.log_error("HTTP pooled client connection error (attempt: %d) - retrying: %s" % (ctr+1, e,))
+ continue
+ else:
+ returnValue(response)
+ else:
+ self.log_error("HTTP pooled client connection error - exhausted retry attempts.")
+ raise HTTPError(StatusResponse(responsecode.BAD_GATEWAY, "Could not connect to HTTP pooled client host."))
+
+ def _submitRequest(self, request, *args, **kwargs):
+ """
+ Select an available client and perform the given request on it.
+
+ @param command: A C{str} representing an attribute of
+ L{MemCacheProtocol}.
+ @parma args: Any positional arguments that should be passed to
+ C{command}.
+ @param kwargs: Any keyword arguments that should be passed to
+ C{command}.
+
+ @return: A L{Deferred} that fires with the result of the given command.
+ """
+
if len(self._freeClients) > 0:
- client = self._freeClients.pop()
+ d = self._performRequestOnClient(self._freeClients.pop(), request, *args, **kwargs)
- d = self._performRequestOnClient(client, request, *args, **kwargs)
-
elif len(self._busyClients) + self._pendingConnects >= self._maxClients:
d = Deferred()
- self._commands.append((d, request, args, kwargs))
+ self._pendingRequests.append((d, request, args, kwargs))
self.log_debug("Request queued: %s, %r, %r" % (request, args, kwargs))
self._logClientStats()
@@ -243,7 +278,7 @@
len(self._freeClients),
len(self._busyClients),
self._pendingConnects,
- len(self._commands)))
+ len(self._pendingRequests)))
def clientGone(self, client):
"""
@@ -260,6 +295,8 @@
self.log_debug("Removed client: %r" % (client,))
self._logClientStats()
+ self._processPending()
+
def clientBusy(self, client):
"""
Notify that the given client is being used to complete a request.
@@ -289,20 +326,23 @@
if self.shutdown_deferred and self._isIdle():
self.shutdown_deferred.callback(None)
- if len(self._commands) > 0:
- d, request, args, kwargs = self._commands.pop(0)
+ self.log_debug("Freed client: %r" % (client,))
+ self._logClientStats()
+ self._processPending()
+
+ def _processPending(self):
+ if len(self._pendingRequests) > 0:
+ d, request, args, kwargs = self._pendingRequests.pop(0)
+
self.log_debug("Performing Queued Request: %s, %r, %r" % (
request, args, kwargs))
self._logClientStats()
- _ign_d = self.submitRequest(request, *args, **kwargs)
+ _ign_d = self._submitRequest(request, *args, **kwargs)
- _ign_d.addCallback(d.callback)
+ _ign_d.addCallbacks(d.callback, d.errback)
- self.log_debug("Freed client: %r" % (client,))
- self._logClientStats()
-
def suggestMaxClients(self, maxClients):
"""
Suggest the maximum number of concurrently connected clients.
@@ -327,7 +367,8 @@
def installPool(name, url, maxClients=5, reactor=None):
parsedURL = urlparse.urlparse(url)
- pool = ReverseProxyPool(
+ pool = HTTPClientPool(
+ name,
parsedURL.scheme,
IPv4Address(
"TCP",
@@ -339,5 +380,5 @@
)
_clientPools[name] = pool
-def getReverseProxyPool(name):
+def getHTTPClientPool(name):
return _clientPools[name]
Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/reverseproxy.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/reverseproxy.py 2009-11-24 16:30:24 UTC (rev 4803)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/reverseproxy.py 2009-11-24 20:53:14 UTC (rev 4804)
@@ -23,7 +23,7 @@
from twisted.web2.client.http import ClientRequest
from twisted.web2.resource import LeafResource
-from twistedcaldav.client.pool import getReverseProxyPool
+from twistedcaldav.client.pool import getHTTPClientPool
from twistedcaldav.log import LoggingMixIn
import urllib
@@ -64,7 +64,7 @@
"""
self.logger.info("%s %s %s" % (request.method, urllib.unquote(request.uri), "HTTP/%s.%s" % request.clientproto))
- clientPool = getReverseProxyPool(self.poolID)
+ clientPool = getHTTPClientPool(self.poolID)
proxyRequest = ClientRequest(request.method, request.uri, request.headers, request.stream)
response = (yield clientPool.submitRequest(proxyRequest))
returnValue(response)
Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/directory/directory.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/directory/directory.py 2009-11-24 16:30:24 UTC (rev 4803)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/directory/directory.py 2009-11-24 20:53:14 UTC (rev 4804)
@@ -356,7 +356,8 @@
self.calendarUserAddresses = set(augment.calendarUserAddresses)
if self.enabledForCalendaring and self.recordType == self.service.recordType_groups:
- raise AssertionError("Groups may not be enabled for calendaring")
+ self.log_error("Group '%s(%s)' cannot be enabled for calendaring" % (self.guid, self.shortName,))
+ self.enabledForCalendaring = False
if self.enabledForCalendaring:
for email in self.emailAddresses:
Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/ischedule.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/ischedule.py 2009-11-24 16:30:24 UTC (rev 4803)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/ischedule.py 2009-11-24 20:53:14 UTC (rev 4804)
@@ -172,7 +172,9 @@
def _generateHeaders(self):
self.headers = Headers()
self.headers.setHeader('Host', utf8String(self.server.host + ":%s" % (self.server.port,)))
- self.headers.addRawHeader('Originator', utf8String(self.scheduler.originator.cuaddr))
+
+ # The Originator must be the ORGANIZER (for a request) or ATTENDEE (for a reply)
+ self.headers.addRawHeader('Originator', utf8String(self.scheduler.organizer.cuaddr if self.scheduler.isiTIPRequest else self.scheduler.attendee))
self._doAuthentication()
for recipient in self.recipients:
self.headers.addRawHeader('Recipient', utf8String(recipient.cuaddr))
Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/scheduler.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/scheduler.py 2009-11-24 16:30:24 UTC (rev 4803)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/scheduler.py 2009-11-24 20:53:14 UTC (rev 4804)
@@ -72,6 +72,8 @@
self.recipients = None
self.calendar = None
self.organizer = None
+ self.attendee = None
+ self.isiTIPRequest = None
self.timeRange = None
self.excludeUID = None
self.fakeTheResult = False
@@ -281,6 +283,27 @@
log.err("X-CALENDARSERVER-ACCESS not allowed in a calendar component %s request: %s" % (self.method, self.calendar,))
raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (calendarserver_namespace, "no-access-restrictions")))
+ # Determine iTIP method mode
+ if self.calendar.propertyValue("METHOD") in ("PUBLISH", "REQUEST", "ADD", "CANCEL", "DECLINECOUNTER"):
+ self.isiTIPRequest = True
+
+ elif self.calendar.propertyValue("METHOD") in ("REPLY", "COUNTER", "REFRESH"):
+ self.isiTIPRequest = False
+
+ # Verify that there is a single ATTENDEE property
+ attendees = self.calendar.getAttendees()
+
+ # Must have only one
+ if len(attendees) != 1:
+ log.err("Wrong number of ATTENDEEs in calendar data: %s" % (self.calendardata,))
+ raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "attendee-allowed")))
+ self.attendee = attendees[0]
+
+ else:
+ msg = "Unknown iTIP METHOD: %s" % (self.calendar.propertyValue("METHOD"),)
+ log.err(msg)
+ raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "valid-calendar-data"), description=msg))
+
def checkForFreeBusy(self):
if not hasattr(self, "isfreebusy"):
if (self.calendar.propertyValue("METHOD") == "REQUEST") and (self.calendar.mainType() == "VFREEBUSY"):
@@ -555,17 +578,8 @@
Only local attendees are allowed for message originating from this server.
"""
- # Verify that there is a single ATTENDEE property
- attendees = self.calendar.getAttendees()
-
- # Must have only one
- if len(attendees) != 1:
- log.err("Wrong number of ATTENDEEs in calendar data: %s" % (self.calendar,))
- raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "attendee-allowed")))
- attendee = attendees[0]
-
# Attendee's Outbox MUST be the request URI
- attendeePrincipal = self.resource.principalForCalendarUserAddress(attendee)
+ attendeePrincipal = self.resource.principalForCalendarUserAddress(self.attendee)
if attendeePrincipal:
if self.doingPOST and attendeePrincipal.scheduleOutboxURL() != self.request.uri:
log.err("ATTENDEE in calendar data does not match owner of Outbox: %s" % (self.calendar,))
@@ -580,16 +594,12 @@
"""
# Prevent spoofing of ORGANIZER with specific METHODs when local
- if self.calendar.propertyValue("METHOD") in ("PUBLISH", "REQUEST", "ADD", "CANCEL", "DECLINECOUNTER"):
+ if self.isiTIPRequest:
self.checkOrganizerAsOriginator()
# Prevent spoofing when doing reply-like METHODs
- elif self.calendar.propertyValue("METHOD") in ("REPLY", "COUNTER", "REFRESH"):
+ else:
self.checkAttendeeAsOriginator()
-
- else:
- log.err("Unknown iTIP METHOD for security checks: %s" % (self.calendar.propertyValue("METHOD"),))
- raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "valid-calendar-data"), description="Unknown iTIP METHOD for security checks"))
def finalChecks(self):
"""
@@ -803,17 +813,8 @@
Only local attendees are allowed for message originating from this server.
"""
- # Verify that there is a single ATTENDEE property
- attendees = self.calendar.getAttendees()
-
- # Must have only one
- if len(attendees) != 1:
- log.err("Wrong number of ATTENDEEs in calendar data: %s" % (self.calendar,))
- raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "attendee-allowed")))
- attendee = attendees[0]
-
# Attendee cannot be local.
- attendeePrincipal = self.resource.principalForCalendarUserAddress(attendee)
+ attendeePrincipal = self.resource.principalForCalendarUserAddress(self.attendee)
if attendeePrincipal:
if attendeePrincipal.locallyHosted():
log.err("Invalid ATTENDEE in calendar data: %s" % (self.calendar,))
@@ -821,7 +822,7 @@
else:
self._validPartitionServer(attendeePrincipal)
else:
- localUser = (yield addressmapping.mapper.isCalendarUserInMyDomain(attendee))
+ localUser = (yield addressmapping.mapper.isCalendarUserInMyDomain(self.attendee))
if localUser:
log.err("Unknown ATTENDEE in calendar data: %s" % (self.calendar,))
raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "attendee-allowed")))
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20091124/604941b1/attachment-0001.html>
More information about the calendarserver-changes
mailing list