[CalendarServer-changes] [13516] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Tue May 20 11:49:22 PDT 2014


Revision: 13516
          http://trac.calendarserver.org//changeset/13516
Author:   cdaboo at apple.com
Date:     2014-05-20 11:49:22 -0700 (Tue, 20 May 2014)
Log Message:
-----------
Add singleton and regenerating WorkItem types.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/calendarserver/tools/principals.py
    CalendarServer/trunk/calendarserver/tools/purge.py
    CalendarServer/trunk/calendarserver/tools/util.py
    CalendarServer/trunk/requirements-stable.txt
    CalendarServer/trunk/txdav/caldav/datastore/scheduling/imip/inbound.py
    CalendarServer/trunk/txdav/common/datastore/sql.py
    CalendarServer/trunk/txdav/common/datastore/work/inbox_cleanup.py
    CalendarServer/trunk/txdav/common/datastore/work/revision_cleanup.py
    CalendarServer/trunk/txdav/common/datastore/work/test/test_inbox_cleanup.py
    CalendarServer/trunk/txdav/common/datastore/work/test/test_revision_cleanup.py
    CalendarServer/trunk/txdav/who/groups.py

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -69,7 +69,7 @@
 from twext.enterprise.jobqueue import PeerConnectionPool
 from twext.enterprise.jobqueue import WorkerFactory as QueueWorkerFactory
 from twext.application.service import ReExecService
-from txdav.who.groups import scheduleNextGroupCachingUpdate
+from txdav.who.groups import GroupCacherPollingWork
 
 from txweb2.channel.http import (
     LimitingHTTPFactory, SSLRedirectRequest, HTTPChannel
@@ -89,10 +89,8 @@
     UpgradeAcquireLockStep, UpgradeReleaseLockStep,
     UpgradeDatabaseNotificationDataStep
 )
-from txdav.common.datastore.work.inbox_cleanup import scheduleFirstInboxCleanup
-from txdav.common.datastore.work.revision_cleanup import (
-    scheduleFirstFindMinRevision
-)
+from txdav.common.datastore.work.inbox_cleanup import InboxCleanupWork
+from txdav.common.datastore.work.revision_cleanup import FindMinValidRevisionWork
 from txdav.who.util import directoryFromConfig
 from txdav.dps.client import DirectoryService as DirectoryProxyClientService
 from txdav.who.groups import GroupCacher
@@ -593,22 +591,22 @@
                 int(config.LogID) if config.LogID else 5
             )
         if self.doGroupCaching:
