[CalendarServer-changes] [11492] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Fri Jul 5 17:56:45 PDT 2013


Revision: 11492
          http://trac.calendarserver.org//changeset/11492
Author:   glyph at apple.com
Date:     2013-07-05 17:56:45 -0700 (Fri, 05 Jul 2013)
Log Message:
-----------
Merge /branches/users/glyph/hang-fix.

This addresses a potential hang when the server experiences a certain class of
error, as well as a problem recovering from hangs during a rolling restart.
Also, simplify and better document the process for the manager process keeping
track of what the workers are doing.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/twext/internet/sendfdport.py
    CalendarServer/trunk/twext/internet/test/test_sendfdport.py
    CalendarServer/trunk/twext/web2/metafd.py
    CalendarServer/trunk/twext/web2/server.py
    CalendarServer/trunk/twext/web2/stream.py
    CalendarServer/trunk/twext/web2/test/test_metafd.py
    CalendarServer/trunk/twext/web2/test/test_server.py

Property Changed:
----------------
    CalendarServer/trunk/


Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalDAVTester/trunk:11193-11198
/CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/store-scheduling:10876-11129
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/gaya/sharedgroups-3:11088-11204
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/launchd-wrapper-bis:11413-11436
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop:11060-11065
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/warning-cleanups:11347-11357
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
   + /CalDAVTester/trunk:11193-11198
/CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/store-scheduling:10876-11129
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/gaya/sharedgroups-3:11088-11204
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/hang-fix:11465-11491
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/launchd-wrapper-bis:11413-11436
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop:11060-11065
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/warning-cleanups:11347-11357
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2013-07-06 00:43:38 UTC (rev 11491)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2013-07-06 00:56:45 UTC (rev 11492)
@@ -1758,6 +1758,22 @@
 
     def getFileDescriptors(self):
         """
+        Get the file descriptors that will be passed to the subprocess, as a
+        mapping that will be used with L{IReactorProcess.spawnProcess}.
+
+        If this server is configured to use a meta FD, pass the client end of
+        the meta FD.  If this server is configured to use an AMP database
+        connection pool, pass a pre-connected AMP socket.
+
+        Note that, contrary to the documentation for
+        L{twext.internet.sendfdport.InheritedSocketDispatcher.addSocket}, this
+        does I{not} close the added child socket; this method
+        (C{getFileDescriptors}) is called repeatedly to start a new process
+        with the same C{LogID} if the previous one exits.  Therefore we
+        consistently re-use the same file descriptor and leave it open in the
+        master, relying upon process-exit notification rather than noticing the
+        meta-FD socket was closed in the subprocess.
+
         @return: a mapping of file descriptor numbers for the new (child)
             process to file descriptor numbers in the current (master) process.
         """

Modified: CalendarServer/trunk/twext/internet/sendfdport.py
===================================================================
--- CalendarServer/trunk/twext/internet/sendfdport.py	2013-07-06 00:43:38 UTC (rev 11491)
+++ CalendarServer/trunk/twext/internet/sendfdport.py	2013-07-06 00:56:45 UTC (rev 11492)
@@ -25,6 +25,8 @@
 from socket import (socketpair, fromfd, error as SocketError, AF_UNIX,
                     SOCK_STREAM, SOCK_DGRAM)
 
+from zope.interface import Interface
+
 from twisted.internet.abstract import FileDescriptor
 from twisted.internet.protocol import Protocol, Factory
 
@@ -58,9 +60,13 @@
 
 class InheritingProtocolFactory(Factory, object):
     """
-    In the 'master' process, make one of these and hook it up to the sockets
-    where you want to hear stuff.
+    An L{InheritingProtocolFactory} is a protocol factory which listens for
+    incoming connections in a I{master process}, then sends those connections
+    off to be inherited by a I{worker process} via an
+    L{InheritedSocketDispatcher}.
 
+    L{InheritingProtocolFactory} is instantiated in the master process.
+
     @ivar dispatcher: an L{InheritedSocketDispatcher} to use to dispatch
         incoming connections to an appropriate subprocess.
 
@@ -91,21 +97,22 @@
     @ivar skt: the UNIX socket used as the sendmsg() transport.
 
     @ivar outgoingSocketQueue: an outgoing queue of sockets to send to the
-        subprocess.
+        subprocess, along with their descriptions (strings describing their
+        protocol so that the subprocess knows how to handle them; as of this
+        writing, either C{"TCP"} or C{"SSL"})
+    @ivar outgoingSocketQueue: a C{list} of 2-tuples of C{(socket-object,
+        bytes)}
 
-    @ivar outgoingSocketQueue: a C{list} of 2-tuples of C{(socket-object, str)}
-
     @ivar status: a record of the last status message received (via recvmsg)
         from the subprocess: this is an application-specific indication of how
         ready this subprocess is to receive more connections.  A typical usage
-        would be to count the open connections: this is what is passed to 
-
+        would be to count the open connections: this is what is passed to
     @type status: C{str}
     """
 
