[CalendarServer-changes] [4804] CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Tue Nov 24 12:54:20 PST 2009


Revision: 4804
          http://trac.macosforge.org/projects/calendarserver/changeset/4804
Author:   cdaboo at apple.com
Date:     2009-11-24 12:53:14 -0800 (Tue, 24 Nov 2009)
Log Message:
-----------
Catching up to changes made on deployment branch.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py
    CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/reverseproxy.py
    CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/directory/directory.py
    CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/ischedule.py
    CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/scheduler.py

Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py	2009-11-24 16:30:24 UTC (rev 4803)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/pool.py	2009-11-24 20:53:14 UTC (rev 4804)
@@ -17,13 +17,14 @@
 __all__ = [
     "installPools",
     "installPool",
-    "getReverseProxyPool",
+    "getHTTPClientPool",
 ]
 
 from twext.internet.ssl import ChainingOpenSSLContextFactory
 from twisted.internet.address import IPv4Address
-from twisted.internet.defer import Deferred
-from twisted.internet.protocol import ReconnectingClientFactory
+from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
+from twisted.internet.error import ConnectionLost, ConnectionDone, ConnectError
+from twisted.internet.protocol import ClientFactory
 from twisted.web2 import responsecode
 from twisted.web2.client.http import HTTPClientProtocol
 from twisted.web2.http import StatusResponse, HTTPError
@@ -32,57 +33,55 @@
 import OpenSSL
 import urlparse
 
-class ReverseProxyClientFactory(ReconnectingClientFactory, LoggingMixIn):
+class PooledHTTPClientFactory(ClientFactory, LoggingMixIn):
     """
-    A client factory for HTTPClient that reconnects and notifies a pool of it's
-    state.
+    A client factory for HTTPClient that notifies a pool of it's state. It the connection
+    fails in the middle of a request it will retry the request.
 
+    @ivar protocol: The current instance of our protocol that we pass
+        to our connectionPool.
     @ivar connectionPool: A managing connection pool that we notify of events.
-    @ivar deferred: A L{Deferred} that represents the initial connection.
-    @ivar _protocolInstance: The current instance of our protocol that we pass
-        to our connectionPool.
     """
     protocol = HTTPClientProtocol
     connectionPool = None
-    maxRetries = 2
 
     def __init__(self, reactor):
         self.reactor = reactor
         self.instance = None
-        self.deferred = Deferred()
+        self.onConnect = Deferred()
+        self.afterConnect = Deferred()
 
     def clientConnectionLost(self, connector, reason):
         """
         Notify the connectionPool that we've lost our connection.
         """
 
+        if hasattr(self, "afterConnect"):
+            self.reactor.callLater(0, self.afterConnect.errback, reason)
+            del self.afterConnect
+
         if self.connectionPool.shutdown_requested:
             # The reactor is stopping; don't reconnect
             return
 
-        self.log_error("ReverseProxy connection lost: %s" % (reason,))
-        if self.instance is not None:
-            self.connectionPool.clientGone(self.instance)
-
     def clientConnectionFailed(self, connector, reason):
         """
         Notify the connectionPool that we're unable to connect
         """
-        self.log_error("ReverseProxy connection failed: %s" % (reason,))
-        if hasattr(self, "deferred"):
-            self.reactor.callLater(0, self.deferred.errback, reason)
-            del self.deferred
+        if hasattr(self, "onConnect"):
+            self.reactor.callLater(0, self.onConnect.errback, reason)
+            del self.onConnect
+        elif hasattr(self, "afterConnect"):
+            self.reactor.callLater(0, self.afterConnect.errback, reason)
+            del self.afterConnect
 
     def buildProtocol(self, addr):
-        if self.instance is not None:
-            self.connectionPool.clientGone(self.instance)
-
         self.instance = self.protocol()
-        self.reactor.callLater(0, self.deferred.callback, self.instance)
-        del self.deferred
+        self.reactor.callLater(0, self.onConnect.callback, self.instance)
+        del self.onConnect
         return self.instance
 
-class ReverseProxyPool(LoggingMixIn):
+class HTTPClientPool(LoggingMixIn):
     """
     A connection pool for HTTPClientProtocol instances.
 
@@ -100,9 +99,10 @@
     @ivar _pendingConnects: A C{int} indicating how many connections are in
         progress.
     """
-    clientFactory = ReverseProxyClientFactory
+    clientFactory = PooledHTTPClientFactory
+    maxRetries = 2
 
-    def __init__(self, scheme, serverAddress, maxClients=5, reactor=None):
+    def __init__(self, name, scheme, serverAddress, maxClients=5, reactor=None):
         """
         @param serverAddress: An L{IPv4Address} indicating the server to
             connect to.
@@ -110,6 +110,8 @@
         @param reactor: An L{IReactorTCP{ provider used to initiate new
             connections.
         """