-            yield scheduleNextGroupCachingUpdate(
+            yield GroupCacherPollingWork.initialSchedule(
                 self.store,
                 int(config.LogID) if config.LogID else 5
             )
-        yield scheduleFirstFindMinRevision(
+        yield FindMinValidRevisionWork.initialSchedule(
             self.store,
             int(config.LogID) if config.LogID else 5
         )
-        yield scheduleFirstInboxCleanup(
+        yield InboxCleanupWork.initialSchedule(
             self.store,
             int(config.LogID) if config.LogID else 5
         )
 
         # FIXME: uncomment this when purge is working
         # from calendarserver.tools.purge import scheduleNextPrincipalPurgeUpdate
-        # yield scheduleNextPrincipalPurgeUpdate(
+        # yield PrincipalPurgePollingWork.initialSchedule(
         #     self.store,
         #     int(config.LogID) if config.LogID else 5
         # )

Modified: CalendarServer/trunk/calendarserver/tools/principals.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/principals.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/calendarserver/tools/principals.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -595,12 +595,8 @@
         )
         yield proxyGroup.setMembers(proxyRecords)
 
-    # if store is not None:
-    #     # Schedule work the PeerConnectionPool will pick up as overdue
-    #     yield schedulePolledGroupCachingUpdate(store)
 
 
-
 @inlineCallbacks
 def getProxies(record):
     """

Modified: CalendarServer/trunk/calendarserver/tools/purge.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/purge.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/calendarserver/tools/purge.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -30,10 +30,10 @@
 
 from twext.enterprise.dal.record import fromTable
 from twext.enterprise.dal.syntax import Delete, Select, Union
-from twext.enterprise.jobqueue import WorkItem
+from twext.enterprise.jobqueue import WorkItem, RegeneratingWorkItem
 from twext.python.log import Logger
 
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
 
 from twistedcaldav import caldavxml
 from twistedcaldav.config import config
@@ -50,7 +50,7 @@
 
 
 class PrincipalPurgePollingWork(
-    WorkItem,
+    RegeneratingWorkItem,
     fromTable(schema.PRINCIPAL_PURGE_POLLING_WORK)
 ):
     """
@@ -61,32 +61,31 @@
 
     group = "principal_purge_polling"
 
+    @classmethod
+    def initialSchedule(cls, store, seconds):
+        def _enqueue(txn):
+            return PrincipalPurgePollingWork.reschedule(txn, seconds)
+
+        if config.AutomaticPurging.Enabled:
+            return store.inTransaction("PrincipalPurgePollingWork.initialSchedule", _enqueue)
+        else:
+            return succeed(None)
+
+
+    def regenerateInterval(self):
+        """
+        Return the interval in seconds between regenerating instances.
+        """
+        return config.AutomaticPurging.PollingIntervalSeconds if config.AutomaticPurging.Enabled else None
+
+
     @inlineCallbacks
     def doWork(self):
 
-        # Delete all other work items
-        yield Delete(From=self.table, Where=None).on(self.transaction)
-
         # If not enabled, punt here
         if not config.AutomaticPurging.Enabled:
             returnValue(None)
 
-        # Schedule next update, 7 days out (default)
-        # Special - for testing it is handy to have this work item not regenerate, so
-        # we use an interval of -1 to signify a one-shot operation
-        if config.AutomaticPurging.PollingIntervalSeconds != -1:
-            notBefore = (
-                datetime.datetime.utcnow() +
-                datetime.timedelta(seconds=config.AutomaticPurging.PollingIntervalSeconds)
-            )
-            log.info(
-                "Scheduling next principal purge scan update: {when}", when=notBefore
-            )
-            yield self.transaction.enqueue(
-                PrincipalPurgePollingWork,
-                notBefore=notBefore
-            )
-
         # Do the scan
         allUIDs = set()
         for home in (schema.CALENDAR_HOME, schema.ADDRESSBOOK_HOME):
@@ -276,26 +275,6 @@
 
 
 
- at inlineCallbacks
-def scheduleNextPrincipalPurgeUpdate(store, seconds):
-
-    notBefore = (
-        datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
-    )
-
-    log.debug(
-        "Scheduling next principal purge update: {when}", when=notBefore
-    )
-
-    def _enqueue(txn):
-        return txn.enqueue(PrincipalPurgePollingWork, notBefore=notBefore)
-
-    wp = yield store.inTransaction("scheduleNextPrincipalPurgeUpdate", _enqueue)
-
-    returnValue(wp)
-
-
-
 class PurgeOldEventsService(WorkerService):
 
     cutoff = None

Modified: CalendarServer/trunk/calendarserver/tools/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/util.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/calendarserver/tools/util.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -42,7 +42,7 @@
 
 
 from twistedcaldav import memcachepool
-from txdav.who.groups import schedulePolledGroupCachingUpdate
+from txdav.who.groups import GroupCacherPollingWork
 
 
 log = Logger()
@@ -511,7 +511,9 @@
         principal, proxyPrincipal, proxyTypes=proxyTypes))
 
     # Schedule work the PeerConnectionPool will pick up as overdue
-    yield schedulePolledGroupCachingUpdate(store)
+    def groupPollNow(txn):
+        return GroupCacherPollingWork.reschedule(txn, 0, force=True)
+    yield store.inTransaction(groupPollNow)
 
 
 
@@ -545,7 +547,9 @@
 
     if removed:
         # Schedule work the PeerConnectionPool will pick up as overdue
-        yield schedulePolledGroupCachingUpdate(store)
+        def groupPollNow(txn):
+            return GroupCacherPollingWork.reschedule(txn, 0, force=True)
+        yield store.inTransaction(groupPollNow)
     returnValue(removed)
 
 