-    def __init__(self, dispatcher, skt):
+    def __init__(self, dispatcher, skt, status):
         FileDescriptor.__init__(self, dispatcher.reactor)
-        self.status = None
+        self.status = status
         self.dispatcher = dispatcher
         self.skt = skt          # XXX needs to be set non-blocking by somebody
         self.fileno = skt.fileno
@@ -151,11 +158,81 @@
 
 
 
+class IStatusWatcher(Interface):
+    """
+    A provider of L{IStatusWatcher} tracks the I{status messages} reported by
+    the worker processes over their control sockets, and computes internal
+    I{status values} for those messages.  The I{messages} are individual
+    octets, representing one of three operations.  C{0} meaning "a new worker
+    process has started, with zero connections being processed", C{+} meaning
+    "I have received and am processing your request; I am confirming that my
+    requests-being-processed count has gone up by one", and C{-} meaning "I
+    have completed processing a request, my requests-being-processed count has
+    gone down by one".  The I{status value} tracked by
+    L{_SubprocessSocket.status} is an integer, indicating the current
+    requests-being-processed value.  (FIXME: the intended design here is
+    actually just that all I{this} object knows about is that
+    L{_SubprocessSocket.status} is an orderable value, and that this
+    C{statusWatcher} will compute appropriate values so the status that I{sorts
+    the least} is the socket to which new connections should be directed; also,
+    the format of the status messages is only known / understood by the
+    C{statusWatcher}, not the L{InheritedSocketDispatcher}.  It's hard to
+    explain it in that manner though.)
+
+    @note: the intention of this interface is to eventually provide a broader
+        notion of what might constitute 'status', so the above explanation just
+        explains the current implementation, in for expediency's sake, rather
+        than the somewhat more abstract language that would be accurate.
+    """
+
+    def initialStatus():
+        """
+        A new socket was created and added to the dispatcher.  Compute an
+        initial value for its status.
+
+        @return: the new status.
+        """
+
+
+    def newConnectionStatus(previousStatus):
+        """
+        A new connection was sent to a given socket.  Compute its status based
+        on the previous status of that socket.
+
+        @param previousStatus: A status value for the socket being sent work,
+            previously returned by one of the methods on this interface.
+
+        @return: the socket's status after incrementing its outstanding work.
+        """
+
+
+    def statusFromMessage(previousStatus, message):
+        """
+        A status message was received by a worker.  Convert the previous status
+        value (returned from L{newConnectionStatus}, L{initialStatus}, or
+        L{statusFromMessage}).
+
+        @param previousStatus: A status value for the socket being sent work,
+            previously returned by one of the methods on this interface.
+
+        @return: the socket's status after taking the reported message into
+            account.
+        """
+
+
+
 class InheritedSocketDispatcher(object):
     """
     Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
-    list of available sockets in subprocesses and sends inbound connections
-    towards them.
+    list of available sockets that connect to I{worker process}es and sends
+    inbound connections to be inherited over those sockets, by those processes.
+
+    L{InheritedSocketDispatcher} is therefore insantiated in the I{master
+    process}.
+
+    @ivar statusWatcher: The object which will handle status messages and
+        convert them into current statuses, as well as .
+    @type statusWatcher: L{IStatusWatcher}
     """
 
     def __init__(self, statusWatcher):
@@ -183,17 +260,22 @@
         The status of a connection has changed; update all registered status
         change listeners.
         """
-        subsocket.status = self.statusWatcher.statusFromMessage(subsocket.status, message)
+        subsocket.status = self.statusWatcher.statusFromMessage(
+            subsocket.status, message
+        )
+        self.statusWatcher.statusesChanged(self.statuses)
 
 
     def sendFileDescriptor(self, skt, description):
         """
         A connection has been received.  Dispatch it.
 
-        @param skt: a socket object
+        @param skt: the I{connection socket} (i.e.: not the listening socket)
+        @type skt: L{socket.socket}
 
         @param description: some text to identify to the subprocess's
             L{InheritedPort} what type of transport to create for this socket.
+        @type description: C{bytes}
         """
         # We want None to sort after 0 and before 1, so coerce to 0.5 - this
         # allows the master to first schedule all child process that are up but
@@ -208,7 +290,10 @@
         selectedSocket.sendSocketToPeer(skt, description)
         # XXX Maybe want to send along 'description' or 'skt' or some
         # properties thereof? -glyph
-        selectedSocket.status = self.statusWatcher.newConnectionStatus(selectedSocket.status)
+        selectedSocket.status = self.statusWatcher.newConnectionStatus(
+           selectedSocket.status
+        )
+        self.statusWatcher.statusesChanged(self.statuses)
 
 
     def startDispatching(self):
@@ -232,7 +317,7 @@
         i, o = socketpair(AF_UNIX, SOCK_DGRAM)
         i.setblocking(False)
         o.setblocking(False)
-        a = _SubprocessSocket(self, o)
+        a = _SubprocessSocket(self, o, self.statusWatcher.initialStatus())
         self._subprocessSockets.append(a)
         if self._isDispatching:
             a.startReading()
@@ -242,13 +327,16 @@
 
 class InheritedPort(FileDescriptor, object):
     """
