[CalendarServer-changes] [10977] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Tue Apr 2 18:26:02 PDT 2013


Revision: 10977
          http://trac.calendarserver.org//changeset/10977
Author:   sagen at apple.com
Date:     2013-04-02 18:26:02 -0700 (Tue, 02 Apr 2013)
Log Message:
-----------
Eliminate group cacher sidecar.  Master schedules initial mail poll so each worker process doesn't need to.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/twisted/plugins/caldav.py
    CalendarServer/trunk/twistedcaldav/directory/directory.py
    CalendarServer/trunk/twistedcaldav/directory/test/test_directory.py
    CalendarServer/trunk/twistedcaldav/scheduling/imip/inbound.py
    CalendarServer/trunk/twistedcaldav/stdconfig.py
    CalendarServer/trunk/twistedcaldav/upgrade.py
    CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2013-04-02 20:31:37 UTC (rev 10976)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2013-04-03 01:26:02 UTC (rev 10977)
@@ -70,6 +70,8 @@
 
 from twistedcaldav.config import ConfigurationError
 from twistedcaldav.config import config
+from twistedcaldav.directory import calendaruserproxy
+from twistedcaldav.directory.directory import GroupMembershipCacheUpdater
 from twistedcaldav.localization import processLocalizationFiles
 from twistedcaldav import memcachepool
 from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
@@ -542,29 +544,8 @@
             )
             self.monitor.addProcessObject(process, PARENT_ENVIRONMENT)
 
-        if config.GroupCaching.Enabled and config.GroupCaching.EnableUpdater:
-            self.maker.log_info("Adding group caching service")
 
-            groupMembershipCacherArgv = [
-                sys.executable,
-                sys.argv[0],
-            ]
-            if config.UserName:
-                groupMembershipCacherArgv.extend(("-u", config.UserName))
-            if config.GroupName:
-                groupMembershipCacherArgv.extend(("-g", config.GroupName))
-            groupMembershipCacherArgv.extend((
-                "--reactor=%s" % (config.Twisted.reactor,),
-                "-n", self.maker.groupMembershipCacherTapName,
-                "-f", self.configPath,
-                "-o", "PIDFile=groupcacher.pid",
-            ))
 
-            self.monitor.addProcess("groupcacher", groupMembershipCacherArgv,
-                               env=PARENT_ENVIRONMENT)
-
-
-
 class ReExecService(MultiService, LoggingMixIn):
     """
     A MultiService which catches SIGHUP and re-exec's the process.
@@ -621,12 +602,7 @@
     description = "Calendar and Contacts Server"
     options = CalDAVOptions
 
-    #
-    # Default tap names
-    #
-    groupMembershipCacherTapName = "caldav_groupcacher"
 
-
     def makeService(self, options):
         """
         Create the top-level service.
@@ -774,10 +750,24 @@
         else:
             mailRetriever = None
 
+        # Optionally set up group cacher
+        if config.GroupCaching.Enabled:
+            groupCacher = GroupMembershipCacheUpdater(
+                calendaruserproxy.ProxyDBService,
+                directory,
+                config.GroupCaching.UpdateSeconds,
+                config.GroupCaching.ExpireSeconds,
+                namespace=config.GroupCaching.MemcachedPool,
+                useExternalProxies=config.GroupCaching.UseExternalProxies
+                )
+        else:
+            groupCacher = None
+
         def decorateTransaction(txn):
             txn._pushDistributor = pushDistributor
             txn._rootResource = result.rootResource
             txn._mailRetriever = mailRetriever
+            txn._groupCacher = groupCacher
 
         store.callWithNewTransactions(decorateTransaction)
 
@@ -1016,11 +1006,35 @@
     def makeService_Single(self, options):
         """
         Create a service to be used in a single-process, stand-alone
-        configuration.
+        configuration.  Memcached will be spawned automatically.
         """
         def slaveSvcCreator(pool, store, logObserver):
             result = self.requestProcessingService(options, store, logObserver)
 