+        
+        self._name = name
         self._scheme = scheme
         self._serverAddress = serverAddress
         self._maxClients = maxClients
@@ -125,12 +127,12 @@
         self._busyClients = set([])
         self._freeClients = set([])
         self._pendingConnects = 0
-        self._commands = []
+        self._pendingRequests = []
 
     def _isIdle(self):
         return (
             len(self._busyClients) == 0 and
-            len(self._commands) == 0 and
+            len(self._pendingRequests) == 0 and
             self._pendingConnects == 0
         )
 
@@ -152,17 +154,7 @@
 
         self._pendingConnects += 1
 
-        def _connected(client):
-            self._pendingConnects -= 1
-
-            return client
-
-        def _badGateway(f):
-            raise HTTPError(StatusResponse(responsecode.BAD_GATEWAY, "Could not connect to reverse proxy host."))
-
         factory = self.clientFactory(self._reactor)
-        factory.noisy = False
-
         factory.connectionPool = self
 
         if self._scheme == "https":
@@ -173,10 +165,24 @@
         else:
             raise ValueError("URL scheme for client pool not supported")
 
-        d = factory.deferred
+        def _doneOK(client):
+            self._pendingConnects -= 1
 
-        d.addCallback(_connected)
-        d.addErrback(_badGateway)
+            def _goneClientAfterError(f, client):
+                f.trap(ConnectionLost, ConnectionDone, ConnectError)
+                self.clientGone(client)
+
+            d2 = factory.afterConnect
+            d2.addErrback(_goneClientAfterError, client)
+            return client
+
+        def _doneError(result):
+            self._pendingConnects -= 1
+            return result
+
+        d = factory.onConnect
+        d.addCallbacks(_doneOK, _doneError)
+        
         return d
 
     def _performRequestOnClient(self, client, request, *args, **kwargs):
@@ -200,11 +206,16 @@
             self.clientFree(client)
             return result
 
+        def _goneClientAfterError(result):
+            self.clientGone(client)
+            return result
+
         self.clientBusy(client)
         d = client.submitRequest(request, closeAfter=False)
-        d.addBoth(_freeClientAfterRequest)
+        d.addCallbacks(_freeClientAfterRequest, _goneClientAfterError)
         return d
 
+    @inlineCallbacks
     def submitRequest(self, request, *args, **kwargs):
         """
         Select an available client and perform the given request on it.
@@ -219,15 +230,39 @@
         @return: A L{Deferred} that fires with the result of the given command.
         """
 
-        client = None
+        # Try this maxRetries times
+        for ctr in xrange(self.maxRetries + 1):
+            try:
+                response = (yield self._submitRequest(request, args, kwargs))
+            except (ConnectionLost, ConnectionDone, ConnectError), e:
+                self.log_error("HTTP pooled client connection error (attempt: %d) - retrying: %s" % (ctr+1, e,))
+                continue
+            else:
+                returnValue(response)
+        else:
+            self.log_error("HTTP pooled client connection error - exhausted retry attempts.")
+            raise HTTPError(StatusResponse(responsecode.BAD_GATEWAY, "Could not connect to HTTP pooled client host."))
+
+    def _submitRequest(self, request, *args, **kwargs):
+        """
+        Select an available client and perform the given request on it.
+
+        @param command: A C{str} representing an attribute of
+            L{MemCacheProtocol}.
+        @parma args: Any positional arguments that should be passed to
+            C{command}.
+        @param kwargs: Any keyword arguments that should be passed to
+            C{command}.
+
+        @return: A L{Deferred} that fires with the result of the given command.
+        """
+
         if len(self._freeClients) > 0:
-            client = self._freeClients.pop()
+            d = self._performRequestOnClient(self._freeClients.pop(), request, *args, **kwargs)
 
-            d = self._performRequestOnClient(client, request, *args, **kwargs)
-
         elif len(self._busyClients) + self._pendingConnects >= self._maxClients:
             d = Deferred()
-            self._commands.append((d, request, args, kwargs))
+            self._pendingRequests.append((d, request, args, kwargs))
             self.log_debug("Request queued: %s, %r, %r" % (request, args, kwargs))
             self._logClientStats()
 
@@ -243,7 +278,7 @@
                 len(self._freeClients),
                 len(self._busyClients),
                 self._pendingConnects,
-                len(self._commands)))
+                len(self._pendingRequests)))
 
     def clientGone(self, client):
         """
@@ -260,6 +295,8 @@
         self.log_debug("Removed client: %r" % (client,))
         self._logClientStats()
 
+        self._processPending()
+
     def clientBusy(self, client):
         """
         Notify that the given client is being used to complete a request.