-    Create this in the 'slave' process to handle incoming connections
-    dispatched via C{sendmsg}.
+    An L{InheritedPort} is an L{IReadDescriptor}/L{IWriteDescriptor} created in
+    the I{worker process} to handle incoming connections dispatched via
+    C{sendmsg}.
     """
 
     def __init__(self, fd, transportFactory, protocolFactory):
         """
-        @param fd: a file descriptor
+        @param fd: the file descriptor representing a UNIX socket connected to
+            a I{master process}.  We will call C{recvmsg} on this socket to
+            receive file descriptors.
         @type fd: C{int}
 
         @param transportFactory: a 4-argument function that takes the socket

Modified: CalendarServer/trunk/twext/internet/test/test_sendfdport.py
===================================================================
--- CalendarServer/trunk/twext/internet/test/test_sendfdport.py	2013-07-06 00:43:38 UTC (rev 11491)
+++ CalendarServer/trunk/twext/internet/test/test_sendfdport.py	2013-07-06 00:56:45 UTC (rev 11492)
@@ -1,3 +1,4 @@
+from twext.internet.sendfdport import IStatusWatcher
 # -*- test-case-name: twext.internet.test.test_sendfdport -*-
 ##
 # Copyright (c) 2010-2013 Apple Inc. All rights reserved.
@@ -22,28 +23,34 @@
 import os
 import fcntl
 
-from twext.internet.sendfdport import InheritedSocketDispatcher,\
-    _SubprocessSocket
+from twext.internet.sendfdport import InheritedSocketDispatcher
+
 from twext.web2.metafd import ConnectionLimiter
 from twisted.internet.interfaces import IReactorFDSet
 from twisted.trial.unittest import TestCase
-from zope.interface.declarations import implements
+from zope.interface import implementer
 
-
+ at implementer(IReactorFDSet)
 class ReaderAdder(object):
-    implements(IReactorFDSet)
 
     def __init__(self):
         self.readers = []
+        self.writers = []
 
+
     def addReader(self, reader):
         self.readers.append(reader)
 
+
     def getReaders(self):
         return self.readers[:]
 
 
+    def addWriter(self, writer):
+        self.writers.append(writer)
 
+
+
 def isNonBlocking(skt):
     """
     Determine if the given socket is blocking or not.
@@ -59,11 +66,50 @@
 
 
 
+from zope.interface.verify import verifyClass
+from zope.interface import implementer
+
+def verifiedImplementer(interface):
+    def _(cls):
+        result = implementer(interface)(cls)
+        verifyClass(interface, result)
+        return result
+    return _
+
+
+
+ at verifiedImplementer(IStatusWatcher)
+class Watcher(object):
+    def __init__(self, q):
+        self.q = q
+
+
+    def newConnectionStatus(self, previous):
+        return previous + 1
+
+
+    def statusFromMessage(self, previous, message):
+        return previous - 1
+
+
+    def statusesChanged(self, statuses):
+        self.q.append(list(statuses))
+
+
+    def initialStatus(self):
+        return 0
+
+
+
 class InheritedSocketDispatcherTests(TestCase):
     """
     Inherited socket dispatcher tests.
     """
+    def setUp(self):
+        self.dispatcher = InheritedSocketDispatcher(ConnectionLimiter(2, 20))
+        self.dispatcher.reactor = ReaderAdder()
 
+
     def test_nonBlocking(self):
         """
         Creating a L{_SubprocessSocket} via
@@ -71,12 +117,10 @@
         L{socket.socket} object being assigned to its C{skt} attribute, as well
         as a non-blocking L{socket.socket} object being returned.
         """
-        dispatcher = InheritedSocketDispatcher(None)
+        dispatcher = self.dispatcher
         dispatcher.startDispatching()
-        reactor = ReaderAdder()
-        dispatcher.reactor = reactor
         inputSocket = dispatcher.addSocket()
-        outputSocket = reactor.readers[-1]
+        outputSocket = self.dispatcher.reactor.readers[-1]
         self.assertTrue(isNonBlocking(inputSocket), "Input is blocking.")
         self.assertTrue(isNonBlocking(outputSocket), "Output is blocking.")
 
@@ -86,60 +130,41 @@
         Adding a socket to an L{InheritedSocketDispatcher} after it has already
         been started results in it immediately starting reading.
         """
-        reactor = ReaderAdder()
-        dispatcher = InheritedSocketDispatcher(None)
-        dispatcher.reactor = reactor
+        dispatcher = self.dispatcher
         dispatcher.startDispatching()
         dispatcher.addSocket()
-        self.assertEquals(reactor.getReaders(), dispatcher._subprocessSockets)
+        self.assertEquals(dispatcher.reactor.getReaders(),
+                          dispatcher._subprocessSockets)
 
 
-    def test_sendFileDescriptorSorting(self):
+    def test_statusesChangedOnNewConnection(self):
         """
-        Make sure InheritedSocketDispatcher.sendFileDescriptor sorts sockets with status None
-        higher than those with int status values.
+        L{InheritedSocketDispatcher.sendFileDescriptor} will update its
+        C{statusWatcher} via C{statusesChanged}.
         """
