[CalendarServer-changes] [7741] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Thu Jul 7 18:51:16 PDT 2011


Revision: 7741
          http://trac.macosforge.org/projects/calendarserver/changeset/7741
Author:   sagen at apple.com
Date:     2011-07-07 18:51:16 -0700 (Thu, 07 Jul 2011)
Log Message:
-----------
Adds memcached data expiration as a safety net to handle the case where the
proxy cache updater is bounced but memcached isn't.  Also adds a config option
to not spawn a proxy sidecar, to facilitate having multiple machines share
the same proxy updater.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/conf/caldavd-test.plist
    CalendarServer/trunk/twistedcaldav/directory/calendaruserproxy.py
    CalendarServer/trunk/twistedcaldav/directory/test/test_proxyprincipalmembers.py
    CalendarServer/trunk/twistedcaldav/memcacher.py
    CalendarServer/trunk/twistedcaldav/stdconfig.py
    CalendarServer/trunk/twistedcaldav/test/test_memcacher.py

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2011-07-07 21:06:24 UTC (rev 7740)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2011-07-08 01:51:16 UTC (rev 7741)
@@ -506,7 +506,7 @@
             self.monitor.addProcess("mailgateway", mailGatewayArgv,
                                env=PARENT_ENVIRONMENT)
 