+            # Optionally launch memcached.  Note, this is not going through a
+            # ProcessMonitor because there is code elsewhere that needs to
+            # access memcached before startService() gets called, so we're just
+            # directly using Popen to spawn memcached.
+            for name, pool in config.Memcached.Pools.items():
+                if pool.ServerEnabled:
+                    self.log_info(
+                        "Adding memcached service for pool: %s" % (name,)
+                    )
+                    memcachedArgv = [
+                        config.Memcached.memcached,
+                        "-p", str(pool.Port),
+                        "-l", pool.BindAddress,
+                        "-U", "0",
+                    ]
+                    if config.Memcached.MaxMemory is not 0:
+                        memcachedArgv.extend(
+                            ["-m", str(config.Memcached.MaxMemory)]
+                        )
+                    if config.UserName:
+                        memcachedArgv.extend(["-u", config.UserName])
+                    memcachedArgv.extend(config.Memcached.Options)
+                    Popen(memcachedArgv)
+
             # Optionally set up push notifications
             pushDistributor = None
             if config.Notifications.Enabled:
@@ -1049,10 +1063,24 @@
             else:
                 mailRetriever = None
 
+            # Optionally set up group cacher
+            if config.GroupCaching.Enabled:
+                groupCacher = GroupMembershipCacheUpdater(
+                    calendaruserproxy.ProxyDBService,
+                    directory,
+                    config.GroupCaching.UpdateSeconds,
+                    config.GroupCaching.ExpireSeconds,
+                    namespace=config.GroupCaching.MemcachedPool,
+                    useExternalProxies=config.GroupCaching.UseExternalProxies
+                    )
+            else:
+                groupCacher = None
+
             def decorateTransaction(txn):
                 txn._pushDistributor = pushDistributor
                 txn._rootResource = result.rootResource
                 txn._mailRetriever = mailRetriever
+                txn._groupCacher = groupCacher
 
             store.callWithNewTransactions(decorateTransaction)
 

Modified: CalendarServer/trunk/twisted/plugins/caldav.py
===================================================================
--- CalendarServer/trunk/twisted/plugins/caldav.py	2013-04-02 20:31:37 UTC (rev 10976)
+++ CalendarServer/trunk/twisted/plugins/caldav.py	2013-04-03 01:26:02 UTC (rev 10977)
@@ -53,4 +53,3 @@
 
 
 TwistedCalDAV = TAP("calendarserver.tap.caldav.CalDAVServiceMaker")
-CalDAVGroupCacher = TAP("twistedcaldav.directory.directory.GroupMembershipCacherServiceMaker")

Modified: CalendarServer/trunk/twistedcaldav/directory/directory.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/directory/directory.py	2013-04-02 20:31:37 UTC (rev 10976)
+++ CalendarServer/trunk/twistedcaldav/directory/directory.py	2013-04-03 01:26:02 UTC (rev 10977)
@@ -26,6 +26,7 @@
     "DirectoryError",
     "DirectoryConfigurationError",
     "UnknownRecordTypeError",
+    "GroupMembershipCacheUpdater",
 ]
 
 import cPickle as pickle
@@ -34,7 +35,6 @@
 import itertools
 import os
 import pwd
-import signal
 import sys
 import types
 
@@ -46,24 +46,27 @@
 from twext.web2.dav.auth import IPrincipalCredentials
 from twisted.internet.defer import succeed, inlineCallbacks, returnValue
 
-from twext.python.log import LoggingMixIn
+from twext.python.log import Logger, LoggingMixIn
 
 from twistedcaldav.config import config
+from twistedcaldav.stdconfig import DEFAULT_CONFIG_FILE
+
 from twistedcaldav.directory.idirectory import IDirectoryService, IDirectoryRecord
 from twistedcaldav.directory.util import uuidFromName, normalizeUUID
 from twistedcaldav.scheduling.cuaddress import normalizeCUAddr
 from twistedcaldav.scheduling.ischedule.localservers import Servers
 from twistedcaldav.memcacher import Memcacher
-from twistedcaldav import memcachepool
 from twisted.python.filepath import FilePath
-from twisted.python.reflect import namedClass
-from twisted.python.usage import Options, UsageError
-from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
-from twisted.application import service
-from twisted.plugin import IPlugin
 from xml.parsers.expat import ExpatError
 from plistlib import readPlistFromString
+from twext.enterprise.dal.record import fromTable
+from twext.enterprise.queue import WorkItem
+from txdav.common.datastore.sql_tables import schema
+from twext.enterprise.dal.syntax import Delete
 
+log = Logger()
+
+
 class DirectoryService(LoggingMixIn):
     implements(IDirectoryService, ICredentialsChecker)
 
@@ -564,20 +567,16 @@
 
     "group-cacher-populated" : contains a datestamp indicating the most recent
     population.