-
-        self.patch(_SubprocessSocket, 'sendSocketToPeer', lambda x, y, z:None)
-        dispatcher = InheritedSocketDispatcher(ConnectionLimiter(2, 20))
+        q = []
+        dispatcher = self.dispatcher
+        dispatcher.statusWatcher = Watcher(q)
+        description = "whatever"
+        # Need to have a socket that will accept the descriptors.
         dispatcher.addSocket()
-        dispatcher.addSocket()
-        dispatcher.addSocket()
+        dispatcher.sendFileDescriptor(object(), description)
+        dispatcher.sendFileDescriptor(object(), description)
+        self.assertEquals(q, [[1], [2]])
 
-        sockets = dispatcher._subprocessSockets[:]
 
-        # Check that 0 is preferred over None
-        sockets[0].status = 0
-        sockets[1].status = 1
-        sockets[2].status = None
-
-        dispatcher.sendFileDescriptor(None, "")
-
-        self.assertEqual(sockets[0].status, 1)
-        self.assertEqual(sockets[1].status, 1)
-        self.assertEqual(sockets[2].status, None)
-
-        dispatcher.sendFileDescriptor(None, "")
-
-        self.assertEqual(sockets[0].status, 1)
-        self.assertEqual(sockets[1].status, 1)
-        self.assertEqual(sockets[2].status, 1)
-
-        # Check that after going to 1 and back to 0 that is still preferred over None
-        sockets[0].status = 0
-        sockets[1].status = 1
-        sockets[2].status = None
-
-        dispatcher.sendFileDescriptor(None, "")
-
-        self.assertEqual(sockets[0].status, 1)
-        self.assertEqual(sockets[1].status, 1)
-        self.assertEqual(sockets[2].status, None)
-
-        sockets[1].status = 0
-
-        dispatcher.sendFileDescriptor(None, "")
-
-        self.assertEqual(sockets[0].status, 1)
-        self.assertEqual(sockets[1].status, 1)
-        self.assertEqual(sockets[2].status, None)
+    def test_statusesChangedOnStatusMessage(self):
+        """
+        L{InheritedSocketDispatcher.sendFileDescriptor} will update its
+        C{statusWatcher} will update its C{statusWatcher} via
+        C{statusesChanged}.
+        """
+        q = []
+        dispatcher = self.dispatcher
+        dispatcher.statusWatcher = Watcher(q)
+        message = "whatever"
+        # Need to have a socket that will accept the descriptors.
+        dispatcher.addSocket()
+        dispatcher.statusMessage(dispatcher._subprocessSockets[0], message)
+        dispatcher.statusMessage(dispatcher._subprocessSockets[0], message)
+        self.assertEquals(q, [[-1], [-2]])

Modified: CalendarServer/trunk/twext/web2/metafd.py
===================================================================
--- CalendarServer/trunk/twext/web2/metafd.py	2013-07-06 00:43:38 UTC (rev 11491)
+++ CalendarServer/trunk/twext/web2/metafd.py	2013-07-06 00:56:45 UTC (rev 11492)
@@ -15,6 +15,14 @@
 # limitations under the License.
 ##
 
+"""
+Implementation of dispatching HTTP connections to child processes using
+L{twext.internet.sendfdport.InheritedSocketDispatcher}.
+"""
+from __future__ import print_function
+
+from functools import total_ordering
+
 from twext.internet.sendfdport import (
     InheritedPort, InheritedSocketDispatcher, InheritingProtocolFactory)
 from twext.internet.tcp import MaxAcceptTCPServer
@@ -42,6 +50,8 @@
     Service which starts up an HTTP server that can report back to its parent
     process via L{InheritedPort}.
 
+    This is instantiated in the I{worker process}.
+
     @ivar site: a twext.web2 'site' object, i.e. a request factory
 
     @ivar fd: the file descriptor of a UNIX socket being used to receive
@@ -81,7 +91,9 @@
     def stopService(self):
         """
         Stop reading on the inherited port.
-        @return: a Deferred which fires after the last outstanding request is complete.
+
+        @return: a Deferred which fires after the last outstanding request is
+            complete.
         """
         Service.stopService(self)
         # XXX stopping should really be destructive, because otherwise we will
