[CalendarServer-changes] [9078] CalendarServer/branches/users/glyph/ipv6-client/twext/internet
source_changes at macosforge.org
source_changes at macosforge.org
Fri Apr 13 11:45:56 PDT 2012
Revision: 9078
http://trac.macosforge.org/projects/calendarserver/changeset/9078
Author: glyph at apple.com
Date: 2012-04-13 11:45:56 -0700 (Fri, 13 Apr 2012)
Log Message:
-----------
GAI endpoint with manual testing and basic unit test
Added Paths:
-----------
CalendarServer/branches/users/glyph/ipv6-client/twext/internet/gaiendpoint.py
CalendarServer/branches/users/glyph/ipv6-client/twext/internet/test/test_gaiendpoint.py
Added: CalendarServer/branches/users/glyph/ipv6-client/twext/internet/gaiendpoint.py
===================================================================
--- CalendarServer/branches/users/glyph/ipv6-client/twext/internet/gaiendpoint.py (rev 0)
+++ CalendarServer/branches/users/glyph/ipv6-client/twext/internet/gaiendpoint.py 2012-04-13 18:45:56 UTC (rev 9078)
@@ -0,0 +1,157 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+L{getaddrinfo}()-based endpoint
+"""
+
+from socket import getaddrinfo, AF_UNSPEC, AF_INET, AF_INET6, SOCK_STREAM
+from twisted.internet.endpoints import TCP4ClientEndpoint # misnamed :-(
+from twisted.internet.defer import Deferred
+from twisted.internet.threads import deferToThread
+from twisted.internet.task import LoopingCall
+
+class MultiFailure(Exception):
+
+ def __init__(self, failures):
+ super(MultiFailure, self).__init__("Failure with multiple causes.")
+ self.failures = failures
+
+
+
+class GAIEndpoint(object):
+ """
+ Client endpoint that will call L{getaddrinfo} in a thread and then attempt
+ to connect to each endpoint (almost) in parallel.
+
+ @ivar reactor: The reactor to attempt the connection with.
+ @type reactor: provider of L{IReactorTCP} and L{IReactorTime}
+
+ @ivar host: The host to resolve.
+ @type host: L{str}
+
+ @ivar port: The port number to resolve.
+ @type port: L{int}
+
+ @ivar deferToThread: A function like L{deferToThread}, used to invoke
+ getaddrinfo. (Replaceable mainly for testing purposes.)
+
+ @ivar subEndpoint: A 3-argument callable taking C{(reactor (IReactorTCP),
+ host (str), port (int))} where 'host' is an IP address literal, either
+ IPv6 or IPv6.
+ """
+
+ deferToThread = staticmethod(deferToThread)
+
+ subEndpoint = TCP4ClientEndpoint
+
+ def __init__(self, reactor, host, port):
+ self.reactor = reactor
+ self.host = host
+ self.port = port
+
+
+ def connect(self, factory):
+ dgai = self.deferToThread(getaddrinfo, self.host, self.port,
+ AF_UNSPEC, SOCK_STREAM)
+ @dgai.addCallback
+ def gaiToEndpoints(gairesult):
+ for family, socktype, proto, canonname, sockaddr in gairesult:
+ if family in [AF_INET6, AF_INET]:
+ yield self.subEndpoint(self.reactor, sockaddr[0],
+ sockaddr[1])
+
+ @gaiToEndpoints.addCallback
+ def connectTheEndpoints(endpoints):
+ doneTrying = []
+ outstanding = []
+ errors = []
+ succeeded = []
+ actuallyDidIt = Deferred()
+ def removeMe(result, attempt):
+ outstanding.remove(attempt)
+ return result
+ def connectingDone(result):
+ if lc.running:
+ lc.stop()
+ succeeded.append(True)
+ for o in outstanding[::]:
+ o.cancel()
+ actuallyDidIt.callback(result)
+ return None
+ def lastChance():
+ if doneTrying and not outstanding and not succeeded:
+ # We've issued our last attempts. There are no remaining
+ # outstanding attempts; they've all failed. We haven't
+ # succeeded. Time... to die.
+ actuallyDidIt.errback(MultiFailure(errors))
+ def connectingFailed(why):
+ errors.append(why)
+ lastChance()
+ return None
+ def nextOne():
+ try:
+ endpoint = endpoints.next()
+ except StopIteration:
+ # Out of endpoints to try! Now it's time to wait for all of
+ # the outstanding attempts to complete, and, if none of them
+ # have been successful, then to give up with a relevant
+ # error. They'll all be dealt with by connectingDone or
+ # connectingFailed.
+ doneTrying.append(True)
+ lc.stop()
+ lastChance()
+ else:
+ attempt = endpoint.connect(factory)
+ attempt.addBoth(removeMe, attempt)
+ attempt.addCallbacks(connectingDone, connectingFailed)
+ outstanding.append(attempt)
+ lc = LoopingCall(nextOne)
+ lc.clock = self.reactor
+ lc.start(0.0)
+ return actuallyDidIt
+ return dgai
+
+
+
+if __name__ == '__main__':
+ from twisted.internet import reactor
+ import sys
+ if sys.argv[1:]:
+ host = sys.argv[1]
+ port = int(sys.argv[2])
+ else:
+ host = "localhost"
+ port = 22
+ gaie = GAIEndpoint(reactor, host, port)
+ from twisted.internet.protocol import Factory, Protocol
+ class HelloGoobye(Protocol, object):
+ def connectionMade(self):
+ print 'Hello!'
+ self.transport.loseConnection()
+
+ def connectionLost(self, reason):
+ print 'Goodbye'
+
+ class MyFactory(Factory, object):
+ def buildProtocol(self, addr):
+ print 'Building protocol for:', addr
+ return HelloGoobye()
+ def bye(what):
+ print 'bye', what
+ reactor.stop()
+ gaie.connect(MyFactory()).addBoth(bye)
+ reactor.run()
Added: CalendarServer/branches/users/glyph/ipv6-client/twext/internet/test/test_gaiendpoint.py
===================================================================
--- CalendarServer/branches/users/glyph/ipv6-client/twext/internet/test/test_gaiendpoint.py (rev 0)
+++ CalendarServer/branches/users/glyph/ipv6-client/twext/internet/test/test_gaiendpoint.py 2012-04-13 18:45:56 UTC (rev 9078)
@@ -0,0 +1,111 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Test cases for L{twext.internet.gaiendpoint}
+"""
+
+from socket import getaddrinfo, AF_INET, SOCK_STREAM
+
+from twext.internet.gaiendpoint import GAIEndpoint
+from twisted.trial.unittest import TestCase
+from twisted.internet.defer import Deferred
+from twisted.internet.protocol import Factory, Protocol
+from twisted.internet.task import Clock
+
+
+class FakeTCPEndpoint(object):
+ def __init__(self, reactor, host, port):
+ self._reactor = reactor
+ self._host = host
+ self._port = port
+ self._attempt = None
+
+
+ def connect(self, factory):
+ self._attempt = Deferred()
+ self._factory = factory
+ return self._attempt
+
+
+
+class GAIEndpointTestCase(TestCase):
+ """
+ Test cases for L{GAIEndpoint}.
+ """
+
+ def makeEndpoint(self, host="abcd.example.com", port=4321):
+ gaie = GAIEndpoint(self.clock, host, port)
+ gaie.subEndpoint = self.subEndpoint
+ gaie.deferToThread = self.deferToSomething
+ return gaie
+
+
+ def subEndpoint(self, reactor, host, port):
+ ftcpe = FakeTCPEndpoint(reactor, host, port)
+ self.fakeRealEndpoints.append(ftcpe)
+ return ftcpe
+
+
+ def deferToSomething(self, func, *a, **k):
+ """
+ Test replacement for L{deferToThread}, which can only call
+ L{getaddrinfo}.
+ """
+ d = Deferred()
+ if func is not getaddrinfo:
+ self.fail("Only getaddrinfo should be invoked in a thread.")
+ self.inThreads.append((d, func, a, k))
+ return d
+
+
+ def gaiResult(self, family, socktype, proto, canonname, sockaddr):
+ """
+ A call to L{getaddrinfo} has succeeded; invoke the L{Deferred} waiting
+ on it.
+ """
+ d, f, a, k = self.inThreads.pop(0)
+ d.callback([(family, socktype, proto, canonname, sockaddr)])
+
+
+ def setUp(self):
+ """
+ Set up!
+ """
+ self.inThreads = []
+ self.clock = Clock()
+ self.fakeRealEndpoints = []
+ self.makeEndpoint()
+
+
+ def test_simpleSuccess(self):
+ """
+ If C{getaddrinfo} gives one L{GAIEndpoint.connect}.
+ """
+ gaiendpoint = self.makeEndpoint()
+ protos = []
+ f = Factory()
+ f.protocol = Protocol
+ gaiendpoint.connect(f).addCallback(protos.append)
+ WHO_CARES = 0
+ WHAT_EVER = ""
+ self.gaiResult(AF_INET, SOCK_STREAM, WHO_CARES, WHAT_EVER,
+ ("1.2.3.4", 4321))
+ self.clock.advance(1.0)
+ attempt = self.fakeRealEndpoints[0]._attempt
+ attempt.callback(self.fakeRealEndpoints[0]._factory.buildProtocol(None))
+ self.assertEqual(len(protos), 1)
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120413/471050f5/attachment.html>
More information about the calendarserver-changes
mailing list