-
-    "group-cacher-lock" : used to prevent multiple updates, it has a value of "1"
-
     """
 
     def __init__(self, namespace, pickle=True, no_invalidation=False,
-        key_normalization=True, expireSeconds=0, lockSeconds=60):
+        key_normalization=True, expireSeconds=0):
 
         super(GroupMembershipCache, self).__init__(namespace, pickle=pickle,
             no_invalidation=no_invalidation,
             key_normalization=key_normalization)
 
         self.expireSeconds = expireSeconds
-        self.lockSeconds = lockSeconds
 
 
     def setGroupsFor(self, guid, memberships):
@@ -616,17 +615,7 @@
         returnValue(value is not None)
 
 
-    def acquireLock(self):
-        self.log_debug("add group-cacher-lock")
-        return self.add("group-cacher-lock", "1", expireTime=self.lockSeconds)
 
-
-    def releaseLock(self):
-        self.log_debug("delete group-cacher-lock")
-        return self.delete("group-cacher-lock")
-
-
-
 class GroupMembershipCacheUpdater(LoggingMixIn):
     """
     Responsible for updating memcached with group memberships.  This will run
@@ -634,11 +623,12 @@
     proxy database, and the location/resource info in the directory system.
     """
 
-    def __init__(self, proxyDB, directory, expireSeconds, lockSeconds,
+    def __init__(self, proxyDB, directory, updateSeconds, expireSeconds,
         cache=None, namespace=None, useExternalProxies=False,
         externalProxiesSource=None):
         self.proxyDB = proxyDB
         self.directory = directory
+        self.updateSeconds = updateSeconds
         self.useExternalProxies = useExternalProxies
         if useExternalProxies and externalProxiesSource is None:
             externalProxiesSource = self.directory.getExternalProxyAssignments
@@ -646,8 +636,7 @@
 
         if cache is None:
             assert namespace is not None, "namespace must be specified if GroupMembershipCache is not provided"
-            cache = GroupMembershipCache(namespace, expireSeconds=expireSeconds,
-                lockSeconds=lockSeconds)
+            cache = GroupMembershipCache(namespace, expireSeconds=expireSeconds)
         self.cache = cache
 
 
@@ -738,8 +727,6 @@
         # See if anyone has completely populated the group membership cache
         isPopulated = (yield self.cache.isPopulated())
 
-        useLock = True
-
         if fast:
             # We're in quick-start mode.  Check first to see if someone has
             # populated the membership cache, and if so, return immediately
@@ -747,9 +734,6 @@
                 self.log_info("Group membership cache is already populated")
                 returnValue((fast, 0))
 
-            # We don't care what others are doing right now, we need to update
-            useLock = False
-
         self.log_info("Updating group membership cache")
 
         dataRoot = FilePath(config.DataRoot)
@@ -767,14 +751,6 @@
             previousMembers = pickle.loads(membershipsCacheFile.getContent())
             callGroupsChanged = True
 
-        if useLock:
-            self.log_info("Attempting to acquire group membership cache lock")
-            acquiredLock = (yield self.cache.acquireLock())
-            if not acquiredLock:
-                self.log_info("Group membership cache lock held by another process")
-                returnValue((fast, 0))
-            self.log_info("Acquired lock")
-
         if not fast and self.useExternalProxies:
 
             # Load in cached copy of external proxies so we can diff against them
@@ -944,257 +920,44 @@
 
         yield self.cache.setPopulatedMarker()
 
-        if useLock:
-            self.log_info("Releasing lock")
-            yield self.cache.releaseLock()
-
         self.log_info("Group memberships cache updated")
 
         returnValue((fast, len(members), len(changedMembers)))
 
 
+class GroupCacherPollingWork(WorkItem, fromTable(schema.GROUP_CACHER_POLLING_WORK)):
 
-class GroupMembershipCacherOptions(Options):
-    optParameters = [[
-        "config", "f", DEFAULT_CONFIG_FILE, "Path to configuration file."
-    ]]
+    group = "group_cacher_polling"
 
-    def __init__(self, *args, **kwargs):
-        super(GroupMembershipCacherOptions, self).__init__(*args, **kwargs)
-
-        self.overrides = {}
-
-
-    def _coerceOption(self, configDict, key, value):
-        """
-        Coerce the given C{val} to type of C{configDict[key]}
-        """
-        if key in configDict:
-            if isinstance(configDict[key], bool):
-                value = value == "True"
-
-            elif isinstance(configDict[key], (int, float, long)):
-                value = type(configDict[key])(value)
-
-            elif isinstance(configDict[key], (list, tuple)):
-                value = value.split(',')
-
-            elif isinstance(configDict[key], dict):
-                raise UsageError(
-                    "Dict options not supported on the command line"
-                )
-
-            elif value == 'None':
-                value = None
-
-        return value
-
-
-    def _setOverride(self, configDict, path, value, overrideDict):
-        """
-        Set the value at path in configDict
-        """
-        key = path[0]
-
-        if len(path) == 1:
-            overrideDict[key] = self._coerceOption(configDict, key, value)
-            return
-
-        if key in configDict:
-            if not isinstance(configDict[key], dict):
-                raise UsageError(
-                    "Found intermediate path element that is not a dictionary"
-                )
-
-            if key not in overrideDict:
-                overrideDict[key] = {}
-
-            self._setOverride(
-                configDict[key], path[1:],
-                value, overrideDict[key]
-            )
-
-
-    def opt_option(self, option):
-        """
-        Set an option to override a value in the config file. True, False, int,
-        and float options are supported, as well as comma seperated lists. Only
-        one option may be given for each --option flag, however multiple
-        --option flags may be specified.
-        """
-
-        if "=" in option:
-            path, value = option.split('=')
-            self._setOverride(
-                DEFAULT_CONFIG,
-                path.split('/'),
-                value,
-                self.overrides
-            )
-        else:
-            self.opt_option('%s=True' % (option,))
-
-    opt_o = opt_option
-
-    def postOptions(self):
-        config.load(self['config'])
-        config.updateDefaults(self.overrides)
-        self.parent['pidfile'] = config.PIDFile
-
-
-
-class GroupMembershipCacherService(service.Service, LoggingMixIn):
-    """
-    Service to update the group membership cache at a configured interval
-    """
-
-    def __init__(self, proxyDB, directory, namespace, updateSeconds,
-        expireSeconds, lockSeconds, reactor=None, updateMethod=None,
-        useExternalProxies=False):
-
-        if updateSeconds >= expireSeconds:
-            expireSeconds = updateSeconds * 2
-            self.log_warn("Configuration warning: GroupCaching.ExpireSeconds needs to be longer than UpdateSeconds; setting to %d seconds" % (expireSeconds,))
-
-        self.updater = GroupMembershipCacheUpdater(proxyDB, directory,
-            expireSeconds, lockSeconds, namespace=namespace,
-            useExternalProxies=useExternalProxies)
-
-        if reactor is None:
-            from twisted.internet import reactor
-        self.reactor = reactor
-
-        self.updateSeconds = updateSeconds
-        self.nextUpdate = None
-        self.updateInProgress = False
-        self.updateAwaiting = False
-
-        if updateMethod:
-            self.updateMethod = updateMethod
-        else:
-            self.updateMethod = self.updater.updateCache
-
-
-    def startService(self):
-        self.previousHandler = signal.signal(signal.SIGHUP, self.sighupHandler)
-        self.log_warn("Starting group membership cacher service")
-        service.Service.startService(self)
-        return self.update()
-
-
-    def sighupHandler(self, num, frame):
-        self.reactor.callFromThread(self.update)
-
-
-    def stopService(self):
-        signal.signal(signal.SIGHUP, self.previousHandler)
-        self.log_warn("Stopping group membership cacher service")
-        service.Service.stopService(self)
-        if self.nextUpdate is not None:
-            self.nextUpdate.cancel()
-            self.nextUpdate = None
-
-
     @inlineCallbacks
-    def update(self):
-        """
-        A wrapper around updateCache, this method manages the scheduling of the
-        subsequent update, as well as prevents multiple updates from running
-        simultaneously, which could otherwise happen because SIGHUP now triggers
-        an update on demand.  If update is called while an update is in progress,
-        as soon as the first update is finished a new one is started.  Otherwise,
-        when an update finishes and there is not another one waiting, the next
-        update is scheduled for updateSeconds in the future.
+    def doWork(self):
 