@@ -109,8 +121,12 @@
 class ReportingHTTPFactory(HTTPFactory):
     """
     An L{HTTPFactory} which reports its status to a
-    L{twext.internet.sendfdport.InheritedPort}.
+    L{InheritedPort<twext.internet.sendfdport.InheritedPort>}.
 
+    Since this is processing application-level bytes, it is of course
+    instantiated in the I{worker process}, as is
+    L{InheritedPort<twext.internet.sendfdport.InheritedPort>}.
+
     @ivar inheritedPort: an L{InheritedPort} to report status (the current
         number of outstanding connections) to.  Since this - the
         L{ReportingHTTPFactory} - needs to be instantiated to be passed to
@@ -144,6 +160,73 @@
 
 
 
+ at total_ordering
+class WorkerStatus(object):
+    """
+    The status of a worker process.
+    """
+
+    def __init__(self, acknowledged=0, unacknowledged=0, started=0):
+        """
+        Create a L{ConnectionStatus} with a number of sent connections and a
+        number of un-acknowledged connections.
+
+        @param acknowledged: the number of connections which we know the
+            subprocess to be presently processing; i.e. those which have been
+            transmitted to the subprocess.
+
+        @param unacknowledged: The number of connections which we have sent to
+            the subprocess which have never received a status response (a
+            "C{+}" status message).
+
+        @param started: The number of times this worker has been started.
+        """
+        self.acknowledged = acknowledged
+        self.unacknowledged = unacknowledged
+        self.started = started
+
+
+    def restarted(self):
+        """
+        The L{WorkerStatus} derived from the current status of a process and
+        the fact that it just restarted.
+        """
+        return self.__class__(0, self.unacknowledged, self.started + 1)
+
+
+    def _tuplify(self):
+        return (self.acknowledged, self.unacknowledged, self.started)
+
+
+    def __lt__(self, other):
+        if not isinstance(other, WorkerStatus):
+            return NotImplemented
+        return self._tuplify() < other._tuplify()
+
+
+    def __eq__(self, other):
+        if not isinstance(other, WorkerStatus):
+            return NotImplemented
+        return self._tuplify() == other._tuplify()
+
+
+    def __add__(self, other):
+        if not isinstance(other, WorkerStatus):
+            return NotImplemented
+        return self.__class__(self.acknowledged + other.acknowledged,
+                              self.unacknowledged + other.unacknowledged,
+                              self.started + other.started)
+
+
+    def __sub__(self, other):
+        if not isinstance(other, WorkerStatus):
+            return NotImplemented
+        return self + self.__class__(-other.acknowledged,
+                                     -other.unacknowledged,
+                                     -other.started)
+
+
+
 class ConnectionLimiter(MultiService, object):
     """
     Connection limiter for use with L{InheritedSocketDispatcher}.
@@ -173,57 +256,51 @@
         self.dispatcher.startDispatching()
 
 
-    def addPortService(self, description, port, interface, backlog):
+    def addPortService(self, description, port, interface, backlog,
+                       serverServiceMaker=MaxAcceptTCPServer):
         """
         Add a L{MaxAcceptTCPServer} to bind a TCP port to a socket description.
         """
         lipf = LimitingInheritingProtocolFactory(self, description)
         self.factories.append(lipf)
