[CalendarServer-changes] [2524] CalendarServer/branches/memcache-reconnect/twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Mon Jun 2 16:45:10 PDT 2008
Revision: 2524
http://trac.macosforge.org/projects/calendarserver/changeset/2524
Author: dreid at apple.com
Date: 2008-06-02 16:45:09 -0700 (Mon, 02 Jun 2008)
Log Message:
-----------
Start of MemCache connection pooling.
Added Paths:
-----------
CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py
Added: CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py (rev 0)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py 2008-06-02 23:45:09 UTC (rev 2524)
@@ -0,0 +1,195 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.internet.defer import Deferred
+from twisted.internet.protocol import ReconnectingClientFactory
+
+from twistedcaldav.memcache import MemCacheProtocol
+
+
+class PooledMemCacheProtocol(MemCacheProtocol):
+ """
+ A MemCacheProtocol that will notify a connectionPool that it is ready
+ to accept requests.
+
+ @ivar factory: A L{MemCacheClientFactory} instance.
+ @ivar connectionPool: A managing connection pool that we notify of events.
+ """
+ factory = None
+ connectionPool = None
+
+ def connectionMade(self):
+ """
+ Notify our connectionPool that we're ready to accept connections.
+ """
+ MemCacheProtocol.connectionMade(self)
+ self.connectionPool.clientFree(self)
+
+ if self.factory.deferred is not None:
+ self.factory.deferred.callback(self)
+ self.factory.deferred = None
+
+
+
+class MemCacheClientFactory(ReconnectingClientFactory):
+ """
+ A client factory for MemCache that reconnects and notifies a pool of it's
+ state.
+
+ @ivar connectionPool: A managing connection pool that we notify of events.
+ @ivar deferred: A L{Deferred} that represents the initial connection.
+ @ivar _protocolInstance: The current instance of our protocol that we pass
+ to our connectionPool.
+ """
+ protocol = PooledMemCacheProtocol
+ connectionPool = None
+ _protocolInstance = None
+
+
+ def __init__(self):
+ self.deferred = Deferred()
+
+
+ def clientConnectionLost(self, connector, reason):
+ """
+ Notify the connectionPool that we've lost our connection.
+ """
+ if self._protocolInstance is not None:
+ self.connectionPool.clientBusy(self._protocolInstance)
+
+ ReconnectingClientFactory.clientConnectionLost(
+ self,
+ connector,
+ reason)
+
+
+ def clientConnectionFailed(self, connector, reason):
+ """
+ Notify the connectionPool that we're unable to connect
+ """
+ if self._protocolInstance is not None:
+ self.connectionPool.clientBusy(self._protocolInstance)
+
+ ReconnectingClientFactory.clientConnectionFailed(
+ self,
+ connector,
+ reason)
+
+
+ def buildProtocol(self, addr):
+ """
+ Attach the C{self.connectionPool} to the protocol so it can tell it,
+ when we've connected.
+ """
+ self._protocolInstance = self.protocol()
+ self._protocolInstance.connectionPool = self.connectionPool
+ self._protocolInstance.factory = self
+ return self._protocolInstance
+
+
+
+class MemCachePool(object):
+ """
+ A connection pool for MemCacheProtocol instances.
+
+ @ivar clientFactory: The L{ClientFactory} implementation that will be used
+ for each protocol.
+
+ @ivar _maxClients: A C{int} indicating the maximum number of clients.
+ @ivar _serverAddress: An L{IAddress} provider indicating the server to
+ connect to. (Only L{IPv4Address} currently supported.)
+ @ivar _reactor: The L{IReactorTCP} provider used to initiate new
+ connections.
+
+ @ivar _busyClients: A C{set} that contains all currently busy clients.
+ @ivar _freeClients: A C{set} that contains all currently free clients.
+ """
+ clientFactory = MemCacheClientFactory
+
+ def __init__(self, serverAddress, maxClients=5, reactor=None):
+ """
+ @param serverAddress: An L{IPv4Address} indicating the server to
+ connect to.
+ @param maxClients: A C{int} indicating the maximum number of clients.
+ @param reactor: An L{IReactorTCP{ provider used to initiate new
+ connections.
+ """
+ self._serverAddress = serverAddress
+ self._maxClients = maxClients
+
+ if reactor is None:
+ from twisted.internet import reactor
+
+ self._reactor = reactor
+
+ self._busyClients = set([])
+ self._freeClients = set([])
+
+
+ def _newClientConnection(self):
+ """
+ Create a new client connection.
+
+ @return: A L{Deferred} that fires with the L{IProtocol} instance.
+ """
+ factory = self.clientFactory()
+
+ factory.connectionPool = self
+
+ self._reactor.connectTCP(self._serverAddress.host,
+ self._serverAddress.port,
+ factory)
+ return factory.deferred
+
+
+ def performRequest(self, command, *args, **kwargs):
+ """
+ Select an available client and perform the given request on it.
+
+ @param command: A C{str} representing an attribute of
+ L{MemCacheProtocol}.
+ @parma args: Any positional arguments that should be passed to
+ C{command}.
+ @param kwargs: Any keyword arguments that should be passed to
+ C{command}.
+
+ @return: A L{Deferred} that fires with the result of the given command.
+ """
+ d = self._newClientConnection()
+ return d
+
+ def clientBusy(self, client):
+ """
+ Notify that the given client is being used to complete a request.
+
+ @param client: An instance of C{self.clientFactory}
+ """
+ if client in self._freeClients:
+ self._freeClients.remove(client)
+
+ self._busyClients.add(client)
+
+
+ def clientFree(self, client):
+ """
+ Notify that the given client is free to handle more requests.
+
+ @param client: An instance of C{self.clientFactory}
+ """
+ if client in self._busyClients:
+ self._busyClients.remove(client)
+
+ self._freeClients.add(client)
Added: CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py (rev 0)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py 2008-06-02 23:45:09 UTC (rev 2524)
@@ -0,0 +1,271 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from zope.interface import implements
+
+from twisted.trial.unittest import TestCase
+
+from twisted.internet.interfaces import IConnector, IReactorTCP
+from twisted.internet.address import IPv4Address
+
+from twistedcaldav.memcachepool import PooledMemCacheProtocol
+from twistedcaldav.memcachepool import MemCacheClientFactory
+from twistedcaldav.memcachepool import MemCachePool
+
+
+MC_ADDRESS = IPv4Address('TCP', '127.0.0.1', 11211)
+
+
+class StubConnectionPool(object):
+ """
+ A stub client connection pool that records it's calls in the form of a list
+ of (status, client) tuples where status is C{'free'} or C{'busy'}
+
+ @ivar calls: A C{list} of C{tuple}s of the form C{(status, client)} where
+ status is C{'free'} or C{'busy'} and client is the protocol instance
+ that made the call.
+ """
+ def __init__(self):
+ self.calls = []
+
+
+ def clientFree(self, client):
+ """
+ Record a C{'free'} call for C{client}.
+ """
+ self.calls.append(('free', client))
+
+
+ def clientBusy(self, client):
+ """
+ Record a C{'busy'} call for C{client}.
+ """
+ self.calls.append(('busy', client))
+
+
+
+class StubConnector(object):
+ """
+ A stub L{IConnector} that can be used for testing.
+ """
+ implements(IConnector)
+
+ def connect(self):
+ """
+ A L{IConnector.connect} implementation that doesn't do anything.
+ """
+
+
+ def stopConnecting(self):
+ """
+ A L{IConnector.stopConnecting} that doesn't do anything.
+ """
+
+
+
+class StubReactor(object):
+ """
+ A stub L{IReactorTCP} that records the calls to connectTCP.
+
+ @ivar calls: A C{list} of tuples (args, kwargs) sent to connectTCP.
+ """
+ implements(IReactorTCP)
+
+ def __init__(self):
+ self.calls = []
+
+
+ def connectTCP(self, *args, **kwargs):
+ self.calls.append((args, kwargs))
+ return StubConnector()
+
+
+
+class PooledMemCacheProtocolTests(TestCase):
+ """
+ Tests for the L{PooledMemCacheProtocol}
+ """
+ def test_connectionMadeNotifiesPool(self):
+ """
+ Test that L{PooledMemCacheProtocol.connectionMade} notifies the
+ connectionPool that it is free.
+ """
+ p = PooledMemCacheProtocol()
+ p.factory = MemCacheClientFactory()
+ p.connectionPool = StubConnectionPool()
+ p.connectionMade()
+
+ self.assertEquals(p.connectionPool.calls, [('free', p)])
+
+ return p.factory.deferred
+
+
+ def test_connectionMadeFiresDeferred(self):
+ """
+ Test that L{PooledMemCacheProtocol.connectionMade} fires the factory's
+ deferred.
+ """
+ p = PooledMemCacheProtocol()
+ p.factory = MemCacheClientFactory()
+ p.connectionPool = StubConnectionPool()
+ d = p.factory.deferred
+ d.addCallback(self.assertEquals, p)
+
+ p.connectionMade()
+ return d
+
+
+class MemCacheClientFactoryTests(TestCase):
+ """
+ Tests for the L{MemCacheClientFactory}
+
+ @ivar factory: A L{MemCacheClientFactory} instance with a
+ L{StubConnectionPool}.
+ @ivar protocol: A L{PooledMemCacheProtocol} that was built by
+ L{MemCacheClientFactory.buildProtocol}.
+ @ivar pool: The L{StubConnectionPool} attached to C{self.factory} and
+ C{self.protocol}.
+ """
+ def setUp(self):
+ """
+ Create a L{MemCacheClientFactory} instance and and give it a
+ L{StubConnectionPool} instance.
+ """
+ self.pool = StubConnectionPool()
+ self.factory = MemCacheClientFactory()
+ self.factory.connectionPool = self.pool
+ self.protocol = self.factory.buildProtocol(None)
+
+
+ def test_buildProtocolGivesProtocolConnectionPool(self):
+ """
+ Test that L{MemCacheClientFactory.buildProtocol} gives it's
+ protocol instance it's connectionPool.
+ """
+ self.assertEquals(self.factory.connectionPool,
+ self.protocol.connectionPool)
+
+
+ def test_clientConnectionFailedNotifiesPool(self):
+ """
+ Test that L{MemCacheClientFactory.clientConnectionFailed} notifies
+ the it's connectionPool that it is busy.
+ """
+ self.factory.clientConnectionFailed(StubConnector(), None)
+ self.assertEquals(self.factory.connectionPool.calls,
+ [('busy', self.protocol)])
+
+
+ def test_clientConnectionLostNotifiesPool(self):
+ """
+ Test that L{MemCacheClientFactory.clientConnectionLost} notifies
+ the it's connectionPool that it is busy.
+ """
+ self.factory.clientConnectionLost(StubConnector(), None)
+ self.assertEquals(self.factory.connectionPool.calls,
+ [('busy', self.protocol)])
+
+
+ def tearDown(self):
+ """
+ Make sure the L{MemCacheClientFactory} isn't trying to reconnect
+ anymore.
+ """
+ self.factory.stopTrying()
+
+
+
+class MemCachePoolTests(TestCase):
+ """
+ Tests for L{MemCachePool}.
+
+ @ivar reactor: A L{StubReactor} instance.
+ @ivar pool: A L{MemCachePool} for testing.
+ """
+ def setUp(self):
+ """
+ Create a L{MemCachePool}.
+ """
+ self.reactor = StubReactor()
+ self.pool = MemCachePool(MC_ADDRESS,
+ maxClients=5,
+ reactor=self.reactor)
+
+
+ def test_clientFreeAddsNewClient(self):
+ """
+ Test that a client not in the busy set gets added to the free set.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+ self.pool.clientFree(p)
+
+ self.assertEquals(self.pool._freeClients, set([p]))
+
+
+ def test_clientFreeAddsBusyClient(self):
+ """
+ Test that a client in the busy set gets moved to the free set.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+
+ self.pool.clientBusy(p)
+ self.pool.clientFree(p)
+
+ self.assertEquals(self.pool._freeClients, set([p]))
+ self.assertEquals(self.pool._busyClients, set([]))
+
+
+ def test_clientBusyAddsNewClient(self):
+ """
+ Test that a client not in the free set gets added to the busy set.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+ self.pool.clientBusy(p)
+
+ self.assertEquals(self.pool._busyClients, set([p]))
+
+
+ def test_clientBusyAddsFreeClient(self):
+ """
+ Test that a client in the free set gets moved to the busy set.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+
+ self.pool.clientFree(p)
+ self.pool.clientBusy(p)
+
+ self.assertEquals(self.pool._busyClients, set([p]))
+ self.assertEquals(self.pool._freeClients, set([]))
+
+
+ def test_performRequestCreatesConnection(self):
+ """
+ Test that L{MemCachePool.performRequest} on a fresh instance causes
+ a new connection to be created.
+ """
+ d = self.pool.performRequest('get', 'foo')
+
+ args, kwargs = self.reactor.calls.pop()
+
+ self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
+ self.failUnless(isinstance(args[2], MemCacheClientFactory))
+ self.assertEquals(kwargs, {})
+
+ factory = args[2]
+ protocol = factory.buildProtocol(None)
+ protocol.connectionMade()
+
+ return d
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080602/d2f166af/attachment-0001.htm
More information about the calendarserver-changes
mailing list