[CalendarServer-changes] [4825] CalendarServer/branches/users/wsanchez/deployment/twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Fri Dec 4 14:31:14 PST 2009
Revision: 4825
http://trac.macosforge.org/projects/calendarserver/changeset/4825
Author: sagen at apple.com
Date: 2009-12-04 14:31:14 -0800 (Fri, 04 Dec 2009)
Log Message:
-----------
The master process now sends SIGUSR1 to each slave process to tell it to reread the directory cache files. These signals are sent with a (configurable) delay in between so as not to use up too much CPU at once.
Modified Paths:
--------------
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/appleopendirectory.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/directory.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/test/test_opendirectoryrecords.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py 2009-12-04 20:20:34 UTC (rev 4824)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py 2009-12-04 22:31:14 UTC (rev 4825)
@@ -19,10 +19,12 @@
import tempfile
import socket
import time
+import signal
from twisted.runner import procmon
from twisted.application import internet, service
from twisted.internet import reactor, process
+from twisted.internet.threads import deferToThread
from twisted.python.reflect import namedClass
from twistedcaldav.accesslog import AMPLoggingFactory, RotatingFileAccessLoggingObserver
@@ -180,7 +182,7 @@
self.consistency = reactor.callLater(self.consistencyDelay,
self._checkConsistency)
- def signalAll(self, signal, startswithname=None):
+ def signalAll(self, signal, startswithname=None, seconds=0):
"""
Send a signal to all child processes.
@@ -189,9 +191,11 @@
@param startswithname: is set only signal those processes whose name starts with this string
@type signal: C{str}
"""
+ delay = 0
for name in self.processes.keys():
if startswithname is None or name.startswith(startswithname):
- self.signalProcess(signal, name)
+ reactor.callLater(delay, self.signalProcess, signal, name)
+ delay += seconds
def signalProcess(self, signal, name):
"""
@@ -238,14 +242,23 @@
# Refresh directory information on behalf of the child processes
directoryClass = namedClass(config.DirectoryService["type"])
directory = directoryClass(dosetup=True, doreload=False, **config.DirectoryService["params"])
- directory.refresh()
+ directory.refresh(master=True)
+ # Register USR1 handler
+ def sigusr1_handler(num, frame):
+ log.warn("SIGUSR1 recieved in master, triggering directory refresh")
+ deferToThread(directory.refresh, loop=False, master=True)
+ return
+
+ signal.signal(signal.SIGUSR1, sigusr1_handler)
+
s = service.MultiService()
monitor = DelayedStartupProcessMonitor()
monitor.setServiceParent(s)
- s.processMonitor = monitor
+ directory.processMonitor = s.processMonitor = monitor
+
parentEnv = {
'PATH': os.environ.get('PATH', ''),
'PYTHONPATH': os.environ.get('PYTHONPATH', ''),
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py 2009-12-04 20:20:34 UTC (rev 4824)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py 2009-12-04 22:31:14 UTC (rev 4825)
@@ -73,6 +73,7 @@
"node": "/Search",
"requireComputerRecord": True,
"cacheTimeout": 30,
+ "signalIntervalSeconds": 10,
},
}
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/appleopendirectory.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/appleopendirectory.py 2009-12-04 20:20:34 UTC (rev 4824)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/appleopendirectory.py 2009-12-04 22:31:14 UTC (rev 4825)
@@ -25,6 +25,7 @@
import sys
import os
+import signal
from random import random
from uuid import UUID
@@ -68,7 +69,7 @@
def __repr__(self):
return "<%s %r: %r>" % (self.__class__.__name__, self.realmName, self.node)
- def __init__(self, node="/Search", requireComputerRecord=True, dosetup=True, doreload=True, cacheTimeout=30, **kwds):
+ def __init__(self, node="/Search", requireComputerRecord=True, dosetup=True, doreload=True, cacheTimeout=30, signalIntervalSeconds=10, **kwds):
"""
@param node: an OpenDirectory node name to bind to.
@param requireComputerRecord: C{True} if the directory schema is to be used to determine
@@ -90,8 +91,10 @@
self.computerRecords = {}
self.servicetags = set()
self.cacheTimeout = cacheTimeout
+ self.signalIntervalSeconds = signalIntervalSeconds
self._records = {}
self._delayedCalls = set()
+ self._refreshing = False
self.isWorkgroupServer = False
@@ -115,7 +118,7 @@
for recordType in self.recordTypes():
self.recordsForType(recordType)
- def refresh(self, loop=True):
+ def refresh(self, loop=True, master=False):
"""
This service works by having the master process call this method
which queries OD for all records, storing the pickled results into
@@ -126,57 +129,82 @@
these files.
"""
+ if self._refreshing:
+ self.log_warn("Already refreshing directory cache")
+ return
+
+ self._refreshing = True
+
def _refresh(self):
- dataRoot = FilePath(config.DataRoot)
- cacheDir = dataRoot.child("DirectoryCache")
- if not cacheDir.exists():
- cacheDir.createDirectory()
+ try:
+ dataRoot = FilePath(config.DataRoot)
+ cacheDir = dataRoot.child("DirectoryCache")
+ if not cacheDir.exists():
+ cacheDir.createDirectory()
- for recordType in self.recordTypes():
- self.log_debug("Master fetching %s from directory" % (recordType,))
- cacheFile = cacheDir.child(recordType)
- try:
- results = self._queryDirectory(recordType)
- except Exception, e:
- self.log_error("Master query for %s failed: %s" % (recordType, e))
- continue
+ dataWritten = False
+ for recordType in self.recordTypes():
+ self.log_warn("Master fetching %s for directory cache" % (recordType,))
+ cacheFile = cacheDir.child(recordType)
+ try:
+ results = self._queryDirectory(recordType)
+ except Exception, e:
+ self.log_error("Master query for %s failed: %s" % (recordType, e))
+ continue
- results.sort()
- numNewResults = len(results)
- pickled = pickle.dumps(results)
- needsWrite = True
- if cacheFile.exists():
- prevPickled = cacheFile.getContent()
- if prevPickled == pickled:
- needsWrite = False
- else:
- prevResults = pickle.loads(prevPickled)
- numPrevResults = len(prevResults)
- if numPrevResults == 0:
- needsWrite = True
+ results.sort()
+ numNewResults = len(results)
+ pickled = pickle.dumps(results)
+ needsWrite = True
+ if cacheFile.exists():
+ prevPickled = cacheFile.getContent()
+ if prevPickled == pickled:
+ needsWrite = False
else:
- if float(numNewResults) / numPrevResults < 0.5:
- # New results is less than half of what it used
- # to be -- this indicates we might not have
- # gotten back enough records from OD. Don't
- # write out the file, but log an error.
- self.log_error("OD results for %s substantially less than last time: was %d, now %d." % (recordType, numPrevResults, numNewResults))
- needsWrite = False
- continue
+ prevResults = pickle.loads(prevPickled)
+ numPrevResults = len(prevResults)
+ if numPrevResults == 0:
+ needsWrite = True
+ else:
+ if float(numNewResults) / numPrevResults < 0.5:
+ # New results is less than half of what it used
+ # to be -- this indicates we might not have
+ # gotten back enough records from OD. Don't
+ # write out the file, but log an error.
+ self.log_error("OD results for %s substantially less than last time: was %d, now %d." % (recordType, numPrevResults, numNewResults))
+ needsWrite = False
+ continue
- if needsWrite:
- self.log_info("Saving cache file for %s (%d items)" % (recordType, numNewResults))
- cacheFile.setContent(pickled)
- else:
- self.log_debug("%s info hasn't changed" % (recordType,))
+ if needsWrite:
+ self.log_warn("Saving directory cache file for %s (%d items)" % (recordType, numNewResults))
+ cacheFile.setContent(pickled)
+ dataWritten = True
+ else:
+ self.log_warn("%s info hasn't changed" % (recordType,))
+ if dataWritten and hasattr(self, 'processMonitor'):
+ self.processMonitor.signalAll(signal.SIGUSR1, "caldav", seconds=self.signalIntervalSeconds)
+ finally:
+ self._refreshing = False
+
def _refreshInThread(self):
return deferToThread(_refresh, self)
- _refresh(self)
+ if master:
+ _refresh(self)
- if loop:
- LoopingCall(_refreshInThread, self).start(self.cacheTimeout * 60)
+ if loop:
+ LoopingCall(_refreshInThread, self).start(self.cacheTimeout * 60)
+ else:
+ def _reloadCaches():
+ try:
+ self.log_warn("Reading directory cache files")
+ for recordType in self.recordTypes():
+ self.reloadCache(recordType)
+ self.log_warn("Done reading directory cache files")
+ finally:
+ self._refreshing = False
+ deferToThread(_reloadCaches)
@@ -483,20 +511,7 @@
except KeyError:
self.reloadCache(recordType)
storage = self._records[recordType]
- else:
- if storage["status"] == "stale":
- storage["status"] = "loading"
- def onError(f):
- storage["status"] = "stale" # Keep trying
- self.log_error(
- "Unable to load records of type %s from OpenDirectory due to unexpected error: %s"
- % (recordType, f)
- )
-
- d = deferToThread(self.reloadCache, recordType)
- d.addErrback(onError)
-
return storage
def recordsForType(self, recordType):
@@ -638,17 +653,6 @@
def reloadCache(self, recordType, forceUpdate=False):
- def rot():
- storage["status"] = "stale"
- removals = set()
- for call in self._delayedCalls:
- if not call.active():
- removals.add(call)
- for item in removals:
- self._delayedCalls.remove(item)
-
- cacheTimeout = 60 # child processes always check once per minute
-
dataRoot = FilePath(config.DataRoot)
cacheDir = dataRoot.child("DirectoryCache")
if not cacheDir.exists():
@@ -664,11 +668,14 @@
lastModified = cacheFile.getModificationTime()
try:
storage = self._records[recordType]
+ if storage["status"] == "loading":
+ self.log_warn("Directory cache file for %s already being reloaded" % (recordType,))
+ return
if not forceUpdate and (lastModified <= storage["last modified"]):
self.log_debug("Directory cache file for %s unchanged" % (recordType,))
storage["status"] = "new" # mark this as not stale
- self._delayedCalls.add(callLater(cacheTimeout, rot))
return
+ storage["status"] = "loading"
except KeyError:
# Haven't read the file before
pass
@@ -677,7 +684,6 @@
pickled = cacheFile.getContent()
results = pickle.loads(pickled)
- # results = self._queryDirectory(recordType)
records = {}
guids = {}
@@ -692,31 +698,49 @@
proxiesForGUID = {}
readOnlyProxiesForGUID = {}
+ def allowForACLs():
+ return recordType in (
+ DirectoryService.recordType_users,
+ DirectoryService.recordType_groups,
+ )
+
+ def disableForCalendaring(recordShortName):
+ self.log_debug(
+ "Record (%s) %s is not enabled for calendaring but may be used in ACLs"
+ % (recordType, recordShortName)
+ )
+
+ def invalidRecord(recordShortName):
+ self.log_error(
+ "Directory (incorrectly) returned a record with no applicable "
+ "ServicesLocator attribute: (%s) %s"
+ % (recordType, recordShortName)
+ )
+
+ def disableRecord(record):
+ self.log_warn("Record disabled due to conflict (record name and GUID must match): %s" % (record,))
+
+ shortName = record.shortName
+ guid = record.guid
+ cuaddrset = record.calendarUserAddresses
+
+ disabledNames.add(shortName)
+ disabledGUIDs.add(guid)
+
+ if shortName in records:
+ del records[shortName]
+ if guid in guids:
+ del guids[guid]
+ for cuaddr in cuaddrset:
+ if cuaddr in cuaddrs:
+ del cuaddrs[cuaddr]
+
for (recordShortName, value) in results:
enabledForCalendaring = True
if self.requireComputerRecord:
servicesLocators = value.get(dsattributes.kDSNAttrServicesLocator)
- def allowForACLs():
- return recordType in (
- DirectoryService.recordType_users,
- DirectoryService.recordType_groups,
- )
-
- def disableForCalendaring():
- self.log_debug(
- "Record (%s) %s is not enabled for calendaring but may be used in ACLs"
- % (recordType, recordShortName)
- )
-
- def invalidRecord():
- self.log_error(
- "Directory (incorrectly) returned a record with no applicable "
- "ServicesLocator attribute: (%s) %s"
- % (recordType, recordShortName)
- )
-
if servicesLocators:
if type(servicesLocators) is str:
servicesLocators = (servicesLocators,)
@@ -726,17 +750,17 @@
break
else:
if allowForACLs():
- disableForCalendaring()
+ disableForCalendaring(recordShortName)
enabledForCalendaring = False
else:
- invalidRecord()
+ invalidRecord(recordShortName)
continue
else:
if allowForACLs():
- disableForCalendaring()
+ disableForCalendaring(recordShortName)
enabledForCalendaring = False
else:
- invalidRecord()
+ invalidRecord(recordShortName)
continue
# Now get useful record info.
@@ -801,24 +825,6 @@
readOnlyProxyGUIDs = readOnlyProxyGUIDs,
)
- def disableRecord(record):
- self.log_warn("Record disabled due to conflict (record name and GUID must match): %s" % (record,))
-
- shortName = record.shortName
- guid = record.guid
- cuaddrset = record.calendarUserAddresses
-
- disabledNames.add(shortName)
- disabledGUIDs.add(guid)
-
- if shortName in records:
- del records[shortName]
- if guid in guids:
- del guids[guid]
- for cuaddr in cuaddrset:
- if cuaddr in cuaddrs:
- del cuaddrs[cuaddr]
-
# Check for disabled items
if record.shortName in disabledNames or record.guid in disabledGUIDs:
disableRecord(record)
@@ -873,13 +879,11 @@
storage["proxiesForGUID"] = proxiesForGUID
storage["readOnlyProxiesForGUID"] = readOnlyProxiesForGUID
- self._delayedCalls.add(callLater(cacheTimeout, rot))
-
self._records[recordType] = storage
self.log_info(
- "Added %d records to %s OD record cache; expires in %d seconds"
- % (len(self._records[recordType]["guids"]), recordType, cacheTimeout)
+ "Added %d records to %s OD record cache"
+ % (len(self._records[recordType]["guids"]), recordType)
)
def _queryDirectory(self, recordType):
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/directory.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/directory.py 2009-12-04 20:20:34 UTC (rev 4824)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/directory.py 2009-12-04 22:31:14 UTC (rev 4825)
@@ -139,13 +139,14 @@
for record in self.listRecords(recordType):
yield record
- def refresh(self):
+ def refresh(self, master=False):
"""
This gets called in the master process to give the directory service
a chance to refresh a cache of directory information
"""
pass
+
class DirectoryRecord(LoggingMixIn):
implements(IDirectoryRecord)
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/test/test_opendirectoryrecords.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/test/test_opendirectoryrecords.py 2009-12-04 20:20:34 UTC (rev 4824)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/test/test_opendirectoryrecords.py 2009-12-04 22:31:14 UTC (rev 4825)
@@ -102,7 +102,7 @@
fakeODRecord("Location 02"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
self._service.reloadCache(DirectoryService.recordType_groups)
@@ -148,7 +148,7 @@
fakeODRecord("Location 04", addLocator=False),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
self._service.reloadCache(DirectoryService.recordType_groups)
@@ -173,7 +173,7 @@
fakeODRecord("User 01"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
@@ -187,7 +187,7 @@
fakeODRecord("User 03", guid="D10F3EE0-5014-41D3-8488-3819D3EF3B2A"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users, forceUpdate=True)
@@ -202,7 +202,7 @@
fakeODRecord("User 02"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
@@ -219,7 +219,7 @@
fakeODRecord("User 02", guid="30CA2BB9-C935-4A5D-80E2-79266BCB0255"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
@@ -238,7 +238,7 @@
fakeODRecord("User 03", guid="113D7F74-F84A-4F17-8C96-CE8F10D68EF8"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
@@ -258,7 +258,7 @@
fakeODRecord("User 02", guid="136E369F-DB40-4135-878D-B75D38242D39"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
@@ -277,7 +277,7 @@
fakeODRecord("User 03", guid="D10F3EE0-5014-41D3-8488-3819D3EF3B2A"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
@@ -292,7 +292,7 @@
fakeODRecord("User 03", guid="62368DDF-0C62-4C97-9A58-DE9FD46131A0", shortName="user05"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users, forceUpdate=True)
@@ -328,7 +328,7 @@
fakeODRecord("Location 02"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
self._service.reloadCache(DirectoryService.recordType_groups)
@@ -358,7 +358,7 @@
guidForShortName("user02"),
]),
]
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_groups, forceUpdate=True)
group1 = self._service.recordWithShortName(DirectoryService.recordType_groups, "group01")
@@ -388,7 +388,7 @@
guidForShortName("user02"),
]),
]
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_groups, forceUpdate=True)
group1 = self._service.recordWithShortName(DirectoryService.recordType_groups, "group01")
@@ -418,7 +418,7 @@
DirectoryService.recordType_resources: [],
DirectoryService.recordType_locations: [],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
@@ -438,7 +438,7 @@
DirectoryService.recordType_resources: [],
DirectoryService.recordType_locations: [],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users, forceUpdate=True)
user1 = self._service.recordWithCalendarUserAddress("mailto:user01 at example.com")
@@ -456,7 +456,7 @@
DirectoryService.recordType_resources: [],
DirectoryService.recordType_locations: [],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users, forceUpdate=True)
user1 = self._service.recordWithCalendarUserAddress("mailto:user01 at example.com")
@@ -486,7 +486,7 @@
DirectoryService.recordType_resources: [],
DirectoryService.recordType_locations: [],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users, forceUpdate=True)
user1 = self._service.recordWithCalendarUserAddress("mailto:user01 at example.com")
self.assertTrue(user1 is not None)
@@ -496,7 +496,7 @@
fakeODRecord("User 01"),
fakeODRecord("User 02"),
]
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users, forceUpdate=True)
user3 = self._service.recordWithCalendarUserAddress("mailto:user03 at example.com")
self.assertTrue(user3 is not None)
@@ -529,7 +529,7 @@
fakeODRecord("User 01"),
],
}
- self._service.refresh(loop=False)
+ self._service.refresh(loop=False, master=True)
self._service.reloadCache(DirectoryService.recordType_users)
user1 = self._service.recordWithCalendarUserAddress("mailto:user01 at example.com")
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py 2009-12-04 20:20:34 UTC (rev 4824)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py 2009-12-04 22:31:14 UTC (rev 4825)
@@ -17,6 +17,7 @@
import os
import stat
import socket
+import signal
from zope.interface import implements
@@ -827,6 +828,15 @@
# Change log level back to what it was before
setLogLevelForNamespace(None, oldLogLevel)
+
+ # Register USR1 handler
+ def sigusr1_handler(num, frame):
+ log.debug("SIGUSR1 recieved, triggering directory refresh")
+ baseDirectory.refresh()
+ return
+
+ signal.signal(signal.SIGUSR1, sigusr1_handler)
+
return service
makeService_Combined = makeService_Combined
@@ -862,7 +872,6 @@
else:
return "%s: %s" % (frame.f_code.co_name, frame.f_lineno)
- import signal
def sighup_handler(num, frame):
log.info("SIGHUP recieved at %s" % (location(frame),))
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20091204/80831f2a/attachment-0001.html>
More information about the calendarserver-changes
mailing list