[CalendarServer-changes] [13516] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Tue May 20 11:49:22 PDT 2014
Revision: 13516
http://trac.calendarserver.org//changeset/13516
Author: cdaboo at apple.com
Date: 2014-05-20 11:49:22 -0700 (Tue, 20 May 2014)
Log Message:
-----------
Add singleton and regenerating WorkItem types.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/tap/caldav.py
CalendarServer/trunk/calendarserver/tools/principals.py
CalendarServer/trunk/calendarserver/tools/purge.py
CalendarServer/trunk/calendarserver/tools/util.py
CalendarServer/trunk/requirements-stable.txt
CalendarServer/trunk/txdav/caldav/datastore/scheduling/imip/inbound.py
CalendarServer/trunk/txdav/common/datastore/sql.py
CalendarServer/trunk/txdav/common/datastore/work/inbox_cleanup.py
CalendarServer/trunk/txdav/common/datastore/work/revision_cleanup.py
CalendarServer/trunk/txdav/common/datastore/work/test/test_inbox_cleanup.py
CalendarServer/trunk/txdav/common/datastore/work/test/test_revision_cleanup.py
CalendarServer/trunk/txdav/who/groups.py
Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -69,7 +69,7 @@
from twext.enterprise.jobqueue import PeerConnectionPool
from twext.enterprise.jobqueue import WorkerFactory as QueueWorkerFactory
from twext.application.service import ReExecService
-from txdav.who.groups import scheduleNextGroupCachingUpdate
+from txdav.who.groups import GroupCacherPollingWork
from txweb2.channel.http import (
LimitingHTTPFactory, SSLRedirectRequest, HTTPChannel
@@ -89,10 +89,8 @@
UpgradeAcquireLockStep, UpgradeReleaseLockStep,
UpgradeDatabaseNotificationDataStep
)
-from txdav.common.datastore.work.inbox_cleanup import scheduleFirstInboxCleanup
-from txdav.common.datastore.work.revision_cleanup import (
- scheduleFirstFindMinRevision
-)
+from txdav.common.datastore.work.inbox_cleanup import InboxCleanupWork
+from txdav.common.datastore.work.revision_cleanup import FindMinValidRevisionWork
from txdav.who.util import directoryFromConfig
from txdav.dps.client import DirectoryService as DirectoryProxyClientService
from txdav.who.groups import GroupCacher
@@ -593,22 +591,22 @@
int(config.LogID) if config.LogID else 5
)
if self.doGroupCaching:
- yield scheduleNextGroupCachingUpdate(
+ yield GroupCacherPollingWork.initialSchedule(
self.store,
int(config.LogID) if config.LogID else 5
)
- yield scheduleFirstFindMinRevision(
+ yield FindMinValidRevisionWork.initialSchedule(
self.store,
int(config.LogID) if config.LogID else 5
)
- yield scheduleFirstInboxCleanup(
+ yield InboxCleanupWork.initialSchedule(
self.store,
int(config.LogID) if config.LogID else 5
)
# FIXME: uncomment this when purge is working
# from calendarserver.tools.purge import scheduleNextPrincipalPurgeUpdate
- # yield scheduleNextPrincipalPurgeUpdate(
+ # yield PrincipalPurgePollingWork.initialSchedule(
# self.store,
# int(config.LogID) if config.LogID else 5
# )
Modified: CalendarServer/trunk/calendarserver/tools/principals.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/principals.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/calendarserver/tools/principals.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -595,12 +595,8 @@
)
yield proxyGroup.setMembers(proxyRecords)
- # if store is not None:
- # # Schedule work the PeerConnectionPool will pick up as overdue
- # yield schedulePolledGroupCachingUpdate(store)
-
@inlineCallbacks
def getProxies(record):
"""
Modified: CalendarServer/trunk/calendarserver/tools/purge.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/purge.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/calendarserver/tools/purge.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -30,10 +30,10 @@
from twext.enterprise.dal.record import fromTable
from twext.enterprise.dal.syntax import Delete, Select, Union
-from twext.enterprise.jobqueue import WorkItem
+from twext.enterprise.jobqueue import WorkItem, RegeneratingWorkItem
from twext.python.log import Logger
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from twistedcaldav import caldavxml
from twistedcaldav.config import config
@@ -50,7 +50,7 @@
class PrincipalPurgePollingWork(
- WorkItem,
+ RegeneratingWorkItem,
fromTable(schema.PRINCIPAL_PURGE_POLLING_WORK)
):
"""
@@ -61,32 +61,31 @@
group = "principal_purge_polling"
+ @classmethod
+ def initialSchedule(cls, store, seconds):
+ def _enqueue(txn):
+ return PrincipalPurgePollingWork.reschedule(txn, seconds)
+
+ if config.AutomaticPurging.Enabled:
+ return store.inTransaction("PrincipalPurgePollingWork.initialSchedule", _enqueue)
+ else:
+ return succeed(None)
+
+
+ def regenerateInterval(self):
+ """
+ Return the interval in seconds between regenerating instances.
+ """
+ return config.AutomaticPurging.PollingIntervalSeconds if config.AutomaticPurging.Enabled else None
+
+
@inlineCallbacks
def doWork(self):
- # Delete all other work items
- yield Delete(From=self.table, Where=None).on(self.transaction)
-
# If not enabled, punt here
if not config.AutomaticPurging.Enabled:
returnValue(None)
- # Schedule next update, 7 days out (default)
- # Special - for testing it is handy to have this work item not regenerate, so
- # we use an interval of -1 to signify a one-shot operation
- if config.AutomaticPurging.PollingIntervalSeconds != -1:
- notBefore = (
- datetime.datetime.utcnow() +
- datetime.timedelta(seconds=config.AutomaticPurging.PollingIntervalSeconds)
- )
- log.info(
- "Scheduling next principal purge scan update: {when}", when=notBefore
- )
- yield self.transaction.enqueue(
- PrincipalPurgePollingWork,
- notBefore=notBefore
- )
-
# Do the scan
allUIDs = set()
for home in (schema.CALENDAR_HOME, schema.ADDRESSBOOK_HOME):
@@ -276,26 +275,6 @@
- at inlineCallbacks
-def scheduleNextPrincipalPurgeUpdate(store, seconds):
-
- notBefore = (
- datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
- )
-
- log.debug(
- "Scheduling next principal purge update: {when}", when=notBefore
- )
-
- def _enqueue(txn):
- return txn.enqueue(PrincipalPurgePollingWork, notBefore=notBefore)
-
- wp = yield store.inTransaction("scheduleNextPrincipalPurgeUpdate", _enqueue)
-
- returnValue(wp)
-
-
-
class PurgeOldEventsService(WorkerService):
cutoff = None
Modified: CalendarServer/trunk/calendarserver/tools/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/util.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/calendarserver/tools/util.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -42,7 +42,7 @@
from twistedcaldav import memcachepool
-from txdav.who.groups import schedulePolledGroupCachingUpdate
+from txdav.who.groups import GroupCacherPollingWork
log = Logger()
@@ -511,7 +511,9 @@
principal, proxyPrincipal, proxyTypes=proxyTypes))
# Schedule work the PeerConnectionPool will pick up as overdue
- yield schedulePolledGroupCachingUpdate(store)
+ def groupPollNow(txn):
+ return GroupCacherPollingWork.reschedule(txn, 0, force=True)
+ yield store.inTransaction(groupPollNow)
@@ -545,7 +547,9 @@
if removed:
# Schedule work the PeerConnectionPool will pick up as overdue
- yield schedulePolledGroupCachingUpdate(store)
+ def groupPollNow(txn):
+ return GroupCacherPollingWork.reschedule(txn, 0, force=True)
+ yield store.inTransaction(groupPollNow)
returnValue(removed)
Modified: CalendarServer/trunk/requirements-stable.txt
===================================================================
--- CalendarServer/trunk/requirements-stable.txt 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/requirements-stable.txt 2014-05-20 18:49:22 UTC (rev 13516)
@@ -5,7 +5,7 @@
# For CalendarServer development, don't try to get these projects from PyPI; use svn.
-e .
--e svn+http://svn.calendarserver.org/repository/calendarserver/twext/trunk@13494#egg=twextpy
+-e svn+http://svn.calendarserver.org/repository/calendarserver/twext/trunk@13515#egg=twextpy
-e svn+http://svn.calendarserver.org/repository/calendarserver/PyKerberos/trunk@13420#egg=kerberos
-e svn+http://svn.calendarserver.org/repository/calendarserver/PyCalendar/trunk@13420#egg=pycalendar
Modified: CalendarServer/trunk/txdav/caldav/datastore/scheduling/imip/inbound.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/scheduling/imip/inbound.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/caldav/datastore/scheduling/imip/inbound.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -19,8 +19,7 @@
"""
from twext.enterprise.dal.record import fromTable
-from twext.enterprise.dal.syntax import Delete
-from twext.enterprise.jobqueue import WorkItem
+from twext.enterprise.jobqueue import WorkItem, RegeneratingWorkItem
from twext.internet.gaiendpoint import GAIEndpoint
from twext.python.log import Logger, LegacyLogger
@@ -81,25 +80,25 @@
-class IMIPPollingWork(WorkItem, fromTable(schema.IMIP_POLLING_WORK)):
+class IMIPPollingWork(RegeneratingWorkItem, fromTable(schema.IMIP_POLLING_WORK)):
# FIXME: purge all old tokens here
group = "imip_polling"
+ def regenerateInterval(self):
+ """
+ Return the interval in seconds between regenerating instances.
+ """
+ mailRetriever = self.transaction._mailRetriever
+ return mailRetriever.settings["seconds"]
+
+
@inlineCallbacks
def doWork(self):
- # Delete all other work items
- yield Delete(From=self.table, Where=None).on(self.transaction)
-
mailRetriever = self.transaction._mailRetriever
if mailRetriever is not None:
- try:
- yield mailRetriever.fetchMail()
- except Exception, e:
- log.error("Failed to fetch mail (%s)" % (e,))
- finally:
- yield mailRetriever.scheduleNextPoll()
+ yield mailRetriever.fetchMail()
@@ -140,7 +139,7 @@
def scheduleNextPoll(self, seconds=None):
if seconds is None:
seconds = self.settings["PollingSeconds"]
- yield scheduleNextMailPoll(self.store, seconds)
+ yield IMIPPollingWork.reschedule(self.store, seconds)
Modified: CalendarServer/trunk/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/common/datastore/sql.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -583,6 +583,7 @@
label = "{}#{}${}".format(tr.filename, tr.lineno, tr.function)
self._store = store
+ self._queuer = self._store.queuer
self._calendarHomes = {}
self._addressbookHomes = {}
self._notificationHomes = {}
Modified: CalendarServer/trunk/txdav/common/datastore/work/inbox_cleanup.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/work/inbox_cleanup.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/common/datastore/work/inbox_cleanup.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -20,10 +20,10 @@
"""
from twext.enterprise.dal.record import fromTable
-from twext.enterprise.dal.syntax import Delete, Select, Count
-from twext.enterprise.jobqueue import WorkItem
+from twext.enterprise.dal.syntax import Select, Count
+from twext.enterprise.jobqueue import WorkItem, RegeneratingWorkItem
from twext.python.log import Logger
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, succeed
from twistedcaldav.config import config
from txdav.common.datastore.sql_tables import schema, _HOME_STATUS_NORMAL
import datetime
@@ -31,26 +31,31 @@
log = Logger()
-class InboxCleanupWork(WorkItem,
- fromTable(schema.INBOX_CLEANUP_WORK)):
+class InboxCleanupWork(RegeneratingWorkItem, fromTable(schema.INBOX_CLEANUP_WORK)):
group = "inbox_cleanup"
@classmethod
- @inlineCallbacks
- def _schedule(cls, txn, seconds):
- notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
- log.debug("Scheduling clean inboxes work: {}".format(notBefore,))
- wp = yield txn.enqueue(cls, notBefore=notBefore)
- returnValue(wp)
+ def initialSchedule(cls, store, seconds):
+ def _enqueue(txn):
+ return InboxCleanupWork.reschedule(txn, seconds)
+ if config.InboxCleanup.Enabled:
+ return store.inTransaction("InboxCleanupWork.initialSchedule", _enqueue)
+ else:
+ return succeed(None)
+
+ def regenerateInterval(self):
+ """
+ Return the interval in seconds between regenerating instances.
+ """
+ return float(config.InboxCleanup.CleanupPeriodDays) * 24 * 60 * 60
+
+
@inlineCallbacks
def doWork(self):
- # Delete all other work items
- yield Delete(From=self.table, Where=None).on(self.transaction)
-
# exit if not done with last delete:
coiw = schema.CLEANUP_ONE_INBOX_WORK
queuedCleanupOneInboxWorkItems = (yield Select(
@@ -71,32 +76,16 @@
).on(self.transaction)
for homeRow in homeRows:
- yield CleanupOneInboxWork._schedule(self.transaction, homeID=homeRow[0], seconds=0)
+ yield CleanupOneInboxWork.reschedule(self.transaction, seconds=0, homeID=homeRow[0])
- # Schedule next check
- yield self._schedule(
- self.transaction,
- float(config.InboxCleanup.CleanupPeriodDays) * 24 * 60 * 60
- )
-
class CleanupOneInboxWork(WorkItem,
fromTable(schema.CLEANUP_ONE_INBOX_WORK)):
group = property(lambda self: "cleanup_inbox_in_homeid_{}".format(self.homeID))
- @classmethod
@inlineCallbacks
- def _schedule(cls, txn, homeID, seconds):
- notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
- log.debug("Scheduling Inbox cleanup work: {notBefore} in home id: {homeID}".format(
- notBefore=notBefore, homeID=homeID))
- wp = yield txn.enqueue(cls, notBefore=notBefore, homeID=homeID)
- returnValue(wp)
-
-
- @inlineCallbacks
def doWork(self):
# No need to delete other work items. They are unique
@@ -143,15 +132,3 @@
inbox = yield home.childWithName("inbox")
for item in (yield inbox.objectResourcesWithNames(itemNamesToDelete)):
yield item.remove()
-
-
-
- at inlineCallbacks
-def scheduleFirstInboxCleanup(store, seconds):
- if config.InboxCleanup.Enabled:
- txn = store.newTransaction(label="scheduleFirstInboxCleanup")
- wp = yield InboxCleanupWork._schedule(txn, seconds)
- yield txn.commit()
- returnValue(wp)
- else:
- log.debug("Inbox cleanup work disabled.")
Modified: CalendarServer/trunk/txdav/common/datastore/work/revision_cleanup.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/work/revision_cleanup.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/common/datastore/work/revision_cleanup.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -20,10 +20,10 @@
"""
from twext.enterprise.dal.record import fromTable
-from twext.enterprise.dal.syntax import Delete, Select, Max
-from twext.enterprise.jobqueue import WorkItem
+from twext.enterprise.dal.syntax import Select, Max
+from twext.enterprise.jobqueue import SingletonWorkItem, RegeneratingWorkItem
from twext.python.log import Logger
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, succeed
from twistedcaldav.config import config
from txdav.common.datastore.sql_tables import schema
import datetime
@@ -31,26 +31,31 @@
log = Logger()
-class FindMinValidRevisionWork(WorkItem,
- fromTable(schema.FIND_MIN_VALID_REVISION_WORK)):
+class FindMinValidRevisionWork(RegeneratingWorkItem, fromTable(schema.FIND_MIN_VALID_REVISION_WORK)):
group = "find_min_revision"
@classmethod
- @inlineCallbacks
- def _schedule(cls, txn, seconds):
- notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
- log.debug("Scheduling find minimum valid revision work: %s" % (notBefore,))
- wp = yield txn.enqueue(cls, notBefore=notBefore)
- returnValue(wp)
+ def initialSchedule(cls, store, seconds):
+ def _enqueue(txn):
+ return FindMinValidRevisionWork.reschedule(txn, seconds)
+ if config.RevisionCleanup.Enabled:
+ return store.inTransaction("FindMinValidRevisionWork.initialSchedule", _enqueue)
+ else:
+ return succeed(None)
+
+ def regenerateInterval(self):
+ """
+ Return the interval in seconds between regenerating instances.
+ """
+ return float(config.RevisionCleanup.CleanupPeriodDays) * 24 * 60 * 60
+
+
@inlineCallbacks
def doWork(self):
- # Delete all other work items
- yield Delete(From=self.table, Where=None).on(self.transaction)
-
# Get the minimum valid revision
minValidRevision = int((yield self.transaction.calendarserverValue("MIN-VALID-REVISION")))
@@ -82,57 +87,19 @@
yield self.transaction.updateCalendarserverValue("MIN-VALID-REVISION", maxRevOlderThanDate)
# Schedule revision cleanup
- yield RevisionCleanupWork._schedule(self.transaction, seconds=0)
+ yield RevisionCleanupWork.reschedule(self.transaction, seconds=0)
- else:
- # Schedule next check
- yield FindMinValidRevisionWork._schedule(
- self.transaction,
- float(config.RevisionCleanup.CleanupPeriodDays) * 24 * 60 * 60
- )
+class RevisionCleanupWork(SingletonWorkItem, fromTable(schema.REVISION_CLEANUP_WORK)):
-class RevisionCleanupWork(WorkItem,
- fromTable(schema.REVISION_CLEANUP_WORK)):
-
group = "group_revsion_cleanup"
- @classmethod
@inlineCallbacks
- def _schedule(cls, txn, seconds):
- notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
- log.debug("Scheduling revision cleanup work: %s" % (notBefore,))
- wp = yield txn.enqueue(cls, notBefore=notBefore)
- returnValue(wp)
-
-
- @inlineCallbacks
def doWork(self):
- # Delete all other work items
- yield Delete(From=self.table, Where=None).on(self.transaction)
-
# Get the minimum valid revision
minValidRevision = int((yield self.transaction.calendarserverValue("MIN-VALID-REVISION")))
# delete revisions
yield self.transaction.deleteRevisionsBefore(minValidRevision)
-
- # Schedule next update
- yield FindMinValidRevisionWork._schedule(
- self.transaction,
- float(config.RevisionCleanup.CleanupPeriodDays) * 24 * 60 * 60
- )
-
-
-
- at inlineCallbacks
-def scheduleFirstFindMinRevision(store, seconds):
- if config.RevisionCleanup.Enabled:
- txn = store.newTransaction(label="scheduleFirstFindMinRevision")
- wp = yield FindMinValidRevisionWork._schedule(txn, seconds)
- yield txn.commit()
- returnValue(wp)
- else:
- log.debug("Revision cleanup work disabled.")
Modified: CalendarServer/trunk/txdav/common/datastore/work/test/test_inbox_cleanup.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/work/test/test_inbox_cleanup.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/common/datastore/work/test/test_inbox_cleanup.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -126,25 +126,20 @@
"""
Verify that InboxCleanupWork queues one CleanupOneInboxBoxWork per home
"""
- class FakeInboxCleanupWork(WorkItem):
- @classmethod
- def _schedule(cls, txn, seconds):
- pass
+ self.patch(config.InboxCleanup, "CleanupPeriodDays", -1)
- self.patch(InboxCleanupWork, "_schedule", FakeInboxCleanupWork._schedule)
-
class FakeCleanupOneInboxWork(WorkItem):
scheduledHomeIDs = []
@classmethod
- def _schedule(cls, txn, homeID, seconds):
+ def reschedule(cls, txn, seconds, homeID):
cls.scheduledHomeIDs.append(homeID)
pass
- self.patch(CleanupOneInboxWork, "_schedule", FakeCleanupOneInboxWork._schedule)
+ self.patch(CleanupOneInboxWork, "reschedule", FakeCleanupOneInboxWork.reschedule)
# do cleanup
- yield self.transactionUnderTest().enqueue(InboxCleanupWork, notBefore=datetime.datetime.utcnow())
+ yield InboxCleanupWork.reschedule(self.transactionUnderTest(), 0)
yield self.commit()
yield JobItem.waitEmpty(self.storeUnderTest().newTransaction, reactor, 60)
Modified: CalendarServer/trunk/txdav/common/datastore/work/test/test_revision_cleanup.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/work/test/test_revision_cleanup.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/common/datastore/work/test/test_revision_cleanup.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -17,7 +17,7 @@
from twext.enterprise.dal.syntax import Select
-from twext.enterprise.jobqueue import WorkItem, JobItem
+from twext.enterprise.jobqueue import JobItem
from twext.python.clsprop import classproperty
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
@@ -43,14 +43,9 @@
yield self.buildStoreAndDirectory()
yield self.populate()
- class FakeWork(WorkItem):
- @classmethod
- def _schedule(cls, txn, seconds):
- pass
-
- self.patch(FindMinValidRevisionWork, "_schedule", FakeWork._schedule)
- self.patch(RevisionCleanupWork, "_schedule", FakeWork._schedule)
+ self.patch(config.RevisionCleanup, "Enabled", True)
self.patch(config.RevisionCleanup, "SyncTokenLifetimeDays", 0)
+ self.patch(config.RevisionCleanup, "CleanupPeriodDays", -1)
@inlineCallbacks
@@ -266,7 +261,7 @@
self.assertNotEqual(len(revisionRows), 0)
# do FindMinValidRevisionWork
- yield self.transactionUnderTest().enqueue(FindMinValidRevisionWork, notBefore=datetime.datetime.utcnow())
+ yield FindMinValidRevisionWork.reschedule(self.transactionUnderTest(), 0)
yield self.commit()
yield JobItem.waitEmpty(self.storeUnderTest().newTransaction, reactor, 60)
Modified: CalendarServer/trunk/txdav/who/groups.py
===================================================================
--- CalendarServer/trunk/txdav/who/groups.py 2014-05-20 18:38:40 UTC (rev 13515)
+++ CalendarServer/trunk/txdav/who/groups.py 2014-05-20 18:49:22 UTC (rev 13516)
@@ -21,9 +21,9 @@
from twext.enterprise.dal.record import fromTable
from twext.enterprise.dal.syntax import Delete, Select
-from twext.enterprise.jobqueue import WorkItem
+from twext.enterprise.jobqueue import WorkItem, RegeneratingWorkItem
from twext.python.log import Logger
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from twistedcaldav.config import config
from txdav.caldav.datastore.sql import CalendarStoreFeatures
from txdav.common.datastore.sql_tables import schema
@@ -34,36 +34,38 @@
class GroupCacherPollingWork(
- WorkItem,
+ RegeneratingWorkItem,
fromTable(schema.GROUP_CACHER_POLLING_WORK)
):
group = "group_cacher_polling"
- @inlineCallbacks
- def doWork(self):
+ @classmethod
+ def initialSchedule(cls, store, seconds):
+ def _enqueue(txn):
+ return GroupCacherPollingWork.reschedule(txn, seconds)
- # Delete all other work items
- yield Delete(From=self.table, Where=None).on(self.transaction)
+ if config.InboxCleanup.Enabled:
+ return store.inTransaction("GroupCacherPollingWork.initialSchedule", _enqueue)
+ else:
+ return succeed(None)
+
+ def regenerateInterval(self):
+ """
+ Return the interval in seconds between regenerating instances.
+ """
groupCacher = getattr(self.transaction, "_groupCacher", None)
- if groupCacher is not None:
+ return groupCacher.updateSeconds if groupCacher else 10
- # Schedule next update
- notBefore = (
- datetime.datetime.utcnow() +
- datetime.timedelta(seconds=groupCacher.updateSeconds)
- )
- log.debug(
- "Scheduling next group cacher update: {when}", when=notBefore
- )
- yield self.transaction.enqueue(
- GroupCacherPollingWork,
- notBefore=notBefore
- )
+ @inlineCallbacks
+ def doWork(self):
- # New implmementation
+ groupCacher = getattr(self.transaction, "_groupCacher", None)
+ if groupCacher is not None:
+
+ # New implementation
try:
yield groupCacher.update(self.transaction)
except Exception, e:
@@ -72,49 +74,8 @@
error=e
)
- else:
- notBefore = (
- datetime.datetime.utcnow() +
- datetime.timedelta(seconds=10)
- )
- log.debug(
- "Rescheduling group cacher update: {when}", when=notBefore
- )
- yield self.transaction.enqueue(
- GroupCacherPollingWork,
- notBefore=notBefore
- )
-
- at inlineCallbacks
-def scheduleNextGroupCachingUpdate(store, seconds):
-
- notBefore = (
- datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
- )
-
- log.debug(
- "Scheduling next group cacher update: {when}", when=notBefore
- )
-
- def _enqueue(txn):
- return txn.enqueue(GroupCacherPollingWork, notBefore=notBefore)
-
- wp = yield store.inTransaction("scheduleNextGroupCachingUpdate", _enqueue)
-
- returnValue(wp)
-
-
-
-def schedulePolledGroupCachingUpdate(store):
- """
- Schedules a group caching update work item to run immediately.
- """
- return scheduleNextGroupCachingUpdate(store, 0)
-
-
-
class GroupRefreshWork(WorkItem, fromTable(schema.GROUP_REFRESH_WORK)):
group = property(lambda self: self.groupUid)
@@ -163,26 +124,7 @@
lambda self: "{0}, {1}".format(self.groupID, self.resourceID)
)
- @classmethod
- @inlineCallbacks
- def _schedule(cls, txn, eventID, groupID, seconds):
- notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds)
- log.debug(
- "scheduling group reconciliation for "
- "({resourceID}, {groupID},): {when}",
- resourceID=eventID,
- groupID=groupID,
- when=notBefore
- )
- wp = yield txn.enqueue(
- cls,
- resourceID=eventID,
- groupID=groupID,
- notBefore=notBefore,
- )
- returnValue(wp)
-
@inlineCallbacks
def doWork(self):
@@ -432,11 +374,11 @@
wps = []
for [eventID] in rows:
- wp = yield GroupAttendeeReconciliationWork._schedule(
+ wp = yield GroupAttendeeReconciliationWork.reschedule(
txn,
- eventID=eventID,
+ seconds=float(config.GroupAttendees.ReconciliationDelaySeconds),
+ resourceID=eventID,
groupID=groupID,
- seconds=float(config.GroupAttendees.ReconciliationDelaySeconds)
)
wps.append(wp)
returnValue(tuple(wps))
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140520/8ae91341/attachment-0001.html>
More information about the calendarserver-changes
mailing list