-        @return: True if an update was already in progress, False otherwise
-        @rtype: C{bool}
-        """
+        # Delete all other work items
+        yield Delete(From=self.table, Where=None).on(self.transaction)
 
-        self.log_debug("Group membership update called")
+        groupCacher = self.transaction._groupCacher
+        if groupCacher is not None:
+            try:
+                yield groupCacher.updateCache()
+            except Exception, e:
+                log.error("Failed to update group membership cache (%s)" % (e,))
+            finally:
+                notBefore = (datetime.datetime.utcnow() +
+                    datetime.timedelta(seconds=groupCacher.updateSeconds))
+                log.debug("Scheduling next group cacher update: %s" % (notBefore,))
+                yield self.transaction.enqueue(GroupCacherPollingWork,
+                    notBefore=notBefore)
 
-        # A call to update while an update is in progress sets the updateAwaiting flag
-        # so that an update happens again right after the current one is complete.
-        if self.updateInProgress:
-            self.updateAwaiting = True
-            returnValue(True)
 
-        self.nextUpdate = None
-        self.updateInProgress = True
-        self.updateAwaiting = False
-        try:
-            yield self.updateMethod()
-        finally:
-            self.updateInProgress = False
-            if self.updateAwaiting:
-                self.log_info("Performing group membership update")
-                yield self.update()
-            else:
-                self.log_info("Scheduling next group membership update")
-                self.nextUpdate = self.reactor.callLater(self.updateSeconds,
-                    self.update)
-        returnValue(False)
+ at inlineCallbacks
+def scheduleNextGroupCachingUpdate(store, seconds):
+    txn = store.newTransaction()
+    notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
+    log.debug("Scheduling next group cacher update: %s" % (notBefore,))
+    yield txn.enqueue(GroupCacherPollingWork, notBefore=notBefore)
+    yield txn.commit()
 
 
-
-class GroupMembershipCacherServiceMaker(LoggingMixIn):
-    """
-    Configures and returns a GroupMembershipCacherService
-    """
-    implements(IPlugin, service.IServiceMaker)
-
-    tapname = "caldav_groupcacher"
-    description = "Group Membership Cacher"
-    options = GroupMembershipCacherOptions
-
-    def makeService(self, options):
-        try:
-            from setproctitle import setproctitle
-        except ImportError:
-            pass
-        else:
-            setproctitle("CalendarServer [Group Cacher]")
-
-        # Setup the directory
-        from calendarserver.tap.util import directoryFromConfig
-        directory = directoryFromConfig(config)
-
-        # We have to set cacheNotifierFactory otherwise group cacher can't
-        # invalidate the cache tokens for principals whose membership has
-        # changed
-        if config.EnableResponseCache and config.Memcached.Pools.Default.ClientEnabled:
-            from twistedcaldav.directory.principal import DirectoryPrincipalResource
-            from twistedcaldav.cache import MemcacheChangeNotifier
-            DirectoryPrincipalResource.cacheNotifierFactory = MemcacheChangeNotifier
-
-        # Setup the ProxyDB Service
-        proxydbClass = namedClass(config.ProxyDBService.type)
-
-        self.log_warn("Configuring proxydb service of type: %s" % (proxydbClass,))
-
-        try:
-            proxyDB = proxydbClass(**config.ProxyDBService.params)
-        except IOError:
-            self.log_error("Could not start proxydb service")
-            raise
-
-        # Setup memcached pools
-        memcachepool.installPools(
-            config.Memcached.Pools,
-            config.Memcached.MaxClients,
-        )
-
-        cacherService = GroupMembershipCacherService(proxyDB, directory,
-            config.GroupCaching.MemcachedPool,
-            config.GroupCaching.UpdateSeconds,
-            config.GroupCaching.ExpireSeconds,
-            config.GroupCaching.LockSeconds,
-            useExternalProxies=config.GroupCaching.UseExternalProxies
-            )
-
-        return cacherService
-
-
-
 def diffAssignments(old, new):
     """
     Compare two proxy assignment lists and return their differences in the form of