@@ -289,20 +326,23 @@
         if self.shutdown_deferred and self._isIdle():
             self.shutdown_deferred.callback(None)
 
-        if len(self._commands) > 0:
-            d, request, args, kwargs = self._commands.pop(0)
+        self.log_debug("Freed client: %r" % (client,))
+        self._logClientStats()
 
+        self._processPending()
+
+    def _processPending(self):
+        if len(self._pendingRequests) > 0:
+            d, request, args, kwargs = self._pendingRequests.pop(0)
+
             self.log_debug("Performing Queued Request: %s, %r, %r" % (
                     request, args, kwargs))
             self._logClientStats()
 
-            _ign_d = self.submitRequest(request, *args, **kwargs)
+            _ign_d = self._submitRequest(request, *args, **kwargs)
 
-            _ign_d.addCallback(d.callback)
+            _ign_d.addCallbacks(d.callback, d.errback)
 
-        self.log_debug("Freed client: %r" % (client,))
-        self._logClientStats()
-
     def suggestMaxClients(self, maxClients):
         """
         Suggest the maximum number of concurrently connected clients.
@@ -327,7 +367,8 @@
 def installPool(name, url, maxClients=5, reactor=None):
 
     parsedURL = urlparse.urlparse(url)
-    pool = ReverseProxyPool(
+    pool = HTTPClientPool(
+        name,
         parsedURL.scheme,
         IPv4Address(
             "TCP",
@@ -339,5 +380,5 @@
     )
     _clientPools[name] = pool
 
-def getReverseProxyPool(name):
+def getHTTPClientPool(name):
     return _clientPools[name]

Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/reverseproxy.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/reverseproxy.py	2009-11-24 16:30:24 UTC (rev 4803)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/client/reverseproxy.py	2009-11-24 20:53:14 UTC (rev 4804)
@@ -23,7 +23,7 @@
 from twisted.web2.client.http import ClientRequest
 from twisted.web2.resource import LeafResource
 
-from twistedcaldav.client.pool import getReverseProxyPool
+from twistedcaldav.client.pool import getHTTPClientPool
 from twistedcaldav.log import LoggingMixIn
 
 import urllib
@@ -64,7 +64,7 @@
         """
             
         self.logger.info("%s %s %s" % (request.method, urllib.unquote(request.uri), "HTTP/%s.%s" % request.clientproto))
-        clientPool = getReverseProxyPool(self.poolID)
+        clientPool = getHTTPClientPool(self.poolID)
         proxyRequest = ClientRequest(request.method, request.uri, request.headers, request.stream)
         response = (yield clientPool.submitRequest(proxyRequest))
         returnValue(response)

Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/directory/directory.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/directory/directory.py	2009-11-24 16:30:24 UTC (rev 4803)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/directory/directory.py	2009-11-24 20:53:14 UTC (rev 4804)
@@ -356,7 +356,8 @@
             self.calendarUserAddresses = set(augment.calendarUserAddresses)
 
             if self.enabledForCalendaring and self.recordType == self.service.recordType_groups:
-                raise AssertionError("Groups may not be enabled for calendaring")
+                self.log_error("Group '%s(%s)' cannot be enabled for calendaring" % (self.guid, self.shortName,))
+                self.enabledForCalendaring = False
     
             if self.enabledForCalendaring:
                 for email in self.emailAddresses:

Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/ischedule.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/ischedule.py	2009-11-24 16:30:24 UTC (rev 4803)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/ischedule.py	2009-11-24 20:53:14 UTC (rev 4804)
@@ -172,7 +172,9 @@
     def _generateHeaders(self):
         self.headers = Headers()
         self.headers.setHeader('Host', utf8String(self.server.host + ":%s" % (self.server.port,)))
-        self.headers.addRawHeader('Originator', utf8String(self.scheduler.originator.cuaddr))
+        
+        # The Originator must be the ORGANIZER (for a request) or ATTENDEE (for a reply)
+        self.headers.addRawHeader('Originator', utf8String(self.scheduler.organizer.cuaddr if self.scheduler.isiTIPRequest else self.scheduler.attendee))
         self._doAuthentication()
         for recipient in self.recipients:
             self.headers.addRawHeader('Recipient', utf8String(recipient.cuaddr))

Modified: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/scheduler.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/scheduler.py	2009-11-24 16:30:24 UTC (rev 4803)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/scheduling/scheduler.py	2009-11-24 20:53:14 UTC (rev 4804)
@@ -72,6 +72,8 @@
         self.recipients = None
         self.calendar = None
         self.organizer = None