Modified: CalendarServer/trunk/requirements-stable.txt
===================================================================
--- CalendarServer/trunk/requirements-stable.txt	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/requirements-stable.txt	2014-05-20 18:49:22 UTC (rev 13516)
@@ -5,7 +5,7 @@
 # For CalendarServer development, don't try to get these projects from PyPI; use svn.
 
 -e .
--e svn+http://svn.calendarserver.org/repository/calendarserver/twext/trunk@13494#egg=twextpy
+-e svn+http://svn.calendarserver.org/repository/calendarserver/twext/trunk@13515#egg=twextpy
 -e svn+http://svn.calendarserver.org/repository/calendarserver/PyKerberos/trunk@13420#egg=kerberos
 -e svn+http://svn.calendarserver.org/repository/calendarserver/PyCalendar/trunk@13420#egg=pycalendar
 

Modified: CalendarServer/trunk/txdav/caldav/datastore/scheduling/imip/inbound.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/scheduling/imip/inbound.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/caldav/datastore/scheduling/imip/inbound.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -19,8 +19,7 @@
 """
 
 from twext.enterprise.dal.record import fromTable
-from twext.enterprise.dal.syntax import Delete
-from twext.enterprise.jobqueue import WorkItem
+from twext.enterprise.jobqueue import WorkItem, RegeneratingWorkItem
 from twext.internet.gaiendpoint import GAIEndpoint
 from twext.python.log import Logger, LegacyLogger
 
@@ -81,25 +80,25 @@
 
 
 
-class IMIPPollingWork(WorkItem, fromTable(schema.IMIP_POLLING_WORK)):
+class IMIPPollingWork(RegeneratingWorkItem, fromTable(schema.IMIP_POLLING_WORK)):
 
     # FIXME: purge all old tokens here
     group = "imip_polling"
 
+    def regenerateInterval(self):
+        """
+        Return the interval in seconds between regenerating instances.
+        """
+        mailRetriever = self.transaction._mailRetriever
+        return mailRetriever.settings["seconds"]
+
+
     @inlineCallbacks
     def doWork(self):
 
-        # Delete all other work items
-        yield Delete(From=self.table, Where=None).on(self.transaction)
-
         mailRetriever = self.transaction._mailRetriever
         if mailRetriever is not None:
-            try:
-                yield mailRetriever.fetchMail()
-            except Exception, e:
-                log.error("Failed to fetch mail (%s)" % (e,))
-            finally:
-                yield mailRetriever.scheduleNextPoll()
+            yield mailRetriever.fetchMail()
 
 
 
@@ -140,7 +139,7 @@
     def scheduleNextPoll(self, seconds=None):
         if seconds is None:
             seconds = self.settings["PollingSeconds"]
-        yield scheduleNextMailPoll(self.store, seconds)
+        yield IMIPPollingWork.reschedule(self.store, seconds)
 
 
 

Modified: CalendarServer/trunk/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/common/datastore/sql.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -583,6 +583,7 @@
             label = "{}#{}${}".format(tr.filename, tr.lineno, tr.function)
 
         self._store = store
+        self._queuer = self._store.queuer
         self._calendarHomes = {}
         self._addressbookHomes = {}
         self._notificationHomes = {}

Modified: CalendarServer/trunk/txdav/common/datastore/work/inbox_cleanup.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/work/inbox_cleanup.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/common/datastore/work/inbox_cleanup.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -20,10 +20,10 @@
 """
 
 from twext.enterprise.dal.record import fromTable
-from twext.enterprise.dal.syntax import Delete, Select, Count
-from twext.enterprise.jobqueue import WorkItem
+from twext.enterprise.dal.syntax import Select, Count
+from twext.enterprise.jobqueue import WorkItem, RegeneratingWorkItem
 from twext.python.log import Logger
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, succeed
 from twistedcaldav.config import config
 from txdav.common.datastore.sql_tables import schema, _HOME_STATUS_NORMAL
 import datetime
@@ -31,26 +31,31 @@
 log = Logger()
 
 