Modified: CalendarServer/trunk/twistedcaldav/directory/test/test_directory.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/directory/test/test_directory.py	2013-04-02 20:31:37 UTC (rev 10976)
+++ CalendarServer/trunk/twistedcaldav/directory/test/test_directory.py	2013-04-03 01:26:02 UTC (rev 10977)
@@ -15,13 +15,12 @@
 ##
 
 from twisted.internet.defer import inlineCallbacks
-from twisted.internet.task import Clock
 from twisted.python.filepath import FilePath
 
 from twistedcaldav.test.util import TestCase
 from twistedcaldav.test.util import xmlFile, augmentsFile, proxiesFile, dirTest
 from twistedcaldav.config import config
-from twistedcaldav.directory.directory import DirectoryService, DirectoryRecord, GroupMembershipCacherService, GroupMembershipCache, GroupMembershipCacheUpdater, diffAssignments
+from twistedcaldav.directory.directory import DirectoryService, DirectoryRecord, GroupMembershipCache, GroupMembershipCacheUpdater, diffAssignments
 from twistedcaldav.directory.xmlfile import XMLDirectoryService
 from twistedcaldav.directory.calendaruserproxyloader import XMLCalendarUserProxyLoader
 from twistedcaldav.directory import augment, calendaruserproxy