-        if config.ProxyCaching.Enabled:
+        if config.ProxyCaching.Enabled and config.ProxyCaching.EnableUpdater:
             self.maker.log_info("Adding proxy caching service")
 
             proxyCacherArgv = [

Modified: CalendarServer/trunk/conf/caldavd-test.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd-test.plist	2011-07-07 21:06:24 UTC (rev 7740)
+++ CalendarServer/trunk/conf/caldavd-test.plist	2011-07-08 01:51:16 UTC (rev 7741)
@@ -903,10 +903,14 @@
     <dict>
       <key>Enabled</key>
       <false/>
+      <key>EnableUpdater</key>
+      <true/>
       <key>MemcachedPool</key>
       <string>ProxyDB</string>
       <key>UpdateSeconds</key>
       <integer>300</integer>
+      <key>ExpireSeconds</key>
+      <integer>3600</integer>
     </dict>
 
     <!--

Modified: CalendarServer/trunk/twistedcaldav/directory/calendaruserproxy.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/directory/calendaruserproxy.py	2011-07-07 21:06:24 UTC (rev 7740)
+++ CalendarServer/trunk/twistedcaldav/directory/calendaruserproxy.py	2011-07-08 01:51:16 UTC (rev 7741)
@@ -886,9 +886,10 @@
     """
 
 
-    def setMembers(self, guid, members):
+    def setMembers(self, guid, members, expireSeconds=0):
         self.log_debug("set proxy-members %s : %s" % (guid, members))
-        return self.set("proxy-members:%s" % (str(guid),), str(",".join(members)))
+        return self.set("proxy-members:%s" % (str(guid),), str(",".join(members)), expire_time=expireSeconds)
+
     def getMembers(self, guid):
         self.log_debug("get proxy-members %s" % (guid,))
         def _value(value):
@@ -905,11 +906,12 @@
     def deleteMembers(self, guid):
         return self.delete("proxy-members:%s" % (str(guid),))
 
-    def setProxyFor(self, guid, proxyType, memberships):
+    def setProxyFor(self, guid, proxyType, memberships, expireSeconds=0):
         self.log_debug("set %s-proxy-for %s : %s" %
             (proxyType, guid, memberships))
         return self.set("%s-proxy-for:%s" %
-            (proxyType, str(guid)), str(",".join(memberships)))
+            (proxyType, str(guid)), str(",".join(memberships)),
+            expire_time=expireSeconds)
 
     def getProxyFor(self, guid, proxyType):
         self.log_debug("get %s-proxy-for %s" % (proxyType, guid))
@@ -927,8 +929,9 @@
     def deleteProxyFor(self, guid, proxyType):
         return self.delete("%s-proxy-for:%s" % (proxyType, str(guid),))
 
-    def createMarker(self):
-        return self.set("proxy-cache-populated", "true")
+    def createMarker(self, expireSeconds=0):
+        return self.set("proxy-cache-populated", "true",
+            expire_time=expireSeconds)
 
     def checkMarker(self):
         def _value(value):
@@ -937,6 +940,7 @@
         d.addCallback(_value)
         return d
 
+
 class ProxyMemberCacheUpdater(LoggingMixIn):
     """
     Responsible for updating memcached with proxy assignments.  This will run
@@ -946,7 +950,8 @@
     TODO: Implement location/resource
     """
 
-    def __init__(self, proxyDB, directory, cache=None, namespace=None):
+    def __init__(self, proxyDB, directory, expireSeconds, cache=None,
+        namespace=None):
         self.proxyDB = proxyDB
         self.directory = directory
         if cache is None:
@@ -958,6 +963,7 @@
             "read" : { },
             "write" : { },
         }
+        self.expireSeconds = expireSeconds
 
     @inlineCallbacks
     def updateCache(self):
@@ -1001,7 +1007,8 @@
                         guids = set([r.guid for r in members])
                         combinedGUIDs.update(guids)
 
-            self.cache.setMembers(proxyGroup, combinedGUIDs)
+            self.cache.setMembers(proxyGroup, combinedGUIDs,
+                expireSeconds=self.expireSeconds)
             numProxyGroupsUpdated += 1
 
             if proxyGroup in self.previousProxyGroups:
@@ -1019,7 +1026,8 @@
 
         for proxyType in ("read", "write"):
             for guid, memberships in currentProxyFor[proxyType].iteritems():
-                self.cache.setProxyFor(guid, proxyType, memberships)
+                self.cache.setProxyFor(guid, proxyType, memberships,
+                    expireSeconds=self.expireSeconds)
                 numProxiesUpdated[proxyType] += 1
                 if self.previousProxyFor[proxyType].has_key(guid):
                     # whatever remains in previousProxyFor needs to be deleted
@@ -1028,15 +1036,18 @@
 
         self.log_debug("%d proxyGroups updated" % (numProxyGroupsUpdated,))
         for proxyType in ("read", "write"):
-            self.log_debug("%d %s proxies updated" % (numProxiesUpdated[proxyType], proxyType))
+            self.log_debug("%d %s proxies updated" %
+                (numProxiesUpdated[proxyType], proxyType))
 
         # Delete obsolete memcached keys
         for proxyGroup in self.previousProxyGroups:
-            self.log_debug("Deleting proxyGroup members for %s" % (proxyGroup,))
+            self.log_debug("Deleting proxyGroup members for %s" %
+                (proxyGroup,))
             self.cache.deleteMembers(proxyGroup)
         for proxyType in ("read", "write"):
             for guid in self.previousProxyFor[proxyType].iterkeys():
-                self.log_debug("Deleting %s proxyFor members for %s" % (proxyType, guid,))
+                self.log_debug("Deleting %s proxyFor members for %s" %
+                    (proxyType, guid,))
                 self.cache.deleteProxyFor(guid, proxyType)
 
         self.previousProxyGroups = currentProxyGroups
@@ -1044,7 +1055,7 @@
 
         # Put a special key into memcached to let workers know proxyCache is
         # populated
-        self.cache.createMarker()
+        self.cache.createMarker(expireSeconds=self.expireSeconds)
 
 
 class ProxyCacherOptions(Options):
@@ -1139,28 +1150,36 @@
     Service to update the proxy cache at a configured interval
     """
 
-    def __init__(self, proxyDB, directory, namespace, seconds, reactor=None):
+    def __init__(self, proxyDB, directory, namespace, updateSeconds,
+        expireSeconds, reactor=None, updateMethod=None):
+
         self.updater = ProxyMemberCacheUpdater(proxyDB, directory,
-            namespace=namespace)
+            expireSeconds, namespace=namespace)
+
         if reactor is None:
             from twisted.internet import reactor
         self.reactor = reactor