-        MaxAcceptTCPServer(
+        serverServiceMaker(
             port, lipf,
             interface=interface,
             backlog=backlog
         ).setServiceParent(self)
 
 
-    # implementation of implicit statusWatcher interface required by
-    # InheritedSocketDispatcher
+    # IStatusWatcher
 
+    def initialStatus(self):
+        """
+        The status of a new worker added to the pool.
+        """
+        return WorkerStatus()
+
+
     def statusFromMessage(self, previousStatus, message):
         """
         Determine a subprocess socket's status from its previous status and a
         status message.
         """
-        if message in ('-', '0'):
-            if message == '-':
-                # A connection has gone away in a subprocess; we should start
-                # accepting connections again if we paused (see
-                # newConnectionStatus)
-                result = self.intWithNoneAsZero(previousStatus) - 1
-                if result < 0:
-                    log.error("metafd: trying to decrement status below zero")
-                    result = 0
-            else:
-                # A new process just started accepting new connections; zero
-                # out its expected load, but only if previous status is still None
-                result = 0 if previousStatus is None else previousStatus
-                if previousStatus is None:
-                    result = 0
-                else:
-                    log.error("metafd: trying to zero status that is not None")
-                    result = previousStatus
-
-            # If load has indeed decreased (i.e. in any case except 'a new,
-            # idle process replaced an old, idle process'), then start
-            # listening again.
-            if result < previousStatus and self.running:
-                for f in self.factories:
-                    f.myServer.myPort.startReading()
+        if message == '-':
+            # A connection has gone away in a subprocess; we should start
+            # accepting connections again if we paused (see
+            # newConnectionStatus)
+            return previousStatus - WorkerStatus(acknowledged=1)
+        elif message == '0':
+            # A new process just started accepting new connections.  It might
+            # still have some unacknowledged connections, but any connections
+            # that it acknowledged working on are now completed.  (We have no
+            # way of knowing whether the acknowledged connections were acted
+            # upon or dropped, so we have to treat that number with a healthy
+            # amount of skepticism.)
+            return previousStatus.restarted()
         else:
-            # '+' is just an acknowledgement of newConnectionStatus, so we can
-            # ignore it.
-            result = self.intWithNoneAsZero(previousStatus)
-        return result
+            # '+' acknowledges that the subprocess has taken on the work.
+            return previousStatus + WorkerStatus(acknowledged=1,
+                                                 unacknowledged=-1)
 
 
     def newConnectionStatus(self, previousStatus):
@@ -231,33 +308,35 @@
         Determine the effect of a new connection being sent on a subprocess
         socket.
         """
-        current = self.outstandingRequests + 1
+        return previousStatus + WorkerStatus(unacknowledged=1)
+
+
+    def statusesChanged(self, statuses):
+        """
+        The L{InheritedSocketDispatcher} is reporting that the list of
+        connection-statuses have changed.
+
+        (The argument to this function is currently duplicated by the
+        C{self.dispatcher.statuses} attribute, which is what
+        C{self.outstandingRequests} uses to compute it.)
+        """
+        current = sum(status.acknowledged
+                      for status in self.dispatcher.statuses)
+        self._outstandingRequests = current # preserve for or= field in log
         maximum = self.maxRequests
         overloaded = (current >= maximum)
         if overloaded:
             for f in self.factories:
                 f.myServer.myPort.stopReading()
-
-        result = self.intWithNoneAsZero(previousStatus) + 1
-        return result
-
-
-    def intWithNoneAsZero(self, x):
-        """
-        Convert 'x' to an C{int}, unless x is C{None}, in which case return 0.
-        """
-        if x is None:
-            return 0
         else:
-            return int(x)
+            for f in self.factories:
+                f.myServer.myPort.startReading()
 
 
-    @property
+    _outstandingRequests = 0
+    @property # make read-only
     def outstandingRequests(self):
-        outstanding = 0
-        for status in self.dispatcher.statuses:
-            outstanding += self.intWithNoneAsZero(status)
-        return outstanding
+        return self._outstandingRequests
 
 
 
@@ -266,6 +345,9 @@
     An L{InheritingProtocolFactory} that supports the implicit factory contract
     required by L{MaxAcceptTCPServer}/L{MaxAcceptTCPPort}.
 
+    Since L{InheritingProtocolFactory} is instantiated in the I{master
+    process}, so is L{LimitingInheritingProtocolFactory}.
+
     @ivar outstandingRequests: a read-only property for the number of currently
         active connections.
 
@@ -273,8 +355,8 @@
         single reactor loop iteration.
 
     @ivar maxRequests: The maximum number of concurrent connections to accept
-        at once - note that this is for the I{entire server}, whereas the
-        value in the configuration file is for only a single process.
+        at once - note that this is for the I{entire server}, whereas the value
+        in the configuration file is for only a single process.
     """
 
     def __init__(self, limiter, description):

Modified: CalendarServer/trunk/twext/web2/server.py
===================================================================
--- CalendarServer/trunk/twext/web2/server.py	2013-07-06 00:43:38 UTC (rev 11491)
+++ CalendarServer/trunk/twext/web2/server.py	2013-07-06 00:56:45 UTC (rev 11492)
@@ -22,12 +22,12 @@
 # SOFTWARE.
 #
 ##
-from __future__ import print_function
 
 """
 This is a web-server which integrates with the twisted.internet
 infrastructure.
 """
+from __future__ import print_function
 
 import cgi, time, urlparse
 from urllib import quote, unquote
@@ -628,18 +628,39 @@
         d.addErrback(self._processingReallyFailed, reason)
         return d
 
+
     def _processingReallyFailed(self, reason, origReason):
+        """
+        An error occurred when attempting to report an error to the HTTP
+        client.
+        """
         log.failure("Exception rendering error page", reason)
         log.failure("Original exception", origReason)
 
-        body = ("<html><head><title>Internal Server Error</title></head>"
-                "<body><h1>Internal Server Error</h1>An error occurred rendering the requested page. Additionally, an error occurred rendering the error page.</body></html>")
+        try:
+            body = (
+                "<html><head><title>Internal Server Error</title></head>"
+                "<body><h1>Internal Server Error</h1>"
+                "An error occurred rendering the requested page. "
+                "Additionally, an error occurred rendering the error page."
+                "</body></html>"
+            )
+            response = http.Response(
+                responsecode.INTERNAL_SERVER_ERROR,
+                {'content-type': http_headers.MimeType('text','html')},
+                body
+            )
+            self.writeResponse(response)
+        except:
+            log.failure(
+                "An error occurred.  We tried to report that error.  "
+                "Reporting that error caused an error.  "
+                "In the process of reporting the error-reporting error to "
+                "the client, there was *yet another* error.  Here it is.  "
+                "I give up."
+            )
+            self.chanRequest.abortConnection()
 
-        response = http.Response(
-            responsecode.INTERNAL_SERVER_ERROR,
-            {'content-type': http_headers.MimeType('text','html')},
-            body)
-        self.writeResponse(response)
 
     def _cbFinishRender(self, result):
         def filterit(response, f):

Modified: CalendarServer/trunk/twext/web2/stream.py
===================================================================
--- CalendarServer/trunk/twext/web2/stream.py	2013-07-06 00:43:38 UTC (rev 11491)
+++ CalendarServer/trunk/twext/web2/stream.py	2013-07-06 00:56:45 UTC (rev 11492)
@@ -61,7 +61,7 @@
 from twisted.internet import interfaces as ti_interfaces, defer, reactor, protocol, error as ti_error
 from twisted.python import components
 from twisted.python.failure import Failure
-from twisted.python.hashlib import md5
+from hashlib import md5
 
 from twext.python.log import Logger
 

Modified: CalendarServer/trunk/twext/web2/test/test_metafd.py
===================================================================
--- CalendarServer/trunk/twext/web2/test/test_metafd.py	2013-07-06 00:43:38 UTC (rev 11491)
+++ CalendarServer/trunk/twext/web2/test/test_metafd.py	2013-07-06 00:56:45 UTC (rev 11492)
@@ -27,6 +27,9 @@
 from twext.web2.channel.http import HTTPChannel
 from twext.web2.metafd import ReportingHTTPService, ConnectionLimiter
 from twisted.internet.tcp import Server
+from twisted.application.service import Service
+
+from twext.internet.test.test_sendfdport import ReaderAdder
 from twisted.trial.unittest import TestCase
 
 
@@ -149,31 +152,92 @@
     """
     Tests for L{ConnectionLimiter}
     """
-    
-    
-    def test_statusFromMessage(self):
+
+    def test_loadReducedStartsReadingAgain(self):
         """
-        Test ConnectionLimiter.statusFromMessage to make sure count cannot go below zero and that
-        zeroing out does not wipe out an existing count.
+        L{ConnectionLimiter.statusesChanged} determines whether the current
+        "load" of all subprocesses - that is, the total outstanding request
+        count - is high enough that the listening ports attached to it should
+        be suspended.
         """
-        
-        cl = ConnectionLimiter(2, 20)
-        
-        # "0" Zeroing out does not overwrite legitimate count
-        self.assertEqual(cl.statusFromMessage(None, "0"), 0)
-        self.assertEqual(cl.statusFromMessage(0, "0"), 0)
-        self.assertEqual(cl.statusFromMessage(1, "0"), 1)
-        self.assertEqual(cl.statusFromMessage(2, "0"), 2)
-        
-        # "-" No negative counts
-        self.assertEqual(cl.statusFromMessage(None, "-"), 0)
-        self.assertEqual(cl.statusFromMessage(0, "-"), 0)
-        self.assertEqual(cl.statusFromMessage(1, "-"), 0)
-        self.assertEqual(cl.statusFromMessage(2, "-"), 1)
-        
-        # "+" No change
-        self.assertEqual(cl.statusFromMessage(None, "+"), 0)
-        self.assertEqual(cl.statusFromMessage(0, "+"), 0)
-        self.assertEqual(cl.statusFromMessage(1, "+"), 1)
-        self.assertEqual(cl.statusFromMessage(2, "+"), 2)
+        builder = LimiterBuilder(self)
+        builder.fillUp()
+        self.assertEquals(builder.port.reading, False) # sanity check
+        builder.loadDown()
+        self.assertEquals(builder.port.reading, True)
 
+
+    def test_processRestartedStartsReadingAgain(self):
+        """
+        L{ConnectionLimiter.statusesChanged} determines whether the current
+        number of outstanding requests is above the limit, and either stops or
+        resumes reading on the listening port.
+        """
+        builder = LimiterBuilder(self)
+        builder.fillUp()
+        self.assertEquals(builder.port.reading, False)
+        builder.processRestart()
+        self.assertEquals(builder.port.reading, True)
+
+
+
+class LimiterBuilder(object):
+    """
+    A L{LimiterBuilder} can build a L{ConnectionLimiter} and associated objects
+    for a given unit test.
+    """
+
+    def __init__(self, test, maxReq=3):
+        self.limiter = ConnectionLimiter(2, maxRequests=maxReq)
+        self.dispatcher = self.limiter.dispatcher
+        self.dispatcher.reactor = ReaderAdder()
+        self.service = Service()
+        self.limiter.addPortService("TCP", 4321, "127.0.0.1", 5,
+                                    self.serverServiceMakerMaker(self.service))
+        self.dispatcher.addSocket()
+        # Has to be running in order to add stuff.
+        self.limiter.startService()
+        self.port = self.service.myPort
+
+
+    def serverServiceMakerMaker(self, s):
+        """
+        Make a serverServiceMaker for use with
+        L{ConnectionLimiter.addPortService}.
+        """
+        class NotAPort(object):
+            def startReading(self):
+                self.reading = True
+            def stopReading(self):
+                self.reading = False
+
+        def serverServiceMaker(port, factory, *a, **k):
+            s.factory = factory
+            s.myPort = NotAPort()
+            s.myPort.startReading() # TODO: technically, should wait for startService
+            factory.myServer = s
+            return s
+        return serverServiceMaker
+
+
+    def fillUp(self):
+        """
+        Fill up all the slots on the connection limiter.
+        """
+        for x in range(self.limiter.maxRequests):
+            self.dispatcher.sendFileDescriptor(None, "SSL")
+            self.dispatcher.statusMessage(
+                self.dispatcher._subprocessSockets[0], "+"
+            )
+
+
+    def processRestart(self):
+        self.dispatcher.statusMessage(
+            self.dispatcher._subprocessSockets[0], "0"
+        )
+
+
+    def loadDown(self):
+        self.dispatcher.statusMessage(
+            self.dispatcher._subprocessSockets[0], "-"
+        )

Modified: CalendarServer/trunk/twext/web2/test/test_server.py
===================================================================
--- CalendarServer/trunk/twext/web2/test/test_server.py	2013-07-06 00:43:38 UTC (rev 11491)
+++ CalendarServer/trunk/twext/web2/test/test_server.py	2013-07-06 00:56:45 UTC (rev 11492)
@@ -5,7 +5,7 @@
 A test harness for the twext.web2 server.
 """
 
-from zope.interface import implements
+from zope.interface import implementer
 
 from twisted.python import components
 from twext.web2 import http, http_headers, iweb, server
@@ -27,7 +27,7 @@
     """
 
 
-
+ at implementer(iweb.IResource)
 class ResourceAdapter(object):
     """
     Adapter to IResource.
@@ -36,7 +36,6 @@
     L{AdaptionTestCase.test_registered} can test that such an adapter will
     be used.
     """
-    implements(iweb.IResource)
 
     def __init__(self, original):
         pass
@@ -57,7 +56,7 @@
     """
 
 
-
+ at implementer(iweb.IOldNevowResource)
 class OldResourceAdapter(object):
     """
     Adapter to IOldNevowResource.
@@ -66,7 +65,6 @@
     that L{AdaptionTestCase.test_transitive} can test that such an adapter
     will be used to allow the initial input to be adapted to IResource.
     """
-    implements(iweb.IOldNevowResource)
 
     def __init__(self, original):
         pass
@@ -100,8 +98,8 @@
         Test that the adaption to IResource of an object which provides
         IResource returns the same object.
         """
-        class Resource(object):
-            implements(iweb.IResource)
+        @implementer(iweb.IResource)
+        class Resource(object): ""
         resource = Resource()
         self.assertIdentical(iweb.IResource(resource), resource)
 
@@ -116,15 +114,17 @@
 
 
 
+ at implementer(iweb.IChanRequest)
 class TestChanRequest:
-    implements(iweb.IChanRequest)
 
     hostInfo = address.IPv4Address('TCP', 'host', 80), False
     remoteHost = address.IPv4Address('TCP', 'remotehost', 34567)
+    finished = False
 
 
     def __init__(self, site, method, prepath, uri, length=None,
                  headers=None, version=(1,1), content=None):
+        self.producer = None
         self.site = site
         self.method = method
         self.prepath = prepath
@@ -171,10 +171,12 @@
         self.finish(failed=True)
 
     def registerProducer(self, producer, streaming):
-        pass
+        if self.producer is not None:
+            raise ValueError("Producer still set: " + repr(self.producer))
+        self.producer = producer
 
     def unregisterProducer(self):
-        pass
+        self.producer = None
 
     def getHostInfo(self):
         return self.hostInfo
@@ -205,7 +207,25 @@
         return stream.MemoryStream(self.responseText)
 
 
+class MyRenderError(Exception):
+    ""
 
+
+class ErrorWithProducerResource(BaseTestResource):
+
+    addSlash = True
+
+    def render(self, req):
+        req.chanRequest.registerProducer(object(), None)
+        return defer.fail(MyRenderError())
+
+
+    def child_(self, request):
+        return self
+
+
+
+
 _unset = object()
 class BaseCase(unittest.TestCase):
     """
@@ -266,7 +286,35 @@
         self.assertEquals(failed, expectedfailure)
 
 
+class ErrorHandlingTest(BaseCase):
+    """
+    Tests for error handling.
+    """
 
+    def test_processingReallyReallyReallyFailed(self):
+        """
+        The HTTP connection will be shut down if there's really no way to relay
+        any useful information about the error to the HTTP client.
+        """
+        root = ErrorWithProducerResource()
+        site = server.Site(root)
+        tcr = TestChanRequest(site, "GET", "/", "http://localhost/")
+        request = server.Request(tcr, "GET", "/", (1, 1),
+                                 0, http_headers.Headers(
+                                        {"host": "localhost"}),
+                                        site=site)
+        proc = request.process()
+        done = []
+        proc.addBoth(done.append)
+        self.assertEquals(done, [None])
+        errs = self.flushLoggedErrors(ValueError)
+        self.assertIn('producer', str(errs[0]).lower())
+        errs = self.flushLoggedErrors(MyRenderError)
+        self.assertEquals(bool(errs), True)
+        self.assertEquals(tcr.finished, True)
+
+
+
 class SampleWebTest(BaseCase):
     class SampleTestResource(BaseTestResource):
         addSlash = True
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130705/f287167b/attachment-0001.html>


More information about the calendarserver-changes mailing list