[CalendarServer-changes] [4430] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Mon Jul 6 14:32:08 PDT 2009


Revision: 4430
          http://trac.macosforge.org/projects/calendarserver/changeset/4430
Author:   sagen at apple.com
Date:     2009-07-06 14:32:06 -0700 (Mon, 06 Jul 2009)
Log Message:
-----------
Any inbox items left over from before an upgrade to an implicit-scheduling-capable server will now get processed.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/twisted/plugins/caldav.py
    CalendarServer/trunk/twistedcaldav/scheduling/scheduler.py
    CalendarServer/trunk/twistedcaldav/test/test_upgrade.py
    CalendarServer/trunk/twistedcaldav/upgrade.py

Added Paths:
-----------
    CalendarServer/trunk/calendarserver/sidecar/
    CalendarServer/trunk/calendarserver/sidecar/__init__.py
    CalendarServer/trunk/calendarserver/sidecar/task.py

Added: CalendarServer/trunk/calendarserver/sidecar/__init__.py
===================================================================
--- CalendarServer/trunk/calendarserver/sidecar/__init__.py	                        (rev 0)
+++ CalendarServer/trunk/calendarserver/sidecar/__init__.py	2009-07-06 21:32:06 UTC (rev 4430)
@@ -0,0 +1,19 @@
+##
+# Copyright (c) 2005-2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+CalendarServer "sidecar" processes
+"""

Added: CalendarServer/trunk/calendarserver/sidecar/task.py
===================================================================
--- CalendarServer/trunk/calendarserver/sidecar/task.py	                        (rev 0)
+++ CalendarServer/trunk/calendarserver/sidecar/task.py	2009-07-06 21:32:06 UTC (rev 4430)
@@ -0,0 +1,435 @@
+# Copyright (c) 2005-2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+__all__ = [
+    "CalDAVService",
+    "CalDAVOptions",
+    "CalDAVServiceMaker",
+]
+
+from calendarserver.provision.root import RootResource
+from time import sleep
+from twisted.application.service import Service, IServiceMaker
+from twisted.internet.address import IPv4Address
+from twisted.internet.defer import DeferredList, succeed, inlineCallbacks, returnValue
+from twisted.internet.reactor import callLater
+from twisted.plugin import IPlugin
+from twisted.python.reflect import namedClass
+from twisted.python.usage import Options, UsageError
+from twisted.web2.http_headers import Headers
+from twistedcaldav import memcachepool
+from twistedcaldav.config import config, defaultConfig, defaultConfigFile
+from twistedcaldav.directory.principal import DirectoryPrincipalProvisioningResource
+from twistedcaldav.ical import Component
+from twistedcaldav.log import Logger, LoggingMixIn
+from twistedcaldav.log import logLevelForNamespace, setLogLevelForNamespace
+from twistedcaldav.notify import installNotificationClient
+from twistedcaldav.scheduling.cuaddress import LocalCalendarUser
+from twistedcaldav.scheduling.scheduler import DirectScheduler
+from twistedcaldav.static import CalendarHomeProvisioningFile
+from zope.interface import implements
+import os
+
+log = Logger()
+
+class FakeRequest(object):
+
+    def __init__(self, rootResource, method):
+        self.rootResource = rootResource
+        self.method = method
+        self._resourcesByURL = {}
+        self._urlsByResource = {}
+        self.headers = Headers()
+
+    @inlineCallbacks
+    def _getChild(self, resource, segments):
+        if not segments:
+            returnValue(resource)
+
+        child, remaining = (yield resource.locateChild(self, segments))
+        returnValue((yield self._getChild(child, remaining)))
+
+    @inlineCallbacks
+    def locateResource(self, url):
+        url = url.strip("/")
+        segments = url.split("/")
+        resource = (yield self._getChild(self.rootResource, segments))
+        if resource:
+            self._rememberResource(resource, url)
+        returnValue(resource)
+
+    def _rememberResource(self, resource, url):
+        self._resourcesByURL[url] = resource
+        self._urlsByResource[resource] = url
+        return resource
+
+    def urlForResource(self, resource):
+        url = self._urlsByResource.get(resource, None)
+        if url is None:
+            raise NoURLForResourceError(resource)
+        return url
+
+    def addResponseFilter(*args, **kwds):
+        pass
+
+ at inlineCallbacks
+def processInboxItem(rootResource, directory, inboxFile, inboxItemFile, uuid):
+    log.debug("Processing inbox item %s" % (inboxItemFile,))
+
+    principals = rootResource.getChild("principals")
+    ownerPrincipal = principals.principalForUID(uuid)
+    cua = "urn:uuid:%s" % (uuid,)
+    owner = LocalCalendarUser(cua, ownerPrincipal,
+        inboxFile, ownerPrincipal.scheduleInboxURL())
+
+    data = inboxItemFile.iCalendarText()
+    calendar = Component.fromString(data)
+    try:
+        method = calendar.propertyValue("METHOD")
+    except ValueError:
+        returnValue(None)
+
+    if method == "REPLY":
+        # originator is attendee sending reply
+        originator = calendar.getAttendees()[0]
+    else:
+        # originator is the organizer
+        originator = calendar.getOrganizer()
+
+    originatorPrincipal = principals.principalForCalendarUserAddress(originator)
+    originator = LocalCalendarUser(originator, originatorPrincipal)
+    recipients = (owner,)
+    scheduler = DirectScheduler(FakeRequest(rootResource, "PUT"), inboxItemFile)
+    result = (yield scheduler.doSchedulingViaPUT(originator, recipients,
+        calendar, internal_request=False))
+
+    if os.path.exists(inboxItemFile.fp.path):
+        os.remove(inboxItemFile.fp.path)
+
+
+
+class Task(object):
+
+    def __init__(self, service, fileName):
+        self.service = service
+        self.taskName = fileName.split(".")[0]
+        self.taskFile = os.path.join(self.service.processingDir, fileName)
+
+    @inlineCallbacks
+    def run(self):
+        methodName = "task_%s" % (self.taskName,)
+        method = getattr(self, methodName, None)
+        if method:
+            try:
+                log.warn("Running task '%s'" % (self.taskName))
+                yield method()
+                log.warn("Completed task '%s'" % (self.taskName))
+            except Exception, e:
+                log.error("Failed task '%s' (%s)" % (self.taskName, e))
+                os.remove(self.taskFile)
+                raise
+        else:
+            log.error("Unknown task requested: '%s'" % (self.taskName))
+            os.remove(self.taskFile)
+            returnValue(None)
+
+    @inlineCallbacks
+    def task_scheduleinboxes(self):
+
+        calendars = self.service.root.getChild("calendars")
+        uidDir = calendars.getChild("__uids__")
+
+        inboxItems = set()
+        with open(self.taskFile) as input:
+            for inboxItem in input:
+                inboxItem = inboxItem.strip()
+                inboxItems.add(inboxItem)
+
+        for inboxItem in list(inboxItems):
+            log.info("Processing inbox item: %s" % (inboxItem,))
+            ignore, uuid, ignore, fileName = inboxItem.rsplit("/", 3)
+
+            homeFile = uidDir.getChild(uuid)
+            if not homeFile:
+                continue
+
+            inboxFile = homeFile.getChild("inbox")
+            if not inboxFile:
+                continue
+
+            inboxItemFile = inboxFile.getChild(fileName)
+
+            yield processInboxItem(
+                self.service.root,
+                self.service.directory,
+                inboxFile,
+                inboxItemFile,
+                uuid
+            )
+            inboxItems.remove(inboxItem)
+
+            # Rewrite the task file in case we exit before we're done
+            with open(self.taskFile + ".tmp", "w") as output:
+                for inboxItem in inboxItems:
+                    output.write("%s\n" % (inboxItem,))
+            os.rename(self.taskFile + ".tmp", self.taskFile)
+
+        os.remove(self.taskFile)
+
+
+
+class CalDAVTaskService(Service):
+
+    def __init__(self, root, directory):
+        self.root = root
+        self.directory = directory
+        self.seconds = 30 # How often to check for new tasks in incomingDir
+        self.taskDir = os.path.join(config.DataRoot, "tasks")
+        # New task files are placed into "incoming"
+        self.incomingDir = os.path.join(self.taskDir, "incoming")
+        # Task files get moved into "processing" and then removed when complete
+        self.processingDir = os.path.join(self.taskDir, "processing")
+
+    def startService(self):
+        log.info("Starting task service")
+
+        if not os.path.exists(self.taskDir):
+            os.mkdir(self.taskDir)
+        if not os.path.exists(self.incomingDir):
+            os.mkdir(self.incomingDir)
+        if not os.path.exists(self.processingDir):
+            os.mkdir(self.processingDir)
+
+        callLater(self.seconds, self.periodic, first=True)
+
+
+    def periodic(self, first=False):
+        log.debug("Checking for tasks")
+
+        deferreds = []
+
+        try:
+            if first:
+                # check the processing directory to see if there are any tasks
+                # that didn't complete during the last server run; start those
+                for fileName in os.listdir(self.processingDir):
+                    if fileName.endswith(".task"):
+                        log.debug("Restarting old task: %s" % (fileName,))
+                        deferreds.append(Task(self, fileName).run())
+
+            for fileName in os.listdir(self.incomingDir):
+                if fileName.endswith(".task"):
+                    log.debug("Found new task: %s" % (fileName,))
+                    os.rename(os.path.join(self.incomingDir, fileName),
+                        os.path.join(self.processingDir, fileName))
+                    deferreds.append(Task(self, fileName).run())
+
+        finally:
+            callLater(self.seconds, self.periodic)
+
+        return DeferredList(deferreds)
+
+
+
+class CalDAVTaskOptions(Options):
+    optParameters = [[
+        "config", "f", defaultConfigFile, "Path to configuration file."
+    ]]
+
+    def __init__(self, *args, **kwargs):
+        super(CalDAVTaskOptions, self).__init__(*args, **kwargs)
+
+        self.overrides = {}
+
+    def _coerceOption(self, configDict, key, value):
+        """
+        Coerce the given C{val} to type of C{configDict[key]}
+        """
+        if key in configDict:
+            if isinstance(configDict[key], bool):
+                value = value == "True"
+
+            elif isinstance(configDict[key], (int, float, long)):
+                value = type(configDict[key])(value)
+
+            elif isinstance(configDict[key], (list, tuple)):
+                value = value.split(',')
+
+            elif isinstance(configDict[key], dict):
+                raise UsageError(
+                    "Dict options not supported on the command line"
+                )
+
+            elif value == 'None':
+                value = None
+
+        return value
+
+    def _setOverride(self, configDict, path, value, overrideDict):
+        """
+        Set the value at path in configDict
+        """
+        key = path[0]
+
+        if len(path) == 1:
+            overrideDict[key] = self._coerceOption(configDict, key, value)
+            return
+
+        if key in configDict:
+            if not isinstance(configDict[key], dict):
+                raise UsageError(
+                    "Found intermediate path element that is not a dictionary"
+                )
+
+            if key not in overrideDict:
+                overrideDict[key] = {}
+
+            self._setOverride(
+                configDict[key], path[1:],
+                value, overrideDict[key]
+            )
+
+
+    def opt_option(self, option):
+        """
+        Set an option to override a value in the config file. True, False, int,
+        and float options are supported, as well as comma seperated lists. Only
+        one option may be given for each --option flag, however multiple
+        --option flags may be specified.
+        """
+
+        if "=" in option:
+            path, value = option.split('=')
+            self._setOverride(
+                defaultConfig,
+                path.split('/'),
+                value,
+                self.overrides
+            )
+        else:
+            self.opt_option('%s=True' % (option,))
+
+    opt_o = opt_option
+
+    def postOptions(self):
+        config.loadConfig(self['config'])
+        config.updateDefaults(self.overrides)
+        self.parent['pidfile'] = None
+
+
+class CalDAVTaskServiceMaker (LoggingMixIn):
+    implements(IPlugin, IServiceMaker)
+
+    tapname = "caldav_task"
+    description = "Calendar Server Task Process"
+    options = CalDAVTaskOptions
+
+    #
+    # Default resource classes
+    #
+    rootResourceClass            = RootResource
+    principalResourceClass       = DirectoryPrincipalProvisioningResource
+    calendarResourceClass        = CalendarHomeProvisioningFile
+
+    def makeService(self, options):
+
+        #
+        # The task sidecar doesn't care about system SACLs
+        #
+        config.EnableSACLs = False
+
+        #
+        # Change default log level to "info" as its useful to have
+        # that during startup
+        #
+        oldLogLevel = logLevelForNamespace(None)
+        setLogLevelForNamespace(None, "info")
+
+        #
+        # Setup the Directory
+        #
+        directories = []
+
+        directoryClass = namedClass(config.DirectoryService.type)
+
+        self.log_info("Configuring directory service of type: %s"
+                      % (config.DirectoryService.type,))
+
+        directory = directoryClass(config.DirectoryService.params)
+
+        # Wait for the directory to become available
+        while not directory.isAvailable():
+            sleep(5)
+
+        #
+        # Configure Memcached Client Pool
+        #
+        if config.Memcached.ClientEnabled:
+            memcachepool.installPool(
+                IPv4Address(
+                    "TCP",
+                    config.Memcached.BindAddress,
+                    config.Memcached.Port,
+                ),
+                config.Memcached.MaxClients,
+            )
+
+        #
+        # Configure NotificationClient
+        #
+        if config.Notifications.Enabled:
+            installNotificationClient(
+                config.Notifications.InternalNotificationHost,
+                config.Notifications.InternalNotificationPort,
+            )
+
+        #
+        # Setup Resource hierarchy
+        #
+        self.log_info("Setting up document root at: %s"
+                      % (config.DocumentRoot,))
+        self.log_info("Setting up principal collection: %r"
+                      % (self.principalResourceClass,))
+
+        principalCollection = self.principalResourceClass(
+            "/principals/",
+            directory,
+        )
+
+        self.log_info("Setting up calendar collection: %r"
+                      % (self.calendarResourceClass,))
+
+        calendarCollection = self.calendarResourceClass(
+            os.path.join(config.DocumentRoot, "calendars"),
+            directory, "/calendars/",
+        )
+
+        self.log_info("Setting up root resource: %r"
+                      % (self.rootResourceClass,))
+
+        root = self.rootResourceClass(
+            config.DocumentRoot,
+            principalCollections=(principalCollection,),
+        )
+
+        root.putChild("principals", principalCollection)
+        root.putChild("calendars", calendarCollection)
+
+        service = CalDAVTaskService(root, directory)
+
+        # Change log level back to what it was before
+        setLogLevelForNamespace(None, oldLogLevel)
+
+        return service

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2009-07-06 20:53:09 UTC (rev 4429)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2009-07-06 21:32:06 UTC (rev 4430)
@@ -39,6 +39,7 @@
 from twisted.python.usage import Options, UsageError
 from twisted.python.reflect import namedClass
 from twisted.plugin import IPlugin
+from twisted.internet.defer import DeferredList, succeed, inlineCallbacks, returnValue
 from twisted.internet.reactor import callLater
 from twisted.internet.process import ProcessExitedAlready
 from twisted.internet.protocol import Protocol, Factory
@@ -1111,6 +1112,24 @@
 
             monitor.addProcess("mailgateway", mailGatewayArgv, env=parentEnv)
 
+        self.log_info("Adding task service")
+        taskArgv = [
+            sys.executable,
+            config.Twisted.twistd,
+        ]
+        if config.UserName:
+            taskArgv.extend(("-u", config.UserName))
+        if config.GroupName:
+            taskArgv.extend(("-g", config.GroupName))
+        taskArgv.extend((
+            "--reactor=%s" % (config.Twisted.reactor,),
+            "-n", "caldav_task",
+            "-f", options["config"],
+        ))
+
+        monitor.addProcess("caldav_task", taskArgv, env=parentEnv)
+
+
         stats = CalDAVStatisticsServer(logger) 
         statsService = UNIXServer(config.GlobalStatsSocket, stats) 
         statsService.setServiceParent(s)
@@ -1165,6 +1184,8 @@
                         self.log_warn("Deleting stale socket file (not accepting connections): %s" % checkSocket)
                         os.remove(checkSocket)
 
+
+
 class TwistdSlaveProcess(object):
     prefix = "caldav"
 

Modified: CalendarServer/trunk/twisted/plugins/caldav.py
===================================================================
--- CalendarServer/trunk/twisted/plugins/caldav.py	2009-07-06 20:53:09 UTC (rev 4429)
+++ CalendarServer/trunk/twisted/plugins/caldav.py	2009-07-06 21:32:06 UTC (rev 4430)
@@ -34,5 +34,6 @@
 
 
 TwistedCalDAV     = TAP("calendarserver.tap.caldav.CalDAVServiceMaker")
+CalDAVTask        = TAP("calendarserver.sidecar.task.CalDAVTaskServiceMaker")
 CalDAVNotifier    = TAP("twistedcaldav.notify.NotificationServiceMaker")
 CalDAVMailGateway = TAP("twistedcaldav.mail.MailGatewayServiceMaker")

Modified: CalendarServer/trunk/twistedcaldav/scheduling/scheduler.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/scheduling/scheduler.py	2009-07-06 20:53:09 UTC (rev 4429)
+++ CalendarServer/trunk/twistedcaldav/scheduling/scheduler.py	2009-07-06 21:32:06 UTC (rev 4430)
@@ -55,6 +55,7 @@
     "CalDAVScheduler",
     "IScheduleScheduler",
     "IMIPScheduler",
+    "DirectScheduler",
 ]
 
 
@@ -386,7 +387,7 @@
             # Now process iMIP recipients
             if imip_recipients:
                 yield self.generateIMIPSchedulingResponses(imip_recipients, responses, freebusy)
-    
+
         # Return with final response if we are done
         returnValue(responses)
     
@@ -789,6 +790,33 @@
             log.err(msg)
             raise HTTPError(ErrorResponse(responsecode.FORBIDDEN, (caldav_namespace, "valid-calendar-data"), description=msg))
 
+
+class DirectScheduler(Scheduler):
+    """ An implicit scheduler meant for use by local processes which don't
+        need to go through all these checks. """
+
+    def checkAuthorization(self):
+        pass
+
+    def checkOrganizer(self):
+        pass
+
+    def checkOrganizerAsOriginator(self):
+        pass
+
+    def checkAttendeeAsOriginator(self):
+        pass
+
+    def securityChecks(self):
+        pass
+
+    def checkOriginator(self):
+        pass
+
+    def checkRecipients(self):
+        pass
+
+
 class IMIPScheduler(Scheduler):
 
     def checkAuthorization(self):

Modified: CalendarServer/trunk/twistedcaldav/test/test_upgrade.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_upgrade.py	2009-07-06 20:53:09 UTC (rev 4429)
+++ CalendarServer/trunk/twistedcaldav/test/test_upgrade.py	2009-07-06 21:32:06 UTC (rev 4430)
@@ -282,6 +282,12 @@
         }
 
         after = {
+            "tasks" :
+            {
+                "incoming" :
+                {
+                },
+            },
             ".calendarserver_version" :
             {
                 "@contents" : "1",
@@ -406,6 +412,12 @@
         }
 
         after = {
+            "tasks" :
+            {
+                "incoming" :
+                {
+                },
+            },
             ".calendarserver_version" :
             {
                 "@contents" : "1",
@@ -525,6 +537,12 @@
         }
 
         after = {
+            "tasks" :
+            {
+                "incoming" :
+                {
+                },
+            },
             ".calendarserver_version" :
             {
                 "@contents" : "1",
@@ -650,6 +668,12 @@
         }
 
         after = {
+            "tasks" :
+            {
+                "incoming" :
+                {
+                },
+            },
             ".calendarserver_version" :
             {
                 "@contents" : "1",
@@ -761,6 +785,12 @@
 
 
         after = {
+            "tasks" :
+            {
+                "incoming" :
+                {
+                },
+            },
             "calendars" :
             {
                 "__uids__" :

Modified: CalendarServer/trunk/twistedcaldav/upgrade.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/upgrade.py	2009-07-06 20:53:09 UTC (rev 4429)
+++ CalendarServer/trunk/twistedcaldav/upgrade.py	2009-07-06 21:32:06 UTC (rev 4430)
@@ -286,7 +286,22 @@
         if os.path.exists(dbPath):
             os.chown(dbPath, uid, gid)
 
+    def createTaskServiceDirectory(config, uid, gid):
 
+        taskDir = os.path.join(config.DataRoot, "tasks")
+        if not os.path.exists(taskDir):
+            os.mkdir(taskDir)
+        os.chown(taskDir, uid, gid)
+
+        incomingDir = os.path.join(taskDir, "incoming")
+        if not os.path.exists(incomingDir):
+            os.mkdir(incomingDir)
+        os.chown(incomingDir, uid, gid)
+
+        return incomingDir
+
+
+
     directory = getDirectory()
 
     docRoot = config.DocumentRoot
@@ -354,8 +369,10 @@
                     os.rmdir(dirPath)
 
 
-            # Count how many calendar homes we'll be processing
+            # Count how many calendar homes we'll be processing, and build
+            # list of pending inbox items
             total = 0
+            inboxItems = set()
             for first in os.listdir(uidHomes):
                 if len(first) == 2:
                     firstPath = os.path.join(uidHomes, first)
@@ -364,7 +381,21 @@
                             secondPath = os.path.join(firstPath, second)
                             for home in os.listdir(secondPath):
                                 total += 1
+                                homePath = os.path.join(secondPath, home)
+                                inboxPath = os.path.join(homePath, "inbox")
+                                if os.path.exists(inboxPath):
+                                    for inboxItem in os.listdir(inboxPath):
+                                        if not inboxItem.startswith("."):
+                                            inboxItems.add(os.path.join(inboxPath, inboxItem))
 
+            incomingDir = createTaskServiceDirectory(config, uid, gid)
+            if inboxItems:
+                taskFile = os.path.join(incomingDir, "scheduleinboxes.task")
+                with open(taskFile, "w") as out:
+                    for item in inboxItems:
+                        out.write("%s\n" % (item))
+                os.chown(taskFile, uid, gid)
+
             if total:
                 log.warn("Processing %d calendar homes in %s" % (total, uidHomes))
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20090706/02adc286/attachment-0001.html>


More information about the calendarserver-changes mailing list