-        self.seconds = seconds
+        self.updateSeconds = updateSeconds
         self.nextUpdate = None
+        if updateMethod:
+            self.updateMethod = updateMethod
+        else:
+            self.updateMethod = self.updater.updateCache
 
     def startService(self):
         self.log_warn("Starting proxy cacher service")
         service.Service.startService(self)
-        self.update()
+        return self.update()
 
     @inlineCallbacks
     def update(self):
         self.nextUpdate = None
         try:
-            yield self.updater.updateCache()
+            yield self.updateMethod()
         finally:
             self.log_debug("Scheduling next proxy cacher update")
-            self.nextUpdate = self.reactor.callLater(self.seconds, self.update)
+            self.nextUpdate = self.reactor.callLater(self.updateSeconds,
+                self.update)
 
     def stopService(self):
         self.log_warn("Stopping proxy cacher service")
@@ -1204,7 +1223,9 @@
 
         proxyCacherService = ProxyCacherService(proxyDB, directory,
             config.ProxyCaching.MemcachedPool,
-            config.ProxyCaching.UpdateSeconds)
+            config.ProxyCaching.UpdateSeconds,
+            config.ProxyCaching.ExpireSeconds
+            )
 
         return proxyCacherService
 

Modified: CalendarServer/trunk/twistedcaldav/directory/test/test_proxyprincipalmembers.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/directory/test/test_proxyprincipalmembers.py	2011-07-07 21:06:24 UTC (rev 7740)
+++ CalendarServer/trunk/twistedcaldav/directory/test/test_proxyprincipalmembers.py	2011-07-08 01:51:16 UTC (rev 7741)
@@ -29,6 +29,7 @@
 from twistedcaldav.config import config
 from twistedcaldav.directory import augment, calendaruserproxy
 from twistedcaldav.directory.calendaruserproxyloader import XMLCalendarUserProxyLoader
+from twisted.internet.task import Clock
 
 
 class ProxyPrincipals (twistedcaldav.test.util.TestCase):
@@ -630,8 +631,9 @@
         self.directoryService.proxyCache = cache
 
         proxyGroup = delegator.getChild("calendar-proxy-write")
-        yield cache.setMembers(proxyGroup.uid, [cdaboo, lecroy])
-        yield cache.createMarker()
+        yield cache.setMembers(proxyGroup.uid, [cdaboo, lecroy],
+            expireSeconds=10)
+        yield cache.createMarker(expireSeconds=20)
 
         members = (yield proxyGroup.expandedGroupMembers())
         self.assertEquals(
@@ -639,7 +641,14 @@
             set([cdaboo, lecroy])
         )
 
+        # Now go 10 seconds in the future and the key will be expired
+        # Note the marker is set to expire later, otherwise the
+        # expandedGroupMembers( ) call would get a 503
+        cache._memcacheProtocol.advanceClock(10)
+        members = (yield proxyGroup.expandedGroupMembers())
+        self.assertEquals(members, set())
 
+
     @inlineCallbacks
     def test_proxyMemberCacheUpdater(self):
         """
@@ -649,7 +658,7 @@
         """
         cache = calendaruserproxy.ProxyMemberCache("ProxyDB")
         updater = calendaruserproxy.ProxyMemberCacheUpdater(
-            calendaruserproxy.ProxyDBService, self.directoryService,
+            calendaruserproxy.ProxyDBService, self.directoryService, 30,
             cache=cache)
         yield updater.updateCache()
 
@@ -858,3 +867,37 @@
             pass
         else:
             self.fail("HTTPError was unexpectedly not raised")
