[CalendarServer-changes] [9106] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Fri Apr 13 11:59:38 PDT 2012
Revision: 9106
http://trac.macosforge.org/projects/calendarserver/changeset/9106
Author: glyph at apple.com
Date: 2012-04-13 11:59:38 -0700 (Fri, 13 Apr 2012)
Log Message:
-----------
IPv6 support for outbound connections.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/__init__.py
CalendarServer/trunk/calendarserver/platform/darwin/wiki.py
CalendarServer/trunk/calendarserver/push/applepush.py
CalendarServer/trunk/calendarserver/tools/notifications.py
CalendarServer/trunk/contrib/__init__.py
CalendarServer/trunk/contrib/performance/__init__.py
CalendarServer/trunk/contrib/performance/loadtest/ical.py
CalendarServer/trunk/pyflakes
CalendarServer/trunk/support/build.sh
CalendarServer/trunk/twext/patches.py
CalendarServer/trunk/twisted/plugins/caldav.py
CalendarServer/trunk/twisted/plugins/kqueuereactor.py
CalendarServer/trunk/twistedcaldav/client/pool.py
CalendarServer/trunk/twistedcaldav/mail.py
CalendarServer/trunk/twistedcaldav/memcachepool.py
CalendarServer/trunk/twistedcaldav/notify.py
CalendarServer/trunk/twistedcaldav/scheduling/imip.py
CalendarServer/trunk/twistedcaldav/scheduling/ischedule.py
CalendarServer/trunk/twistedcaldav/test/test_localization.py
CalendarServer/trunk/twistedcaldav/test/test_memcachepool.py
CalendarServer/trunk/twistedcaldav/util.py
Added Paths:
-----------
CalendarServer/trunk/twext/backport/
CalendarServer/trunk/twext/backport/__init__.py
CalendarServer/trunk/twext/backport/internet/
CalendarServer/trunk/twext/backport/internet/__init__.py
CalendarServer/trunk/twext/backport/internet/address.py
CalendarServer/trunk/twext/backport/internet/endpoints.py
CalendarServer/trunk/twext/backport/internet/tcp.py
CalendarServer/trunk/twext/internet/adaptendpoint.py
CalendarServer/trunk/twext/internet/gaiendpoint.py
CalendarServer/trunk/twext/internet/test/test_adaptendpoint.py
CalendarServer/trunk/twext/internet/test/test_gaiendpoint.py
Removed Paths:
-------------
CalendarServer/trunk/twext/backport/__init__.py
CalendarServer/trunk/twext/backport/internet/
CalendarServer/trunk/twext/backport/internet/__init__.py
CalendarServer/trunk/twext/backport/internet/address.py
CalendarServer/trunk/twext/backport/internet/endpoints.py
CalendarServer/trunk/twext/backport/internet/tcp.py
Property Changed:
----------------
CalendarServer/trunk/
Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
+ /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
Modified: CalendarServer/trunk/calendarserver/__init__.py
===================================================================
--- CalendarServer/trunk/calendarserver/__init__.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/calendarserver/__init__.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -19,6 +19,10 @@
CalendarServer application code.
"""
+# Make sure we have twext's required Twisted patches loaded before we do
+# anything at all.
+__import__("twext")
+
#
# setuptools is annoying
#
Modified: CalendarServer/trunk/calendarserver/platform/darwin/wiki.py
===================================================================
--- CalendarServer/trunk/calendarserver/platform/darwin/wiki.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/calendarserver/platform/darwin/wiki.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -14,10 +14,15 @@
# limitations under the License.
##
+
from twext.python.log import Logger
+from twext.internet.gaiendpoint import GAIEndpoint
+from twext.internet.adaptendpoint import connect
+
from twisted.web.client import HTTPPageGetter, HTTPClientFactory
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
+
import json
log = Logger()
@@ -83,7 +88,7 @@
"""
factory = HTTPClientFactory(url)
factory.protocol = HTTPPageGetter
- reactor.connectTCP(host, port, factory)
+ connect(GAIEndpoint(reactor, host, port), factory)
return factory.deferred
class WebAuthError(RuntimeError):
Modified: CalendarServer/trunk/calendarserver/push/applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/applepush.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/calendarserver/push/applepush.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -1,3 +1,4 @@
+# -*- test-case-name: calendarserver.push.test.test_applepush -*-
##
# Copyright (c) 2011-2012 Apple Inc. All rights reserved.
#
@@ -16,7 +17,7 @@
from twext.internet.ssl import ChainingOpenSSLContextFactory
from twext.python.log import Logger, LoggingMixIn
-from twext.python.log import LoggingMixIn
+
from twext.web2 import responsecode
from txdav.xml import element as davxml
from twext.web2.dav.noneprops import NonePropertyStore
@@ -25,22 +26,25 @@
from twext.web2.http_headers import MimeType
from twext.web2.server import parsePOSTData
from twisted.application import service
-from twisted.internet import reactor, protocol
+from twisted.internet import protocol
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from twisted.internet.protocol import ClientFactory, ReconnectingClientFactory
-from twistedcaldav.extensions import DAVResource, DAVResourceWithoutChildrenMixin
+from twistedcaldav.extensions import DAVResourceWithoutChildrenMixin
from twistedcaldav.resource import ReadOnlyNoCopyResourceMixIn
import OpenSSL
import struct
import time
from txdav.common.icommondatastore import InvalidSubscriptionValues
+
from calendarserver.push.util import validToken, TokenHistory, PushScheduler
+from twext.internet.adaptendpoint import connect
+from twext.internet.gaiendpoint import GAIEndpoint
-
log = Logger()
+
class ApplePushNotifierService(service.MultiService, LoggingMixIn):
"""
ApplePushNotifierService is a MultiService responsible for
@@ -394,9 +398,11 @@
passwdCallback=passwdCallback,
sslmethod=getattr(OpenSSL.SSL, self.sslMethod)
)
- reactor.connectSSL(self.host, self.port, factory, context)
+ connect(GAIEndpoint(self.reactor, self.host, self.port, context),
+ factory)
+
class APNProviderService(APNConnectionService):
def __init__(self, store, host, port, certPath, keyPath, chainPath="",
Modified: CalendarServer/trunk/calendarserver/tools/notifications.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/notifications.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/calendarserver/tools/notifications.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -19,7 +19,7 @@
from getopt import getopt, GetoptError
from getpass import getpass
from twisted.application.service import Service
-from twisted.internet import reactor, ssl
+from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.web import client
from twisted.words.protocols.jabber import xmlstream
@@ -27,6 +27,10 @@
from twisted.words.protocols.jabber.jid import JID
from twisted.words.protocols.jabber.xmlstream import IQ
from twisted.words.xish import domish
+
+from twext.internet.gaiendpoint import GAIEndpoint
+from twext.internet.adaptendpoint import connect
+
from twistedcaldav.config import config, ConfigurationError
from twistedcaldav.util import AuthorizedHTTPGetter
from xml.etree import ElementTree
@@ -597,7 +601,7 @@
pubsubFactory = PubSubClientFactory(jid, self.password, service, nodes,
self.verbose)
- reactor.connectTCP(host, port, pubsubFactory)
+ connect(GAIEndpoint(reactor, host, port), pubsubFactory)
def makeRequest(self, path, method, headers, body):
@@ -610,10 +614,11 @@
caldavFactory.noisy = False
caldavFactory.protocol = PropfindRequestor
if self.useSSL:
- reactor.connectSSL(self.host, self.port, caldavFactory,
- ssl.ClientContextFactory())
+ connect(GAIEndpoint(reactor, self.host, self.port,
+ self.ClientContextFactory()),
+ caldavFactory)
else:
- reactor.connectTCP(self.host, self.port, caldavFactory)
+ connect(GAIEndpoint(reactor, self.host, self.port), caldavFactory)
return caldavFactory.deferred
Modified: CalendarServer/trunk/contrib/__init__.py
===================================================================
--- CalendarServer/trunk/contrib/__init__.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/contrib/__init__.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -13,3 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
+
+__import__("twext")
Modified: CalendarServer/trunk/contrib/performance/__init__.py
===================================================================
--- CalendarServer/trunk/contrib/performance/__init__.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/contrib/performance/__init__.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -0,0 +1,17 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+__import__("twext")
Modified: CalendarServer/trunk/contrib/performance/loadtest/ical.py
===================================================================
--- CalendarServer/trunk/contrib/performance/loadtest/ical.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/contrib/performance/loadtest/ical.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -20,6 +20,10 @@
from urlparse import urlparse, urlunparse
from xml.etree import ElementTree
+
+from twext.internet.gaiendpoint import GAIEndpoint
+from twext.internet.adaptendpoint import connect
+
from twistedcaldav.ical import Component, Property
from pycalendar.duration import PyCalendarDuration
from pycalendar.timezone import PyCalendarTimezone
@@ -916,7 +920,7 @@
{params.pushkey: (home, home, "Calendar home")}, False,
sigint=False)
self._pushFactories.append(factory)
- self.reactor.connectTCP(host, port, factory)
+ connect(GAIEndpoint(self.reactor, host, port), factory)
@inlineCallbacks
Modified: CalendarServer/trunk/pyflakes
===================================================================
--- CalendarServer/trunk/pyflakes 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/pyflakes 2012-04-13 18:59:38 UTC (rev 9106)
@@ -15,13 +15,11 @@
tmp="$(mktemp "/tmp/pyflakes.XXXXX")";
cd "${wd}" && "${flakes}/bin/pyflakes" "$@" | sed \
- -e "/undefined name '_'/d" \
- -e "/undefined name 'CalDAVFile'/d" \
-e "/redefinition of unused/d" \
-e "/'from .* import \\*' used; unable to detect undefined names/d" \
-e "/redefinition of function/d" \
- -e "/i[a-z]*store.py:[0-9][0-9]*: '.*' imported but unused/d" \
-e "/xmlext.py:[0-9][0-9]*: /d" \
+ -e "/^twext\\/backport/d" \
| tee "${tmp}";
if [ -s "${tmp}" ]; then error="true"; else error="false"; fi;
Modified: CalendarServer/trunk/support/build.sh
===================================================================
--- CalendarServer/trunk/support/build.sh 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/support/build.sh 2012-04-13 18:59:38 UTC (rev 9106)
@@ -717,6 +717,8 @@
"PyGreSQL" "pgdb" "${pg}" \
"${pypi}/P/PyGreSQL/${pg}.tar.gz";
+ # Maintenance note: next time the Twisted dependency gets updated, check out
+ # twext/patches.py.
py_dependency -v 12 -r HEAD \
"Twisted" "twisted" "Twisted" \
"svn://svn.twistedmatrix.com/svn/Twisted/tags/releases/twisted-12.0.0";
Deleted: CalendarServer/trunk/twext/backport/__init__.py
===================================================================
Copied: CalendarServer/trunk/twext/backport/__init__.py (from rev 9105, CalendarServer/branches/users/glyph/ipv6-client/twext/backport/__init__.py)
===================================================================
Deleted: CalendarServer/trunk/twext/backport/internet/__init__.py
===================================================================
Copied: CalendarServer/trunk/twext/backport/internet/__init__.py (from rev 9105, CalendarServer/branches/users/glyph/ipv6-client/twext/backport/internet/__init__.py)
===================================================================
Deleted: CalendarServer/trunk/twext/backport/internet/address.py
===================================================================
--- CalendarServer/branches/users/glyph/ipv6-client/twext/backport/internet/address.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twext/backport/internet/address.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -1,144 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Address objects for network connections.
-"""
-
-import warnings, os
-
-from zope.interface import implements
-
-from twisted.internet.interfaces import IAddress
-from twisted.python import util
-
-
-class _IPAddress(object, util.FancyEqMixin):
- """
- An L{_IPAddress} represents the address of an IP socket endpoint, providing
- common behavior for IPv4 and IPv6.
-
- @ivar type: A string describing the type of transport, either 'TCP' or
- 'UDP'.
-
- @ivar host: A string containing the presentation format of the IP address;
- for example, "127.0.0.1" or "::1".
- @type host: C{str}
-
- @ivar port: An integer representing the port number.
- @type port: C{int}
- """
-
- implements(IAddress)
-
- compareAttributes = ('type', 'host', 'port')
-
- def __init__(self, type, host, port):
- assert type in ('TCP', 'UDP')
- self.type = type
- self.host = host
- self.port = port
-
-
- def __repr__(self):
- return '%s(%s, %r, %d)' % (
- self.__class__.__name__, self.type, self.host, self.port)
-
-
- def __hash__(self):
- return hash((self.type, self.host, self.port))
-
-
-
-class IPv4Address(_IPAddress):
- """
- An L{IPv4Address} represents the address of an IPv4 socket endpoint.
-
- @ivar host: A string containing a dotted-quad IPv4 address; for example,
- "127.0.0.1".
- @type host: C{str}
- """
-
- def __init__(self, type, host, port, _bwHack=None):
- _IPAddress.__init__(self, type, host, port)
- if _bwHack is not None:
- warnings.warn("twisted.internet.address.IPv4Address._bwHack "
- "is deprecated since Twisted 11.0",
- DeprecationWarning, stacklevel=2)
-
-
-
-class IPv6Address(_IPAddress):
- """
- An L{IPv6Address} represents the address of an IPv6 socket endpoint.
-
- @ivar host: A string containing a colon-separated, hexadecimal formatted
- IPv6 address; for example, "::1".
- @type host: C{str}
- """
-
-
-
-class UNIXAddress(object, util.FancyEqMixin):
- """
- Object representing a UNIX socket endpoint.
-
- @ivar name: The filename associated with this socket.
- @type name: C{str}
- """
-
- implements(IAddress)
-
- compareAttributes = ('name', )
-
- def __init__(self, name, _bwHack = None):
- self.name = name
- if _bwHack is not None:
- warnings.warn("twisted.internet.address.UNIXAddress._bwHack is deprecated since Twisted 11.0",
- DeprecationWarning, stacklevel=2)
-
-
- if getattr(os.path, 'samefile', None) is not None:
- def __eq__(self, other):
- """
- overriding L{util.FancyEqMixin} to ensure the os level samefile
- check is done if the name attributes do not match.
- """
- res = super(UNIXAddress, self).__eq__(other)
- if res == False:
- try:
- return os.path.samefile(self.name, other.name)
- except OSError:
- pass
- return res
-
-
- def __repr__(self):
- return 'UNIXAddress(%r)' % (self.name,)
-
-
- def __hash__(self):
- try:
- s1 = os.stat(self.name)
- return hash((s1.st_ino, s1.st_dev))
- except OSError:
- return hash(self.name)
-
-
-
-# These are for buildFactory backwards compatability due to
-# stupidity-induced inconsistency.
-
-class _ServerFactoryIPv4Address(IPv4Address):
- """Backwards compatability hack. Just like IPv4Address in practice."""
-
- def __eq__(self, other):
- if isinstance(other, tuple):
- warnings.warn("IPv4Address.__getitem__ is deprecated. Use attributes instead.",
- category=DeprecationWarning, stacklevel=2)
- return (self.host, self.port) == other
- elif isinstance(other, IPv4Address):
- a = (self.type, self.host, self.port)
- b = (other.type, other.host, other.port)
- return a == b
- return False
Copied: CalendarServer/trunk/twext/backport/internet/address.py (from rev 9105, CalendarServer/branches/users/glyph/ipv6-client/twext/backport/internet/address.py)
===================================================================
--- CalendarServer/trunk/twext/backport/internet/address.py (rev 0)
+++ CalendarServer/trunk/twext/backport/internet/address.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -0,0 +1,144 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Address objects for network connections.
+"""
+
+import warnings, os
+
+from zope.interface import implements
+
+from twisted.internet.interfaces import IAddress
+from twisted.python import util
+
+
+class _IPAddress(object, util.FancyEqMixin):
+ """
+ An L{_IPAddress} represents the address of an IP socket endpoint, providing
+ common behavior for IPv4 and IPv6.
+
+ @ivar type: A string describing the type of transport, either 'TCP' or
+ 'UDP'.
+
+ @ivar host: A string containing the presentation format of the IP address;
+ for example, "127.0.0.1" or "::1".
+ @type host: C{str}
+
+ @ivar port: An integer representing the port number.
+ @type port: C{int}
+ """
+
+ implements(IAddress)
+
+ compareAttributes = ('type', 'host', 'port')
+
+ def __init__(self, type, host, port):
+ assert type in ('TCP', 'UDP')
+ self.type = type
+ self.host = host
+ self.port = port
+
+
+ def __repr__(self):
+ return '%s(%s, %r, %d)' % (
+ self.__class__.__name__, self.type, self.host, self.port)
+
+
+ def __hash__(self):
+ return hash((self.type, self.host, self.port))
+
+
+
+class IPv4Address(_IPAddress):
+ """
+ An L{IPv4Address} represents the address of an IPv4 socket endpoint.
+
+ @ivar host: A string containing a dotted-quad IPv4 address; for example,
+ "127.0.0.1".
+ @type host: C{str}
+ """
+
+ def __init__(self, type, host, port, _bwHack=None):
+ _IPAddress.__init__(self, type, host, port)
+ if _bwHack is not None:
+ warnings.warn("twisted.internet.address.IPv4Address._bwHack "
+ "is deprecated since Twisted 11.0",
+ DeprecationWarning, stacklevel=2)
+
+
+
+class IPv6Address(_IPAddress):
+ """
+ An L{IPv6Address} represents the address of an IPv6 socket endpoint.
+
+ @ivar host: A string containing a colon-separated, hexadecimal formatted
+ IPv6 address; for example, "::1".
+ @type host: C{str}
+ """
+
+
+
+class UNIXAddress(object, util.FancyEqMixin):
+ """
+ Object representing a UNIX socket endpoint.
+
+ @ivar name: The filename associated with this socket.
+ @type name: C{str}
+ """
+
+ implements(IAddress)
+
+ compareAttributes = ('name', )
+
+ def __init__(self, name, _bwHack = None):
+ self.name = name
+ if _bwHack is not None:
+ warnings.warn("twisted.internet.address.UNIXAddress._bwHack is deprecated since Twisted 11.0",
+ DeprecationWarning, stacklevel=2)
+
+
+ if getattr(os.path, 'samefile', None) is not None:
+ def __eq__(self, other):
+ """
+ overriding L{util.FancyEqMixin} to ensure the os level samefile
+ check is done if the name attributes do not match.
+ """
+ res = super(UNIXAddress, self).__eq__(other)
+ if res == False:
+ try:
+ return os.path.samefile(self.name, other.name)
+ except OSError:
+ pass
+ return res
+
+
+ def __repr__(self):
+ return 'UNIXAddress(%r)' % (self.name,)
+
+
+ def __hash__(self):
+ try:
+ s1 = os.stat(self.name)
+ return hash((s1.st_ino, s1.st_dev))
+ except OSError:
+ return hash(self.name)
+
+
+
+# These are for buildFactory backwards compatability due to
+# stupidity-induced inconsistency.
+
+class _ServerFactoryIPv4Address(IPv4Address):
+ """Backwards compatability hack. Just like IPv4Address in practice."""
+
+ def __eq__(self, other):
+ if isinstance(other, tuple):
+ warnings.warn("IPv4Address.__getitem__ is deprecated. Use attributes instead.",
+ category=DeprecationWarning, stacklevel=2)
+ return (self.host, self.port) == other
+ elif isinstance(other, IPv4Address):
+ a = (self.type, self.host, self.port)
+ b = (other.type, other.host, other.port)
+ return a == b
+ return False
Deleted: CalendarServer/trunk/twext/backport/internet/endpoints.py
===================================================================
--- CalendarServer/branches/users/glyph/ipv6-client/twext/backport/internet/endpoints.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twext/backport/internet/endpoints.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -1,1178 +0,0 @@
-# -*- test-case-name: twisted.internet.test.test_endpoints -*-
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-"""
-Implementations of L{IStreamServerEndpoint} and L{IStreamClientEndpoint} that
-wrap the L{IReactorTCP}, L{IReactorSSL}, and L{IReactorUNIX} interfaces.
-
-This also implements an extensible mini-language for describing endpoints,
-parsed by the L{clientFromString} and L{serverFromString} functions.
-
- at since: 10.1
-"""
-
-import os, socket
-
-from zope.interface import implements, directlyProvides
-import warnings
-
-from twisted.internet import interfaces, defer, error, fdesc
-from twisted.internet.protocol import ClientFactory, Protocol
-from twisted.plugin import IPlugin, getPlugins
-from twisted.internet.interfaces import IStreamServerEndpointStringParser
-from twisted.internet.interfaces import IStreamClientEndpointStringParser
-from twisted.python.filepath import FilePath
-#from twisted.python.systemd import ListenFDs
-
-
-__all__ = ["clientFromString", "serverFromString",
- "TCP4ServerEndpoint", "TCP4ClientEndpoint",
- "UNIXServerEndpoint", "UNIXClientEndpoint",
- "SSL4ServerEndpoint", "SSL4ClientEndpoint",
- "AdoptedStreamServerEndpoint"]
-
-
-class _WrappingProtocol(Protocol):
- """
- Wrap another protocol in order to notify my user when a connection has
- been made.
-
- @ivar _connectedDeferred: The L{Deferred} that will callback
- with the C{wrappedProtocol} when it is connected.
-
- @ivar _wrappedProtocol: An L{IProtocol} provider that will be
- connected.
- """
-
- def __init__(self, connectedDeferred, wrappedProtocol):
- """
- @param connectedDeferred: The L{Deferred} that will callback
- with the C{wrappedProtocol} when it is connected.
-
- @param wrappedProtocol: An L{IProtocol} provider that will be
- connected.
- """
- self._connectedDeferred = connectedDeferred
- self._wrappedProtocol = wrappedProtocol
-
- if interfaces.IHalfCloseableProtocol.providedBy(
- self._wrappedProtocol):
- directlyProvides(self, interfaces.IHalfCloseableProtocol)
-
-
- def logPrefix(self):
- """
- Transparently pass through the wrapped protocol's log prefix.
- """
- if interfaces.ILoggingContext.providedBy(self._wrappedProtocol):
- return self._wrappedProtocol.logPrefix()
- return self._wrappedProtocol.__class__.__name__
-
-
- def connectionMade(self):
- """
- Connect the C{self._wrappedProtocol} to our C{self.transport} and
- callback C{self._connectedDeferred} with the C{self._wrappedProtocol}
- """
- self._wrappedProtocol.makeConnection(self.transport)
- self._connectedDeferred.callback(self._wrappedProtocol)
-
-
- def dataReceived(self, data):
- """
- Proxy C{dataReceived} calls to our C{self._wrappedProtocol}
- """
- return self._wrappedProtocol.dataReceived(data)
-
-
- def connectionLost(self, reason):
- """
- Proxy C{connectionLost} calls to our C{self._wrappedProtocol}
- """
- return self._wrappedProtocol.connectionLost(reason)
-
-
- def readConnectionLost(self):
- """
- Proxy L{IHalfCloseableProtocol.readConnectionLost} to our
- C{self._wrappedProtocol}
- """
- self._wrappedProtocol.readConnectionLost()
-
-
- def writeConnectionLost(self):
- """
- Proxy L{IHalfCloseableProtocol.writeConnectionLost} to our
- C{self._wrappedProtocol}
- """
- self._wrappedProtocol.writeConnectionLost()
-
-
-
-class _WrappingFactory(ClientFactory):
- """
- Wrap a factory in order to wrap the protocols it builds.
-
- @ivar _wrappedFactory: A provider of I{IProtocolFactory} whose buildProtocol
- method will be called and whose resulting protocol will be wrapped.
-
- @ivar _onConnection: An L{Deferred} that fires when the protocol is
- connected
-
- @ivar _connector: A L{connector <twisted.internet.interfaces.IConnector>}
- that is managing the current or previous connection attempt.
- """
- protocol = _WrappingProtocol
-
- def __init__(self, wrappedFactory):
- """
- @param wrappedFactory: A provider of I{IProtocolFactory} whose
- buildProtocol method will be called and whose resulting protocol
- will be wrapped.
- """
- self._wrappedFactory = wrappedFactory
- self._onConnection = defer.Deferred(canceller=self._canceller)
-
-
- def startedConnecting(self, connector):
- """
- A connection attempt was started. Remember the connector which started
- said attempt, for use later.
- """
- self._connector = connector
-
-
- def _canceller(self, deferred):
- """
- The outgoing connection attempt was cancelled. Fail that L{Deferred}
- with a L{error.ConnectingCancelledError}.
-
- @param deferred: The L{Deferred <defer.Deferred>} that was cancelled;
- should be the same as C{self._onConnection}.
- @type deferred: L{Deferred <defer.Deferred>}
-
- @note: This relies on startedConnecting having been called, so it may
- seem as though there's a race condition where C{_connector} may not
- have been set. However, using public APIs, this condition is
- impossible to catch, because a connection API
- (C{connectTCP}/C{SSL}/C{UNIX}) is always invoked before a
- L{_WrappingFactory}'s L{Deferred <defer.Deferred>} is returned to
- C{connect()}'s caller.
-
- @return: C{None}
- """
- deferred.errback(
- error.ConnectingCancelledError(
- self._connector.getDestination()))
- self._connector.stopConnecting()
-
-
- def doStart(self):
- """
- Start notifications are passed straight through to the wrapped factory.
- """
- self._wrappedFactory.doStart()
-
-
- def doStop(self):
- """
- Stop notifications are passed straight through to the wrapped factory.
- """
- self._wrappedFactory.doStop()
-
-
- def buildProtocol(self, addr):
- """
- Proxy C{buildProtocol} to our C{self._wrappedFactory} or errback
- the C{self._onConnection} L{Deferred}.
-
- @return: An instance of L{_WrappingProtocol} or C{None}
- """
- try:
- proto = self._wrappedFactory.buildProtocol(addr)
- except:
- self._onConnection.errback()
- else:
- return self.protocol(self._onConnection, proto)
-
-
- def clientConnectionFailed(self, connector, reason):
- """
- Errback the C{self._onConnection} L{Deferred} when the
- client connection fails.
- """
- if not self._onConnection.called:
- self._onConnection.errback(reason)
-
-
-
-class TCP4ServerEndpoint(object):
- """
- TCP server endpoint with an IPv4 configuration
-
- @ivar _reactor: An L{IReactorTCP} provider.
-
- @type _port: int
- @ivar _port: The port number on which to listen for incoming connections.
-
- @type _backlog: int
- @ivar _backlog: size of the listen queue
-
- @type _interface: str
- @ivar _interface: the hostname to bind to, defaults to '' (all)
- """
- implements(interfaces.IStreamServerEndpoint)
-
- def __init__(self, reactor, port, backlog=50, interface=''):
- """
- @param reactor: An L{IReactorTCP} provider.
- @param port: The port number used listening
- @param backlog: size of the listen queue
- @param interface: the hostname to bind to, defaults to '' (all)
- """
- self._reactor = reactor
- self._port = port
- self._listenArgs = dict(backlog=50, interface='')
- self._backlog = backlog
- self._interface = interface
-
-
- def listen(self, protocolFactory):
- """
- Implement L{IStreamServerEndpoint.listen} to listen on a TCP socket
- """
- return defer.execute(self._reactor.listenTCP,
- self._port,
- protocolFactory,
- backlog=self._backlog,
- interface=self._interface)
-
-
-
-class TCP4ClientEndpoint(object):
- """
- TCP client endpoint with an IPv4 configuration.
-
- @ivar _reactor: An L{IReactorTCP} provider.
-
- @type _host: str
- @ivar _host: The hostname to connect to as a C{str}
-
- @type _port: int
- @ivar _port: The port to connect to as C{int}
-
- @type _timeout: int
- @ivar _timeout: number of seconds to wait before assuming the
- connection has failed.
-
- @type _bindAddress: tuple
- @type _bindAddress: a (host, port) tuple of local address to bind
- to, or None.
- """
- implements(interfaces.IStreamClientEndpoint)
-
- def __init__(self, reactor, host, port, timeout=30, bindAddress=None):
- """
- @param reactor: An L{IReactorTCP} provider
- @param host: A hostname, used when connecting
- @param port: The port number, used when connecting
- @param timeout: number of seconds to wait before assuming the
- connection has failed.
- @param bindAddress: a (host, port tuple of local address to bind to,
- or None.
- """
- self._reactor = reactor
- self._host = host
- self._port = port
- self._timeout = timeout
- self._bindAddress = bindAddress
-
-
- def connect(self, protocolFactory):
- """
- Implement L{IStreamClientEndpoint.connect} to connect via TCP.
- """
- try:
- wf = _WrappingFactory(protocolFactory)
- self._reactor.connectTCP(
- self._host, self._port, wf,
- timeout=self._timeout, bindAddress=self._bindAddress)
- return wf._onConnection
- except:
- return defer.fail()
-
-
-
-class SSL4ServerEndpoint(object):
- """
- SSL secured TCP server endpoint with an IPv4 configuration.
-
- @ivar _reactor: An L{IReactorSSL} provider.
-
- @type _host: str
- @ivar _host: The hostname to connect to as a C{str}
-
- @type _port: int
- @ivar _port: The port to connect to as C{int}
-
- @type _sslContextFactory: L{OpenSSLCertificateOptions}
- @var _sslContextFactory: SSL Configuration information as an
- L{OpenSSLCertificateOptions}
-
- @type _backlog: int
- @ivar _backlog: size of the listen queue
-
- @type _interface: str
- @ivar _interface: the hostname to bind to, defaults to '' (all)
- """
- implements(interfaces.IStreamServerEndpoint)
-
- def __init__(self, reactor, port, sslContextFactory,
- backlog=50, interface=''):
- """
- @param reactor: An L{IReactorSSL} provider.
- @param port: The port number used listening
- @param sslContextFactory: An instance of
- L{twisted.internet._sslverify.OpenSSLCertificateOptions}.
- @param timeout: number of seconds to wait before assuming the
- connection has failed.
- @param bindAddress: a (host, port tuple of local address to bind to,
- or None.
- """
- self._reactor = reactor
- self._port = port
- self._sslContextFactory = sslContextFactory
- self._backlog = backlog
- self._interface = interface
-
-
- def listen(self, protocolFactory):
- """
- Implement L{IStreamServerEndpoint.listen} to listen for SSL on a
- TCP socket.
- """
- return defer.execute(self._reactor.listenSSL, self._port,
- protocolFactory,
- contextFactory=self._sslContextFactory,
- backlog=self._backlog,
- interface=self._interface)
-
-
-
-class SSL4ClientEndpoint(object):
- """
- SSL secured TCP client endpoint with an IPv4 configuration
-
- @ivar _reactor: An L{IReactorSSL} provider.
-
- @type _host: str
- @ivar _host: The hostname to connect to as a C{str}
-
- @type _port: int
- @ivar _port: The port to connect to as C{int}
-
- @type _sslContextFactory: L{OpenSSLCertificateOptions}
- @var _sslContextFactory: SSL Configuration information as an
- L{OpenSSLCertificateOptions}
-
- @type _timeout: int
- @ivar _timeout: number of seconds to wait before assuming the
- connection has failed.
-
- @type _bindAddress: tuple
- @ivar _bindAddress: a (host, port) tuple of local address to bind
- to, or None.
- """
- implements(interfaces.IStreamClientEndpoint)
-
- def __init__(self, reactor, host, port, sslContextFactory,
- timeout=30, bindAddress=None):
- """
- @param reactor: An L{IReactorSSL} provider.
- @param host: A hostname, used when connecting
- @param port: The port number, used when connecting
- @param sslContextFactory: SSL Configuration information as An instance
- of L{OpenSSLCertificateOptions}.
- @param timeout: number of seconds to wait before assuming the
- connection has failed.
- @param bindAddress: a (host, port tuple of local address to bind to,
- or None.
- """
- self._reactor = reactor
- self._host = host
- self._port = port
- self._sslContextFactory = sslContextFactory
- self._timeout = timeout
- self._bindAddress = bindAddress
-
-
- def connect(self, protocolFactory):
- """
- Implement L{IStreamClientEndpoint.connect} to connect with SSL over
- TCP.
- """
- try:
- wf = _WrappingFactory(protocolFactory)
- self._reactor.connectSSL(
- self._host, self._port, wf, self._sslContextFactory,
- timeout=self._timeout, bindAddress=self._bindAddress)
- return wf._onConnection
- except:
- return defer.fail()
-
-
-
-class UNIXServerEndpoint(object):
- """
- UnixSocket server endpoint.
-
- @type path: str
- @ivar path: a path to a unix socket on the filesystem.
-
- @type _listenArgs: dict
- @ivar _listenArgs: A C{dict} of keyword args that will be passed
- to L{IReactorUNIX.listenUNIX}
-
- @var _reactor: An L{IReactorTCP} provider.
- """
- implements(interfaces.IStreamServerEndpoint)
-
- def __init__(self, reactor, address, backlog=50, mode=0666, wantPID=0):
- """
- @param reactor: An L{IReactorUNIX} provider.
- @param address: The path to the Unix socket file, used when listening
- @param listenArgs: An optional dict of keyword args that will be
- passed to L{IReactorUNIX.listenUNIX}
- @param backlog: number of connections to allow in backlog.
- @param mode: mode to set on the unix socket. This parameter is
- deprecated. Permissions should be set on the directory which
- contains the UNIX socket.
- @param wantPID: if True, create a pidfile for the socket.
- """
- self._reactor = reactor
- self._address = address
- self._backlog = backlog
- self._mode = mode
- self._wantPID = wantPID
-
-
- def listen(self, protocolFactory):
- """
- Implement L{IStreamServerEndpoint.listen} to listen on a UNIX socket.
- """
- return defer.execute(self._reactor.listenUNIX, self._address,
- protocolFactory,
- backlog=self._backlog,
- mode=self._mode,
- wantPID=self._wantPID)
-
-
-
-class UNIXClientEndpoint(object):
- """
- UnixSocket client endpoint.
-
- @type _path: str
- @ivar _path: a path to a unix socket on the filesystem.
-
- @type _timeout: int
- @ivar _timeout: number of seconds to wait before assuming the connection
- has failed.
-
- @type _checkPID: bool
- @ivar _checkPID: if True, check for a pid file to verify that a server
- is listening.
-
- @var _reactor: An L{IReactorUNIX} provider.
- """
- implements(interfaces.IStreamClientEndpoint)
-
- def __init__(self, reactor, path, timeout=30, checkPID=0):
- """
- @param reactor: An L{IReactorUNIX} provider.
- @param path: The path to the Unix socket file, used when connecting
- @param timeout: number of seconds to wait before assuming the
- connection has failed.
- @param checkPID: if True, check for a pid file to verify that a server
- is listening.
- """
- self._reactor = reactor
- self._path = path
- self._timeout = timeout
- self._checkPID = checkPID
-
-
- def connect(self, protocolFactory):
- """
- Implement L{IStreamClientEndpoint.connect} to connect via a
- UNIX Socket
- """
- try:
- wf = _WrappingFactory(protocolFactory)
- self._reactor.connectUNIX(
- self._path, wf,
- timeout=self._timeout,
- checkPID=self._checkPID)
- return wf._onConnection
- except:
- return defer.fail()
-
-
-
-class AdoptedStreamServerEndpoint(object):
- """
- An endpoint for listening on a file descriptor initialized outside of
- Twisted.
-
- @ivar _used: A C{bool} indicating whether this endpoint has been used to
- listen with a factory yet. C{True} if so.
- """
- _close = os.close
- _setNonBlocking = staticmethod(fdesc.setNonBlocking)
-
- def __init__(self, reactor, fileno, addressFamily):
- """
- @param reactor: An L{IReactorSocket} provider.
-
- @param fileno: An integer file descriptor corresponding to a listening
- I{SOCK_STREAM} socket.
-
- @param addressFamily: The address family of the socket given by
- C{fileno}.
- """
- self.reactor = reactor
- self.fileno = fileno
- self.addressFamily = addressFamily
- self._used = False
-
-
- def listen(self, factory):
- """
- Implement L{IStreamServerEndpoint.listen} to start listening on, and
- then close, C{self._fileno}.
- """
- if self._used:
- return defer.fail(error.AlreadyListened())
- self._used = True
-
- try:
- self._setNonBlocking(self.fileno)
- port = self.reactor.adoptStreamPort(
- self.fileno, self.addressFamily, factory)
- self._close(self.fileno)
- except:
- return defer.fail()
- return defer.succeed(port)
-
-
-
-def _parseTCP(factory, port, interface="", backlog=50):
- """
- Internal parser function for L{_parseServer} to convert the string
- arguments for a TCP(IPv4) stream endpoint into the structured arguments.
-
- @param factory: the protocol factory being parsed, or C{None}. (This was a
- leftover argument from when this code was in C{strports}, and is now
- mostly None and unused.)
-
- @type factory: L{IProtocolFactory} or C{NoneType}
-
- @param port: the integer port number to bind
- @type port: C{str}
-
- @param interface: the interface IP to listen on
- @param backlog: the length of the listen queue
- @type backlog: C{str}
-
- @return: a 2-tuple of (args, kwargs), describing the parameters to
- L{IReactorTCP.listenTCP} (or, modulo argument 2, the factory, arguments
- to L{TCP4ServerEndpoint}.
- """
- return (int(port), factory), {'interface': interface,
- 'backlog': int(backlog)}
-
-
-
-def _parseUNIX(factory, address, mode='666', backlog=50, lockfile=True):
- """
- Internal parser function for L{_parseServer} to convert the string
- arguments for a UNIX (AF_UNIX/SOCK_STREAM) stream endpoint into the
- structured arguments.
-
- @param factory: the protocol factory being parsed, or C{None}. (This was a
- leftover argument from when this code was in C{strports}, and is now
- mostly None and unused.)
-
- @type factory: L{IProtocolFactory} or C{NoneType}
-
- @param address: the pathname of the unix socket
- @type address: C{str}
-
- @param backlog: the length of the listen queue
- @type backlog: C{str}
-
- @param lockfile: A string '0' or '1', mapping to True and False
- respectively. See the C{wantPID} argument to C{listenUNIX}
-
- @return: a 2-tuple of (args, kwargs), describing the parameters to
- L{IReactorTCP.listenUNIX} (or, modulo argument 2, the factory,
- arguments to L{UNIXServerEndpoint}.
- """
- return (
- (address, factory),
- {'mode': int(mode, 8), 'backlog': int(backlog),
- 'wantPID': bool(int(lockfile))})
-
-
-
-def _parseSSL(factory, port, privateKey="server.pem", certKey=None,
- sslmethod=None, interface='', backlog=50):
- """
- Internal parser function for L{_parseServer} to convert the string
- arguments for an SSL (over TCP/IPv4) stream endpoint into the structured
- arguments.
-
- @param factory: the protocol factory being parsed, or C{None}. (This was a
- leftover argument from when this code was in C{strports}, and is now
- mostly None and unused.)
-
- @type factory: L{IProtocolFactory} or C{NoneType}
-
- @param port: the integer port number to bind
- @type port: C{str}
-
- @param interface: the interface IP to listen on
- @param backlog: the length of the listen queue
- @type backlog: C{str}
-
- @param privateKey: The file name of a PEM format private key file.
- @type privateKey: C{str}
-
- @param certKey: The file name of a PEM format certificate file.
- @type certKey: C{str}
-
- @param sslmethod: The string name of an SSL method, based on the name of a
- constant in C{OpenSSL.SSL}. Must be one of: "SSLv23_METHOD",
- "SSLv2_METHOD", "SSLv3_METHOD", "TLSv1_METHOD".
- @type sslmethod: C{str}
-
- @return: a 2-tuple of (args, kwargs), describing the parameters to
- L{IReactorSSL.listenSSL} (or, modulo argument 2, the factory, arguments
- to L{SSL4ServerEndpoint}.
- """
- from twisted.internet import ssl
- if certKey is None:
- certKey = privateKey
- kw = {}
- if sslmethod is not None:
- kw['sslmethod'] = getattr(ssl.SSL, sslmethod)
- cf = ssl.DefaultOpenSSLContextFactory(privateKey, certKey, **kw)
- return ((int(port), factory, cf),
- {'interface': interface, 'backlog': int(backlog)})
-
-
-class _SystemdParser(object):
- """
- Stream server endpoint string parser for the I{systemd} endpoint type.
-
- @ivar prefix: See L{IStreamClientEndpointStringParser.prefix}.
-
- @ivar _sddaemon: A L{ListenFDs} instance used to translate an index into an
- actual file descriptor.
- """
- implements(IPlugin, IStreamServerEndpointStringParser)
-
- #_sddaemon = ListenFDs.fromEnvironment()
-
- prefix = "systemd"
-
- def _parseServer(self, reactor, domain, index):
- """
- Internal parser function for L{_parseServer} to convert the string
- arguments for a systemd server endpoint into structured arguments for
- L{AdoptedStreamServerEndpoint}.
-
- @param reactor: An L{IReactorSocket} provider.
-
- @param domain: The domain (or address family) of the socket inherited
- from systemd. This is a string like C{"INET"} or C{"UNIX"}, ie the
- name of an address family from the L{socket} module, without the
- C{"AF_"} prefix.
- @type domain: C{str}
-
- @param index: An offset into the list of file descriptors inherited from
- systemd.
- @type index: C{str}
-
- @return: A two-tuple of parsed positional arguments and parsed keyword
- arguments (a tuple and a dictionary). These can be used to
- construct a L{AdoptedStreamServerEndpoint}.
- """
- index = int(index)
- fileno = self._sddaemon.inheritedDescriptors()[index]
- addressFamily = getattr(socket, 'AF_' + domain)
- return AdoptedStreamServerEndpoint(reactor, fileno, addressFamily)
-
-
- def parseStreamServer(self, reactor, *args, **kwargs):
- # Delegate to another function with a sane signature. This function has
- # an insane signature to trick zope.interface into believing the
- # interface is correctly implemented.
- return self._parseServer(reactor, *args, **kwargs)
-
-
-
-_serverParsers = {"tcp": _parseTCP,
- "unix": _parseUNIX,
- "ssl": _parseSSL,
- }
-
-_OP, _STRING = range(2)
-
-def _tokenize(description):
- """
- Tokenize a strports string and yield each token.
-
- @param description: a string as described by L{serverFromString} or
- L{clientFromString}.
-
- @return: an iterable of 2-tuples of (L{_OP} or L{_STRING}, string). Tuples
- starting with L{_OP} will contain a second element of either ':' (i.e.
- 'next parameter') or '=' (i.e. 'assign parameter value'). For example,
- the string 'hello:greet\=ing=world' would result in a generator
- yielding these values::
-
- _STRING, 'hello'
- _OP, ':'
- _STRING, 'greet=ing'
- _OP, '='
- _STRING, 'world'
- """
- current = ''
- ops = ':='
- nextOps = {':': ':=', '=': ':'}
- description = iter(description)
- for n in description:
- if n in ops:
- yield _STRING, current
- yield _OP, n
- current = ''
- ops = nextOps[n]
- elif n == '\\':
- current += description.next()
- else:
- current += n
- yield _STRING, current
-
-
-
-def _parse(description):
- """
- Convert a description string into a list of positional and keyword
- parameters, using logic vaguely like what Python does.
-
- @param description: a string as described by L{serverFromString} or
- L{clientFromString}.
-
- @return: a 2-tuple of C{(args, kwargs)}, where 'args' is a list of all
- ':'-separated C{str}s not containing an '=' and 'kwargs' is a map of
- all C{str}s which do contain an '='. For example, the result of
- C{_parse('a:b:d=1:c')} would be C{(['a', 'b', 'c'], {'d': '1'})}.
- """
- args, kw = [], {}
- def add(sofar):
- if len(sofar) == 1:
- args.append(sofar[0])
- else:
- kw[sofar[0]] = sofar[1]
- sofar = ()
- for (type, value) in _tokenize(description):
- if type is _STRING:
- sofar += (value,)
- elif value == ':':
- add(sofar)
- sofar = ()
- add(sofar)
- return args, kw
-
-
-# Mappings from description "names" to endpoint constructors.
-_endpointServerFactories = {
- 'TCP': TCP4ServerEndpoint,
- 'SSL': SSL4ServerEndpoint,
- 'UNIX': UNIXServerEndpoint,
- }
-
-_endpointClientFactories = {
- 'TCP': TCP4ClientEndpoint,
- 'SSL': SSL4ClientEndpoint,
- 'UNIX': UNIXClientEndpoint,
- }
-
-
-_NO_DEFAULT = object()
-
-def _parseServer(description, factory, default=None):
- """
- Parse a stports description into a 2-tuple of arguments and keyword values.
-
- @param description: A description in the format explained by
- L{serverFromString}.
- @type description: C{str}
-
- @param factory: A 'factory' argument; this is left-over from
- twisted.application.strports, it's not really used.
- @type factory: L{IProtocolFactory} or L{None}
-
- @param default: Deprecated argument, specifying the default parser mode to
- use for unqualified description strings (those which do not have a ':'
- and prefix).
- @type default: C{str} or C{NoneType}
-
- @return: a 3-tuple of (plugin or name, arguments, keyword arguments)
- """
- args, kw = _parse(description)
- if not args or (len(args) == 1 and not kw):
- deprecationMessage = (
- "Unqualified strport description passed to 'service'."
- "Use qualified endpoint descriptions; for example, 'tcp:%s'."
- % (description,))
- if default is None:
- default = 'tcp'
- warnings.warn(
- deprecationMessage, category=DeprecationWarning, stacklevel=4)
- elif default is _NO_DEFAULT:
- raise ValueError(deprecationMessage)
- # If the default has been otherwise specified, the user has already
- # been warned.
- args[0:0] = [default]
- endpointType = args[0]
- parser = _serverParsers.get(endpointType)
- if parser is None:
- for plugin in getPlugins(IStreamServerEndpointStringParser):
- if plugin.prefix == endpointType:
- return (plugin, args[1:], kw)
- raise ValueError("Unknown endpoint type: '%s'" % (endpointType,))
- return (endpointType.upper(),) + parser(factory, *args[1:], **kw)
-
-
-
-def _serverFromStringLegacy(reactor, description, default):
- """
- Underlying implementation of L{serverFromString} which avoids exposing the
- deprecated 'default' argument to anything but L{strports.service}.
- """
- nameOrPlugin, args, kw = _parseServer(description, None, default)
- if type(nameOrPlugin) is not str:
- plugin = nameOrPlugin
- return plugin.parseStreamServer(reactor, *args, **kw)
- else:
- name = nameOrPlugin
- # Chop out the factory.
- args = args[:1] + args[2:]
- return _endpointServerFactories[name](reactor, *args, **kw)
-
-
-
-def serverFromString(reactor, description):
- """
- Construct a stream server endpoint from an endpoint description string.
-
- The format for server endpoint descriptions is a simple string. It is a
- prefix naming the type of endpoint, then a colon, then the arguments for
- that endpoint.
-
- For example, you can call it like this to create an endpoint that will
- listen on TCP port 80::
-
- serverFromString(reactor, "tcp:80")
-
- Additional arguments may be specified as keywords, separated with colons.
- For example, you can specify the interface for a TCP server endpoint to
- bind to like this::
-
- serverFromString(reactor, "tcp:80:interface=127.0.0.1")
-
- SSL server endpoints may be specified with the 'ssl' prefix, and the
- private key and certificate files may be specified by the C{privateKey} and
- C{certKey} arguments::
-
- serverFromString(reactor, "ssl:443:privateKey=key.pem:certKey=crt.pem")
-
- If a private key file name (C{privateKey}) isn't provided, a "server.pem"
- file is assumed to exist which contains the private key. If the certificate
- file name (C{certKey}) isn't provided, the private key file is assumed to
- contain the certificate as well.
-
- You may escape colons in arguments with a backslash, which you will need to
- use if you want to specify a full pathname argument on Windows::
-
- serverFromString(reactor,
- "ssl:443:privateKey=C\\:/key.pem:certKey=C\\:/cert.pem")
-
- finally, the 'unix' prefix may be used to specify a filesystem UNIX socket,
- optionally with a 'mode' argument to specify the mode of the socket file
- created by C{listen}::
-
- serverFromString(reactor, "unix:/var/run/finger")
- serverFromString(reactor, "unix:/var/run/finger:mode=660")
-
- This function is also extensible; new endpoint types may be registered as
- L{IStreamServerEndpointStringParser} plugins. See that interface for more
- information.
-
- @param reactor: The server endpoint will be constructed with this reactor.
-
- @param description: The strports description to parse.
-
- @return: A new endpoint which can be used to listen with the parameters
- given by by C{description}.
-
- @rtype: L{IStreamServerEndpoint<twisted.internet.interfaces.IStreamServerEndpoint>}
-
- @raise ValueError: when the 'description' string cannot be parsed.
-
- @since: 10.2
- """
- return _serverFromStringLegacy(reactor, description, _NO_DEFAULT)
-
-
-
-def quoteStringArgument(argument):
- """
- Quote an argument to L{serverFromString} and L{clientFromString}. Since
- arguments are separated with colons and colons are escaped with
- backslashes, some care is necessary if, for example, you have a pathname,
- you may be tempted to interpolate into a string like this::
-
- serverFromString("ssl:443:privateKey=%s" % (myPathName,))
-
- This may appear to work, but will have portability issues (Windows
- pathnames, for example). Usually you should just construct the appropriate
- endpoint type rather than interpolating strings, which in this case would
- be L{SSL4ServerEndpoint}. There are some use-cases where you may need to
- generate such a string, though; for example, a tool to manipulate a
- configuration file which has strports descriptions in it. To be correct in
- those cases, do this instead::
-
- serverFromString("ssl:443:privateKey=%s" %
- (quoteStringArgument(myPathName),))
-
- @param argument: The part of the endpoint description string you want to
- pass through.
-
- @type argument: C{str}
-
- @return: The quoted argument.
-
- @rtype: C{str}
- """
- return argument.replace('\\', '\\\\').replace(':', '\\:')
-
-
-
-def _parseClientTCP(*args, **kwargs):
- """
- Perform any argument value coercion necessary for TCP client parameters.
-
- Valid positional arguments to this function are host and port.
-
- Valid keyword arguments to this function are all L{IReactorTCP.connectTCP}
- arguments.
-
- @return: The coerced values as a C{dict}.
- """
-
- if len(args) == 2:
- kwargs['port'] = int(args[1])
- kwargs['host'] = args[0]
- elif len(args) == 1:
- if 'host' in kwargs:
- kwargs['port'] = int(args[0])
- else:
- kwargs['host'] = args[0]
-
- try:
- kwargs['port'] = int(kwargs['port'])
- except KeyError:
- pass
-
- try:
- kwargs['timeout'] = int(kwargs['timeout'])
- except KeyError:
- pass
- return kwargs
-
-
-
-def _loadCAsFromDir(directoryPath):
- """
- Load certificate-authority certificate objects in a given directory.
-
- @param directoryPath: a L{FilePath} pointing at a directory to load .pem
- files from.
-
- @return: a C{list} of L{OpenSSL.crypto.X509} objects.
- """
- from twisted.internet import ssl
-
- caCerts = {}
- for child in directoryPath.children():
- if not child.basename().split('.')[-1].lower() == 'pem':
- continue
- try:
- data = child.getContent()
- except IOError:
- # Permission denied, corrupt disk, we don't care.
- continue
- try:
- theCert = ssl.Certificate.loadPEM(data)
- except ssl.SSL.Error:
- # Duplicate certificate, invalid certificate, etc. We don't care.
- pass
- else:
- caCerts[theCert.digest()] = theCert.original
- return caCerts.values()
-
-
-
-def _parseClientSSL(*args, **kwargs):
- """
- Perform any argument value coercion necessary for SSL client parameters.
-
- Valid keyword arguments to this function are all L{IReactorSSL.connectSSL}
- arguments except for C{contextFactory}. Instead, C{certKey} (the path name
- of the certificate file) C{privateKey} (the path name of the private key
- associated with the certificate) are accepted and used to construct a
- context factory.
-
- Valid positional arguments to this function are host and port.
-
- @param caCertsDir: The one parameter which is not part of
- L{IReactorSSL.connectSSL}'s signature, this is a path name used to
- construct a list of certificate authority certificates. The directory
- will be scanned for files ending in C{.pem}, all of which will be
- considered valid certificate authorities for this connection.
-
- @type caCertsDir: C{str}
-
- @return: The coerced values as a C{dict}.
- """
- from twisted.internet import ssl
- kwargs = _parseClientTCP(*args, **kwargs)
- certKey = kwargs.pop('certKey', None)
- privateKey = kwargs.pop('privateKey', None)
- caCertsDir = kwargs.pop('caCertsDir', None)
- if certKey is not None:
- certx509 = ssl.Certificate.loadPEM(
- FilePath(certKey).getContent()).original
- else:
- certx509 = None
- if privateKey is not None:
- privateKey = ssl.PrivateCertificate.loadPEM(
- FilePath(privateKey).getContent()).privateKey.original
- else:
- privateKey = None
- if caCertsDir is not None:
- verify = True
- caCerts = _loadCAsFromDir(FilePath(caCertsDir))
- else:
- verify = False
- caCerts = None
- kwargs['sslContextFactory'] = ssl.CertificateOptions(
- method=ssl.SSL.SSLv23_METHOD,
- certificate=certx509,
- privateKey=privateKey,
- verify=verify,
- caCerts=caCerts
- )
- return kwargs
-
-
-
-def _parseClientUNIX(**kwargs):
- """
- Perform any argument value coercion necessary for UNIX client parameters.
-
- Valid keyword arguments to this function are all L{IReactorUNIX.connectUNIX}
- arguments except for C{checkPID}. Instead, C{lockfile} is accepted and has
- the same meaning.
-
- @return: The coerced values as a C{dict}.
- """
- try:
- kwargs['checkPID'] = bool(int(kwargs.pop('lockfile')))
- except KeyError:
- pass
- try:
- kwargs['timeout'] = int(kwargs['timeout'])
- except KeyError:
- pass
- return kwargs
-
-_clientParsers = {
- 'TCP': _parseClientTCP,
- 'SSL': _parseClientSSL,
- 'UNIX': _parseClientUNIX,
- }
-
-
-
-def clientFromString(reactor, description):
- """
- Construct a client endpoint from a description string.
-
- Client description strings are much like server description strings,
- although they take all of their arguments as keywords, aside from host and
- port.
-
- You can create a TCP client endpoint with the 'host' and 'port' arguments,
- like so::
-
- clientFromString(reactor, "tcp:host=www.example.com:port=80")
-
- or, without specifying host and port keywords::
-
- clientFromString(reactor, "tcp:www.example.com:80")
-
- Or you can specify only one or the other, as in the following 2 examples::
-
- clientFromString(reactor, "tcp:host=www.example.com:80")
- clientFromString(reactor, "tcp:www.example.com:port=80")
-
- or an SSL client endpoint with those arguments, plus the arguments used by
- the server SSL, for a client certificate::
-
- clientFromString(reactor, "ssl:web.example.com:443:"
- "privateKey=foo.pem:certKey=foo.pem")
-
- to specify your certificate trust roots, you can identify a directory with
- PEM files in it with the C{caCertsDir} argument::
-
- clientFromString(reactor, "ssl:host=web.example.com:port=443:"
- "caCertsDir=/etc/ssl/certs")
-
- This function is also extensible; new endpoint types may be registered as
- L{IStreamClientEndpointStringParser} plugins. See that interface for more
- information.
-
- @param reactor: The client endpoint will be constructed with this reactor.
-
- @param description: The strports description to parse.
-
- @return: A new endpoint which can be used to connect with the parameters
- given by by C{description}.
- @rtype: L{IStreamClientEndpoint<twisted.internet.interfaces.IStreamClientEndpoint>}
-
- @since: 10.2
- """
- args, kwargs = _parse(description)
- aname = args.pop(0)
- name = aname.upper()
- for plugin in getPlugins(IStreamClientEndpointStringParser):
- if plugin.prefix.upper() == name:
- return plugin.parseStreamClient(*args, **kwargs)
- if name not in _clientParsers:
- raise ValueError("Unknown endpoint type: %r" % (aname,))
- kwargs = _clientParsers[name](*args, **kwargs)
- return _endpointClientFactories[name](reactor, **kwargs)
Copied: CalendarServer/trunk/twext/backport/internet/endpoints.py (from rev 9105, CalendarServer/branches/users/glyph/ipv6-client/twext/backport/internet/endpoints.py)
===================================================================
--- CalendarServer/trunk/twext/backport/internet/endpoints.py (rev 0)
+++ CalendarServer/trunk/twext/backport/internet/endpoints.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -0,0 +1,1178 @@
+# -*- test-case-name: twisted.internet.test.test_endpoints -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+"""
+Implementations of L{IStreamServerEndpoint} and L{IStreamClientEndpoint} that
+wrap the L{IReactorTCP}, L{IReactorSSL}, and L{IReactorUNIX} interfaces.
+
+This also implements an extensible mini-language for describing endpoints,
+parsed by the L{clientFromString} and L{serverFromString} functions.
+
+ at since: 10.1
+"""
+
+import os, socket
+
+from zope.interface import implements, directlyProvides
+import warnings
+
+from twisted.internet import interfaces, defer, error, fdesc
+from twisted.internet.protocol import ClientFactory, Protocol
+from twisted.plugin import IPlugin, getPlugins
+from twisted.internet.interfaces import IStreamServerEndpointStringParser
+from twisted.internet.interfaces import IStreamClientEndpointStringParser
+from twisted.python.filepath import FilePath
+#from twisted.python.systemd import ListenFDs
+
+
+__all__ = ["clientFromString", "serverFromString",
+ "TCP4ServerEndpoint", "TCP4ClientEndpoint",
+ "UNIXServerEndpoint", "UNIXClientEndpoint",
+ "SSL4ServerEndpoint", "SSL4ClientEndpoint",
+ "AdoptedStreamServerEndpoint"]
+
+
+class _WrappingProtocol(Protocol):
+ """
+ Wrap another protocol in order to notify my user when a connection has
+ been made.
+
+ @ivar _connectedDeferred: The L{Deferred} that will callback
+ with the C{wrappedProtocol} when it is connected.
+
+ @ivar _wrappedProtocol: An L{IProtocol} provider that will be
+ connected.
+ """
+
+ def __init__(self, connectedDeferred, wrappedProtocol):
+ """
+ @param connectedDeferred: The L{Deferred} that will callback
+ with the C{wrappedProtocol} when it is connected.
+
+ @param wrappedProtocol: An L{IProtocol} provider that will be
+ connected.
+ """
+ self._connectedDeferred = connectedDeferred
+ self._wrappedProtocol = wrappedProtocol
+
+ if interfaces.IHalfCloseableProtocol.providedBy(
+ self._wrappedProtocol):
+ directlyProvides(self, interfaces.IHalfCloseableProtocol)
+
+
+ def logPrefix(self):
+ """
+ Transparently pass through the wrapped protocol's log prefix.
+ """
+ if interfaces.ILoggingContext.providedBy(self._wrappedProtocol):
+ return self._wrappedProtocol.logPrefix()
+ return self._wrappedProtocol.__class__.__name__
+
+
+ def connectionMade(self):
+ """
+ Connect the C{self._wrappedProtocol} to our C{self.transport} and
+ callback C{self._connectedDeferred} with the C{self._wrappedProtocol}
+ """
+ self._wrappedProtocol.makeConnection(self.transport)
+ self._connectedDeferred.callback(self._wrappedProtocol)
+
+
+ def dataReceived(self, data):
+ """
+ Proxy C{dataReceived} calls to our C{self._wrappedProtocol}
+ """
+ return self._wrappedProtocol.dataReceived(data)
+
+
+ def connectionLost(self, reason):
+ """
+ Proxy C{connectionLost} calls to our C{self._wrappedProtocol}
+ """
+ return self._wrappedProtocol.connectionLost(reason)
+
+
+ def readConnectionLost(self):
+ """
+ Proxy L{IHalfCloseableProtocol.readConnectionLost} to our
+ C{self._wrappedProtocol}
+ """
+ self._wrappedProtocol.readConnectionLost()
+
+
+ def writeConnectionLost(self):
+ """
+ Proxy L{IHalfCloseableProtocol.writeConnectionLost} to our
+ C{self._wrappedProtocol}
+ """
+ self._wrappedProtocol.writeConnectionLost()
+
+
+
+class _WrappingFactory(ClientFactory):
+ """
+ Wrap a factory in order to wrap the protocols it builds.
+
+ @ivar _wrappedFactory: A provider of I{IProtocolFactory} whose buildProtocol
+ method will be called and whose resulting protocol will be wrapped.
+
+ @ivar _onConnection: An L{Deferred} that fires when the protocol is
+ connected
+
+ @ivar _connector: A L{connector <twisted.internet.interfaces.IConnector>}
+ that is managing the current or previous connection attempt.
+ """
+ protocol = _WrappingProtocol
+
+ def __init__(self, wrappedFactory):
+ """
+ @param wrappedFactory: A provider of I{IProtocolFactory} whose
+ buildProtocol method will be called and whose resulting protocol
+ will be wrapped.
+ """
+ self._wrappedFactory = wrappedFactory
+ self._onConnection = defer.Deferred(canceller=self._canceller)
+
+
+ def startedConnecting(self, connector):
+ """
+ A connection attempt was started. Remember the connector which started
+ said attempt, for use later.
+ """
+ self._connector = connector
+
+
+ def _canceller(self, deferred):
+ """
+ The outgoing connection attempt was cancelled. Fail that L{Deferred}
+ with a L{error.ConnectingCancelledError}.
+
+ @param deferred: The L{Deferred <defer.Deferred>} that was cancelled;
+ should be the same as C{self._onConnection}.
+ @type deferred: L{Deferred <defer.Deferred>}
+
+ @note: This relies on startedConnecting having been called, so it may
+ seem as though there's a race condition where C{_connector} may not
+ have been set. However, using public APIs, this condition is
+ impossible to catch, because a connection API
+ (C{connectTCP}/C{SSL}/C{UNIX}) is always invoked before a
+ L{_WrappingFactory}'s L{Deferred <defer.Deferred>} is returned to
+ C{connect()}'s caller.
+
+ @return: C{None}
+ """
+ deferred.errback(
+ error.ConnectingCancelledError(
+ self._connector.getDestination()))
+ self._connector.stopConnecting()
+
+
+ def doStart(self):
+ """
+ Start notifications are passed straight through to the wrapped factory.
+ """
+ self._wrappedFactory.doStart()
+
+
+ def doStop(self):
+ """
+ Stop notifications are passed straight through to the wrapped factory.
+ """
+ self._wrappedFactory.doStop()
+
+
+ def buildProtocol(self, addr):
+ """
+ Proxy C{buildProtocol} to our C{self._wrappedFactory} or errback
+ the C{self._onConnection} L{Deferred}.
+
+ @return: An instance of L{_WrappingProtocol} or C{None}
+ """
+ try:
+ proto = self._wrappedFactory.buildProtocol(addr)
+ except:
+ self._onConnection.errback()
+ else:
+ return self.protocol(self._onConnection, proto)
+
+
+ def clientConnectionFailed(self, connector, reason):
+ """
+ Errback the C{self._onConnection} L{Deferred} when the
+ client connection fails.
+ """
+ if not self._onConnection.called:
+ self._onConnection.errback(reason)
+
+
+
+class TCP4ServerEndpoint(object):
+ """
+ TCP server endpoint with an IPv4 configuration
+
+ @ivar _reactor: An L{IReactorTCP} provider.
+
+ @type _port: int
+ @ivar _port: The port number on which to listen for incoming connections.
+
+ @type _backlog: int
+ @ivar _backlog: size of the listen queue
+
+ @type _interface: str
+ @ivar _interface: the hostname to bind to, defaults to '' (all)
+ """
+ implements(interfaces.IStreamServerEndpoint)
+
+ def __init__(self, reactor, port, backlog=50, interface=''):
+ """
+ @param reactor: An L{IReactorTCP} provider.
+ @param port: The port number used listening
+ @param backlog: size of the listen queue
+ @param interface: the hostname to bind to, defaults to '' (all)
+ """
+ self._reactor = reactor
+ self._port = port
+ self._listenArgs = dict(backlog=50, interface='')
+ self._backlog = backlog
+ self._interface = interface
+
+
+ def listen(self, protocolFactory):
+ """
+ Implement L{IStreamServerEndpoint.listen} to listen on a TCP socket
+ """
+ return defer.execute(self._reactor.listenTCP,
+ self._port,
+ protocolFactory,
+ backlog=self._backlog,
+ interface=self._interface)
+
+
+
+class TCP4ClientEndpoint(object):
+ """
+ TCP client endpoint with an IPv4 configuration.
+
+ @ivar _reactor: An L{IReactorTCP} provider.
+
+ @type _host: str
+ @ivar _host: The hostname to connect to as a C{str}
+
+ @type _port: int
+ @ivar _port: The port to connect to as C{int}
+
+ @type _timeout: int
+ @ivar _timeout: number of seconds to wait before assuming the
+ connection has failed.
+
+ @type _bindAddress: tuple
+ @type _bindAddress: a (host, port) tuple of local address to bind
+ to, or None.
+ """
+ implements(interfaces.IStreamClientEndpoint)
+
+ def __init__(self, reactor, host, port, timeout=30, bindAddress=None):
+ """
+ @param reactor: An L{IReactorTCP} provider
+ @param host: A hostname, used when connecting
+ @param port: The port number, used when connecting
+ @param timeout: number of seconds to wait before assuming the
+ connection has failed.
+ @param bindAddress: a (host, port tuple of local address to bind to,
+ or None.
+ """
+ self._reactor = reactor
+ self._host = host
+ self._port = port
+ self._timeout = timeout
+ self._bindAddress = bindAddress
+
+
+ def connect(self, protocolFactory):
+ """
+ Implement L{IStreamClientEndpoint.connect} to connect via TCP.
+ """
+ try:
+ wf = _WrappingFactory(protocolFactory)
+ self._reactor.connectTCP(
+ self._host, self._port, wf,
+ timeout=self._timeout, bindAddress=self._bindAddress)
+ return wf._onConnection
+ except:
+ return defer.fail()
+
+
+
+class SSL4ServerEndpoint(object):
+ """
+ SSL secured TCP server endpoint with an IPv4 configuration.
+
+ @ivar _reactor: An L{IReactorSSL} provider.
+
+ @type _host: str
+ @ivar _host: The hostname to connect to as a C{str}
+
+ @type _port: int
+ @ivar _port: The port to connect to as C{int}
+
+ @type _sslContextFactory: L{OpenSSLCertificateOptions}
+ @var _sslContextFactory: SSL Configuration information as an
+ L{OpenSSLCertificateOptions}
+
+ @type _backlog: int
+ @ivar _backlog: size of the listen queue
+
+ @type _interface: str
+ @ivar _interface: the hostname to bind to, defaults to '' (all)
+ """
+ implements(interfaces.IStreamServerEndpoint)
+
+ def __init__(self, reactor, port, sslContextFactory,
+ backlog=50, interface=''):
+ """
+ @param reactor: An L{IReactorSSL} provider.
+ @param port: The port number used listening
+ @param sslContextFactory: An instance of
+ L{twisted.internet._sslverify.OpenSSLCertificateOptions}.
+ @param timeout: number of seconds to wait before assuming the
+ connection has failed.
+ @param bindAddress: a (host, port tuple of local address to bind to,
+ or None.
+ """
+ self._reactor = reactor
+ self._port = port
+ self._sslContextFactory = sslContextFactory
+ self._backlog = backlog
+ self._interface = interface
+
+
+ def listen(self, protocolFactory):
+ """
+ Implement L{IStreamServerEndpoint.listen} to listen for SSL on a
+ TCP socket.
+ """
+ return defer.execute(self._reactor.listenSSL, self._port,
+ protocolFactory,
+ contextFactory=self._sslContextFactory,
+ backlog=self._backlog,
+ interface=self._interface)
+
+
+
+class SSL4ClientEndpoint(object):
+ """
+ SSL secured TCP client endpoint with an IPv4 configuration
+
+ @ivar _reactor: An L{IReactorSSL} provider.
+
+ @type _host: str
+ @ivar _host: The hostname to connect to as a C{str}
+
+ @type _port: int
+ @ivar _port: The port to connect to as C{int}
+
+ @type _sslContextFactory: L{OpenSSLCertificateOptions}
+ @var _sslContextFactory: SSL Configuration information as an
+ L{OpenSSLCertificateOptions}
+
+ @type _timeout: int
+ @ivar _timeout: number of seconds to wait before assuming the
+ connection has failed.
+
+ @type _bindAddress: tuple
+ @ivar _bindAddress: a (host, port) tuple of local address to bind
+ to, or None.
+ """
+ implements(interfaces.IStreamClientEndpoint)
+
+ def __init__(self, reactor, host, port, sslContextFactory,
+ timeout=30, bindAddress=None):
+ """
+ @param reactor: An L{IReactorSSL} provider.
+ @param host: A hostname, used when connecting
+ @param port: The port number, used when connecting
+ @param sslContextFactory: SSL Configuration information as An instance
+ of L{OpenSSLCertificateOptions}.
+ @param timeout: number of seconds to wait before assuming the
+ connection has failed.
+ @param bindAddress: a (host, port tuple of local address to bind to,
+ or None.
+ """
+ self._reactor = reactor
+ self._host = host
+ self._port = port
+ self._sslContextFactory = sslContextFactory
+ self._timeout = timeout
+ self._bindAddress = bindAddress
+
+
+ def connect(self, protocolFactory):
+ """
+ Implement L{IStreamClientEndpoint.connect} to connect with SSL over
+ TCP.
+ """
+ try:
+ wf = _WrappingFactory(protocolFactory)
+ self._reactor.connectSSL(
+ self._host, self._port, wf, self._sslContextFactory,
+ timeout=self._timeout, bindAddress=self._bindAddress)
+ return wf._onConnection
+ except:
+ return defer.fail()
+
+
+
+class UNIXServerEndpoint(object):
+ """
+ UnixSocket server endpoint.
+
+ @type path: str
+ @ivar path: a path to a unix socket on the filesystem.
+
+ @type _listenArgs: dict
+ @ivar _listenArgs: A C{dict} of keyword args that will be passed
+ to L{IReactorUNIX.listenUNIX}
+
+ @var _reactor: An L{IReactorTCP} provider.
+ """
+ implements(interfaces.IStreamServerEndpoint)
+
+ def __init__(self, reactor, address, backlog=50, mode=0666, wantPID=0):
+ """
+ @param reactor: An L{IReactorUNIX} provider.
+ @param address: The path to the Unix socket file, used when listening
+ @param listenArgs: An optional dict of keyword args that will be
+ passed to L{IReactorUNIX.listenUNIX}
+ @param backlog: number of connections to allow in backlog.
+ @param mode: mode to set on the unix socket. This parameter is
+ deprecated. Permissions should be set on the directory which
+ contains the UNIX socket.
+ @param wantPID: if True, create a pidfile for the socket.
+ """
+ self._reactor = reactor
+ self._address = address
+ self._backlog = backlog
+ self._mode = mode
+ self._wantPID = wantPID
+
+
+ def listen(self, protocolFactory):
+ """
+ Implement L{IStreamServerEndpoint.listen} to listen on a UNIX socket.
+ """
+ return defer.execute(self._reactor.listenUNIX, self._address,
+ protocolFactory,
+ backlog=self._backlog,
+ mode=self._mode,
+ wantPID=self._wantPID)
+
+
+
+class UNIXClientEndpoint(object):
+ """
+ UnixSocket client endpoint.
+
+ @type _path: str
+ @ivar _path: a path to a unix socket on the filesystem.
+
+ @type _timeout: int
+ @ivar _timeout: number of seconds to wait before assuming the connection
+ has failed.
+
+ @type _checkPID: bool
+ @ivar _checkPID: if True, check for a pid file to verify that a server
+ is listening.
+
+ @var _reactor: An L{IReactorUNIX} provider.
+ """
+ implements(interfaces.IStreamClientEndpoint)
+
+ def __init__(self, reactor, path, timeout=30, checkPID=0):
+ """
+ @param reactor: An L{IReactorUNIX} provider.
+ @param path: The path to the Unix socket file, used when connecting
+ @param timeout: number of seconds to wait before assuming the
+ connection has failed.
+ @param checkPID: if True, check for a pid file to verify that a server
+ is listening.
+ """
+ self._reactor = reactor
+ self._path = path
+ self._timeout = timeout
+ self._checkPID = checkPID
+
+
+ def connect(self, protocolFactory):
+ """
+ Implement L{IStreamClientEndpoint.connect} to connect via a
+ UNIX Socket
+ """
+ try:
+ wf = _WrappingFactory(protocolFactory)
+ self._reactor.connectUNIX(
+ self._path, wf,
+ timeout=self._timeout,
+ checkPID=self._checkPID)
+ return wf._onConnection
+ except:
+ return defer.fail()
+
+
+
+class AdoptedStreamServerEndpoint(object):
+ """
+ An endpoint for listening on a file descriptor initialized outside of
+ Twisted.
+
+ @ivar _used: A C{bool} indicating whether this endpoint has been used to
+ listen with a factory yet. C{True} if so.
+ """
+ _close = os.close
+ _setNonBlocking = staticmethod(fdesc.setNonBlocking)
+
+ def __init__(self, reactor, fileno, addressFamily):
+ """
+ @param reactor: An L{IReactorSocket} provider.
+
+ @param fileno: An integer file descriptor corresponding to a listening
+ I{SOCK_STREAM} socket.
+
+ @param addressFamily: The address family of the socket given by
+ C{fileno}.
+ """
+ self.reactor = reactor
+ self.fileno = fileno
+ self.addressFamily = addressFamily
+ self._used = False
+
+
+ def listen(self, factory):
+ """
+ Implement L{IStreamServerEndpoint.listen} to start listening on, and
+ then close, C{self._fileno}.
+ """
+ if self._used:
+ return defer.fail(error.AlreadyListened())
+ self._used = True
+
+ try:
+ self._setNonBlocking(self.fileno)
+ port = self.reactor.adoptStreamPort(
+ self.fileno, self.addressFamily, factory)
+ self._close(self.fileno)
+ except:
+ return defer.fail()
+ return defer.succeed(port)
+
+
+
+def _parseTCP(factory, port, interface="", backlog=50):
+ """
+ Internal parser function for L{_parseServer} to convert the string
+ arguments for a TCP(IPv4) stream endpoint into the structured arguments.
+
+ @param factory: the protocol factory being parsed, or C{None}. (This was a
+ leftover argument from when this code was in C{strports}, and is now
+ mostly None and unused.)
+
+ @type factory: L{IProtocolFactory} or C{NoneType}
+
+ @param port: the integer port number to bind
+ @type port: C{str}
+
+ @param interface: the interface IP to listen on
+ @param backlog: the length of the listen queue
+ @type backlog: C{str}
+
+ @return: a 2-tuple of (args, kwargs), describing the parameters to
+ L{IReactorTCP.listenTCP} (or, modulo argument 2, the factory, arguments
+ to L{TCP4ServerEndpoint}.
+ """
+ return (int(port), factory), {'interface': interface,
+ 'backlog': int(backlog)}
+
+
+
+def _parseUNIX(factory, address, mode='666', backlog=50, lockfile=True):
+ """
+ Internal parser function for L{_parseServer} to convert the string
+ arguments for a UNIX (AF_UNIX/SOCK_STREAM) stream endpoint into the
+ structured arguments.
+
+ @param factory: the protocol factory being parsed, or C{None}. (This was a
+ leftover argument from when this code was in C{strports}, and is now
+ mostly None and unused.)
+
+ @type factory: L{IProtocolFactory} or C{NoneType}
+
+ @param address: the pathname of the unix socket
+ @type address: C{str}
+
+ @param backlog: the length of the listen queue
+ @type backlog: C{str}
+
+ @param lockfile: A string '0' or '1', mapping to True and False
+ respectively. See the C{wantPID} argument to C{listenUNIX}
+
+ @return: a 2-tuple of (args, kwargs), describing the parameters to
+ L{IReactorTCP.listenUNIX} (or, modulo argument 2, the factory,
+ arguments to L{UNIXServerEndpoint}.
+ """
+ return (
+ (address, factory),
+ {'mode': int(mode, 8), 'backlog': int(backlog),
+ 'wantPID': bool(int(lockfile))})
+
+
+
+def _parseSSL(factory, port, privateKey="server.pem", certKey=None,
+ sslmethod=None, interface='', backlog=50):
+ """
+ Internal parser function for L{_parseServer} to convert the string
+ arguments for an SSL (over TCP/IPv4) stream endpoint into the structured
+ arguments.
+
+ @param factory: the protocol factory being parsed, or C{None}. (This was a
+ leftover argument from when this code was in C{strports}, and is now
+ mostly None and unused.)
+
+ @type factory: L{IProtocolFactory} or C{NoneType}
+
+ @param port: the integer port number to bind
+ @type port: C{str}
+
+ @param interface: the interface IP to listen on
+ @param backlog: the length of the listen queue
+ @type backlog: C{str}
+
+ @param privateKey: The file name of a PEM format private key file.
+ @type privateKey: C{str}
+
+ @param certKey: The file name of a PEM format certificate file.
+ @type certKey: C{str}
+
+ @param sslmethod: The string name of an SSL method, based on the name of a
+ constant in C{OpenSSL.SSL}. Must be one of: "SSLv23_METHOD",
+ "SSLv2_METHOD", "SSLv3_METHOD", "TLSv1_METHOD".
+ @type sslmethod: C{str}
+
+ @return: a 2-tuple of (args, kwargs), describing the parameters to
+ L{IReactorSSL.listenSSL} (or, modulo argument 2, the factory, arguments
+ to L{SSL4ServerEndpoint}.
+ """
+ from twisted.internet import ssl
+ if certKey is None:
+ certKey = privateKey
+ kw = {}
+ if sslmethod is not None:
+ kw['sslmethod'] = getattr(ssl.SSL, sslmethod)
+ cf = ssl.DefaultOpenSSLContextFactory(privateKey, certKey, **kw)
+ return ((int(port), factory, cf),
+ {'interface': interface, 'backlog': int(backlog)})
+
+
+class _SystemdParser(object):
+ """
+ Stream server endpoint string parser for the I{systemd} endpoint type.
+
+ @ivar prefix: See L{IStreamClientEndpointStringParser.prefix}.
+
+ @ivar _sddaemon: A L{ListenFDs} instance used to translate an index into an
+ actual file descriptor.
+ """
+ implements(IPlugin, IStreamServerEndpointStringParser)
+
+ #_sddaemon = ListenFDs.fromEnvironment()
+
+ prefix = "systemd"
+
+ def _parseServer(self, reactor, domain, index):
+ """
+ Internal parser function for L{_parseServer} to convert the string
+ arguments for a systemd server endpoint into structured arguments for
+ L{AdoptedStreamServerEndpoint}.
+
+ @param reactor: An L{IReactorSocket} provider.
+
+ @param domain: The domain (or address family) of the socket inherited
+ from systemd. This is a string like C{"INET"} or C{"UNIX"}, ie the
+ name of an address family from the L{socket} module, without the
+ C{"AF_"} prefix.
+ @type domain: C{str}
+
+ @param index: An offset into the list of file descriptors inherited from
+ systemd.
+ @type index: C{str}
+
+ @return: A two-tuple of parsed positional arguments and parsed keyword
+ arguments (a tuple and a dictionary). These can be used to
+ construct a L{AdoptedStreamServerEndpoint}.
+ """
+ index = int(index)
+ fileno = self._sddaemon.inheritedDescriptors()[index]
+ addressFamily = getattr(socket, 'AF_' + domain)
+ return AdoptedStreamServerEndpoint(reactor, fileno, addressFamily)
+
+
+ def parseStreamServer(self, reactor, *args, **kwargs):
+ # Delegate to another function with a sane signature. This function has
+ # an insane signature to trick zope.interface into believing the
+ # interface is correctly implemented.
+ return self._parseServer(reactor, *args, **kwargs)
+
+
+
+_serverParsers = {"tcp": _parseTCP,
+ "unix": _parseUNIX,
+ "ssl": _parseSSL,
+ }
+
+_OP, _STRING = range(2)
+
+def _tokenize(description):
+ """
+ Tokenize a strports string and yield each token.
+
+ @param description: a string as described by L{serverFromString} or
+ L{clientFromString}.
+
+ @return: an iterable of 2-tuples of (L{_OP} or L{_STRING}, string). Tuples
+ starting with L{_OP} will contain a second element of either ':' (i.e.
+ 'next parameter') or '=' (i.e. 'assign parameter value'). For example,
+ the string 'hello:greet\=ing=world' would result in a generator
+ yielding these values::
+
+ _STRING, 'hello'
+ _OP, ':'
+ _STRING, 'greet=ing'
+ _OP, '='
+ _STRING, 'world'
+ """
+ current = ''
+ ops = ':='
+ nextOps = {':': ':=', '=': ':'}
+ description = iter(description)
+ for n in description:
+ if n in ops:
+ yield _STRING, current
+ yield _OP, n
+ current = ''
+ ops = nextOps[n]
+ elif n == '\\':
+ current += description.next()
+ else:
+ current += n
+ yield _STRING, current
+
+
+
+def _parse(description):
+ """
+ Convert a description string into a list of positional and keyword
+ parameters, using logic vaguely like what Python does.
+
+ @param description: a string as described by L{serverFromString} or
+ L{clientFromString}.
+
+ @return: a 2-tuple of C{(args, kwargs)}, where 'args' is a list of all
+ ':'-separated C{str}s not containing an '=' and 'kwargs' is a map of
+ all C{str}s which do contain an '='. For example, the result of
+ C{_parse('a:b:d=1:c')} would be C{(['a', 'b', 'c'], {'d': '1'})}.
+ """
+ args, kw = [], {}
+ def add(sofar):
+ if len(sofar) == 1:
+ args.append(sofar[0])
+ else:
+ kw[sofar[0]] = sofar[1]
+ sofar = ()
+ for (type, value) in _tokenize(description):
+ if type is _STRING:
+ sofar += (value,)
+ elif value == ':':
+ add(sofar)
+ sofar = ()
+ add(sofar)
+ return args, kw
+
+
+# Mappings from description "names" to endpoint constructors.
+_endpointServerFactories = {
+ 'TCP': TCP4ServerEndpoint,
+ 'SSL': SSL4ServerEndpoint,
+ 'UNIX': UNIXServerEndpoint,
+ }
+
+_endpointClientFactories = {
+ 'TCP': TCP4ClientEndpoint,
+ 'SSL': SSL4ClientEndpoint,
+ 'UNIX': UNIXClientEndpoint,
+ }
+
+
+_NO_DEFAULT = object()
+
+def _parseServer(description, factory, default=None):
+ """
+ Parse a stports description into a 2-tuple of arguments and keyword values.
+
+ @param description: A description in the format explained by
+ L{serverFromString}.
+ @type description: C{str}
+
+ @param factory: A 'factory' argument; this is left-over from
+ twisted.application.strports, it's not really used.
+ @type factory: L{IProtocolFactory} or L{None}
+
+ @param default: Deprecated argument, specifying the default parser mode to
+ use for unqualified description strings (those which do not have a ':'
+ and prefix).
+ @type default: C{str} or C{NoneType}
+
+ @return: a 3-tuple of (plugin or name, arguments, keyword arguments)
+ """
+ args, kw = _parse(description)
+ if not args or (len(args) == 1 and not kw):
+ deprecationMessage = (
+ "Unqualified strport description passed to 'service'."
+ "Use qualified endpoint descriptions; for example, 'tcp:%s'."
+ % (description,))
+ if default is None:
+ default = 'tcp'
+ warnings.warn(
+ deprecationMessage, category=DeprecationWarning, stacklevel=4)
+ elif default is _NO_DEFAULT:
+ raise ValueError(deprecationMessage)
+ # If the default has been otherwise specified, the user has already
+ # been warned.
+ args[0:0] = [default]
+ endpointType = args[0]
+ parser = _serverParsers.get(endpointType)
+ if parser is None:
+ for plugin in getPlugins(IStreamServerEndpointStringParser):
+ if plugin.prefix == endpointType:
+ return (plugin, args[1:], kw)
+ raise ValueError("Unknown endpoint type: '%s'" % (endpointType,))
+ return (endpointType.upper(),) + parser(factory, *args[1:], **kw)
+
+
+
+def _serverFromStringLegacy(reactor, description, default):
+ """
+ Underlying implementation of L{serverFromString} which avoids exposing the
+ deprecated 'default' argument to anything but L{strports.service}.
+ """
+ nameOrPlugin, args, kw = _parseServer(description, None, default)
+ if type(nameOrPlugin) is not str:
+ plugin = nameOrPlugin
+ return plugin.parseStreamServer(reactor, *args, **kw)
+ else:
+ name = nameOrPlugin
+ # Chop out the factory.
+ args = args[:1] + args[2:]
+ return _endpointServerFactories[name](reactor, *args, **kw)
+
+
+
+def serverFromString(reactor, description):
+ """
+ Construct a stream server endpoint from an endpoint description string.
+
+ The format for server endpoint descriptions is a simple string. It is a
+ prefix naming the type of endpoint, then a colon, then the arguments for
+ that endpoint.
+
+ For example, you can call it like this to create an endpoint that will
+ listen on TCP port 80::
+
+ serverFromString(reactor, "tcp:80")
+
+ Additional arguments may be specified as keywords, separated with colons.
+ For example, you can specify the interface for a TCP server endpoint to
+ bind to like this::
+
+ serverFromString(reactor, "tcp:80:interface=127.0.0.1")
+
+ SSL server endpoints may be specified with the 'ssl' prefix, and the
+ private key and certificate files may be specified by the C{privateKey} and
+ C{certKey} arguments::
+
+ serverFromString(reactor, "ssl:443:privateKey=key.pem:certKey=crt.pem")
+
+ If a private key file name (C{privateKey}) isn't provided, a "server.pem"
+ file is assumed to exist which contains the private key. If the certificate
+ file name (C{certKey}) isn't provided, the private key file is assumed to
+ contain the certificate as well.
+
+ You may escape colons in arguments with a backslash, which you will need to
+ use if you want to specify a full pathname argument on Windows::
+
+ serverFromString(reactor,
+ "ssl:443:privateKey=C\\:/key.pem:certKey=C\\:/cert.pem")
+
+ finally, the 'unix' prefix may be used to specify a filesystem UNIX socket,
+ optionally with a 'mode' argument to specify the mode of the socket file
+ created by C{listen}::
+
+ serverFromString(reactor, "unix:/var/run/finger")
+ serverFromString(reactor, "unix:/var/run/finger:mode=660")
+
+ This function is also extensible; new endpoint types may be registered as
+ L{IStreamServerEndpointStringParser} plugins. See that interface for more
+ information.
+
+ @param reactor: The server endpoint will be constructed with this reactor.
+
+ @param description: The strports description to parse.
+
+ @return: A new endpoint which can be used to listen with the parameters
+ given by by C{description}.
+
+ @rtype: L{IStreamServerEndpoint<twisted.internet.interfaces.IStreamServerEndpoint>}
+
+ @raise ValueError: when the 'description' string cannot be parsed.
+
+ @since: 10.2
+ """
+ return _serverFromStringLegacy(reactor, description, _NO_DEFAULT)
+
+
+
+def quoteStringArgument(argument):
+ """
+ Quote an argument to L{serverFromString} and L{clientFromString}. Since
+ arguments are separated with colons and colons are escaped with
+ backslashes, some care is necessary if, for example, you have a pathname,
+ you may be tempted to interpolate into a string like this::
+
+ serverFromString("ssl:443:privateKey=%s" % (myPathName,))
+
+ This may appear to work, but will have portability issues (Windows
+ pathnames, for example). Usually you should just construct the appropriate
+ endpoint type rather than interpolating strings, which in this case would
+ be L{SSL4ServerEndpoint}. There are some use-cases where you may need to
+ generate such a string, though; for example, a tool to manipulate a
+ configuration file which has strports descriptions in it. To be correct in
+ those cases, do this instead::
+
+ serverFromString("ssl:443:privateKey=%s" %
+ (quoteStringArgument(myPathName),))
+
+ @param argument: The part of the endpoint description string you want to
+ pass through.
+
+ @type argument: C{str}
+
+ @return: The quoted argument.
+
+ @rtype: C{str}
+ """
+ return argument.replace('\\', '\\\\').replace(':', '\\:')
+
+
+
+def _parseClientTCP(*args, **kwargs):
+ """
+ Perform any argument value coercion necessary for TCP client parameters.
+
+ Valid positional arguments to this function are host and port.
+
+ Valid keyword arguments to this function are all L{IReactorTCP.connectTCP}
+ arguments.
+
+ @return: The coerced values as a C{dict}.
+ """
+
+ if len(args) == 2:
+ kwargs['port'] = int(args[1])
+ kwargs['host'] = args[0]
+ elif len(args) == 1:
+ if 'host' in kwargs:
+ kwargs['port'] = int(args[0])
+ else:
+ kwargs['host'] = args[0]
+
+ try:
+ kwargs['port'] = int(kwargs['port'])
+ except KeyError:
+ pass
+
+ try:
+ kwargs['timeout'] = int(kwargs['timeout'])
+ except KeyError:
+ pass
+ return kwargs
+
+
+
+def _loadCAsFromDir(directoryPath):
+ """
+ Load certificate-authority certificate objects in a given directory.
+
+ @param directoryPath: a L{FilePath} pointing at a directory to load .pem
+ files from.
+
+ @return: a C{list} of L{OpenSSL.crypto.X509} objects.
+ """
+ from twisted.internet import ssl
+
+ caCerts = {}
+ for child in directoryPath.children():
+ if not child.basename().split('.')[-1].lower() == 'pem':
+ continue
+ try:
+ data = child.getContent()
+ except IOError:
+ # Permission denied, corrupt disk, we don't care.
+ continue
+ try:
+ theCert = ssl.Certificate.loadPEM(data)
+ except ssl.SSL.Error:
+ # Duplicate certificate, invalid certificate, etc. We don't care.
+ pass
+ else:
+ caCerts[theCert.digest()] = theCert.original
+ return caCerts.values()
+
+
+
+def _parseClientSSL(*args, **kwargs):
+ """
+ Perform any argument value coercion necessary for SSL client parameters.
+
+ Valid keyword arguments to this function are all L{IReactorSSL.connectSSL}
+ arguments except for C{contextFactory}. Instead, C{certKey} (the path name
+ of the certificate file) C{privateKey} (the path name of the private key
+ associated with the certificate) are accepted and used to construct a
+ context factory.
+
+ Valid positional arguments to this function are host and port.
+
+ @param caCertsDir: The one parameter which is not part of
+ L{IReactorSSL.connectSSL}'s signature, this is a path name used to
+ construct a list of certificate authority certificates. The directory
+ will be scanned for files ending in C{.pem}, all of which will be
+ considered valid certificate authorities for this connection.
+
+ @type caCertsDir: C{str}
+
+ @return: The coerced values as a C{dict}.
+ """
+ from twisted.internet import ssl
+ kwargs = _parseClientTCP(*args, **kwargs)
+ certKey = kwargs.pop('certKey', None)
+ privateKey = kwargs.pop('privateKey', None)
+ caCertsDir = kwargs.pop('caCertsDir', None)
+ if certKey is not None:
+ certx509 = ssl.Certificate.loadPEM(
+ FilePath(certKey).getContent()).original
+ else:
+ certx509 = None
+ if privateKey is not None:
+ privateKey = ssl.PrivateCertificate.loadPEM(
+ FilePath(privateKey).getContent()).privateKey.original
+ else:
+ privateKey = None
+ if caCertsDir is not None:
+ verify = True
+ caCerts = _loadCAsFromDir(FilePath(caCertsDir))
+ else:
+ verify = False
+ caCerts = None
+ kwargs['sslContextFactory'] = ssl.CertificateOptions(
+ method=ssl.SSL.SSLv23_METHOD,
+ certificate=certx509,
+ privateKey=privateKey,
+ verify=verify,
+ caCerts=caCerts
+ )
+ return kwargs
+
+
+
+def _parseClientUNIX(**kwargs):
+ """
+ Perform any argument value coercion necessary for UNIX client parameters.
+
+ Valid keyword arguments to this function are all L{IReactorUNIX.connectUNIX}
+ arguments except for C{checkPID}. Instead, C{lockfile} is accepted and has
+ the same meaning.
+
+ @return: The coerced values as a C{dict}.
+ """
+ try:
+ kwargs['checkPID'] = bool(int(kwargs.pop('lockfile')))
+ except KeyError:
+ pass
+ try:
+ kwargs['timeout'] = int(kwargs['timeout'])
+ except KeyError:
+ pass
+ return kwargs
+
+_clientParsers = {
+ 'TCP': _parseClientTCP,
+ 'SSL': _parseClientSSL,
+ 'UNIX': _parseClientUNIX,
+ }
+
+
+
+def clientFromString(reactor, description):
+ """
+ Construct a client endpoint from a description string.
+
+ Client description strings are much like server description strings,
+ although they take all of their arguments as keywords, aside from host and
+ port.
+
+ You can create a TCP client endpoint with the 'host' and 'port' arguments,
+ like so::
+
+ clientFromString(reactor, "tcp:host=www.example.com:port=80")
+
+ or, without specifying host and port keywords::
+
+ clientFromString(reactor, "tcp:www.example.com:80")
+
+ Or you can specify only one or the other, as in the following 2 examples::
+
+ clientFromString(reactor, "tcp:host=www.example.com:80")
+ clientFromString(reactor, "tcp:www.example.com:port=80")
+
+ or an SSL client endpoint with those arguments, plus the arguments used by
+ the server SSL, for a client certificate::
+
+ clientFromString(reactor, "ssl:web.example.com:443:"
+ "privateKey=foo.pem:certKey=foo.pem")
+
+ to specify your certificate trust roots, you can identify a directory with
+ PEM files in it with the C{caCertsDir} argument::
+
+ clientFromString(reactor, "ssl:host=web.example.com:port=443:"
+ "caCertsDir=/etc/ssl/certs")
+
+ This function is also extensible; new endpoint types may be registered as
+ L{IStreamClientEndpointStringParser} plugins. See that interface for more
+ information.
+
+ @param reactor: The client endpoint will be constructed with this reactor.
+
+ @param description: The strports description to parse.
+
+ @return: A new endpoint which can be used to connect with the parameters
+ given by by C{description}.
+ @rtype: L{IStreamClientEndpoint<twisted.internet.interfaces.IStreamClientEndpoint>}
+
+ @since: 10.2
+ """
+ args, kwargs = _parse(description)
+ aname = args.pop(0)
+ name = aname.upper()
+ for plugin in getPlugins(IStreamClientEndpointStringParser):
+ if plugin.prefix.upper() == name:
+ return plugin.parseStreamClient(*args, **kwargs)
+ if name not in _clientParsers:
+ raise ValueError("Unknown endpoint type: %r" % (aname,))
+ kwargs = _clientParsers[name](*args, **kwargs)
+ return _endpointClientFactories[name](reactor, **kwargs)
Deleted: CalendarServer/trunk/twext/backport/internet/tcp.py
===================================================================
--- CalendarServer/branches/users/glyph/ipv6-client/twext/backport/internet/tcp.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twext/backport/internet/tcp.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -1,1122 +0,0 @@
-# -*- test-case-name: twisted.test.test_tcp -*-
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Various asynchronous TCP/IP classes.
-
-End users shouldn't use this module directly - use the reactor APIs instead.
-"""
-
-
-# System Imports
-import types
-import socket
-import sys
-import operator
-import struct
-
-from zope.interface import implements
-
-from twisted.python.runtime import platformType
-from twisted.python import versions, deprecate
-
-try:
- # Try to get the memory BIO based startTLS implementation, available since
- # pyOpenSSL 0.10
- from twisted.internet._newtls import (
- ConnectionMixin as _TLSConnectionMixin,
- ClientMixin as _TLSClientMixin,
- ServerMixin as _TLSServerMixin)
-except ImportError:
- try:
- # Try to get the socket BIO based startTLS implementation, available in
- # all pyOpenSSL versions
- from twisted.internet._oldtls import (
- ConnectionMixin as _TLSConnectionMixin,
- ClientMixin as _TLSClientMixin,
- ServerMixin as _TLSServerMixin)
- except ImportError:
- # There is no version of startTLS available
- class _TLSConnectionMixin(object):
- TLS = False
- class _TLSClientMixin(object):
- pass
- class _TLSServerMixin(object):
- pass
-
-if platformType == 'win32':
- # no such thing as WSAEPERM or error code 10001 according to winsock.h or MSDN
- EPERM = object()
- from errno import WSAEINVAL as EINVAL
- from errno import WSAEWOULDBLOCK as EWOULDBLOCK
- from errno import WSAEINPROGRESS as EINPROGRESS
- from errno import WSAEALREADY as EALREADY
- from errno import WSAECONNRESET as ECONNRESET
- from errno import WSAEISCONN as EISCONN
- from errno import WSAENOTCONN as ENOTCONN
- from errno import WSAEINTR as EINTR
- from errno import WSAENOBUFS as ENOBUFS
- from errno import WSAEMFILE as EMFILE
- # No such thing as WSAENFILE, either.
- ENFILE = object()
- # Nor ENOMEM
- ENOMEM = object()
- EAGAIN = EWOULDBLOCK
- from errno import WSAECONNRESET as ECONNABORTED
-
- from twisted.python.win32 import formatError as strerror
-else:
- from errno import EPERM
- from errno import EINVAL
- from errno import EWOULDBLOCK
- from errno import EINPROGRESS
- from errno import EALREADY
- from errno import ECONNRESET
- from errno import EISCONN
- from errno import ENOTCONN
- from errno import EINTR
- from errno import ENOBUFS
- from errno import EMFILE
- from errno import ENFILE
- from errno import ENOMEM
- from errno import EAGAIN
- from errno import ECONNABORTED
-
- from os import strerror
-
-
-from errno import errorcode
-
-# Twisted Imports
-from twisted.internet import base, address, fdesc
-from twisted.internet.task import deferLater
-from twisted.python import log, failure, reflect
-from twisted.python.util import unsignedID
-from twisted.internet.error import CannotListenError
-from twisted.internet import abstract, main, interfaces, error
-
-# Not all platforms have, or support, this flag.
-_AI_NUMERICSERV = getattr(socket, "AI_NUMERICSERV", 0)
-
-
-
-class _SocketCloser(object):
- _socketShutdownMethod = 'shutdown'
-
- def _closeSocket(self, orderly):
- # The call to shutdown() before close() isn't really necessary, because
- # we set FD_CLOEXEC now, which will ensure this is the only process
- # holding the FD, thus ensuring close() really will shutdown the TCP
- # socket. However, do it anyways, just to be safe.
- skt = self.socket
- try:
- if orderly:
- getattr(skt, self._socketShutdownMethod)(2)
- else:
- # Set SO_LINGER to 1,0 which, by convention, causes a
- # connection reset to be sent when close is called,
- # instead of the standard FIN shutdown sequence.
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
- struct.pack("ii", 1, 0))
-
- except socket.error:
- pass
- try:
- skt.close()
- except socket.error:
- pass
-
-
-
-class _AbortingMixin(object):
- """
- Common implementation of C{abortConnection}.
-
- @ivar _aborting: Set to C{True} when C{abortConnection} is called.
- @type _aborting: C{bool}
- """
- _aborting = False
-
- def abortConnection(self):
- """
- Aborts the connection immediately, dropping any buffered data.
-
- @since: 11.1
- """
- if self.disconnected or self._aborting:
- return
- self._aborting = True
- self.stopReading()
- self.stopWriting()
- self.doRead = lambda *args, **kwargs: None
- self.doWrite = lambda *args, **kwargs: None
- self.reactor.callLater(0, self.connectionLost,
- failure.Failure(error.ConnectionAborted()))
-
-
-
-class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
- _AbortingMixin):
- """
- Superclass of all socket-based FileDescriptors.
-
- This is an abstract superclass of all objects which represent a TCP/IP
- connection based socket.
-
- @ivar logstr: prefix used when logging events related to this connection.
- @type logstr: C{str}
- """
- implements(interfaces.ITCPTransport, interfaces.ISystemHandle)
-
-
- def __init__(self, skt, protocol, reactor=None):
- abstract.FileDescriptor.__init__(self, reactor=reactor)
- self.socket = skt
- self.socket.setblocking(0)
- self.fileno = skt.fileno
- self.protocol = protocol
-
-
- def getHandle(self):
- """Return the socket for this connection."""
- return self.socket
-
-
- def doRead(self):
- """Calls self.protocol.dataReceived with all available data.
-
- This reads up to self.bufferSize bytes of data from its socket, then
- calls self.dataReceived(data) to process it. If the connection is not
- lost through an error in the physical recv(), this function will return
- the result of the dataReceived call.
- """
- try:
- data = self.socket.recv(self.bufferSize)
- except socket.error, se:
- if se.args[0] == EWOULDBLOCK:
- return
- else:
- return main.CONNECTION_LOST
- if not data:
- return main.CONNECTION_DONE
- rval = self.protocol.dataReceived(data)
- if rval is not None:
- offender = self.protocol.dataReceived
- warningFormat = (
- 'Returning a value other than None from %(fqpn)s is '
- 'deprecated since %(version)s.')
- warningString = deprecate.getDeprecationWarningString(
- offender, versions.Version('Twisted', 11, 0, 0),
- format=warningFormat)
- deprecate.warnAboutFunction(offender, warningString)
- return rval
-
-
- def writeSomeData(self, data):
- """
- Write as much as possible of the given data to this TCP connection.
-
- This sends up to C{self.SEND_LIMIT} bytes from C{data}. If the
- connection is lost, an exception is returned. Otherwise, the number
- of bytes successfully written is returned.
- """
- try:
- # Limit length of buffer to try to send, because some OSes are too
- # stupid to do so themselves (ahem windows)
- return self.socket.send(buffer(data, 0, self.SEND_LIMIT))
- except socket.error, se:
- if se.args[0] == EINTR:
- return self.writeSomeData(data)
- elif se.args[0] in (EWOULDBLOCK, ENOBUFS):
- return 0
- else:
- return main.CONNECTION_LOST
-
-
- def _closeWriteConnection(self):
- try:
- getattr(self.socket, self._socketShutdownMethod)(1)
- except socket.error:
- pass
- p = interfaces.IHalfCloseableProtocol(self.protocol, None)
- if p:
- try:
- p.writeConnectionLost()
- except:
- f = failure.Failure()
- log.err()
- self.connectionLost(f)
-
-
- def readConnectionLost(self, reason):
- p = interfaces.IHalfCloseableProtocol(self.protocol, None)
- if p:
- try:
- p.readConnectionLost()
- except:
- log.err()
- self.connectionLost(failure.Failure())
- else:
- self.connectionLost(reason)
-
-
-
- def connectionLost(self, reason):
- """See abstract.FileDescriptor.connectionLost().
- """
- # Make sure we're not called twice, which can happen e.g. if
- # abortConnection() is called from protocol's dataReceived and then
- # code immediately after throws an exception that reaches the
- # reactor. We can't rely on "disconnected" attribute for this check
- # since twisted.internet._oldtls does evil things to it:
- if not hasattr(self, "socket"):
- return
- abstract.FileDescriptor.connectionLost(self, reason)
- self._closeSocket(not reason.check(error.ConnectionAborted))
- protocol = self.protocol
- del self.protocol
- del self.socket
- del self.fileno
- protocol.connectionLost(reason)
-
-
- logstr = "Uninitialized"
-
- def logPrefix(self):
- """Return the prefix to log with when I own the logging thread.
- """
- return self.logstr
-
- def getTcpNoDelay(self):
- return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
-
- def setTcpNoDelay(self, enabled):
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
-
- def getTcpKeepAlive(self):
- return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
- socket.SO_KEEPALIVE))
-
- def setTcpKeepAlive(self, enabled):
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
-
-
-
-
-class _BaseBaseClient(object):
- """
- Code shared with other (non-POSIX) reactors for management of general
- outgoing connections.
-
- Requirements upon subclasses are documented as instance variables rather
- than abstract methods, in order to avoid MRO confusion, since this base is
- mixed in to unfortunately weird and distinctive multiple-inheritance
- hierarchies and many of these attributes are provided by peer classes
- rather than descendant classes in those hierarchies.
-
- @ivar addressFamily: The address family constant (C{socket.AF_INET},
- C{socket.AF_INET6}, C{socket.AF_UNIX}) of the underlying socket of this
- client connection.
- @type addressFamily: C{int}
-
- @ivar socketType: The socket type constant (C{socket.SOCK_STREAM} or
- C{socket.SOCK_DGRAM}) of the underlying socket.
- @type socketType: C{int}
-
- @ivar _requiresResolution: A flag indicating whether the address of this
- client will require name resolution. C{True} if the hostname of said
- address indicates a name that must be resolved by hostname lookup,
- C{False} if it indicates an IP address literal.
- @type _requiresResolution: C{bool}
-
- @cvar _commonConnection: Subclasses must provide this attribute, which
- indicates the L{Connection}-alike class to invoke C{__init__} and
- C{connectionLost} on.
- @type _commonConnection: C{type}
-
- @ivar _stopReadingAndWriting: Subclasses must implement in order to remove
- this transport from its reactor's notifications in response to a
- terminated connection attempt.
- @type _stopReadingAndWriting: 0-argument callable returning C{None}
-
- @ivar _closeSocket: Subclasses must implement in order to close the socket
- in response to a terminated connection attempt.
- @type _closeSocket: 1-argument callable; see L{_SocketCloser._closeSocket}
-
- @ivar _collectSocketDetails: Clean up references to the attached socket in
- its underlying OS resource (such as a file descriptor or file handle),
- as part of post connection-failure cleanup.
- @type _collectSocketDetails: 0-argument callable returning C{None}.
-
- @ivar reactor: The class pointed to by C{_commonConnection} should set this
- attribute in its constructor.
- @type reactor: L{twisted.internet.interfaces.IReactorTime},
- L{twisted.internet.interfaces.IReactorCore},
- L{twisted.internet.interfaces.IReactorFDSet}
- """
-
- addressFamily = socket.AF_INET
- socketType = socket.SOCK_STREAM
-
- def _finishInit(self, whenDone, skt, error, reactor):
- """
- Called by subclasses to continue to the stage of initialization where
- the socket connect attempt is made.
-
- @param whenDone: A 0-argument callable to invoke once the connection is
- set up. This is C{None} if the connection could not be prepared
- due to a previous error.
-
- @param skt: The socket object to use to perform the connection.
- @type skt: C{socket._socketobject}
-
- @param error: The error to fail the connection with.
-
- @param reactor: The reactor to use for this client.
- @type reactor: L{twisted.internet.interfaces.IReactorTime}
- """
- if whenDone:
- self._commonConnection.__init__(self, skt, None, reactor)
- reactor.callLater(0, whenDone)
- else:
- reactor.callLater(0, self.failIfNotConnected, error)
-
-
- def resolveAddress(self):
- """
- Resolve the name that was passed to this L{_BaseBaseClient}, if
- necessary, and then move on to attempting the connection once an
- address has been determined. (The connection will be attempted
- immediately within this function if either name resolution can be
- synchronous or the address was an IP address literal.)
-
- @note: You don't want to call this method from outside, as it won't do
- anything useful; it's just part of the connection bootstrapping
- process. Also, although this method is on L{_BaseBaseClient} for
- historical reasons, it's not used anywhere except for L{Client}
- itself.
-
- @return: C{None}
- """
- if self._requiresResolution:
- d = self.reactor.resolve(self.addr[0])
- d.addCallback(lambda n: (n,) + self.addr[1:])
- d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
- else:
- self._setRealAddress(self.addr)
-
-
- def _setRealAddress(self, address):
- """
- Set the resolved address of this L{_BaseBaseClient} and initiate the
- connection attempt.
-
- @param address: Depending on whether this is an IPv4 or IPv6 connection
- attempt, a 2-tuple of C{(host, port)} or a 4-tuple of C{(host,
- port, flow, scope)}. At this point it is a fully resolved address,
- and the 'host' portion will always be an IP address, not a DNS
- name.
- """
- self.realAddress = address
- self.doConnect()
-
-
- def failIfNotConnected(self, err):
- """
- Generic method called when the attemps to connect failed. It basically
- cleans everything it can: call connectionFailed, stop read and write,
- delete socket related members.
- """
- if (self.connected or self.disconnected or
- not hasattr(self, "connector")):
- return
-
- self._stopReadingAndWriting()
- try:
- self._closeSocket(True)
- except AttributeError:
- pass
- else:
- self._collectSocketDetails()
- self.connector.connectionFailed(failure.Failure(err))
- del self.connector
-
-
- def stopConnecting(self):
- """
- If a connection attempt is still outstanding (i.e. no connection is
- yet established), immediately stop attempting to connect.
- """
- self.failIfNotConnected(error.UserError())
-
-
- def connectionLost(self, reason):
- """
- Invoked by lower-level logic when it's time to clean the socket up.
- Depending on the state of the connection, either inform the attached
- L{Connector} that the connection attempt has failed, or inform the
- connected L{IProtocol} that the established connection has been lost.
-
- @param reason: the reason that the connection was terminated
- @type reason: L{Failure}
- """
- if not self.connected:
- self.failIfNotConnected(error.ConnectError(string=reason))
- else:
- self._commonConnection.connectionLost(self, reason)
- self.connector.connectionLost(reason)
-
-
-
-class BaseClient(_BaseBaseClient, _TLSClientMixin, Connection):
- """
- A base class for client TCP (and similiar) sockets.
-
- @ivar realAddress: The address object that will be used for socket.connect;
- this address is an address tuple (the number of elements dependent upon
- the address family) which does not contain any names which need to be
- resolved.
- @type realAddress: C{tuple}
-
- @ivar _base: L{Connection}, which is the base class of this class which has
- all of the useful file descriptor methods. This is used by
- L{_TLSServerMixin} to call the right methods to directly manipulate the
- transport, as is necessary for writing TLS-encrypted bytes (whereas
- those methods on L{Server} will go through another layer of TLS if it
- has been enabled).
- """
-
- _base = Connection
- _commonConnection = Connection
-
- def _stopReadingAndWriting(self):
- """
- Implement the POSIX-ish (i.e.
- L{twisted.internet.interfaces.IReactorFDSet}) method of detaching this
- socket from the reactor for L{_BaseBaseClient}.
- """
- if hasattr(self, "reactor"):
- # this doesn't happen if we failed in __init__
- self.stopReading()
- self.stopWriting()
-
-
- def _collectSocketDetails(self):
- """
- Clean up references to the socket and its file descriptor.
-
- @see: L{_BaseBaseClient}
- """
- del self.socket, self.fileno
-
-
- def createInternetSocket(self):
- """(internal) Create a non-blocking socket using
- self.addressFamily, self.socketType.
- """
- s = socket.socket(self.addressFamily, self.socketType)
- s.setblocking(0)
- fdesc._setCloseOnExec(s.fileno())
- return s
-
-
- def doConnect(self):
- """
- Initiate the outgoing connection attempt.
-
- @note: Applications do not need to call this method; it will be invoked
- internally as part of L{IReactorTCP.connectTCP}.
- """
- self.doWrite = self.doConnect
- self.doRead = self.doConnect
- if not hasattr(self, "connector"):
- # this happens when connection failed but doConnect
- # was scheduled via a callLater in self._finishInit
- return
-
- err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
- if err:
- self.failIfNotConnected(error.getConnectError((err, strerror(err))))
- return
-
- # doConnect gets called twice. The first time we actually need to
- # start the connection attempt. The second time we don't really
- # want to (SO_ERROR above will have taken care of any errors, and if
- # it reported none, the mere fact that doConnect was called again is
- # sufficient to indicate that the connection has succeeded), but it
- # is not /particularly/ detrimental to do so. This should get
- # cleaned up some day, though.
- try:
- connectResult = self.socket.connect_ex(self.realAddress)
- except socket.error, se:
- connectResult = se.args[0]
- if connectResult:
- if connectResult == EISCONN:
- pass
- # on Windows EINVAL means sometimes that we should keep trying:
- # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
- elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
- (connectResult == EINVAL and platformType == "win32")):
- self.startReading()
- self.startWriting()
- return
- else:
- self.failIfNotConnected(error.getConnectError((connectResult, strerror(connectResult))))
- return
-
- # If I have reached this point without raising or returning, that means
- # that the socket is connected.
- del self.doWrite
- del self.doRead
- # we first stop and then start, to reset any references to the old doRead
- self.stopReading()
- self.stopWriting()
- self._connectDone()
-
-
- def _connectDone(self):
- """
- This is a hook for when a connection attempt has succeeded.
-
- Here, we build the protocol from the
- L{twisted.internet.protocol.ClientFactory} that was passed in, compute
- a log string, begin reading so as to send traffic to the newly built
- protocol, and finally hook up the protocol itself.
-
- This hook is overridden by L{ssl.Client} to initiate the TLS protocol.
- """
- self.protocol = self.connector.buildProtocol(self.getPeer())
- self.connected = 1
- logPrefix = self._getLogPrefix(self.protocol)
- self.logstr = "%s,client" % logPrefix
- self.startReading()
- self.protocol.makeConnection(self)
-
-
-
-_NUMERIC_ONLY = socket.AI_NUMERICHOST | _AI_NUMERICSERV
-
-def _resolveIPv6(ip, port):
- """
- Resolve an IPv6 literal into an IPv6 address.
-
- This is necessary to resolve any embedded scope identifiers to the relevant
- C{sin6_scope_id} for use with C{socket.connect()}, C{socket.listen()}, or
- C{socket.bind()}; see U{RFC 3493 <https://tools.ietf.org/html/rfc3493>} for
- more information.
-
- @param ip: An IPv6 address literal.
- @type ip: C{str}
-
- @param port: A port number.
- @type port: C{int}
-
- @return: a 4-tuple of C{(host, port, flow, scope)}, suitable for use as an
- IPv6 address.
-
- @raise socket.gaierror: if either the IP or port is not numeric as it
- should be.
- """
- return socket.getaddrinfo(ip, port, 0, 0, 0, _NUMERIC_ONLY)[0][4]
-
-
-
-class _BaseTCPClient(object):
- """
- Code shared with other (non-POSIX) reactors for management of outgoing TCP
- connections (both TCPv4 and TCPv6).
-
- @note: In order to be functional, this class must be mixed into the same
- hierarchy as L{_BaseBaseClient}. It would subclass L{_BaseBaseClient}
- directly, but the class hierarchy here is divided in strange ways out
- of the need to share code along multiple axes; specifically, with the
- IOCP reactor and also with UNIX clients in other reactors.
-
- @ivar _addressType: The Twisted _IPAddress implementation for this client
- @type _addressType: L{IPv4Address} or L{IPv6Address}
-
- @ivar connector: The L{Connector} which is driving this L{_BaseTCPClient}'s
- connection attempt.
-
- @ivar addr: The address that this socket will be connecting to.
- @type addr: If IPv4, a 2-C{tuple} of C{(str host, int port)}. If IPv6, a
- 4-C{tuple} of (C{str host, int port, int ignored, int scope}).
-
- @ivar createInternetSocket: Subclasses must implement this as a method to
- create a python socket object of the appropriate address family and
- socket type.
- @type createInternetSocket: 0-argument callable returning
- C{socket._socketobject}.
- """
-
- _addressType = address.IPv4Address
-
- def __init__(self, host, port, bindAddress, connector, reactor=None):
- # BaseClient.__init__ is invoked later
- self.connector = connector
- self.addr = (host, port)
-
- whenDone = self.resolveAddress
- err = None
- skt = None
-
- if abstract.isIPAddress(host):
- self._requiresResolution = False
- elif abstract.isIPv6Address(host):
- self._requiresResolution = False
- self.addr = _resolveIPv6(host, port)
- self.addressFamily = socket.AF_INET6
- self._addressType = address.IPv6Address
- else:
- self._requiresResolution = True
- try:
- skt = self.createInternetSocket()
- except socket.error, se:
- err = error.ConnectBindError(se.args[0], se.args[1])
- whenDone = None
- if whenDone and bindAddress is not None:
- try:
- if abstract.isIPv6Address(bindAddress[0]):
- bindinfo = _resolveIPv6(*bindAddress)
- else:
- bindinfo = bindAddress
- skt.bind(bindinfo)
- except socket.error, se:
- err = error.ConnectBindError(se.args[0], se.args[1])
- whenDone = None
- self._finishInit(whenDone, skt, err, reactor)
-
-
- def getHost(self):
- """
- Returns an L{IPv4Address} or L{IPv6Address}.
-
- This indicates the address from which I am connecting.
- """
- return self._addressType('TCP', *self.socket.getsockname()[:2])
-
-
- def getPeer(self):
- """
- Returns an L{IPv4Address} or L{IPv6Address}.
-
- This indicates the address that I am connected to.
- """
- # an ipv6 realAddress has more than two elements, but the IPv6Address
- # constructor still only takes two.
- return self._addressType('TCP', *self.realAddress[:2])
-
-
- def __repr__(self):
- s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self))
- return s
-
-
-
-class Client(_BaseTCPClient, BaseClient):
- """
- A transport for a TCP protocol; either TCPv4 or TCPv6.
-
- Do not create these directly; use L{IReactorTCP.connectTCP}.
- """
-
-
-
-class Server(_TLSServerMixin, Connection):
- """
- Serverside socket-stream connection class.
-
- This is a serverside network connection transport; a socket which came from
- an accept() on a server.
-
- @ivar _base: L{Connection}, which is the base class of this class which has
- all of the useful file descriptor methods. This is used by
- L{_TLSServerMixin} to call the right methods to directly manipulate the
- transport, as is necessary for writing TLS-encrypted bytes (whereas
- those methods on L{Server} will go through another layer of TLS if it
- has been enabled).
- """
- _base = Connection
-
- _addressType = address.IPv4Address
-
- def __init__(self, sock, protocol, client, server, sessionno, reactor):
- """
- Server(sock, protocol, client, server, sessionno)
-
- Initialize it with a socket, a protocol, a descriptor for my peer (a
- tuple of host, port describing the other end of the connection), an
- instance of Port, and a session number.
- """
- Connection.__init__(self, sock, protocol, reactor)
- if len(client) != 2:
- self._addressType = address.IPv6Address
- self.server = server
- self.client = client
- self.sessionno = sessionno
- self.hostname = client[0]
-
- logPrefix = self._getLogPrefix(self.protocol)
- self.logstr = "%s,%s,%s" % (logPrefix,
- sessionno,
- self.hostname)
- self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
- self.sessionno,
- self.server._realPortNumber)
- self.startReading()
- self.connected = 1
-
- def __repr__(self):
- """A string representation of this connection.
- """
- return self.repstr
-
-
- def getHost(self):
- """
- Returns an L{IPv4Address} or L{IPv6Address}.
-
- This indicates the server's address.
- """
- host, port = self.socket.getsockname()[:2]
- return self._addressType('TCP', host, port)
-
-
- def getPeer(self):
- """
- Returns an L{IPv4Address} or L{IPv6Address}.
-
- This indicates the client's address.
- """
- return self._addressType('TCP', *self.client[:2])
-
-
-
-class Port(base.BasePort, _SocketCloser):
- """
- A TCP server port, listening for connections.
-
- When a connection is accepted, this will call a factory's buildProtocol
- with the incoming address as an argument, according to the specification
- described in L{twisted.internet.interfaces.IProtocolFactory}.
-
- If you wish to change the sort of transport that will be used, the
- C{transport} attribute will be called with the signature expected for
- C{Server.__init__}, so it can be replaced.
-
- @ivar deferred: a deferred created when L{stopListening} is called, and
- that will fire when connection is lost. This is not to be used it
- directly: prefer the deferred returned by L{stopListening} instead.
- @type deferred: L{defer.Deferred}
-
- @ivar disconnecting: flag indicating that the L{stopListening} method has
- been called and that no connections should be accepted anymore.
- @type disconnecting: C{bool}
-
- @ivar connected: flag set once the listen has successfully been called on
- the socket.
- @type connected: C{bool}
-
- @ivar _type: A string describing the connections which will be created by
- this port. Normally this is C{"TCP"}, since this is a TCP port, but
- when the TLS implementation re-uses this class it overrides the value
- with C{"TLS"}. Only used for logging.
-
- @ivar _preexistingSocket: If not C{None}, a L{socket.socket} instance which
- was created and initialized outside of the reactor and will be used to
- listen for connections (instead of a new socket being created by this
- L{Port}).
- """
-
- implements(interfaces.IListeningPort)
-
- socketType = socket.SOCK_STREAM
-
- transport = Server
- sessionno = 0
- interface = ''
- backlog = 50
-
- _type = 'TCP'
-
- # Actual port number being listened on, only set to a non-None
- # value when we are actually listening.
- _realPortNumber = None
-
- # An externally initialized socket that we will use, rather than creating
- # our own.
- _preexistingSocket = None
-
- addressFamily = socket.AF_INET
- _addressType = address.IPv4Address
-
- def __init__(self, port, factory, backlog=50, interface='', reactor=None):
- """Initialize with a numeric port to listen on.
- """
- base.BasePort.__init__(self, reactor=reactor)
- self.port = port
- self.factory = factory
- self.backlog = backlog
- if abstract.isIPv6Address(interface):
- self.addressFamily = socket.AF_INET6
- self._addressType = address.IPv6Address
- self.interface = interface
-
-
- @classmethod
- def _fromListeningDescriptor(cls, reactor, fd, addressFamily, factory):
- """
- Create a new L{Port} based on an existing listening I{SOCK_STREAM}
- I{AF_INET} socket.
-
- Arguments are the same as to L{Port.__init__}, except where noted.
-
- @param fd: An integer file descriptor associated with a listening
- socket. The socket must be in non-blocking mode. Any additional
- attributes desired, such as I{FD_CLOEXEC}, must also be set already.
-
- @param addressFamily: The address family (sometimes called I{domain}) of
- the existing socket. For example, L{socket.AF_INET}.
-
- @return: A new instance of C{cls} wrapping the socket given by C{fd}.
- """
- port = socket.fromfd(fd, addressFamily, cls.socketType)
- interface = port.getsockname()[0]
- self = cls(None, factory, None, interface, reactor)
- self._preexistingSocket = port
- return self
-
-
- def __repr__(self):
- if self._realPortNumber is not None:
- return "<%s of %s on %s>" % (self.__class__,
- self.factory.__class__, self._realPortNumber)
- else:
- return "<%s of %s (not listening)>" % (self.__class__, self.factory.__class__)
-
- def createInternetSocket(self):
- s = base.BasePort.createInternetSocket(self)
- if platformType == "posix" and sys.platform != "cygwin":
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- return s
-
-
- def startListening(self):
- """Create and bind my socket, and begin listening on it.
-
- This is called on unserialization, and must be called after creating a
- server to begin listening on the specified port.
- """
- if self._preexistingSocket is None:
- # Create a new socket and make it listen
- try:
- skt = self.createInternetSocket()
- if self.addressFamily == socket.AF_INET6:
- addr = _resolveIPv6(self.interface, self.port)
- else:
- addr = (self.interface, self.port)
- skt.bind(addr)
- except socket.error, le:
- raise CannotListenError, (self.interface, self.port, le)
- skt.listen(self.backlog)
- else:
- # Re-use the externally specified socket
- skt = self._preexistingSocket
- self._preexistingSocket = None
-
- # Make sure that if we listened on port 0, we update that to
- # reflect what the OS actually assigned us.
- self._realPortNumber = skt.getsockname()[1]
-
- log.msg("%s starting on %s" % (
- self._getLogPrefix(self.factory), self._realPortNumber))
-
- # The order of the next 5 lines is kind of bizarre. If no one
- # can explain it, perhaps we should re-arrange them.
- self.factory.doStart()
- self.connected = True
- self.socket = skt
- self.fileno = self.socket.fileno
- self.numberAccepts = 100
-
- self.startReading()
-
-
- def _buildAddr(self, address):
- host, port = address[:2]
- return self._addressType('TCP', host, port)
-
-
- def doRead(self):
- """Called when my socket is ready for reading.
-
- This accepts a connection and calls self.protocol() to handle the
- wire-level protocol.
- """
- try:
- if platformType == "posix":
- numAccepts = self.numberAccepts
- else:
- # win32 event loop breaks if we do more than one accept()
- # in an iteration of the event loop.
- numAccepts = 1
- for i in range(numAccepts):
- # we need this so we can deal with a factory's buildProtocol
- # calling our loseConnection
- if self.disconnecting:
- return
- try:
- skt, addr = self.socket.accept()
- except socket.error, e:
- if e.args[0] in (EWOULDBLOCK, EAGAIN):
- self.numberAccepts = i
- break
- elif e.args[0] == EPERM:
- # Netfilter on Linux may have rejected the
- # connection, but we get told to try to accept()
- # anyway.
- continue
- elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
-
- # Linux gives EMFILE when a process is not allowed
- # to allocate any more file descriptors. *BSD and
- # Win32 give (WSA)ENOBUFS. Linux can also give
- # ENFILE if the system is out of inodes, or ENOMEM
- # if there is insufficient memory to allocate a new
- # dentry. ECONNABORTED is documented as possible on
- # both Linux and Windows, but it is not clear
- # whether there are actually any circumstances under
- # which it can happen (one might expect it to be
- # possible if a client sends a FIN or RST after the
- # server sends a SYN|ACK but before application code
- # calls accept(2), however at least on Linux this
- # _seems_ to be short-circuited by syncookies.
-
- log.msg("Could not accept new connection (%s)" % (
- errorcode[e.args[0]],))
- break
- raise
-
- fdesc._setCloseOnExec(skt.fileno())
- protocol = self.factory.buildProtocol(self._buildAddr(addr))
- if protocol is None:
- skt.close()
- continue
- s = self.sessionno
- self.sessionno = s+1
- transport = self.transport(skt, protocol, addr, self, s, self.reactor)
- protocol.makeConnection(transport)
- else:
- self.numberAccepts = self.numberAccepts+20
- except:
- # Note that in TLS mode, this will possibly catch SSL.Errors
- # raised by self.socket.accept()
- #
- # There is no "except SSL.Error:" above because SSL may be
- # None if there is no SSL support. In any case, all the
- # "except SSL.Error:" suite would probably do is log.deferr()
- # and return, so handling it here works just as well.
- log.deferr()
-
- def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
- """
- Stop accepting connections on this port.
-
- This will shut down the socket and call self.connectionLost(). It
- returns a deferred which will fire successfully when the port is
- actually closed, or with a failure if an error occurs shutting down.
- """
- self.disconnecting = True
- self.stopReading()
- if self.connected:
- self.deferred = deferLater(
- self.reactor, 0, self.connectionLost, connDone)
- return self.deferred
-
- stopListening = loseConnection
-
- def _logConnectionLostMsg(self):
- """
- Log message for closing port
- """
- log.msg('(%s Port %s Closed)' % (self._type, self._realPortNumber))
-
-
- def connectionLost(self, reason):
- """
- Cleans up the socket.
- """
- self._logConnectionLostMsg()
- self._realPortNumber = None
-
- base.BasePort.connectionLost(self, reason)
- self.connected = False
- self._closeSocket(True)
- del self.socket
- del self.fileno
-
- try:
- self.factory.doStop()
- finally:
- self.disconnecting = False
-
-
- def logPrefix(self):
- """Returns the name of my class, to prefix log entries with.
- """
- return reflect.qual(self.factory.__class__)
-
-
- def getHost(self):
- """
- Return an L{IPv4Address} or L{IPv6Address} indicating the listening
- address of this port.
- """
- host, port = self.socket.getsockname()[:2]
- return self._addressType('TCP', host, port)
-
-
-
-class Connector(base.BaseConnector):
- """
- A L{Connector} provides of L{twisted.internet.interfaces.IConnector} for
- all POSIX-style reactors.
-
- @ivar _addressType: the type returned by L{Connector.getDestination}.
- Either L{IPv4Address} or L{IPv6Address}, depending on the type of
- address.
- @type _addressType: C{type}
- """
- _addressType = address.IPv4Address
-
- def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
- if isinstance(port, types.StringTypes):
- try:
- port = socket.getservbyname(port, 'tcp')
- except socket.error, e:
- raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
- self.host, self.port = host, port
- if abstract.isIPv6Address(host):
- self._addressType = address.IPv6Address
- self.bindAddress = bindAddress
- base.BaseConnector.__init__(self, factory, timeout, reactor)
-
-
- def _makeTransport(self):
- """
- Create a L{Client} bound to this L{Connector}.
-
- @return: a new L{Client}
- @rtype: L{Client}
- """
- return Client(self.host, self.port, self.bindAddress, self, self.reactor)
-
-
- def getDestination(self):
- """
- @see: L{twisted.internet.interfaces.IConnector.getDestination}.
- """
- return self._addressType('TCP', self.host, self.port)
-
-
Copied: CalendarServer/trunk/twext/backport/internet/tcp.py (from rev 9105, CalendarServer/branches/users/glyph/ipv6-client/twext/backport/internet/tcp.py)
===================================================================
--- CalendarServer/trunk/twext/backport/internet/tcp.py (rev 0)
+++ CalendarServer/trunk/twext/backport/internet/tcp.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -0,0 +1,1122 @@
+# -*- test-case-name: twisted.test.test_tcp -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Various asynchronous TCP/IP classes.
+
+End users shouldn't use this module directly - use the reactor APIs instead.
+"""
+
+
+# System Imports
+import types
+import socket
+import sys
+import operator
+import struct
+
+from zope.interface import implements
+
+from twisted.python.runtime import platformType
+from twisted.python import versions, deprecate
+
+try:
+ # Try to get the memory BIO based startTLS implementation, available since
+ # pyOpenSSL 0.10
+ from twisted.internet._newtls import (
+ ConnectionMixin as _TLSConnectionMixin,
+ ClientMixin as _TLSClientMixin,
+ ServerMixin as _TLSServerMixin)
+except ImportError:
+ try:
+ # Try to get the socket BIO based startTLS implementation, available in
+ # all pyOpenSSL versions
+ from twisted.internet._oldtls import (
+ ConnectionMixin as _TLSConnectionMixin,
+ ClientMixin as _TLSClientMixin,
+ ServerMixin as _TLSServerMixin)
+ except ImportError:
+ # There is no version of startTLS available
+ class _TLSConnectionMixin(object):
+ TLS = False
+ class _TLSClientMixin(object):
+ pass
+ class _TLSServerMixin(object):
+ pass
+
+if platformType == 'win32':
+ # no such thing as WSAEPERM or error code 10001 according to winsock.h or MSDN
+ EPERM = object()
+ from errno import WSAEINVAL as EINVAL
+ from errno import WSAEWOULDBLOCK as EWOULDBLOCK
+ from errno import WSAEINPROGRESS as EINPROGRESS
+ from errno import WSAEALREADY as EALREADY
+ from errno import WSAECONNRESET as ECONNRESET
+ from errno import WSAEISCONN as EISCONN
+ from errno import WSAENOTCONN as ENOTCONN
+ from errno import WSAEINTR as EINTR
+ from errno import WSAENOBUFS as ENOBUFS
+ from errno import WSAEMFILE as EMFILE
+ # No such thing as WSAENFILE, either.
+ ENFILE = object()
+ # Nor ENOMEM
+ ENOMEM = object()
+ EAGAIN = EWOULDBLOCK
+ from errno import WSAECONNRESET as ECONNABORTED
+
+ from twisted.python.win32 import formatError as strerror
+else:
+ from errno import EPERM
+ from errno import EINVAL
+ from errno import EWOULDBLOCK
+ from errno import EINPROGRESS
+ from errno import EALREADY
+ from errno import ECONNRESET
+ from errno import EISCONN
+ from errno import ENOTCONN
+ from errno import EINTR
+ from errno import ENOBUFS
+ from errno import EMFILE
+ from errno import ENFILE
+ from errno import ENOMEM
+ from errno import EAGAIN
+ from errno import ECONNABORTED
+
+ from os import strerror
+
+
+from errno import errorcode
+
+# Twisted Imports
+from twisted.internet import base, address, fdesc
+from twisted.internet.task import deferLater
+from twisted.python import log, failure, reflect
+from twisted.python.util import unsignedID
+from twisted.internet.error import CannotListenError
+from twisted.internet import abstract, main, interfaces, error
+
+# Not all platforms have, or support, this flag.
+_AI_NUMERICSERV = getattr(socket, "AI_NUMERICSERV", 0)
+
+
+
+class _SocketCloser(object):
+ _socketShutdownMethod = 'shutdown'
+
+ def _closeSocket(self, orderly):
+ # The call to shutdown() before close() isn't really necessary, because
+ # we set FD_CLOEXEC now, which will ensure this is the only process
+ # holding the FD, thus ensuring close() really will shutdown the TCP
+ # socket. However, do it anyways, just to be safe.
+ skt = self.socket
+ try:
+ if orderly:
+ getattr(skt, self._socketShutdownMethod)(2)
+ else:
+ # Set SO_LINGER to 1,0 which, by convention, causes a
+ # connection reset to be sent when close is called,
+ # instead of the standard FIN shutdown sequence.
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+ struct.pack("ii", 1, 0))
+
+ except socket.error:
+ pass
+ try:
+ skt.close()
+ except socket.error:
+ pass
+
+
+
+class _AbortingMixin(object):
+ """
+ Common implementation of C{abortConnection}.
+
+ @ivar _aborting: Set to C{True} when C{abortConnection} is called.
+ @type _aborting: C{bool}
+ """
+ _aborting = False
+
+ def abortConnection(self):
+ """
+ Aborts the connection immediately, dropping any buffered data.
+
+ @since: 11.1
+ """
+ if self.disconnected or self._aborting:
+ return
+ self._aborting = True
+ self.stopReading()
+ self.stopWriting()
+ self.doRead = lambda *args, **kwargs: None
+ self.doWrite = lambda *args, **kwargs: None
+ self.reactor.callLater(0, self.connectionLost,
+ failure.Failure(error.ConnectionAborted()))
+
+
+
+class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
+ _AbortingMixin):
+ """
+ Superclass of all socket-based FileDescriptors.
+
+ This is an abstract superclass of all objects which represent a TCP/IP
+ connection based socket.
+
+ @ivar logstr: prefix used when logging events related to this connection.
+ @type logstr: C{str}
+ """
+ implements(interfaces.ITCPTransport, interfaces.ISystemHandle)
+
+
+ def __init__(self, skt, protocol, reactor=None):
+ abstract.FileDescriptor.__init__(self, reactor=reactor)
+ self.socket = skt
+ self.socket.setblocking(0)
+ self.fileno = skt.fileno
+ self.protocol = protocol
+
+
+ def getHandle(self):
+ """Return the socket for this connection."""
+ return self.socket
+
+
+ def doRead(self):
+ """Calls self.protocol.dataReceived with all available data.
+
+ This reads up to self.bufferSize bytes of data from its socket, then
+ calls self.dataReceived(data) to process it. If the connection is not
+ lost through an error in the physical recv(), this function will return
+ the result of the dataReceived call.
+ """
+ try:
+ data = self.socket.recv(self.bufferSize)
+ except socket.error, se:
+ if se.args[0] == EWOULDBLOCK:
+ return
+ else:
+ return main.CONNECTION_LOST
+ if not data:
+ return main.CONNECTION_DONE
+ rval = self.protocol.dataReceived(data)
+ if rval is not None:
+ offender = self.protocol.dataReceived
+ warningFormat = (
+ 'Returning a value other than None from %(fqpn)s is '
+ 'deprecated since %(version)s.')
+ warningString = deprecate.getDeprecationWarningString(
+ offender, versions.Version('Twisted', 11, 0, 0),
+ format=warningFormat)
+ deprecate.warnAboutFunction(offender, warningString)
+ return rval
+
+
+ def writeSomeData(self, data):
+ """
+ Write as much as possible of the given data to this TCP connection.
+
+ This sends up to C{self.SEND_LIMIT} bytes from C{data}. If the
+ connection is lost, an exception is returned. Otherwise, the number
+ of bytes successfully written is returned.
+ """
+ try:
+ # Limit length of buffer to try to send, because some OSes are too
+ # stupid to do so themselves (ahem windows)
+ return self.socket.send(buffer(data, 0, self.SEND_LIMIT))
+ except socket.error, se:
+ if se.args[0] == EINTR:
+ return self.writeSomeData(data)
+ elif se.args[0] in (EWOULDBLOCK, ENOBUFS):
+ return 0
+ else:
+ return main.CONNECTION_LOST
+
+
+ def _closeWriteConnection(self):
+ try:
+ getattr(self.socket, self._socketShutdownMethod)(1)
+ except socket.error:
+ pass
+ p = interfaces.IHalfCloseableProtocol(self.protocol, None)
+ if p:
+ try:
+ p.writeConnectionLost()
+ except:
+ f = failure.Failure()
+ log.err()
+ self.connectionLost(f)
+
+
+ def readConnectionLost(self, reason):
+ p = interfaces.IHalfCloseableProtocol(self.protocol, None)
+ if p:
+ try:
+ p.readConnectionLost()
+ except:
+ log.err()
+ self.connectionLost(failure.Failure())
+ else:
+ self.connectionLost(reason)
+
+
+
+ def connectionLost(self, reason):
+ """See abstract.FileDescriptor.connectionLost().
+ """
+ # Make sure we're not called twice, which can happen e.g. if
+ # abortConnection() is called from protocol's dataReceived and then
+ # code immediately after throws an exception that reaches the
+ # reactor. We can't rely on "disconnected" attribute for this check
+ # since twisted.internet._oldtls does evil things to it:
+ if not hasattr(self, "socket"):
+ return
+ abstract.FileDescriptor.connectionLost(self, reason)
+ self._closeSocket(not reason.check(error.ConnectionAborted))
+ protocol = self.protocol
+ del self.protocol
+ del self.socket
+ del self.fileno
+ protocol.connectionLost(reason)
+
+
+ logstr = "Uninitialized"
+
+ def logPrefix(self):
+ """Return the prefix to log with when I own the logging thread.
+ """
+ return self.logstr
+
+ def getTcpNoDelay(self):
+ return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
+
+ def setTcpNoDelay(self, enabled):
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
+
+ def getTcpKeepAlive(self):
+ return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
+ socket.SO_KEEPALIVE))
+
+ def setTcpKeepAlive(self, enabled):
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
+
+
+
+
+class _BaseBaseClient(object):
+ """
+ Code shared with other (non-POSIX) reactors for management of general
+ outgoing connections.
+
+ Requirements upon subclasses are documented as instance variables rather
+ than abstract methods, in order to avoid MRO confusion, since this base is
+ mixed in to unfortunately weird and distinctive multiple-inheritance
+ hierarchies and many of these attributes are provided by peer classes
+ rather than descendant classes in those hierarchies.
+
+ @ivar addressFamily: The address family constant (C{socket.AF_INET},
+ C{socket.AF_INET6}, C{socket.AF_UNIX}) of the underlying socket of this
+ client connection.
+ @type addressFamily: C{int}
+
+ @ivar socketType: The socket type constant (C{socket.SOCK_STREAM} or
+ C{socket.SOCK_DGRAM}) of the underlying socket.
+ @type socketType: C{int}
+
+ @ivar _requiresResolution: A flag indicating whether the address of this
+ client will require name resolution. C{True} if the hostname of said
+ address indicates a name that must be resolved by hostname lookup,
+ C{False} if it indicates an IP address literal.
+ @type _requiresResolution: C{bool}
+
+ @cvar _commonConnection: Subclasses must provide this attribute, which
+ indicates the L{Connection}-alike class to invoke C{__init__} and
+ C{connectionLost} on.
+ @type _commonConnection: C{type}
+
+ @ivar _stopReadingAndWriting: Subclasses must implement in order to remove
+ this transport from its reactor's notifications in response to a
+ terminated connection attempt.
+ @type _stopReadingAndWriting: 0-argument callable returning C{None}
+
+ @ivar _closeSocket: Subclasses must implement in order to close the socket
+ in response to a terminated connection attempt.
+ @type _closeSocket: 1-argument callable; see L{_SocketCloser._closeSocket}
+
+ @ivar _collectSocketDetails: Clean up references to the attached socket in
+ its underlying OS resource (such as a file descriptor or file handle),
+ as part of post connection-failure cleanup.
+ @type _collectSocketDetails: 0-argument callable returning C{None}.
+
+ @ivar reactor: The class pointed to by C{_commonConnection} should set this
+ attribute in its constructor.
+ @type reactor: L{twisted.internet.interfaces.IReactorTime},
+ L{twisted.internet.interfaces.IReactorCore},
+ L{twisted.internet.interfaces.IReactorFDSet}
+ """
+
+ addressFamily = socket.AF_INET
+ socketType = socket.SOCK_STREAM
+
+ def _finishInit(self, whenDone, skt, error, reactor):
+ """
+ Called by subclasses to continue to the stage of initialization where
+ the socket connect attempt is made.
+
+ @param whenDone: A 0-argument callable to invoke once the connection is
+ set up. This is C{None} if the connection could not be prepared
+ due to a previous error.
+
+ @param skt: The socket object to use to perform the connection.
+ @type skt: C{socket._socketobject}
+
+ @param error: The error to fail the connection with.
+
+ @param reactor: The reactor to use for this client.
+ @type reactor: L{twisted.internet.interfaces.IReactorTime}
+ """
+ if whenDone:
+ self._commonConnection.__init__(self, skt, None, reactor)
+ reactor.callLater(0, whenDone)
+ else:
+ reactor.callLater(0, self.failIfNotConnected, error)
+
+
+ def resolveAddress(self):
+ """
+ Resolve the name that was passed to this L{_BaseBaseClient}, if
+ necessary, and then move on to attempting the connection once an
+ address has been determined. (The connection will be attempted
+ immediately within this function if either name resolution can be
+ synchronous or the address was an IP address literal.)
+
+ @note: You don't want to call this method from outside, as it won't do
+ anything useful; it's just part of the connection bootstrapping
+ process. Also, although this method is on L{_BaseBaseClient} for
+ historical reasons, it's not used anywhere except for L{Client}
+ itself.
+
+ @return: C{None}
+ """
+ if self._requiresResolution:
+ d = self.reactor.resolve(self.addr[0])
+ d.addCallback(lambda n: (n,) + self.addr[1:])
+ d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
+ else:
+ self._setRealAddress(self.addr)
+
+
+ def _setRealAddress(self, address):
+ """
+ Set the resolved address of this L{_BaseBaseClient} and initiate the
+ connection attempt.
+
+ @param address: Depending on whether this is an IPv4 or IPv6 connection
+ attempt, a 2-tuple of C{(host, port)} or a 4-tuple of C{(host,
+ port, flow, scope)}. At this point it is a fully resolved address,
+ and the 'host' portion will always be an IP address, not a DNS
+ name.
+ """
+ self.realAddress = address
+ self.doConnect()
+
+
+ def failIfNotConnected(self, err):
+ """
+ Generic method called when the attemps to connect failed. It basically
+ cleans everything it can: call connectionFailed, stop read and write,
+ delete socket related members.
+ """
+ if (self.connected or self.disconnected or
+ not hasattr(self, "connector")):
+ return
+
+ self._stopReadingAndWriting()
+ try:
+ self._closeSocket(True)
+ except AttributeError:
+ pass
+ else:
+ self._collectSocketDetails()
+ self.connector.connectionFailed(failure.Failure(err))
+ del self.connector
+
+
+ def stopConnecting(self):
+ """
+ If a connection attempt is still outstanding (i.e. no connection is
+ yet established), immediately stop attempting to connect.
+ """
+ self.failIfNotConnected(error.UserError())
+
+
+ def connectionLost(self, reason):
+ """
+ Invoked by lower-level logic when it's time to clean the socket up.
+ Depending on the state of the connection, either inform the attached
+ L{Connector} that the connection attempt has failed, or inform the
+ connected L{IProtocol} that the established connection has been lost.
+
+ @param reason: the reason that the connection was terminated
+ @type reason: L{Failure}
+ """
+ if not self.connected:
+ self.failIfNotConnected(error.ConnectError(string=reason))
+ else:
+ self._commonConnection.connectionLost(self, reason)
+ self.connector.connectionLost(reason)
+
+
+
+class BaseClient(_BaseBaseClient, _TLSClientMixin, Connection):
+ """
+ A base class for client TCP (and similiar) sockets.
+
+ @ivar realAddress: The address object that will be used for socket.connect;
+ this address is an address tuple (the number of elements dependent upon
+ the address family) which does not contain any names which need to be
+ resolved.
+ @type realAddress: C{tuple}
+
+ @ivar _base: L{Connection}, which is the base class of this class which has
+ all of the useful file descriptor methods. This is used by
+ L{_TLSServerMixin} to call the right methods to directly manipulate the
+ transport, as is necessary for writing TLS-encrypted bytes (whereas
+ those methods on L{Server} will go through another layer of TLS if it
+ has been enabled).
+ """
+
+ _base = Connection
+ _commonConnection = Connection
+
+ def _stopReadingAndWriting(self):
+ """
+ Implement the POSIX-ish (i.e.
+ L{twisted.internet.interfaces.IReactorFDSet}) method of detaching this
+ socket from the reactor for L{_BaseBaseClient}.
+ """
+ if hasattr(self, "reactor"):
+ # this doesn't happen if we failed in __init__
+ self.stopReading()
+ self.stopWriting()
+
+
+ def _collectSocketDetails(self):
+ """
+ Clean up references to the socket and its file descriptor.
+
+ @see: L{_BaseBaseClient}
+ """
+ del self.socket, self.fileno
+
+
+ def createInternetSocket(self):
+ """(internal) Create a non-blocking socket using
+ self.addressFamily, self.socketType.
+ """
+ s = socket.socket(self.addressFamily, self.socketType)
+ s.setblocking(0)
+ fdesc._setCloseOnExec(s.fileno())
+ return s
+
+
+ def doConnect(self):
+ """
+ Initiate the outgoing connection attempt.
+
+ @note: Applications do not need to call this method; it will be invoked
+ internally as part of L{IReactorTCP.connectTCP}.
+ """
+ self.doWrite = self.doConnect
+ self.doRead = self.doConnect
+ if not hasattr(self, "connector"):
+ # this happens when connection failed but doConnect
+ # was scheduled via a callLater in self._finishInit
+ return
+
+ err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ if err:
+ self.failIfNotConnected(error.getConnectError((err, strerror(err))))
+ return
+
+ # doConnect gets called twice. The first time we actually need to
+ # start the connection attempt. The second time we don't really
+ # want to (SO_ERROR above will have taken care of any errors, and if
+ # it reported none, the mere fact that doConnect was called again is
+ # sufficient to indicate that the connection has succeeded), but it
+ # is not /particularly/ detrimental to do so. This should get
+ # cleaned up some day, though.
+ try:
+ connectResult = self.socket.connect_ex(self.realAddress)
+ except socket.error, se:
+ connectResult = se.args[0]
+ if connectResult:
+ if connectResult == EISCONN:
+ pass
+ # on Windows EINVAL means sometimes that we should keep trying:
+ # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
+ elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
+ (connectResult == EINVAL and platformType == "win32")):
+ self.startReading()
+ self.startWriting()
+ return
+ else:
+ self.failIfNotConnected(error.getConnectError((connectResult, strerror(connectResult))))
+ return
+
+ # If I have reached this point without raising or returning, that means
+ # that the socket is connected.
+ del self.doWrite
+ del self.doRead
+ # we first stop and then start, to reset any references to the old doRead
+ self.stopReading()
+ self.stopWriting()
+ self._connectDone()
+
+
+ def _connectDone(self):
+ """
+ This is a hook for when a connection attempt has succeeded.
+
+ Here, we build the protocol from the
+ L{twisted.internet.protocol.ClientFactory} that was passed in, compute
+ a log string, begin reading so as to send traffic to the newly built
+ protocol, and finally hook up the protocol itself.
+
+ This hook is overridden by L{ssl.Client} to initiate the TLS protocol.
+ """
+ self.protocol = self.connector.buildProtocol(self.getPeer())
+ self.connected = 1
+ logPrefix = self._getLogPrefix(self.protocol)
+ self.logstr = "%s,client" % logPrefix
+ self.startReading()
+ self.protocol.makeConnection(self)
+
+
+
+_NUMERIC_ONLY = socket.AI_NUMERICHOST | _AI_NUMERICSERV
+
+def _resolveIPv6(ip, port):
+ """
+ Resolve an IPv6 literal into an IPv6 address.
+
+ This is necessary to resolve any embedded scope identifiers to the relevant
+ C{sin6_scope_id} for use with C{socket.connect()}, C{socket.listen()}, or
+ C{socket.bind()}; see U{RFC 3493 <https://tools.ietf.org/html/rfc3493>} for
+ more information.
+
+ @param ip: An IPv6 address literal.
+ @type ip: C{str}
+
+ @param port: A port number.
+ @type port: C{int}
+
+ @return: a 4-tuple of C{(host, port, flow, scope)}, suitable for use as an
+ IPv6 address.
+
+ @raise socket.gaierror: if either the IP or port is not numeric as it
+ should be.
+ """
+ return socket.getaddrinfo(ip, port, 0, 0, 0, _NUMERIC_ONLY)[0][4]
+
+
+
+class _BaseTCPClient(object):
+ """
+ Code shared with other (non-POSIX) reactors for management of outgoing TCP
+ connections (both TCPv4 and TCPv6).
+
+ @note: In order to be functional, this class must be mixed into the same
+ hierarchy as L{_BaseBaseClient}. It would subclass L{_BaseBaseClient}
+ directly, but the class hierarchy here is divided in strange ways out
+ of the need to share code along multiple axes; specifically, with the
+ IOCP reactor and also with UNIX clients in other reactors.
+
+ @ivar _addressType: The Twisted _IPAddress implementation for this client
+ @type _addressType: L{IPv4Address} or L{IPv6Address}
+
+ @ivar connector: The L{Connector} which is driving this L{_BaseTCPClient}'s
+ connection attempt.
+
+ @ivar addr: The address that this socket will be connecting to.
+ @type addr: If IPv4, a 2-C{tuple} of C{(str host, int port)}. If IPv6, a
+ 4-C{tuple} of (C{str host, int port, int ignored, int scope}).
+
+ @ivar createInternetSocket: Subclasses must implement this as a method to
+ create a python socket object of the appropriate address family and
+ socket type.
+ @type createInternetSocket: 0-argument callable returning
+ C{socket._socketobject}.
+ """
+
+ _addressType = address.IPv4Address
+
+ def __init__(self, host, port, bindAddress, connector, reactor=None):
+ # BaseClient.__init__ is invoked later
+ self.connector = connector
+ self.addr = (host, port)
+
+ whenDone = self.resolveAddress
+ err = None
+ skt = None
+
+ if abstract.isIPAddress(host):
+ self._requiresResolution = False
+ elif abstract.isIPv6Address(host):
+ self._requiresResolution = False
+ self.addr = _resolveIPv6(host, port)
+ self.addressFamily = socket.AF_INET6
+ self._addressType = address.IPv6Address
+ else:
+ self._requiresResolution = True
+ try:
+ skt = self.createInternetSocket()
+ except socket.error, se:
+ err = error.ConnectBindError(se.args[0], se.args[1])
+ whenDone = None
+ if whenDone and bindAddress is not None:
+ try:
+ if abstract.isIPv6Address(bindAddress[0]):
+ bindinfo = _resolveIPv6(*bindAddress)
+ else:
+ bindinfo = bindAddress
+ skt.bind(bindinfo)
+ except socket.error, se:
+ err = error.ConnectBindError(se.args[0], se.args[1])
+ whenDone = None
+ self._finishInit(whenDone, skt, err, reactor)
+
+
+ def getHost(self):
+ """
+ Returns an L{IPv4Address} or L{IPv6Address}.
+
+ This indicates the address from which I am connecting.
+ """
+ return self._addressType('TCP', *self.socket.getsockname()[:2])
+
+
+ def getPeer(self):
+ """
+ Returns an L{IPv4Address} or L{IPv6Address}.
+
+ This indicates the address that I am connected to.
+ """
+ # an ipv6 realAddress has more than two elements, but the IPv6Address
+ # constructor still only takes two.
+ return self._addressType('TCP', *self.realAddress[:2])
+
+
+ def __repr__(self):
+ s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self))
+ return s
+
+
+
+class Client(_BaseTCPClient, BaseClient):
+ """
+ A transport for a TCP protocol; either TCPv4 or TCPv6.
+
+ Do not create these directly; use L{IReactorTCP.connectTCP}.
+ """
+
+
+
+class Server(_TLSServerMixin, Connection):
+ """
+ Serverside socket-stream connection class.
+
+ This is a serverside network connection transport; a socket which came from
+ an accept() on a server.
+
+ @ivar _base: L{Connection}, which is the base class of this class which has
+ all of the useful file descriptor methods. This is used by
+ L{_TLSServerMixin} to call the right methods to directly manipulate the
+ transport, as is necessary for writing TLS-encrypted bytes (whereas
+ those methods on L{Server} will go through another layer of TLS if it
+ has been enabled).
+ """
+ _base = Connection
+
+ _addressType = address.IPv4Address
+
+ def __init__(self, sock, protocol, client, server, sessionno, reactor):
+ """
+ Server(sock, protocol, client, server, sessionno)
+
+ Initialize it with a socket, a protocol, a descriptor for my peer (a
+ tuple of host, port describing the other end of the connection), an
+ instance of Port, and a session number.
+ """
+ Connection.__init__(self, sock, protocol, reactor)
+ if len(client) != 2:
+ self._addressType = address.IPv6Address
+ self.server = server
+ self.client = client
+ self.sessionno = sessionno
+ self.hostname = client[0]
+
+ logPrefix = self._getLogPrefix(self.protocol)
+ self.logstr = "%s,%s,%s" % (logPrefix,
+ sessionno,
+ self.hostname)
+ self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
+ self.sessionno,
+ self.server._realPortNumber)
+ self.startReading()
+ self.connected = 1
+
+ def __repr__(self):
+ """A string representation of this connection.
+ """
+ return self.repstr
+
+
+ def getHost(self):
+ """
+ Returns an L{IPv4Address} or L{IPv6Address}.
+
+ This indicates the server's address.
+ """
+ host, port = self.socket.getsockname()[:2]
+ return self._addressType('TCP', host, port)
+
+
+ def getPeer(self):
+ """
+ Returns an L{IPv4Address} or L{IPv6Address}.
+
+ This indicates the client's address.
+ """
+ return self._addressType('TCP', *self.client[:2])
+
+
+
+class Port(base.BasePort, _SocketCloser):
+ """
+ A TCP server port, listening for connections.
+
+ When a connection is accepted, this will call a factory's buildProtocol
+ with the incoming address as an argument, according to the specification
+ described in L{twisted.internet.interfaces.IProtocolFactory}.
+
+ If you wish to change the sort of transport that will be used, the
+ C{transport} attribute will be called with the signature expected for
+ C{Server.__init__}, so it can be replaced.
+
+ @ivar deferred: a deferred created when L{stopListening} is called, and
+ that will fire when connection is lost. This is not to be used it
+ directly: prefer the deferred returned by L{stopListening} instead.
+ @type deferred: L{defer.Deferred}
+
+ @ivar disconnecting: flag indicating that the L{stopListening} method has
+ been called and that no connections should be accepted anymore.
+ @type disconnecting: C{bool}
+
+ @ivar connected: flag set once the listen has successfully been called on
+ the socket.
+ @type connected: C{bool}
+
+ @ivar _type: A string describing the connections which will be created by
+ this port. Normally this is C{"TCP"}, since this is a TCP port, but
+ when the TLS implementation re-uses this class it overrides the value
+ with C{"TLS"}. Only used for logging.
+
+ @ivar _preexistingSocket: If not C{None}, a L{socket.socket} instance which
+ was created and initialized outside of the reactor and will be used to
+ listen for connections (instead of a new socket being created by this
+ L{Port}).
+ """
+
+ implements(interfaces.IListeningPort)
+
+ socketType = socket.SOCK_STREAM
+
+ transport = Server
+ sessionno = 0
+ interface = ''
+ backlog = 50
+
+ _type = 'TCP'
+
+ # Actual port number being listened on, only set to a non-None
+ # value when we are actually listening.
+ _realPortNumber = None
+
+ # An externally initialized socket that we will use, rather than creating
+ # our own.
+ _preexistingSocket = None
+
+ addressFamily = socket.AF_INET
+ _addressType = address.IPv4Address
+
+ def __init__(self, port, factory, backlog=50, interface='', reactor=None):
+ """Initialize with a numeric port to listen on.
+ """
+ base.BasePort.__init__(self, reactor=reactor)
+ self.port = port
+ self.factory = factory
+ self.backlog = backlog
+ if abstract.isIPv6Address(interface):
+ self.addressFamily = socket.AF_INET6
+ self._addressType = address.IPv6Address
+ self.interface = interface
+
+
+ @classmethod
+ def _fromListeningDescriptor(cls, reactor, fd, addressFamily, factory):
+ """
+ Create a new L{Port} based on an existing listening I{SOCK_STREAM}
+ I{AF_INET} socket.
+
+ Arguments are the same as to L{Port.__init__}, except where noted.
+
+ @param fd: An integer file descriptor associated with a listening
+ socket. The socket must be in non-blocking mode. Any additional
+ attributes desired, such as I{FD_CLOEXEC}, must also be set already.
+
+ @param addressFamily: The address family (sometimes called I{domain}) of
+ the existing socket. For example, L{socket.AF_INET}.
+
+ @return: A new instance of C{cls} wrapping the socket given by C{fd}.
+ """
+ port = socket.fromfd(fd, addressFamily, cls.socketType)
+ interface = port.getsockname()[0]
+ self = cls(None, factory, None, interface, reactor)
+ self._preexistingSocket = port
+ return self
+
+
+ def __repr__(self):
+ if self._realPortNumber is not None:
+ return "<%s of %s on %s>" % (self.__class__,
+ self.factory.__class__, self._realPortNumber)
+ else:
+ return "<%s of %s (not listening)>" % (self.__class__, self.factory.__class__)
+
+ def createInternetSocket(self):
+ s = base.BasePort.createInternetSocket(self)
+ if platformType == "posix" and sys.platform != "cygwin":
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ return s
+
+
+ def startListening(self):
+ """Create and bind my socket, and begin listening on it.
+
+ This is called on unserialization, and must be called after creating a
+ server to begin listening on the specified port.
+ """
+ if self._preexistingSocket is None:
+ # Create a new socket and make it listen
+ try:
+ skt = self.createInternetSocket()
+ if self.addressFamily == socket.AF_INET6:
+ addr = _resolveIPv6(self.interface, self.port)
+ else:
+ addr = (self.interface, self.port)
+ skt.bind(addr)
+ except socket.error, le:
+ raise CannotListenError, (self.interface, self.port, le)
+ skt.listen(self.backlog)
+ else:
+ # Re-use the externally specified socket
+ skt = self._preexistingSocket
+ self._preexistingSocket = None
+
+ # Make sure that if we listened on port 0, we update that to
+ # reflect what the OS actually assigned us.
+ self._realPortNumber = skt.getsockname()[1]
+
+ log.msg("%s starting on %s" % (
+ self._getLogPrefix(self.factory), self._realPortNumber))
+
+ # The order of the next 5 lines is kind of bizarre. If no one
+ # can explain it, perhaps we should re-arrange them.
+ self.factory.doStart()
+ self.connected = True
+ self.socket = skt
+ self.fileno = self.socket.fileno
+ self.numberAccepts = 100
+
+ self.startReading()
+
+
+ def _buildAddr(self, address):
+ host, port = address[:2]
+ return self._addressType('TCP', host, port)
+
+
+ def doRead(self):
+ """Called when my socket is ready for reading.
+
+ This accepts a connection and calls self.protocol() to handle the
+ wire-level protocol.
+ """
+ try:
+ if platformType == "posix":
+ numAccepts = self.numberAccepts
+ else:
+ # win32 event loop breaks if we do more than one accept()
+ # in an iteration of the event loop.
+ numAccepts = 1
+ for i in range(numAccepts):
+ # we need this so we can deal with a factory's buildProtocol
+ # calling our loseConnection
+ if self.disconnecting:
+ return
+ try:
+ skt, addr = self.socket.accept()
+ except socket.error, e:
+ if e.args[0] in (EWOULDBLOCK, EAGAIN):
+ self.numberAccepts = i
+ break
+ elif e.args[0] == EPERM:
+ # Netfilter on Linux may have rejected the
+ # connection, but we get told to try to accept()
+ # anyway.
+ continue
+ elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
+
+ # Linux gives EMFILE when a process is not allowed
+ # to allocate any more file descriptors. *BSD and
+ # Win32 give (WSA)ENOBUFS. Linux can also give
+ # ENFILE if the system is out of inodes, or ENOMEM
+ # if there is insufficient memory to allocate a new
+ # dentry. ECONNABORTED is documented as possible on
+ # both Linux and Windows, but it is not clear
+ # whether there are actually any circumstances under
+ # which it can happen (one might expect it to be
+ # possible if a client sends a FIN or RST after the
+ # server sends a SYN|ACK but before application code
+ # calls accept(2), however at least on Linux this
+ # _seems_ to be short-circuited by syncookies.
+
+ log.msg("Could not accept new connection (%s)" % (
+ errorcode[e.args[0]],))
+ break
+ raise
+
+ fdesc._setCloseOnExec(skt.fileno())
+ protocol = self.factory.buildProtocol(self._buildAddr(addr))
+ if protocol is None:
+ skt.close()
+ continue
+ s = self.sessionno
+ self.sessionno = s+1
+ transport = self.transport(skt, protocol, addr, self, s, self.reactor)
+ protocol.makeConnection(transport)
+ else:
+ self.numberAccepts = self.numberAccepts+20
+ except:
+ # Note that in TLS mode, this will possibly catch SSL.Errors
+ # raised by self.socket.accept()
+ #
+ # There is no "except SSL.Error:" above because SSL may be
+ # None if there is no SSL support. In any case, all the
+ # "except SSL.Error:" suite would probably do is log.deferr()
+ # and return, so handling it here works just as well.
+ log.deferr()
+
+ def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
+ """
+ Stop accepting connections on this port.
+
+ This will shut down the socket and call self.connectionLost(). It
+ returns a deferred which will fire successfully when the port is
+ actually closed, or with a failure if an error occurs shutting down.
+ """
+ self.disconnecting = True
+ self.stopReading()
+ if self.connected:
+ self.deferred = deferLater(
+ self.reactor, 0, self.connectionLost, connDone)
+ return self.deferred
+
+ stopListening = loseConnection
+
+ def _logConnectionLostMsg(self):
+ """
+ Log message for closing port
+ """
+ log.msg('(%s Port %s Closed)' % (self._type, self._realPortNumber))
+
+
+ def connectionLost(self, reason):
+ """
+ Cleans up the socket.
+ """
+ self._logConnectionLostMsg()
+ self._realPortNumber = None
+
+ base.BasePort.connectionLost(self, reason)
+ self.connected = False
+ self._closeSocket(True)
+ del self.socket
+ del self.fileno
+
+ try:
+ self.factory.doStop()
+ finally:
+ self.disconnecting = False
+
+
+ def logPrefix(self):
+ """Returns the name of my class, to prefix log entries with.
+ """
+ return reflect.qual(self.factory.__class__)
+
+
+ def getHost(self):
+ """
+ Return an L{IPv4Address} or L{IPv6Address} indicating the listening
+ address of this port.
+ """
+ host, port = self.socket.getsockname()[:2]
+ return self._addressType('TCP', host, port)
+
+
+
+class Connector(base.BaseConnector):
+ """
+ A L{Connector} provides of L{twisted.internet.interfaces.IConnector} for
+ all POSIX-style reactors.
+
+ @ivar _addressType: the type returned by L{Connector.getDestination}.
+ Either L{IPv4Address} or L{IPv6Address}, depending on the type of
+ address.
+ @type _addressType: C{type}
+ """
+ _addressType = address.IPv4Address
+
+ def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
+ if isinstance(port, types.StringTypes):
+ try:
+ port = socket.getservbyname(port, 'tcp')
+ except socket.error, e:
+ raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
+ self.host, self.port = host, port
+ if abstract.isIPv6Address(host):
+ self._addressType = address.IPv6Address
+ self.bindAddress = bindAddress
+ base.BaseConnector.__init__(self, factory, timeout, reactor)
+
+
+ def _makeTransport(self):
+ """
+ Create a L{Client} bound to this L{Connector}.
+
+ @return: a new L{Client}
+ @rtype: L{Client}
+ """
+ return Client(self.host, self.port, self.bindAddress, self, self.reactor)
+
+
+ def getDestination(self):
+ """
+ @see: L{twisted.internet.interfaces.IConnector.getDestination}.
+ """
+ return self._addressType('TCP', self.host, self.port)
+
+
Copied: CalendarServer/trunk/twext/internet/adaptendpoint.py (from rev 9105, CalendarServer/branches/users/glyph/ipv6-client/twext/internet/adaptendpoint.py)
===================================================================
--- CalendarServer/trunk/twext/internet/adaptendpoint.py (rev 0)
+++ CalendarServer/trunk/twext/internet/adaptendpoint.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -0,0 +1,165 @@
+# -*- test-case-name: twext.internet.test.test_adaptendpoint -*-
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Adapter for old-style connectTCP/connectSSL code to use endpoints and be happy;
+specifically, to receive the additional duplicate notifications that it wants to
+receive, L{clientConnectionLost} and L{clientConnectionFailed} on the factory.
+"""
+
+from zope.interface import implements
+
+from twisted.internet.interfaces import IConnector
+
+from twisted.internet.protocol import Factory
+from twisted.python import log
+
+__all__ = [
+ "connect"
+]
+
+
+
+class _WrappedProtocol(object):
+ """
+ A protocol providing a thin wrapper that relays the connectionLost
+ notification.
+ """
+
+ def __init__(self, wrapped, wrapper):
+ """
+ @param wrapped: the wrapped L{IProtocol} provider, to which all methods
+ will be relayed.
+
+ @param wrapper: The L{LegacyClientFactoryWrapper} that holds the
+ relevant L{ClientFactory}.
+ """
+ self._wrapped = wrapped
+ self._wrapper = wrapper
+
+
+ def __getattr__(self, attr):
+ """
+ Relay all undefined methods to the wrapped protocol.
+ """
+ return getattr(self._wrapped, attr)
+
+
+ def connectionLost(self, reason):
+ """
+ When the connection is lost, return the connection.
+ """
+ try:
+ self._wrapped.connectionLost(reason)
+ except:
+ log.err()
+ self._wrapper.legacyFactory.clientConnectionLost(self._wrapper, reason)
+
+
+
+class LegacyClientFactoryWrapper(Factory):
+ implements(IConnector)
+
+ def __init__(self, legacyFactory, endpoint):
+ self.currentlyConnecting = False
+ self.legacyFactory = legacyFactory
+ self.endpoint = endpoint
+ self._connectedProtocol = None
+ self._outstandingAttempt = None
+
+
+ def getDestination(self):
+ """
+ Implement L{IConnector.getDestination}.
+
+ @return: the endpoint being connected to as the destination.
+ """
+ return self.endpoint
+
+
+ def buildProtocol(self, addr):
+ """
+ Implement L{Factory.buildProtocol} to return a wrapper protocol that
+ will capture C{connectionLost} notifications.
+
+ @return: a L{Protocol}.
+ """
+ return _WrappedProtocol(self.legacyFactory.buildProtocol(addr), self)
+
+
+ def connect(self):
+ """
+ Implement L{IConnector.connect} to connect the endpoint.
+ """
+ if self._outstandingAttempt is not None:
+ raise RuntimeError("connection already in progress")
+ self.legacyFactory.startedConnecting(self)
+ d = self._outstandingAttempt = self.endpoint.connect(self)
+ @d.addBoth
+ def attemptDone(result):
+ self._outstandingAttempt = None
+ return result
+ def rememberProto(proto):
+ self._connectedProtocol = proto
+ return proto
+ def callClientConnectionFailed(reason):
+ self.legacyFactory.clientConnectionFailed(self, reason)
+ d.addCallbacks(rememberProto, callClientConnectionFailed)
+
+
+ def disconnect(self):
+ """
+ Implement L{IConnector.disconnect}.
+ """
+ if self._connectedProtocol is not None:
+ self._connectedProtocol.transport.loseConnection()
+ elif self._outstandingAttempt is not None:
+ self._outstandingAttempt.cancel()
+
+
+ def stopConnecting(self):
+ """
+ Implement L{IConnector.stopConnecting}.
+ """
+ if self._outstandingAttempt is None:
+ raise RuntimeError("no connection attempt in progress")
+ self.disconnect()
+
+
+
+def connect(endpoint, clientFactory):
+ """
+ Connect a L{twisted.internet.protocol.ClientFactory} to a remote host using
+ the given L{twisted.internet.interfaces.IStreamClientEndpoint}. This relays
+ C{clientConnectionFailed} and C{clientConnectionLost} notifications as
+ legacy code using the L{ClientFactory} interface, such as,
+ L{ReconnectingClientFactory} would expect.
+
+ @param endpoint: The endpoint to connect to.
+ @type endpoint: L{twisted.internet.interfaces.IStreamClientEndpoint}
+
+ @param clientFactory: The client factory doing the connecting.
+ @type clientFactory: L{twisted.internet.protocol.ClientFactory}
+
+ @return: A connector object representing the connection attempt just
+ initiated.
+ @rtype: L{IConnector}
+ """
+ wrap = LegacyClientFactoryWrapper(clientFactory, endpoint)
+ wrap.connect()
+ return wrap
+
Copied: CalendarServer/trunk/twext/internet/gaiendpoint.py (from rev 9105, CalendarServer/branches/users/glyph/ipv6-client/twext/internet/gaiendpoint.py)
===================================================================
--- CalendarServer/trunk/twext/internet/gaiendpoint.py (rev 0)
+++ CalendarServer/trunk/twext/internet/gaiendpoint.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -0,0 +1,181 @@
+# -*- test-case-name: twext.internet.test.test_gaiendpoint -*-
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+L{getaddrinfo}()-based endpoint
+"""
+
+from socket import getaddrinfo, AF_UNSPEC, AF_INET, AF_INET6, SOCK_STREAM
+from twisted.internet.endpoints import TCP4ClientEndpoint, SSL4ClientEndpoint
+from twisted.internet.defer import Deferred
+from twisted.internet.threads import deferToThread
+from twisted.internet.task import LoopingCall
+
+class MultiFailure(Exception):
+
+ def __init__(self, failures):
+ super(MultiFailure, self).__init__("Failure with multiple causes.")
+ self.failures = failures
+
+
+
+class GAIEndpoint(object):
+ """
+ Client endpoint that will call L{getaddrinfo} in a thread and then attempt
+ to connect to each endpoint (almost) in parallel.
+
+ @ivar reactor: The reactor to attempt the connection with.
+ @type reactor: provider of L{IReactorTCP} and L{IReactorTime}
+
+ @ivar host: The host to resolve.
+ @type host: L{str}
+
+ @ivar port: The port number to resolve.
+ @type port: L{int}
+
+ @ivar deferToThread: A function like L{deferToThread}, used to invoke
+ getaddrinfo. (Replaceable mainly for testing purposes.)
+ """
+
+ deferToThread = staticmethod(deferToThread)
+
+ def subEndpoint(self, reactor, host, port, contextFactory):
+ """
+ Create an endpoint to connect to based on a single address result from
+ L{getaddrinfo}.
+
+ @param reactor: the reactor to connect to
+ @type reactor: L{IReactorTCP}
+
+ @param host: The IP address of the host to connect to, in presentation
+ format.
+ @type host: L{str}
+
+ @param port: The numeric port number to connect to.
+ @type port: L{int}
+
+ @param contextFactory: If not L{None}, the OpenSSL context factory to
+ use to produce client connections.
+
+ @return: a stream client endpoint that will connect to the given host
+ and port via the given reactor.
+ @rtype: L{IStreamClientEndpoint}
+ """
+ if contextFactory is None:
+ return TCP4ClientEndpoint(reactor, host, port)
+ else:
+ return SSL4ClientEndpoint(reactor, host, port, contextFactory)
+
+
+ def __init__(self, reactor, host, port, contextFactory=None):
+ self.reactor = reactor
+ self.host = host
+ self.port = port
+ self.contextFactory = contextFactory
+
+
+ def connect(self, factory):
+ dgai = self.deferToThread(getaddrinfo, self.host, self.port,
+ AF_UNSPEC, SOCK_STREAM)
+ @dgai.addCallback
+ def gaiToEndpoints(gairesult):
+ for family, socktype, proto, canonname, sockaddr in gairesult:
+ if family in [AF_INET6, AF_INET]:
+ yield self.subEndpoint(self.reactor, sockaddr[0],
+ sockaddr[1], self.contextFactory)
+
+ @gaiToEndpoints.addCallback
+ def connectTheEndpoints(endpoints):
+ doneTrying = []
+ outstanding = []
+ errors = []
+ succeeded = []
+ actuallyDidIt = Deferred()
+ def removeMe(result, attempt):
+ outstanding.remove(attempt)
+ return result
+ def connectingDone(result):
+ if lc.running:
+ lc.stop()
+ succeeded.append(True)
+ for o in outstanding[::]:
+ o.cancel()
+ actuallyDidIt.callback(result)
+ return None
+ def lastChance():
+ if doneTrying and not outstanding and not succeeded:
+ # We've issued our last attempts. There are no remaining
+ # outstanding attempts; they've all failed. We haven't
+ # succeeded. Time... to die.
+ actuallyDidIt.errback(MultiFailure(errors))
+ def connectingFailed(why):
+ errors.append(why)
+ lastChance()
+ return None
+ def nextOne():
+ try:
+ endpoint = endpoints.next()
+ except StopIteration:
+ # Out of endpoints to try! Now it's time to wait for all of
+ # the outstanding attempts to complete, and, if none of them
+ # have been successful, then to give up with a relevant
+ # error. They'll all be dealt with by connectingDone or
+ # connectingFailed.
+ doneTrying.append(True)
+ lc.stop()
+ lastChance()
+ else:
+ attempt = endpoint.connect(factory)
+ attempt.addBoth(removeMe, attempt)
+ attempt.addCallbacks(connectingDone, connectingFailed)
+ outstanding.append(attempt)
+ lc = LoopingCall(nextOne)
+ lc.clock = self.reactor
+ lc.start(0.0)
+ return actuallyDidIt
+ return dgai
+
+
+
+if __name__ == '__main__':
+ from twisted.internet import reactor
+ import sys
+ if sys.argv[1:]:
+ host = sys.argv[1]
+ port = int(sys.argv[2])
+ else:
+ host = "localhost"
+ port = 22
+ gaie = GAIEndpoint(reactor, host, port)
+ from twisted.internet.protocol import Factory, Protocol
+ class HelloGoobye(Protocol, object):
+ def connectionMade(self):
+ print 'Hello!'
+ self.transport.loseConnection()
+
+ def connectionLost(self, reason):
+ print 'Goodbye'
+
+ class MyFactory(Factory, object):
+ def buildProtocol(self, addr):
+ print 'Building protocol for:', addr
+ return HelloGoobye()
+ def bye(what):
+ print 'bye', what
+ reactor.stop()
+ gaie.connect(MyFactory()).addBoth(bye)
+ reactor.run()
Copied: CalendarServer/trunk/twext/internet/test/test_adaptendpoint.py (from rev 9105, CalendarServer/branches/users/glyph/ipv6-client/twext/internet/test/test_adaptendpoint.py)
===================================================================
--- CalendarServer/trunk/twext/internet/test/test_adaptendpoint.py (rev 0)
+++ CalendarServer/trunk/twext/internet/test/test_adaptendpoint.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -0,0 +1,261 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.internet.adaptendpoint}.
+"""
+
+from zope.interface.verify import verifyObject
+
+from twext.internet.adaptendpoint import connect
+from twisted.internet.defer import Deferred, CancelledError
+from twisted.python.failure import Failure
+
+from twisted.internet.protocol import ClientFactory, Protocol
+from twisted.internet.interfaces import IConnector
+from twisted.trial.unittest import TestCase
+
+class names(object):
+ def __init__(self, **kw):
+ self.__dict__.update(kw)
+
+
+class RecordingProtocol(Protocol, object):
+ def __init__(self):
+ super(RecordingProtocol, self).__init__()
+ self.made = []
+ self.data = []
+ self.lost = []
+
+
+ def connectionMade(self):
+ self.made.append(self.transport)
+
+
+ def dataReceived(self, data):
+ self.data.append(data)
+
+
+ def connectionLost(self, why):
+ self.lost.append(why)
+
+
+
+class RecordingClientFactory(ClientFactory):
+ """
+ L{ClientFactory} subclass that records the things that happen to it.
+ """
+
+ def __init__(self):
+ """
+ Create some records of things that are about to happen.
+ """
+ self.starts = []
+ self.built = []
+ self.fails = []
+ self.lost = []
+
+
+ def startedConnecting(self, ctr):
+ self.starts.append(ctr)
+
+
+ def clientConnectionFailed(self, ctr, reason):
+ self.fails.append(names(connector=ctr, reason=reason))
+
+
+ def clientConnectionLost(self, ctr, reason):
+ self.lost.append(names(connector=ctr, reason=reason))
+
+
+ def buildProtocol(self, addr):
+ b = RecordingProtocol()
+ self.built.append(names(protocol=b, addr=addr))
+ return b
+
+
+
+class RecordingEndpoint(object):
+
+ def __init__(self):
+ self.attempts = []
+
+
+ def connect(self, factory):
+ d = Deferred()
+ self.attempts.append(names(deferred=d, factory=factory))
+ return d
+
+
+class RecordingTransport(object):
+
+ def __init__(self):
+ self.lose = []
+
+
+ def loseConnection(self):
+ self.lose.append(self)
+
+
+
+class AdaptEndpointTests(TestCase):
+ """
+ Tests for L{connect} and the objects that it coordinates.
+ """
+
+ def setUp(self):
+ self.factory = RecordingClientFactory()
+ self.endpoint = RecordingEndpoint()
+ self.connector = connect(self.endpoint, self.factory)
+
+
+ def connectionSucceeds(self, addr=object()):
+ """
+ The most recent connection attempt succeeds, returning the L{ITransport}
+ provider produced by its success.
+ """
+ transport = RecordingTransport()
+ attempt = self.endpoint.attempts[-1]
+ proto = attempt.factory.buildProtocol(addr)
+ proto.makeConnection(transport)
+ transport.protocol = proto
+ attempt.deferred.callback(proto)
+ return transport
+
+
+ def connectionFails(self, reason):
+ """
+ The most recent in-progress connection fails.
+ """
+ self.endpoint.attempts[-1].deferred.errback(reason)
+
+
+ def test_connectStartsConnection(self):
+ """
+ When used with a successful endpoint, L{connect} will simulate all
+ aspects of the connection process; C{buildProtocol}, C{connectionMade},
+ C{dataReceived}.
+ """
+ self.assertIdentical(self.connector.getDestination(), self.endpoint)
+ verifyObject(IConnector, self.connector)
+ self.assertEqual(self.factory.starts, [self.connector])
+ self.assertEqual(len(self.endpoint.attempts), 1)
+ self.assertEqual(len(self.factory.built), 0)
+ transport = self.connectionSucceeds()
+ self.assertEqual(len(self.factory.built), 1)
+ made = transport.protocol.made
+ self.assertEqual(len(made), 1)
+ self.assertIdentical(made[0], transport)
+
+
+ def test_connectionLost(self):
+ """
+ When the connection is lost, both the protocol and the factory will be
+ notified via C{connectionLost} and C{clientConnectionLost}.
+ """
+ why = Failure(RuntimeError())
+ proto = self.connectionSucceeds().protocol
+ proto.connectionLost(why)
+ self.assertEquals(len(self.factory.built), 1)
+ self.assertEquals(self.factory.built[0].protocol.lost, [why])
+ self.assertEquals(len(self.factory.lost), 1)
+ self.assertIdentical(self.factory.lost[0].reason, why)
+
+
+ def test_connectionFailed(self):
+ """
+ When the L{Deferred} from the endpoint fails, the L{ClientFactory} gets
+ notified via C{clientConnectionFailed}.
+ """
+ why = Failure(RuntimeError())
+ self.connectionFails(why)
+ self.assertEquals(len(self.factory.fails), 1)
+ self.assertIdentical(self.factory.fails[0].reason, why)
+
+
+ def test_disconnectWhileConnecting(self):
+ """
+ When the L{IConnector} is told to C{disconnect} before an in-progress
+ L{Deferred} from C{connect} has fired, it will cancel that L{Deferred}.
+ """
+ self.connector.disconnect()
+ self.assertEqual(len(self.factory.fails), 1)
+ self.assertTrue(self.factory.fails[0].reason.check(CancelledError))
+
+
+ def test_disconnectWhileConnected(self):
+ """
+ When the L{IConnector} is told to C{disconnect} while an existing
+ connection is established, that connection will be dropped via
+ C{loseConnection}.
+ """
+ transport = self.connectionSucceeds()
+ self.factory.starts[0].disconnect()
+ self.assertEqual(transport.lose, [transport])
+
+
+ def test_connectAfterFailure(self):
+ """
+ When the L{IConnector} is told to C{connect} after a connection attempt
+ has failed, a new connection attempt is started.
+ """
+ why = Failure(ZeroDivisionError())
+ self.connectionFails(why)
+ self.connector.connect()
+ self.assertEqual(len(self.factory.starts), 2)
+ self.assertEqual(len(self.endpoint.attempts), 2)
+ self.connectionSucceeds()
+
+
+ def test_reConnectTooSoon(self):
+ """
+ When the L{IConnector} is told to C{connect} while another attempt is
+ still in flight, it synchronously raises L{RuntimeError}.
+ """
+ self.assertRaises(RuntimeError, self.connector.connect)
+ self.assertEqual(len(self.factory.starts), 1)
+ self.assertEqual(len(self.endpoint.attempts), 1)
+
+
+ def test_stopConnectingWhileConnecting(self):
+ """
+ When the L{IConnector} is told to C{stopConnecting} while another
+ attempt is still in flight, it cancels that connection.
+ """
+ self.connector.stopConnecting()
+ self.assertEqual(len(self.factory.fails), 1)
+ self.assertTrue(self.factory.fails[0].reason.check(CancelledError))
+
+
+ def test_stopConnectingWhileConnected(self):
+ """
+ When the L{IConnector} is told to C{stopConnecting} while already
+ connected, it raises a L{RuntimeError}.
+ """
+ self.connectionSucceeds()
+ self.assertRaises(RuntimeError, self.connector.stopConnecting)
+
+
+ def test_stopConnectingWhileNotConnected(self):
+ """
+ When the L{IConnector} is told to C{stopConnecting} while it is not
+ connected or connecting, it raises L{RuntimeError}.
+ """
+ self.connectionFails(Failure(ZeroDivisionError()))
+ self.assertRaises(RuntimeError, self.connector.stopConnecting)
+
+
+
Copied: CalendarServer/trunk/twext/internet/test/test_gaiendpoint.py (from rev 9105, CalendarServer/branches/users/glyph/ipv6-client/twext/internet/test/test_gaiendpoint.py)
===================================================================
--- CalendarServer/trunk/twext/internet/test/test_gaiendpoint.py (rev 0)
+++ CalendarServer/trunk/twext/internet/test/test_gaiendpoint.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -0,0 +1,112 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Test cases for L{twext.internet.gaiendpoint}
+"""
+
+from socket import getaddrinfo, AF_INET, SOCK_STREAM
+
+from twext.internet.gaiendpoint import GAIEndpoint
+from twisted.trial.unittest import TestCase
+from twisted.internet.defer import Deferred
+from twisted.internet.protocol import Factory, Protocol
+from twisted.internet.task import Clock
+
+
+class FakeTCPEndpoint(object):
+ def __init__(self, reactor, host, port, contextFactory):
+ self._reactor = reactor
+ self._host = host
+ self._port = port
+ self._attempt = None
+ self._contextFactory = contextFactory
+
+
+ def connect(self, factory):
+ self._attempt = Deferred()
+ self._factory = factory
+ return self._attempt
+
+
+
+class GAIEndpointTestCase(TestCase):
+ """
+ Test cases for L{GAIEndpoint}.
+ """
+
+ def makeEndpoint(self, host="abcd.example.com", port=4321):
+ gaie = GAIEndpoint(self.clock, host, port)
+ gaie.subEndpoint = self.subEndpoint
+ gaie.deferToThread = self.deferToSomething
+ return gaie
+
+
+ def subEndpoint(self, reactor, host, port, contextFactory):
+ ftcpe = FakeTCPEndpoint(reactor, host, port, contextFactory)
+ self.fakeRealEndpoints.append(ftcpe)
+ return ftcpe
+
+
+ def deferToSomething(self, func, *a, **k):
+ """
+ Test replacement for L{deferToThread}, which can only call
+ L{getaddrinfo}.
+ """
+ d = Deferred()
+ if func is not getaddrinfo:
+ self.fail("Only getaddrinfo should be invoked in a thread.")
+ self.inThreads.append((d, func, a, k))
+ return d
+
+
+ def gaiResult(self, family, socktype, proto, canonname, sockaddr):
+ """
+ A call to L{getaddrinfo} has succeeded; invoke the L{Deferred} waiting
+ on it.
+ """
+ d, f, a, k = self.inThreads.pop(0)
+ d.callback([(family, socktype, proto, canonname, sockaddr)])
+
+
+ def setUp(self):
+ """
+ Set up!
+ """
+ self.inThreads = []
+ self.clock = Clock()
+ self.fakeRealEndpoints = []
+ self.makeEndpoint()
+
+
+ def test_simpleSuccess(self):
+ """
+ If C{getaddrinfo} gives one L{GAIEndpoint.connect}.
+ """
+ gaiendpoint = self.makeEndpoint()
+ protos = []
+ f = Factory()
+ f.protocol = Protocol
+ gaiendpoint.connect(f).addCallback(protos.append)
+ WHO_CARES = 0
+ WHAT_EVER = ""
+ self.gaiResult(AF_INET, SOCK_STREAM, WHO_CARES, WHAT_EVER,
+ ("1.2.3.4", 4321))
+ self.clock.advance(1.0)
+ attempt = self.fakeRealEndpoints[0]._attempt
+ attempt.callback(self.fakeRealEndpoints[0]._factory.buildProtocol(None))
+ self.assertEqual(len(protos), 1)
+
Modified: CalendarServer/trunk/twext/patches.py
===================================================================
--- CalendarServer/trunk/twext/patches.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twext/patches.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -1,5 +1,5 @@
##
-# Copyright (c) 2005-2010 Apple Inc. All rights reserved.
+# Copyright (c) 2005-2012 Apple Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,6 +20,65 @@
__all__ = []
+import sys
+
+from twisted import version
+from twisted.python.versions import Version
+from twisted.python.modules import getModule
+
+def _hasIPv6ClientSupport():
+ """
+ Does the loaded version of Twisted have IPv6 client support?
+ """
+ lastVersionWithoutIPv6Clients = Version("twisted", 12, 0, 0)
+ if version > lastVersionWithoutIPv6Clients:
+ return True
+ elif version == lastVersionWithoutIPv6Clients:
+ # It could be a snapshot of trunk or a branch with this bug fixed. Don't
+ # load the module, though, as that would be a bunch of unnecessary work.
+ return "_resolveIPv6" in (getModule("twisted.internet.tcp")
+ .filePath.getContent())
+ else:
+ return False
+
+
+
+def _addBackports():
+ """
+ We currently require 2 backported bugfixes from a future release of Twisted,
+ for IPv6 support:
+
+ - U{IPv6 client support <http://tm.tl/5085>}
+
+ - U{TCP endpoint cancellation <http://tm.tl/4710>}
+
+ This function will activate those backports. (Note it must be run before
+ any of the modules in question are imported or it will raise an exception.)
+
+ This function, L{_hasIPv6ClientSupport}, and all the associated backports
+ (i.e., all of C{twext/backport}) should be removed upon upgrading our
+ minimum required Twisted version.
+ """
+ from twext.backport import internet as bpinternet
+ from twisted import internet
+ internet.__path__[:] = bpinternet.__path__ + internet.__path__
+
+ # Make sure none of the backports are loaded yet.
+ backports = getModule("twext.backport.internet")
+ for submod in backports.iterModules():
+ subname = submod.name.split(".")[-1]
+ tiname = 'twisted.internet.' + subname
+ if tiname in sys.modules:
+ raise RuntimeError(
+ tiname + "already loaded, cannot load required backport")
+
+
+
+if not _hasIPv6ClientSupport():
+ _addBackports()
+
+
+
from twisted.mail.imap4 import Command
Command._1_RESPONSES += tuple(['BYE'])
Modified: CalendarServer/trunk/twisted/plugins/caldav.py
===================================================================
--- CalendarServer/trunk/twisted/plugins/caldav.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twisted/plugins/caldav.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -14,6 +14,8 @@
# limitations under the License.
##
+__import__("twext") # install patches before doing anything
+
from zope.interface import implements
from twisted.plugin import IPlugin
from twisted.application.service import IServiceMaker
Modified: CalendarServer/trunk/twisted/plugins/kqueuereactor.py
===================================================================
--- CalendarServer/trunk/twisted/plugins/kqueuereactor.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twisted/plugins/kqueuereactor.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -1,3 +1,21 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+__import__("twext")
+
from twisted.application.reactors import Reactor
caldav_kqueue = Reactor(
Modified: CalendarServer/trunk/twistedcaldav/client/pool.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/client/pool.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twistedcaldav/client/pool.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -24,20 +24,21 @@
import urlparse
from twext.python.log import LoggingMixIn
+from twext.internet.gaiendpoint import GAIEndpoint
+from twext.internet.adaptendpoint import connect
+
from twext.internet.ssl import ChainingOpenSSLContextFactory
-from twisted.internet.address import IPv4Address
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from twisted.internet.error import ConnectionLost, ConnectionDone, ConnectError
from twisted.internet.protocol import ClientFactory
+
from twext.web2 import responsecode
from twext.web2.client.http import HTTPClientProtocol
from twext.web2.http import StatusResponse, HTTPError
from twext.web2.dav.util import allDataFromStream
from twext.web2.stream import MemoryStream
-from twistedcaldav.config import config
-
class PooledHTTPClientFactory(ClientFactory, LoggingMixIn):
"""
A client factory for HTTPClient that notifies a pool of it's state. It the connection
@@ -86,6 +87,8 @@
del self.onConnect
return self.instance
+
+
class HTTPClientPool(LoggingMixIn):
"""
A connection pool for HTTPClientProtocol instances.
@@ -94,31 +97,39 @@
for each protocol.
@ivar _maxClients: A C{int} indicating the maximum number of clients.
- @ivar _serverAddress: An L{IAddress} provider indicating the server to
- connect to. (Only L{IPv4Address} currently supported.)
+
+ @ivar _endpoint: An L{IStreamClientEndpoint} provider indicating the server
+ to connect to.
+
@ivar _reactor: The L{IReactorTCP} provider used to initiate new
connections.
@ivar _busyClients: A C{set} that contains all currently busy clients.
+
@ivar _freeClients: A C{set} that contains all currently free clients.
+
@ivar _pendingConnects: A C{int} indicating how many connections are in
progress.
"""
clientFactory = PooledHTTPClientFactory
maxRetries = 2
- def __init__(self, name, scheme, serverAddress, maxClients=5, reactor=None):
+ def __init__(self, name, scheme, endpoint, secureEndpoint,
+ maxClients=5, reactor=None):
"""
- @param serverAddress: An L{IPv4Address} indicating the server to
+ @param endpoint: An L{IStreamClientEndpoint} indicating the server to
connect to.
+
@param maxClients: A C{int} indicating the maximum number of clients.
- @param reactor: An L{IReactorTCP{ provider used to initiate new
+
+ @param reactor: An L{IReactorTCP} provider used to initiate new
connections.
"""
self._name = name
self._scheme = scheme
- self._serverAddress = serverAddress
+ self._endpoint = endpoint
+ self._secureEndpoint = secureEndpoint
self._maxClients = maxClients
if reactor is None:
@@ -156,17 +167,17 @@
"""
self._pendingConnects += 1
- self.log_debug("Initiating new client connection to: %s" % (self._serverAddress,))
+ self.log_debug("Initating new client connection to: %r" % (
+ self._endpoint,))
self._logClientStats()
factory = self.clientFactory(self._reactor)
factory.connectionPool = self
if self._scheme == "https":
- context = ChainingOpenSSLContextFactory(config.SSLPrivateKey, config.SSLCertificate, certificateChainFile=config.SSLAuthorityChain, sslmethod=getattr(OpenSSL.SSL, config.SSLMethod))
- self._reactor.connectSSL(self._serverAddress.host, self._serverAddress.port, factory, context)
+ connect(self._secureEndpoint, factory)
elif self._scheme == "http":
- self._reactor.connectTCP(self._serverAddress.host, self._serverAddress.port, factory)
+ connect(self._endpoint, factory)
else:
raise ValueError("URL scheme for client pool not supported")
@@ -384,21 +395,38 @@
reactor,
)
+
+
+def _configuredClientContextFactory():
+ """
+ Get a client context factory from the configuration.
+ """
+ from twistedcaldav.config import config
+ return ChainingOpenSSLContextFactory(
+ config.SSLPrivateKey, config.SSLCertificate,
+ certificateChainFile=config.SSLAuthorityChain,
+ sslmethod=getattr(OpenSSL.SSL, config.SSLMethod)
+ )
+
+
+
def installPool(name, url, maxClients=5, reactor=None):
+ if reactor is None:
+ from twisted.internet import reactor
parsedURL = urlparse.urlparse(url)
+ ctxf = _configuredClientContextFactory()
pool = HTTPClientPool(
name,
parsedURL.scheme,
- IPv4Address(
- "TCP",
- parsedURL.hostname,
- parsedURL.port,
- ),
+ GAIEndpoint(reactor, parsedURL.hostname, parsedURL.port),
+ GAIEndpoint(reactor, parsedURL.hostname, parsedURL.port, ctxf),
maxClients,
reactor,
)
_clientPools[name] = pool
+
+
def getHTTPClientPool(name):
return _clientPools[name]
Modified: CalendarServer/trunk/twistedcaldav/mail.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/mail.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twistedcaldav/mail.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -53,6 +53,9 @@
from twisted.web.microdom import parseString
from twisted.web.microdom import Text as DOMText, Element as DOMElement
+from twext.internet.gaiendpoint import GAIEndpoint
+from twext.internet.adaptendpoint import connect
+
from twext.web2 import server, responsecode
from twext.web2.channel.http import HTTPFactory
from txdav.xml import element as davxml
@@ -77,7 +80,6 @@
from twistedcaldav.util import AuthorizedHTTPGetter
from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
-
from calendarserver.tap.util import getRootResource, directoryFromConfig
@@ -612,10 +614,11 @@
factory.protocol = AuthorizedHTTPGetter
if parsed.scheme == "https":
- reactor.connectSSL(parsed.hostname, parsed.port, factory,
- ssl.ClientContextFactory())
+ connect(GAIEndpoint(reactor, parsed.hostname, parsed.port,
+ ssl.ClientContextFactory()),
+ factory)
else:
- reactor.connectTCP(parsed.hostname, parsed.port, factory)
+ connect(GAIEndpoint(reactor, parsed.hostname, parsed.port), factory)
def _success(result, msgId):
log.info("Mail gateway successfully injected message %s" % (msgId,))
@@ -645,11 +648,11 @@
log.warn("Can't find server for %s" % (organizer,))
raise ServerNotFound()
- server = record.server() # None means hosted locally
- if server is None:
+ srvr = record.server() # None means hosted locally
+ if srvr is None:
return None
else:
- return server.uri
+ return srvr.uri
class ServerNotFound(Exception):
@@ -1106,7 +1109,8 @@
)
self.log_warn("Mail gateway forwarding reply back to organizer")
- _reactor.connectTCP(settings["Server"], settings["Port"], factory)
+ connect(GAIEndpoint(_reactor, settings["Server"], settings["Port"]),
+ factory)
return deferred
# Process the imip attachment; inject to calendar server
@@ -1371,7 +1375,8 @@
requireAuthentication=False,
requireTransportSecurity=settings["UseSSL"])
- _reactor.connectTCP(settings['Server'], settings['Port'], factory)
+ connect(GAIEndpoint(_reactor, settings["Server"], settings["Port"]),
+ factory)
deferred.addCallback(_success, msgId, fromAddr, toAddr)
deferred.addErrback(_failure, msgId, fromAddr, toAddr)
return deferred
@@ -1561,7 +1566,7 @@
@return: a 2-tuple of (should add icon (C{bool}), html text (C{str},
representing utf-8 encoded bytes)). The first element indicates
- whether the MIME generator needs to add a L{cid:} icon image part to
+ whether the MIME generator needs to add a C{cid:} icon image part to
satisfy the HTML links.
"""
orgCN, orgEmail = organizer
@@ -1634,20 +1639,16 @@
"""
Create a dictionary mapping slot names - specifically: summary,
description, location, dateInfo, timeInfo, durationInfo, recurrenceInfo,
- url
- - with localized string values that should be placed into the HTML and
- plain-text templates.
+ url - with localized string values that should be placed into the HTML
+ and plain-text templates.
@param calendar: a L{Component} upon which to base the language.
-
@type calendar: L{Component}
@param language: a 2-letter language code.
-
@type language: C{str}
@return: a mapping from template slot name to localized text.
-
@rtype: a C{dict} mapping C{bytes} to C{unicode}.
"""
Modified: CalendarServer/trunk/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/memcachepool.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twistedcaldav/memcachepool.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -15,10 +15,13 @@
##
from twisted.python.failure import Failure
-from twisted.internet.address import IPv4Address
+
from twisted.internet.defer import Deferred, fail
from twisted.internet.protocol import ReconnectingClientFactory
+from twext.internet.gaiendpoint import GAIEndpoint
+from twext.internet.adaptendpoint import connect
+
from twext.python.log import LoggingMixIn
from twext.protocols.memcache import MemCacheProtocol, NoSuchCommand
@@ -116,13 +119,17 @@
for each protocol.
@ivar _maxClients: A C{int} indicating the maximum number of clients.
- @ivar _serverAddress: An L{IAddress} provider indicating the server to
- connect to. (Only L{IPv4Address} currently supported.)
+
+ @ivar _endpoint: An L{IStreamClientEndpoint} provider indicating the server
+ to connect to.
+
@ivar _reactor: The L{IReactorTCP} provider used to initiate new
connections.
@ivar _busyClients: A C{set} that contains all currently busy clients.
+
@ivar _freeClients: A C{set} that contains all currently free clients.
+
@ivar _pendingConnects: A C{int} indicating how many connections are in
progress.
"""
@@ -130,15 +137,17 @@
REQUEST_LOGGING_SIZE = 1024
- def __init__(self, serverAddress, maxClients=5, reactor=None):
+ def __init__(self, endpoint, maxClients=5, reactor=None):
"""
- @param serverAddress: An L{IPv4Address} indicating the server to
+ @param endpoint: An L{IStreamClientEndpoint} indicating the server to
connect to.
+
@param maxClients: A C{int} indicating the maximum number of clients.
- @param reactor: An L{IReactorTCP{ provider used to initiate new
+
+ @param reactor: An L{IReactorTCP} provider used to initiate new
connections.
"""
- self._serverAddress = serverAddress
+ self._endpoint = endpoint
self._maxClients = maxClients
if reactor is None:
@@ -175,7 +184,7 @@
@return: A L{Deferred} that fires with the L{IProtocol} instance.
"""
self.log_debug("Initating new client connection to: %r" % (
- self._serverAddress,))
+ self._endpoint,))
self._logClientStats()
self._pendingConnects += 1
@@ -190,9 +199,7 @@
factory.connectionPool = self
- self._reactor.connectTCP(self._serverAddress.host,
- self._serverAddress.port,
- factory)
+ connect(self._endpoint, factory)
d = factory.deferred
d.addCallback(_connected)
@@ -404,31 +411,29 @@
_memCachePoolHandler = {} # Maps a handler id to a named pool
def installPools(pools, maxClients=5, reactor=None):
-
+ if reactor is None:
+ from twisted.internet import reactor
for name, pool in pools.items():
if pool["ClientEnabled"]:
_installPool(
name,
pool["HandleCacheTypes"],
- IPv4Address(
- "TCP",
- pool["BindAddress"],
- pool["Port"],
- ),
+ GAIEndpoint(reactor, pool["BindAddress"], pool["Port"]),
maxClients,
reactor,
)
-def _installPool(name, handleTypes, serverAddress, maxClients=5, reactor=None):
- pool = MemCachePool(serverAddress,
- maxClients=maxClients,
- reactor=None)
+
+def _installPool(name, handleTypes, serverEndpoint, maxClients=5, reactor=None):
+ pool = MemCachePool(serverEndpoint, maxClients=maxClients, reactor=None)
_memCachePools[name] = pool
for handle in handleTypes:
_memCachePoolHandler[handle] = pool
+
+
def defaultCachePool(name):
if name not in _memCachePoolHandler:
name = "Default"
Modified: CalendarServer/trunk/twistedcaldav/notify.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/notify.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twistedcaldav/notify.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -1,3 +1,4 @@
+# -*- test-case-name: twistedcaldav.test.test_notify -*-
##
# Copyright (c) 2005-2012 Apple Inc. All rights reserved.
#
@@ -54,6 +55,8 @@
from twistedcaldav.memcacher import Memcacher
from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
from twistedcaldav import memcachepool
+from twext.internet.gaiendpoint import GAIEndpoint
+from twext.internet.adaptendpoint import connect
log = Logger()
@@ -243,7 +246,8 @@
def send(self, op, id):
if self.factory is None:
self.factory = NotificationClientFactory(self)
- self.reactor.connectTCP(self.gatewayHost, self.gatewayPort,
+ connect(
+ GAIEndpoint(self.reactor, self.gatewayHost, self.gatewayPort),
self.factory)
self.log_debug("Creating factory")
Modified: CalendarServer/trunk/twistedcaldav/scheduling/imip.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/scheduling/imip.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twistedcaldav/scheduling/imip.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -1,3 +1,4 @@
+# -*- test-case-name: twistedcaldav.scheduling.test.test_imip -*-
##
# Copyright (c) 2005-2012 Apple Inc. All rights reserved.
#
@@ -14,6 +15,10 @@
# limitations under the License.
##
+"""
+Handles the sending of scheduling messages via iMIP (mail gateway).
+"""
+
from twisted.python.failure import Failure
from twisted.internet.defer import inlineCallbacks, returnValue
@@ -29,10 +34,9 @@
from twistedcaldav.util import AuthorizedHTTPGetter
from twistedcaldav.scheduling.delivery import DeliveryService
from twistedcaldav.scheduling.itip import iTIPRequestStatus
+from twext.internet.gaiendpoint import GAIEndpoint
+from twext.internet.adaptendpoint import connect
-"""
-Handles the sending of scheduling messages via iMIP (mail gateway).
-"""
__all__ = [
"ScheduleViaIMip",
@@ -145,6 +149,7 @@
factory.noisy = False
factory.protocol = AuthorizedHTTPGetter
- reactor.connectTCP(mailGatewayServer, mailGatewayPort, factory)
+ connect(GAIEndpoint(reactor, mailGatewayServer, mailGatewayPort),
+ factory)
return factory.deferred
Modified: CalendarServer/trunk/twistedcaldav/scheduling/ischedule.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/scheduling/ischedule.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twistedcaldav/scheduling/ischedule.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -17,7 +17,7 @@
from StringIO import StringIO
from twisted.internet.defer import inlineCallbacks, DeferredList, succeed
-from twisted.internet.protocol import ClientCreator
+from twisted.internet.protocol import Factory
from twisted.python.failure import Failure
@@ -31,12 +31,12 @@
from twext.web2.stream import MemoryStream
from twext.python.log import Logger, logLevels
-from twext.internet.ssl import ChainingOpenSSLContextFactory
from twext.web2.dav.http import ErrorResponse
+from twistedcaldav.client.pool import _configuredClientContextFactory
+
from twistedcaldav import caldavxml
from twistedcaldav.caldavxml import caldav_namespace
-from twistedcaldav.config import config
from twistedcaldav.scheduling.delivery import DeliveryService
from twistedcaldav.scheduling.ischeduleservers import IScheduleServers
from twistedcaldav.scheduling.ischeduleservers import IScheduleServerRecord
@@ -44,9 +44,8 @@
from twistedcaldav.util import utf8String, normalizationLookup
from twistedcaldav.scheduling.cuaddress import PartitionedCalendarUser, RemoteCalendarUser,\
OtherServerCalendarUser
+from twext.internet.gaiendpoint import GAIEndpoint
-import OpenSSL
-
"""
Handles the sending of iSchedule scheduling messages. Used for both cross-domain scheduling,
as well as internal partitioning or podding.
@@ -175,11 +174,14 @@
# Generate an HTTP client request
try:
from twisted.internet import reactor
+ f = Factory()
+ f.protocol = HTTPClientProtocol
if self.server.ssl:
- context = ChainingOpenSSLContextFactory(config.SSLPrivateKey, config.SSLCertificate, certificateChainFile=config.SSLAuthorityChain, sslmethod=getattr(OpenSSL.SSL, config.SSLMethod))
- proto = (yield ClientCreator(reactor, HTTPClientProtocol).connectSSL(self.server.host, self.server.port, context))
+ ep = GAIEndpoint(reactor, self.server.host, self.server.port,
+ _configuredClientContextFactory())
else:
- proto = (yield ClientCreator(reactor, HTTPClientProtocol).connectTCP(self.server.host, self.server.port))
+ ep = GAIEndpoint(reactor, self.server.host, self.server.port)
+ proto = (yield ep.connect(f))
request = ClientRequest("POST", self.server.path, self.headers, self.data)
yield self.logRequest("debug", "Sending server-to-server POST request:", request)
Modified: CalendarServer/trunk/twistedcaldav/test/test_localization.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_localization.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twistedcaldav/test/test_localization.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -16,7 +16,7 @@
from __future__ import with_statement
-from twistedcaldav.localization import translationTo
+from twistedcaldav.localization import translationTo, _
from twistedcaldav.ical import Component
from twistedcaldav.test.util import TestCase
from pycalendar.datetime import PyCalendarDateTime
Modified: CalendarServer/trunk/twistedcaldav/test/test_memcachepool.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_memcachepool.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twistedcaldav/test/test_memcachepool.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -17,6 +17,7 @@
from zope.interface import implements
from twisted.internet.interfaces import IConnector, IReactorTCP
+from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.internet.address import IPv4Address
from twistedcaldav.test.util import InMemoryMemcacheProtocol
@@ -203,10 +204,19 @@
"""
TestCase.setUp(self)
self.reactor = StubReactor()
- self.pool = MemCachePool(MC_ADDRESS,
- maxClients=5,
- reactor=self.reactor)
+ self.pool = MemCachePool(
+ TCP4ClientEndpoint(self.reactor, MC_ADDRESS.host, MC_ADDRESS.port),
+ maxClients=5, reactor=self.reactor
+ )
+ realClientFactory = self.pool.clientFactory
+ self.clientFactories = []
+ def capturingClientFactory(*a, **k):
+ cf = realClientFactory(*a, **k)
+ self.clientFactories.append(cf)
+ return cf
+ self.pool.clientFactory = capturingClientFactory
+
def test_clientFreeAddsNewClient(self):
"""
Test that a client not in the busy set gets added to the free set.
@@ -286,27 +296,21 @@
Test that L{MemCachePool.performRequest} on a fresh instance causes
a new connection to be created.
"""
- def _checkResult(result):
- self.assertEquals(result, (0, 'bar'))
-
-
+ results = []
p = InMemoryMemcacheProtocol()
p.set('foo', 'bar')
d = self.pool.performRequest('get', 'foo')
- d.addCallback(_checkResult)
+ d.addCallback(results.append)
args, kwargs = self.reactor.calls.pop()
self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
- self.failUnless(isinstance(args[2], MemCacheClientFactory))
- self.assertEquals(kwargs, {})
- args[2].deferred.callback(p)
+ self.clientFactories[-1].deferred.callback(p)
+ self.assertEquals(results, [(0, 'bar')])
- return d
-
def test_performRequestUsesFreeConnection(self):
"""
Test that L{MemCachePool.performRequest} doesn't create a new connection
@@ -323,7 +327,6 @@
d = self.pool.performRequest('get', 'foo')
d.addCallback(_checkResult)
-
return d
@@ -374,19 +377,13 @@
self.pool.clientBusy(p)
- d = self.pool.performRequest('get', 'foo')
+ self.pool.performRequest('get', 'foo')
args, kwargs = self.reactor.calls.pop()
self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
- self.failUnless(isinstance(args[2], MemCacheClientFactory))
- self.assertEquals(kwargs, {})
- args[2].deferred.callback(p1)
- return d
-
-
def test_pendingConnectionsCountAgainstMaxClients(self):
"""
Test that L{MemCachePool.performRequest} will not initiate a new
@@ -395,17 +392,11 @@
"""
self.pool.suggestMaxClients(1)
- d = self.pool.performRequest('get', 'foo')
+ self.pool.performRequest('get', 'foo')
args, kwargs = self.reactor.calls.pop()
self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
- self.failUnless(isinstance(args[2], MemCacheClientFactory))
- self.assertEquals(kwargs, {})
self.pool.performRequest('get', 'bar')
self.assertEquals(self.reactor.calls, [])
-
- args[2].deferred.callback(InMemoryMemcacheProtocol())
-
- return d
Modified: CalendarServer/trunk/twistedcaldav/util.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/util.py 2012-04-13 18:48:13 UTC (rev 9105)
+++ CalendarServer/trunk/twistedcaldav/util.py 2012-04-13 18:59:38 UTC (rev 9106)
@@ -28,6 +28,7 @@
from twext.python.log import LoggingMixIn, Logger
log = Logger()
+from twext.internet.gaiendpoint import GAIEndpoint
##
# System Resources (Memory size and processor count)
@@ -399,10 +400,13 @@
self.factory.headers['Authorization'] = response
if self.factory.scheme == 'https':
- reactor.connectSSL(self.factory.host, self.factory.port,
- self.factory, ssl.ClientContextFactory())
+ connect(
+ GAIEndpoint(reactor, self.factory.host, self.factory.port,
+ ssl.ClientContextFactory()),
+ self.factory)
else:
- reactor.connectTCP(self.factory.host, self.factory.port,
+ connect(
+ GAIEndpoint(reactor, self.factory.host, self.factory.port),
self.factory)
# self.log_debug("Retrying with digest after 401")
@@ -416,10 +420,13 @@
self.factory.headers['Authorization'] = basicauth
if self.factory.scheme == 'https':
- reactor.connectSSL(self.factory.host, self.factory.port,
- self.factory, ssl.ClientContextFactory())
+ connect(
+ GAIEndpoint(reactor, self.factory.host, self.factory.port,
+ ssl.ClientContextFactory()),
+ self.factory)
else:
- reactor.connectTCP(self.factory.host, self.factory.port,
+ connect(
+ GAIEndpoint(reactor, self.factory.host, self.factory.port),
self.factory)
# self.log_debug("Retrying with basic after 401")
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120413/a7caa603/attachment-0001.html>
More information about the calendarserver-changes
mailing list