-class InboxCleanupWork(WorkItem,
-    fromTable(schema.INBOX_CLEANUP_WORK)):
+class InboxCleanupWork(RegeneratingWorkItem, fromTable(schema.INBOX_CLEANUP_WORK)):
 
     group = "inbox_cleanup"
 
     @classmethod
-    @inlineCallbacks
-    def _schedule(cls, txn, seconds):
-        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
-        log.debug("Scheduling clean inboxes work: {}".format(notBefore,))
-        wp = yield txn.enqueue(cls, notBefore=notBefore)
-        returnValue(wp)
+    def initialSchedule(cls, store, seconds):
+        def _enqueue(txn):
+            return InboxCleanupWork.reschedule(txn, seconds)
 
+        if config.InboxCleanup.Enabled:
+            return store.inTransaction("InboxCleanupWork.initialSchedule", _enqueue)
+        else:
+            return succeed(None)
 
+
+    def regenerateInterval(self):
+        """
+        Return the interval in seconds between regenerating instances.
+        """
+        return float(config.InboxCleanup.CleanupPeriodDays) * 24 * 60 * 60
+
+
     @inlineCallbacks
     def doWork(self):
 
-        # Delete all other work items
-        yield Delete(From=self.table, Where=None).on(self.transaction)
-
         # exit if not done with last delete:
         coiw = schema.CLEANUP_ONE_INBOX_WORK
         queuedCleanupOneInboxWorkItems = (yield Select(
@@ -71,32 +76,16 @@
             ).on(self.transaction)
 
             for homeRow in homeRows:
-                yield CleanupOneInboxWork._schedule(self.transaction, homeID=homeRow[0], seconds=0)
+                yield CleanupOneInboxWork.reschedule(self.transaction, seconds=0, homeID=homeRow[0])
 
-        # Schedule next check
-        yield self._schedule(
-            self.transaction,
-            float(config.InboxCleanup.CleanupPeriodDays) * 24 * 60 * 60
-        )
 
 
-
 class CleanupOneInboxWork(WorkItem,
     fromTable(schema.CLEANUP_ONE_INBOX_WORK)):
 
     group = property(lambda self: "cleanup_inbox_in_homeid_{}".format(self.homeID))
 
-    @classmethod
     @inlineCallbacks
-    def _schedule(cls, txn, homeID, seconds):
-        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
-        log.debug("Scheduling Inbox cleanup work: {notBefore} in home id: {homeID}".format(
-            notBefore=notBefore, homeID=homeID))
-        wp = yield txn.enqueue(cls, notBefore=notBefore, homeID=homeID)
-        returnValue(wp)
-
-
-    @inlineCallbacks
     def doWork(self):
 
         # No need to delete other work items.  They are unique
@@ -143,15 +132,3 @@
             inbox = yield home.childWithName("inbox")
             for item in (yield inbox.objectResourcesWithNames(itemNamesToDelete)):
                 yield item.remove()
-
-
-
- at inlineCallbacks
-def scheduleFirstInboxCleanup(store, seconds):
-    if config.InboxCleanup.Enabled:
-        txn = store.newTransaction(label="scheduleFirstInboxCleanup")
-        wp = yield InboxCleanupWork._schedule(txn, seconds)
-        yield txn.commit()
-        returnValue(wp)
-    else:
-        log.debug("Inbox cleanup work disabled.")

Modified: CalendarServer/trunk/txdav/common/datastore/work/revision_cleanup.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/work/revision_cleanup.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/common/datastore/work/revision_cleanup.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -20,10 +20,10 @@
 """
 
 from twext.enterprise.dal.record import fromTable
-from twext.enterprise.dal.syntax import Delete, Select, Max
-from twext.enterprise.jobqueue import WorkItem
+from twext.enterprise.dal.syntax import Select, Max
+from twext.enterprise.jobqueue import SingletonWorkItem, RegeneratingWorkItem
 from twext.python.log import Logger
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, succeed
 from twistedcaldav.config import config
 from txdav.common.datastore.sql_tables import schema
 import datetime
@@ -31,26 +31,31 @@
 log = Logger()
 
 
-class FindMinValidRevisionWork(WorkItem,
-    fromTable(schema.FIND_MIN_VALID_REVISION_WORK)):
+class FindMinValidRevisionWork(RegeneratingWorkItem, fromTable(schema.FIND_MIN_VALID_REVISION_WORK)):
 
     group = "find_min_revision"
 
     @classmethod
-    @inlineCallbacks
-    def _schedule(cls, txn, seconds):
-        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
-        log.debug("Scheduling find minimum valid revision work: %s" % (notBefore,))
-        wp = yield txn.enqueue(cls, notBefore=notBefore)
-        returnValue(wp)
+    def initialSchedule(cls, store, seconds):
+        def _enqueue(txn):
+            return FindMinValidRevisionWork.reschedule(txn, seconds)
 
+        if config.RevisionCleanup.Enabled:
+            return store.inTransaction("FindMinValidRevisionWork.initialSchedule", _enqueue)
+        else:
+            return succeed(None)
 
+
+    def regenerateInterval(self):
+        """
+        Return the interval in seconds between regenerating instances.
+        """
+        return float(config.RevisionCleanup.CleanupPeriodDays) * 24 * 60 * 60
+
+
     @inlineCallbacks
     def doWork(self):
 
-        # Delete all other work items
-        yield Delete(From=self.table, Where=None).on(self.transaction)
-
         # Get the minimum valid revision
         minValidRevision = int((yield self.transaction.calendarserverValue("MIN-VALID-REVISION")))
 
@@ -82,57 +87,19 @@
             yield self.transaction.updateCalendarserverValue("MIN-VALID-REVISION", maxRevOlderThanDate)
 
             # Schedule revision cleanup
-            yield RevisionCleanupWork._schedule(self.transaction, seconds=0)
+            yield RevisionCleanupWork.reschedule(self.transaction, seconds=0)
 
-        else:
-            # Schedule next check
-            yield FindMinValidRevisionWork._schedule(
-                self.transaction,
-                float(config.RevisionCleanup.CleanupPeriodDays) * 24 * 60 * 60
-            )
 
 
+class RevisionCleanupWork(SingletonWorkItem, fromTable(schema.REVISION_CLEANUP_WORK)):
 
-class RevisionCleanupWork(WorkItem,
-    fromTable(schema.REVISION_CLEANUP_WORK)):
-
     group = "group_revsion_cleanup"
 
-    @classmethod
     @inlineCallbacks
-    def _schedule(cls, txn, seconds):
-        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
-        log.debug("Scheduling revision cleanup work: %s" % (notBefore,))
-        wp = yield txn.enqueue(cls, notBefore=notBefore)
-        returnValue(wp)
-
-
-    @inlineCallbacks
     def doWork(self):
 
-        # Delete all other work items
-        yield Delete(From=self.table, Where=None).on(self.transaction)
-
         # Get the minimum valid revision
         minValidRevision = int((yield self.transaction.calendarserverValue("MIN-VALID-REVISION")))
 
         # delete revisions
         yield self.transaction.deleteRevisionsBefore(minValidRevision)
-
-        # Schedule next update
-        yield FindMinValidRevisionWork._schedule(
-            self.transaction,
-            float(config.RevisionCleanup.CleanupPeriodDays) * 24 * 60 * 60
-        )
-
-
-
- at inlineCallbacks
-def scheduleFirstFindMinRevision(store, seconds):
-    if config.RevisionCleanup.Enabled:
-        txn = store.newTransaction(label="scheduleFirstFindMinRevision")
-        wp = yield FindMinValidRevisionWork._schedule(txn, seconds)
-        yield txn.commit()
-        returnValue(wp)
-    else:
-        log.debug("Revision cleanup work disabled.")

Modified: CalendarServer/trunk/txdav/common/datastore/work/test/test_inbox_cleanup.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/work/test/test_inbox_cleanup.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/common/datastore/work/test/test_inbox_cleanup.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -126,25 +126,20 @@
         """
         Verify that InboxCleanupWork queues one CleanupOneInboxBoxWork per home
         """
-        class FakeInboxCleanupWork(WorkItem):
-            @classmethod
-            def _schedule(cls, txn, seconds):
-                pass
+        self.patch(config.InboxCleanup, "CleanupPeriodDays", -1)
 
-        self.patch(InboxCleanupWork, "_schedule", FakeInboxCleanupWork._schedule)
-
         class FakeCleanupOneInboxWork(WorkItem):
             scheduledHomeIDs = []
 
             @classmethod
-            def _schedule(cls, txn, homeID, seconds):
+            def reschedule(cls, txn, seconds, homeID):
                 cls.scheduledHomeIDs.append(homeID)
                 pass
 
-        self.patch(CleanupOneInboxWork, "_schedule", FakeCleanupOneInboxWork._schedule)
+        self.patch(CleanupOneInboxWork, "reschedule", FakeCleanupOneInboxWork.reschedule)
 
         # do cleanup
-        yield self.transactionUnderTest().enqueue(InboxCleanupWork, notBefore=datetime.datetime.utcnow())
+        yield InboxCleanupWork.reschedule(self.transactionUnderTest(), 0)
         yield self.commit()
         yield JobItem.waitEmpty(self.storeUnderTest().newTransaction, reactor, 60)
 

Modified: CalendarServer/trunk/txdav/common/datastore/work/test/test_revision_cleanup.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/work/test/test_revision_cleanup.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/common/datastore/work/test/test_revision_cleanup.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -17,7 +17,7 @@
 
 
 from twext.enterprise.dal.syntax import Select
-from twext.enterprise.jobqueue import WorkItem, JobItem
+from twext.enterprise.jobqueue import JobItem
 from twext.python.clsprop import classproperty
 from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue
@@ -43,14 +43,9 @@
         yield self.buildStoreAndDirectory()
         yield self.populate()
 
-        class FakeWork(WorkItem):
-            @classmethod
-            def _schedule(cls, txn, seconds):
-                pass
-
-        self.patch(FindMinValidRevisionWork, "_schedule", FakeWork._schedule)
-        self.patch(RevisionCleanupWork, "_schedule", FakeWork._schedule)
+        self.patch(config.RevisionCleanup, "Enabled", True)
         self.patch(config.RevisionCleanup, "SyncTokenLifetimeDays", 0)
+        self.patch(config.RevisionCleanup, "CleanupPeriodDays", -1)
 
 
     @inlineCallbacks
@@ -266,7 +261,7 @@
         self.assertNotEqual(len(revisionRows), 0)
 
         # do FindMinValidRevisionWork
-        yield self.transactionUnderTest().enqueue(FindMinValidRevisionWork, notBefore=datetime.datetime.utcnow())
+        yield FindMinValidRevisionWork.reschedule(self.transactionUnderTest(), 0)
         yield self.commit()
         yield JobItem.waitEmpty(self.storeUnderTest().newTransaction, reactor, 60)
 

Modified: CalendarServer/trunk/txdav/who/groups.py
===================================================================
--- CalendarServer/trunk/txdav/who/groups.py	2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/who/groups.py	2014-05-20 18:49:22 UTC (rev 13516)
@@ -21,9 +21,9 @@
 
 from twext.enterprise.dal.record import fromTable
 from twext.enterprise.dal.syntax import Delete, Select
-from twext.enterprise.jobqueue import WorkItem
+from twext.enterprise.jobqueue import WorkItem, RegeneratingWorkItem
 from twext.python.log import Logger
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
 from twistedcaldav.config import config
 from txdav.caldav.datastore.sql import CalendarStoreFeatures
 from txdav.common.datastore.sql_tables import schema
@@ -34,36 +34,38 @@
 
 
 class GroupCacherPollingWork(
-    WorkItem,
+    RegeneratingWorkItem,
     fromTable(schema.GROUP_CACHER_POLLING_WORK)
 ):
 
     group = "group_cacher_polling"
 
-    @inlineCallbacks
-    def doWork(self):
+    @classmethod
+    def initialSchedule(cls, store, seconds):
+        def _enqueue(txn):
+            return GroupCacherPollingWork.reschedule(txn, seconds)
 
-        # Delete all other work items
-        yield Delete(From=self.table, Where=None).on(self.transaction)
+        if config.InboxCleanup.Enabled:
+            return store.inTransaction("GroupCacherPollingWork.initialSchedule", _enqueue)
+        else:
+            return succeed(None)
 
+
+    def regenerateInterval(self):
+        """
+        Return the interval in seconds between regenerating instances.
+        """
         groupCacher = getattr(self.transaction, "_groupCacher", None)
-        if groupCacher is not None:
+        return groupCacher.updateSeconds if groupCacher else 10
 
-            # Schedule next update
 
-            notBefore = (
-                datetime.datetime.utcnow() +
-                datetime.timedelta(seconds=groupCacher.updateSeconds)
-            )
-            log.debug(
-                "Scheduling next group cacher update: {when}", when=notBefore
-            )
-            yield self.transaction.enqueue(
-                GroupCacherPollingWork,
-                notBefore=notBefore
-            )
+    @inlineCallbacks
+    def doWork(self):
 
-            # New implmementation
+        groupCacher = getattr(self.transaction, "_groupCacher", None)
+        if groupCacher is not None:
+
+            # New implementation
             try:
                 yield groupCacher.update(self.transaction)
             except Exception, e:
@@ -72,49 +74,8 @@
                     error=e
                 )
 
-        else:
-            notBefore = (
-                datetime.datetime.utcnow() +
-                datetime.timedelta(seconds=10)
-            )
-            log.debug(
-                "Rescheduling group cacher update: {when}", when=notBefore
-            )
-            yield self.transaction.enqueue(
-                GroupCacherPollingWork,
-                notBefore=notBefore
-            )
 
 
-
- at inlineCallbacks
-def scheduleNextGroupCachingUpdate(store, seconds):
-
-    notBefore = (
-        datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
-    )
-
-    log.debug(
-        "Scheduling next group cacher update: {when}", when=notBefore
-    )
-
-    def _enqueue(txn):
-        return txn.enqueue(GroupCacherPollingWork, notBefore=notBefore)
-
-    wp = yield store.inTransaction("scheduleNextGroupCachingUpdate", _enqueue)
-
-    returnValue(wp)
-
-
-
-def schedulePolledGroupCachingUpdate(store):
-    """
-    Schedules a group caching update work item to run immediately.
-    """
-    return scheduleNextGroupCachingUpdate(store, 0)
-
-
-
 class GroupRefreshWork(WorkItem, fromTable(schema.GROUP_REFRESH_WORK)):
 
     group = property(lambda self: self.groupUid)
@@ -163,26 +124,7 @@
         lambda self: "{0}, {1}".format(self.groupID, self.resourceID)
     )
 
-    @classmethod
-    @inlineCallbacks
-    def _schedule(cls, txn, eventID, groupID, seconds):
-        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
-        log.debug(
-            "scheduling group reconciliation for "
-            "({resourceID}, {groupID},): {when}",
-            resourceID=eventID,
-            groupID=groupID,
-            when=notBefore
-        )
-        wp = yield txn.enqueue(
-            cls,
-            resourceID=eventID,
-            groupID=groupID,
-            notBefore=notBefore,
-        )
-        returnValue(wp)
 
-
     @inlineCallbacks
     def doWork(self):
 
@@ -432,11 +374,11 @@
 
         wps = []
         for [eventID] in rows:
-            wp = yield GroupAttendeeReconciliationWork._schedule(
+            wp = yield GroupAttendeeReconciliationWork.reschedule(
                 txn,
-                eventID=eventID,
+                seconds=float(config.GroupAttendees.ReconciliationDelaySeconds),
+                resourceID=eventID,
                 groupID=groupID,
-                seconds=float(config.GroupAttendees.ReconciliationDelaySeconds)
             )
             wps.append(wp)
         returnValue(tuple(wps))
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140520/8ae91341/attachment-0001.html>


More information about the calendarserver-changes mailing list