+        self.attendee = None
+        self.isiTIPRequest = None
         self.timeRange = None
         self.excludeUID = None
         self.fakeTheResult = False
@@ -281,6 +283,27 @@
             log.err("X-CALENDARSERVER-ACCESS not allowed in a calendar component %s request: %s" % (self.method, self.calendar,))
             raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (calendarserver_namespace, "no-access-restrictions")))
     
+        # Determine iTIP method mode
+        if self.calendar.propertyValue("METHOD") in ("PUBLISH", "REQUEST", "ADD", "CANCEL", "DECLINECOUNTER"):
+            self.isiTIPRequest = True
+
+        elif self.calendar.propertyValue("METHOD") in ("REPLY", "COUNTER", "REFRESH"):
+            self.isiTIPRequest = False
+
+            # Verify that there is a single ATTENDEE property
+            attendees = self.calendar.getAttendees()
+        
+            # Must have only one
+            if len(attendees) != 1:
+                log.err("Wrong number of ATTENDEEs in calendar data: %s" % (self.calendardata,))
+                raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "attendee-allowed")))
+            self.attendee = attendees[0]
+
+        else:
+            msg = "Unknown iTIP METHOD: %s" % (self.calendar.propertyValue("METHOD"),)
+            log.err(msg)
+            raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "valid-calendar-data"), description=msg))
+
     def checkForFreeBusy(self):
         if not hasattr(self, "isfreebusy"):
             if (self.calendar.propertyValue("METHOD") == "REQUEST") and (self.calendar.mainType() == "VFREEBUSY"):
@@ -555,17 +578,8 @@
         Only local attendees are allowed for message originating from this server.
         """
         
-        # Verify that there is a single ATTENDEE property
-        attendees = self.calendar.getAttendees()
-    
-        # Must have only one
-        if len(attendees) != 1:
-            log.err("Wrong number of ATTENDEEs in calendar data: %s" % (self.calendar,))
-            raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "attendee-allowed")))
-        attendee = attendees[0]
-    
         # Attendee's Outbox MUST be the request URI
-        attendeePrincipal = self.resource.principalForCalendarUserAddress(attendee)
+        attendeePrincipal = self.resource.principalForCalendarUserAddress(self.attendee)
         if attendeePrincipal:
             if self.doingPOST and attendeePrincipal.scheduleOutboxURL() != self.request.uri:
                 log.err("ATTENDEE in calendar data does not match owner of Outbox: %s" % (self.calendar,))
@@ -580,16 +594,12 @@
         """
     
         # Prevent spoofing of ORGANIZER with specific METHODs when local
-        if self.calendar.propertyValue("METHOD") in ("PUBLISH", "REQUEST", "ADD", "CANCEL", "DECLINECOUNTER"):
+        if self.isiTIPRequest:
             self.checkOrganizerAsOriginator()
     
         # Prevent spoofing when doing reply-like METHODs
-        elif self.calendar.propertyValue("METHOD") in ("REPLY", "COUNTER", "REFRESH"):
+        else:
             self.checkAttendeeAsOriginator()
-            
-        else:
-            log.err("Unknown iTIP METHOD for security checks: %s" % (self.calendar.propertyValue("METHOD"),))
-            raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "valid-calendar-data"), description="Unknown iTIP METHOD for security checks"))
 
     def finalChecks(self):
         """
@@ -803,17 +813,8 @@
         Only local attendees are allowed for message originating from this server.
         """
         
-        # Verify that there is a single ATTENDEE property
-        attendees = self.calendar.getAttendees()
-    
-        # Must have only one
-        if len(attendees) != 1:
-            log.err("Wrong number of ATTENDEEs in calendar data: %s" % (self.calendar,))
-            raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "attendee-allowed")))
-        attendee = attendees[0]
-    
         # Attendee cannot be local.
-        attendeePrincipal = self.resource.principalForCalendarUserAddress(attendee)
+        attendeePrincipal = self.resource.principalForCalendarUserAddress(self.attendee)
         if attendeePrincipal:
             if attendeePrincipal.locallyHosted():
                 log.err("Invalid ATTENDEE in calendar data: %s" % (self.calendar,))
@@ -821,7 +822,7 @@
             else:
                 self._validPartitionServer(attendeePrincipal)                
         else:
-            localUser = (yield addressmapping.mapper.isCalendarUserInMyDomain(attendee))
+            localUser = (yield addressmapping.mapper.isCalendarUserInMyDomain(self.attendee))
             if localUser:
                 log.err("Unknown ATTENDEE in calendar data: %s" % (self.calendar,))
                 raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "attendee-allowed")))
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20091124/604941b1/attachment-0001.html>


More information about the calendarserver-changes mailing list