@@ -121,46 +120,7 @@
         self.count += 1
 
 
-    @inlineCallbacks
-    def test_groupMembershipCacherService(self):
-        """
-        Instantiate a GroupMembershipCacherService and make sure its update
-        method fires at the right interval, in this case 30 seconds.  The
-        updateMethod keyword arg is purely for testing purposes, so we can
-        directly detect it getting called in this test.
-        """
-        clock = Clock()
-        self.count = 0
 
-        # Deliberately set the expireSeconds lower than updateSeconds to verify
-        # expireSeconds gets set to 2 * updateSeconds in that scenario
-
-        service = GroupMembershipCacherService(
-            None, None, "Testing", 30, 20, 30, reactor=clock,
-            updateMethod=self._updateMethod)
-
-        # expireSeconds = 2 * 30 updateSeconds
-        self.assertEquals(service.updater.cache.expireSeconds, 60)
-
-        yield service.startService()
-
-        self.assertEquals(self.count, 1)
-        clock.advance(29)
-        self.assertEquals(self.count, 1)
-        clock.advance(1)
-        self.assertEquals(self.count, 2)
-
-        service.stopService()
-
-        service.updateInProgress = True
-        self.assertTrue((yield service.update()))
-        self.assertTrue(service.updateAwaiting)
-
-        service.updateInProgress = False
-        self.assertFalse((yield service.update()))
-        self.assertFalse(service.updateAwaiting)
-
-
     def test_expandedMembers(self):
         """
         Make sure expandedMembers( ) returns a complete, flattened set of
@@ -279,17 +239,6 @@
             )
         )
 
-        # Prevent an update by locking the cache
-        acquiredLock = (yield cache.acquireLock())
-        self.assertTrue(acquiredLock)
-        self.assertEquals((False, 0), (yield updater.updateCache()))
-
-        # You can't lock when already locked:
-        acquiredLockAgain = (yield cache.acquireLock())
-        self.assertFalse(acquiredLockAgain)
-
-        # Allow an update by unlocking the cache
-        yield cache.releaseLock()
         self.assertEquals((False, 9, 9), (yield updater.updateCache()))
 
         # Verify cache is populated:
@@ -686,10 +635,6 @@
         # time), but since the snapshot doesn't exist we fault in from the
         # directory (fast now is False), and snapshot will get created
 
-        # Note that because fast=True and isPopulated() is False, locking is
-        # ignored:
-        yield cache.acquireLock()
-
         self.assertFalse((yield cache.isPopulated()))
         fast, numMembers, numChanged = (yield updater.updateCache(fast=True))
         self.assertEquals(fast, False)
@@ -698,8 +643,6 @@
         self.assertTrue(snapshotFile.exists())
         self.assertTrue((yield cache.isPopulated()))
 
-        yield cache.releaseLock()
-
         # Try another fast update where the snapshot already exists (as in a
         # server-restart scenario), which will only read from the snapshot
         # as indicated by the return value for "fast".  Note that the cache

Modified: CalendarServer/trunk/twistedcaldav/scheduling/imip/inbound.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/scheduling/imip/inbound.py	2013-04-02 20:31:37 UTC (rev 10976)
+++ CalendarServer/trunk/twistedcaldav/scheduling/imip/inbound.py	2013-04-03 01:26:02 UTC (rev 10977)
@@ -120,22 +120,25 @@
         self.point = GAIEndpoint(self.reactor, settings.Server,
             settings.Port, contextFactory=contextFactory)
 
-    def startService(self):
-        return self.scheduleNextPoll(seconds=0)
 
-
     def fetchMail(self):
         return self.point.connect(self.factory(self.settings, self.mailReceiver))
 
+
     @inlineCallbacks
     def scheduleNextPoll(self, seconds=None):
         if seconds is None:
             seconds = self.settings["PollingSeconds"]
-        txn = self.store.newTransaction()
-        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
-        yield txn.enqueue(IMIPPollingWork, notBefore=notBefore)
-        yield txn.commit()
+        yield scheduleNextMailPoll(self.store, seconds)
+        
 
+ at inlineCallbacks
+def scheduleNextMailPoll(store, seconds):
+    txn = store.newTransaction()
+    notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
+    log.debug("Scheduling next mail poll: %s" % (notBefore,))
+    yield txn.enqueue(IMIPPollingWork, notBefore=notBefore)
+    yield txn.commit()
 
 
 class MailReceiver(object):

Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py	2013-04-02 20:31:37 UTC (rev 10976)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py	2013-04-03 01:26:02 UTC (rev 10977)
@@ -931,7 +931,6 @@
         "MemcachedPool" : "Default",
         "UpdateSeconds" : 300,
         "ExpireSeconds" : 3600,
-        "LockSeconds" : 300,
         "EnableUpdater" : True,
         "UseExternalProxies" : False,
     },

Modified: CalendarServer/trunk/twistedcaldav/upgrade.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/upgrade.py	2013-04-02 20:31:37 UTC (rev 10976)
+++ CalendarServer/trunk/twistedcaldav/upgrade.py	2013-04-03 01:26:02 UTC (rev 10977)
@@ -39,7 +39,9 @@
 from twistedcaldav.directory import calendaruserproxy
 from twistedcaldav.directory.appleopendirectory import OpenDirectoryService
 from twistedcaldav.directory.calendaruserproxyloader import XMLCalendarUserProxyLoader
-from twistedcaldav.directory.directory import DirectoryService, GroupMembershipCacheUpdater
+from twistedcaldav.directory.directory import DirectoryService
+from twistedcaldav.directory.directory import GroupMembershipCacheUpdater
+from twistedcaldav.directory.directory import scheduleNextGroupCachingUpdate
 from twistedcaldav.directory.principal import DirectoryCalendarPrincipalResource
 from twistedcaldav.directory.resourceinfo import ResourceInfoDatabase
 from twistedcaldav.directory.xmlfile import XMLDirectoryService
@@ -67,6 +69,7 @@
 
 from twext.python.parallel import Parallelizer
 from twistedcaldav.scheduling.imip.mailgateway import migrateTokensToStore
+from twistedcaldav.scheduling.imip.inbound import scheduleNextMailPoll
 
 
 deadPropertyXattrPrefix = namedAny(
@@ -1069,11 +1072,14 @@
                 proxydb = proxydbClass(**self.config.ProxyDBService.params)
 
             updater = GroupMembershipCacheUpdater(proxydb,
-                directory, self.config.GroupCaching.ExpireSeconds,
-                self.config.GroupCaching.LockSeconds,
+                directory,
+                self.config.GroupCaching.UpdateSeconds,
+                self.config.GroupCaching.ExpireSeconds,
                 namespace=self.config.GroupCaching.MemcachedPool,
                 useExternalProxies=self.config.GroupCaching.UseExternalProxies)
             yield updater.updateCache(fast=True)
+            # Set in motion the work queue based updates:
+            yield scheduleNextGroupCachingUpdate(self.store, 0)
 
             uid, gid = getCalendarServerIDs(self.config)
             dbPath = os.path.join(self.config.DataRoot, "proxies.sqlite")
@@ -1087,6 +1093,9 @@
 
         # Migrate mail tokens from sqlite to store
         yield migrateTokensToStore(self.config.DataRoot, self.store)
+        # Set mail polling in motion
+        if self.config.Scheduling.iMIP.Enabled:
+            yield scheduleNextMailPoll(self.store, 0)
 
 
 

Modified: CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql	2013-04-02 20:31:37 UTC (rev 10976)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql	2013-04-03 01:26:02 UTC (rev 10977)
@@ -557,7 +557,16 @@
   PUSH_ID                       varchar(255) not null
 );
 
+-----------------
+-- GroupCacher --
+-----------------
 
+create table GROUP_CACHER_POLLING_WORK (
+  WORK_ID                       integer primary key default nextval('WORKITEM_SEQ') not null,
+  NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP)
+);
+
+
 --------------------
 -- Schema Version --
 --------------------
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130402/8d3e2582/attachment-0001.html>


More information about the calendarserver-changes mailing list