+
+
+    def _updateMethod(self):
+        """
+        Update a counter in the following test
+        """
+        self.count += 1
+
+    @inlineCallbacks
+    def test_proxyCacherService(self):
+        """
+        Instantiate a ProxyCacherService and make sure its update method
+        fires at the right interval, in this case 30 seconds.  The updateMethod
+        keyword arg is purely for testing purposes, so we can directly detect
+        it getting called in this test.
+        """
+        clock = Clock()
+        self.count = 0
+        service = calendaruserproxy.ProxyCacherService(
+            calendaruserproxy.ProxyDBService,
+            self.directoryService, "Testing", 30, 60, reactor=clock,
+            updateMethod=self._updateMethod)
+
+        yield service.startService()
+
+        self.assertEquals(self.count, 1)
+        clock.advance(29)
+        self.assertEquals(self.count, 1)
+        clock.advance(1)
+        self.assertEquals(self.count, 2)
+
+        service.stopService()
+
+

Modified: CalendarServer/trunk/twistedcaldav/memcacher.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/memcacher.py	2011-07-07 21:06:24 UTC (rev 7740)
+++ CalendarServer/trunk/twistedcaldav/memcacher.py	2011-07-08 01:51:16 UTC (rev 7741)
@@ -47,21 +47,29 @@
 
         def __init__(self):
             self._cache = {}
+            self._clock = 0
 
-
         def add(self, key, value, expireTime=0):
             if key not in self._cache:
-                self._cache[key] = value
+                if not expireTime:
+                    expireTime = 99999
+                self._cache[key] = (value, self._clock + expireTime)
                 return succeed(True)
             else:
                 return succeed(False)
 
         def set(self, key, value, expireTime=0):
-            self._cache[key] = value
+            if not expireTime:
+                expireTime = 99999
+            self._cache[key] = (value, self._clock + expireTime)
             return succeed(True)
 
         def get(self, key):
-            return succeed((0, self._cache.get(key, None),))
+            value, expires = self._cache.get(key, (None, 0))
+            if self._clock >= expires:
+                return succeed((0, None,))
+            else:
+                return succeed((0, value,))
 
         def delete(self, key):
             try:
@@ -73,6 +81,9 @@
         def flush_all(self):
             self._cache = {}
             return succeed(True)
+
+        def advanceClock(self, seconds):
+            self._clock += seconds
             
     #TODO: an sqlite based cacher that can be used for multiple instance servers
     # in the absence of memcached. This is not ideal and we may want to not implement

Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py	2011-07-07 21:06:24 UTC (rev 7740)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py	2011-07-08 01:51:16 UTC (rev 7741)
@@ -740,6 +740,8 @@
         "Enabled": False,
         "MemcachedPool" : "ProxyDB",
         "UpdateSeconds" : 300,
+        "ExpireSeconds" : 3600,
+        "EnableUpdater" : True,
     },
 
     "EnableKeepAlive": True,

Modified: CalendarServer/trunk/twistedcaldav/test/test_memcacher.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_memcacher.py	2011-07-07 21:06:24 UTC (rev 7740)
+++ CalendarServer/trunk/twistedcaldav/test/test_memcacher.py	2011-07-08 01:51:16 UTC (rev 7741)
@@ -125,3 +125,26 @@
             self.assertTrue("\n" not in key)
             self.assertTrue("\t" not in key)
             self.assertTrue("\r" not in key)
+
+    @inlineCallbacks
+    def test_expiration(self):
+
+        config.ProcessType = "Single"
+        cacher = Memcacher("testing")
+
+        # Expire this key in 10 seconds
+        result = yield cacher.set("akey", "avalue", 10)
+        self.assertTrue(result)
+
+        result = yield cacher.get("akey")
+        self.assertEquals("avalue", result)
+
+        # Advance time 9 seconds, key still there
+        cacher._memcacheProtocol.advanceClock(9)
+        result = yield cacher.get("akey")
+        self.assertEquals("avalue", result)
+
+        # Advance time 1 more second, key expired
+        cacher._memcacheProtocol.advanceClock(1)
+        result = yield cacher.get("akey")
+        self.assertEquals(None, result)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110707/1e5e9355/attachment-0001.html>


More information about the calendarserver-changes mailing list