[CalendarServer-changes] [9078] CalendarServer/branches/users/glyph/ipv6-client/twext/internet

source_changes at macosforge.org source_changes at macosforge.org
Fri Apr 13 11:45:56 PDT 2012


Revision: 9078
          http://trac.macosforge.org/projects/calendarserver/changeset/9078
Author:   glyph at apple.com
Date:     2012-04-13 11:45:56 -0700 (Fri, 13 Apr 2012)
Log Message:
-----------
GAI endpoint with manual testing and basic unit test

Added Paths:
-----------
    CalendarServer/branches/users/glyph/ipv6-client/twext/internet/gaiendpoint.py
    CalendarServer/branches/users/glyph/ipv6-client/twext/internet/test/test_gaiendpoint.py

Added: CalendarServer/branches/users/glyph/ipv6-client/twext/internet/gaiendpoint.py
===================================================================
--- CalendarServer/branches/users/glyph/ipv6-client/twext/internet/gaiendpoint.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/ipv6-client/twext/internet/gaiendpoint.py	2012-04-13 18:45:56 UTC (rev 9078)
@@ -0,0 +1,157 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+L{getaddrinfo}()-based endpoint
+"""
+
+from socket import getaddrinfo, AF_UNSPEC, AF_INET, AF_INET6, SOCK_STREAM
+from twisted.internet.endpoints import TCP4ClientEndpoint # misnamed :-(
+from twisted.internet.defer import Deferred
+from twisted.internet.threads import deferToThread
+from twisted.internet.task import LoopingCall
+
+class MultiFailure(Exception):
+
+    def __init__(self, failures):
+        super(MultiFailure, self).__init__("Failure with multiple causes.")
+        self.failures = failures
+
+
+
+class GAIEndpoint(object):
+    """
+    Client endpoint that will call L{getaddrinfo} in a thread and then attempt
+    to connect to each endpoint (almost) in parallel.
+
+    @ivar reactor: The reactor to attempt the connection with.
+    @type reactor: provider of L{IReactorTCP} and L{IReactorTime}
+
+    @ivar host: The host to resolve.
+    @type host: L{str}
+
+    @ivar port: The port number to resolve.
+    @type port: L{int}
+
+    @ivar deferToThread: A function like L{deferToThread}, used to invoke
+        getaddrinfo.  (Replaceable mainly for testing purposes.)
+
+    @ivar subEndpoint: A 3-argument callable taking C{(reactor (IReactorTCP),
+        host (str), port (int))} where 'host' is an IP address literal, either
+        IPv6 or IPv6.
+    """
+
+    deferToThread = staticmethod(deferToThread)
+
+    subEndpoint = TCP4ClientEndpoint
+
+    def __init__(self, reactor, host, port):
+        self.reactor = reactor
+        self.host = host
+        self.port = port
+
+
+    def connect(self, factory):
+        dgai = self.deferToThread(getaddrinfo, self.host, self.port,
+                                  AF_UNSPEC, SOCK_STREAM)
+        @dgai.addCallback
+        def gaiToEndpoints(gairesult):
+            for family, socktype, proto, canonname, sockaddr in gairesult:
+                if family in [AF_INET6, AF_INET]:
+                    yield self.subEndpoint(self.reactor, sockaddr[0],
+                                           sockaddr[1])
+
+        @gaiToEndpoints.addCallback
+        def connectTheEndpoints(endpoints):
+            doneTrying = []
+            outstanding = []
+            errors = []
+            succeeded = []
+            actuallyDidIt = Deferred()
+            def removeMe(result, attempt):
+                outstanding.remove(attempt)
+                return result
+            def connectingDone(result):
+                if lc.running:
+                    lc.stop()
+                succeeded.append(True)
+                for o in outstanding[::]:
+                    o.cancel()
+                actuallyDidIt.callback(result)
+                return None
+            def lastChance():
+                if doneTrying and not outstanding and not succeeded:
+                    # We've issued our last attempts. There are no remaining
+                    # outstanding attempts; they've all failed. We haven't
+                    # succeeded.  Time... to die.
+                    actuallyDidIt.errback(MultiFailure(errors))
+            def connectingFailed(why):
+                errors.append(why)
+                lastChance()
+                return None
+            def nextOne():
+                try:
+                    endpoint = endpoints.next()
+                except StopIteration:
+                    # Out of endpoints to try!  Now it's time to wait for all of
+                    # the outstanding attempts to complete, and, if none of them
+                    # have been successful, then to give up with a relevant
+                    # error.  They'll all be dealt with by connectingDone or
+                    # connectingFailed.
+                    doneTrying.append(True)
+                    lc.stop()
+                    lastChance()
+                else:
+                    attempt = endpoint.connect(factory)
+                    attempt.addBoth(removeMe, attempt)
+                    attempt.addCallbacks(connectingDone, connectingFailed)
+                    outstanding.append(attempt)
+            lc = LoopingCall(nextOne)
+            lc.clock = self.reactor
+            lc.start(0.0)
+            return actuallyDidIt
+        return dgai
+
+
+
+if __name__ == '__main__':
+    from twisted.internet import reactor
+    import sys
+    if sys.argv[1:]:
+        host = sys.argv[1]
+        port = int(sys.argv[2])
+    else:
+        host = "localhost"
+        port = 22
+    gaie = GAIEndpoint(reactor, host, port)
+    from twisted.internet.protocol import Factory, Protocol
+    class HelloGoobye(Protocol, object):
+        def connectionMade(self):
+            print 'Hello!'
+            self.transport.loseConnection()
+
+        def connectionLost(self, reason):
+            print 'Goodbye'
+
+    class MyFactory(Factory, object):
+        def buildProtocol(self, addr):
+            print 'Building protocol for:', addr
+            return HelloGoobye()
+    def bye(what):
+        print 'bye', what
+        reactor.stop()
+    gaie.connect(MyFactory()).addBoth(bye)
+    reactor.run()

Added: CalendarServer/branches/users/glyph/ipv6-client/twext/internet/test/test_gaiendpoint.py
===================================================================
--- CalendarServer/branches/users/glyph/ipv6-client/twext/internet/test/test_gaiendpoint.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/ipv6-client/twext/internet/test/test_gaiendpoint.py	2012-04-13 18:45:56 UTC (rev 9078)
@@ -0,0 +1,111 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Test cases for L{twext.internet.gaiendpoint}
+"""
+
+from socket import getaddrinfo, AF_INET, SOCK_STREAM
+
+from twext.internet.gaiendpoint import GAIEndpoint
+from twisted.trial.unittest import TestCase
+from twisted.internet.defer import Deferred
+from twisted.internet.protocol import Factory, Protocol
+from twisted.internet.task import Clock
+
+
+class FakeTCPEndpoint(object):
+    def __init__(self, reactor, host, port):
+        self._reactor = reactor
+        self._host = host
+        self._port = port
+        self._attempt = None
+
+
+    def connect(self, factory):
+        self._attempt = Deferred()
+        self._factory = factory
+        return self._attempt
+
+
+
+class GAIEndpointTestCase(TestCase):
+    """
+    Test cases for L{GAIEndpoint}.
+    """
+
+    def makeEndpoint(self, host="abcd.example.com", port=4321):
+        gaie = GAIEndpoint(self.clock, host, port)
+        gaie.subEndpoint = self.subEndpoint
+        gaie.deferToThread = self.deferToSomething
+        return gaie
+
+
+    def subEndpoint(self, reactor, host, port):
+        ftcpe = FakeTCPEndpoint(reactor, host, port)
+        self.fakeRealEndpoints.append(ftcpe)
+        return ftcpe
+
+
+    def deferToSomething(self, func, *a, **k):
+        """
+        Test replacement for L{deferToThread}, which can only call
+        L{getaddrinfo}.
+        """
+        d = Deferred()
+        if func is not getaddrinfo:
+            self.fail("Only getaddrinfo should be invoked in a thread.")
+        self.inThreads.append((d, func, a, k))
+        return d
+
+
+    def gaiResult(self, family, socktype, proto, canonname, sockaddr):
+        """
+        A call to L{getaddrinfo} has succeeded; invoke the L{Deferred} waiting
+        on it.
+        """
+        d, f, a, k = self.inThreads.pop(0)
+        d.callback([(family, socktype, proto, canonname, sockaddr)])
+
+
+    def setUp(self):
+        """
+        Set up!
+        """
+        self.inThreads = []
+        self.clock = Clock()
+        self.fakeRealEndpoints = []
+        self.makeEndpoint()
+
+
+    def test_simpleSuccess(self):
+        """
+        If C{getaddrinfo} gives one L{GAIEndpoint.connect}.
+        """
+        gaiendpoint = self.makeEndpoint()
+        protos = []
+        f = Factory()
+        f.protocol = Protocol
+        gaiendpoint.connect(f).addCallback(protos.append)
+        WHO_CARES = 0
+        WHAT_EVER = ""
+        self.gaiResult(AF_INET, SOCK_STREAM, WHO_CARES, WHAT_EVER,
+                       ("1.2.3.4", 4321))
+        self.clock.advance(1.0)
+        attempt = self.fakeRealEndpoints[0]._attempt
+        attempt.callback(self.fakeRealEndpoints[0]._factory.buildProtocol(None))
+        self.assertEqual(len(protos), 1)
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120413/471050f5/attachment.html>


More information about the calendarserver-changes mailing list