[CalendarServer-changes] [11566] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Tue Jul 30 11:35:39 PDT 2013


Revision: 11566
          http://trac.calendarserver.org//changeset/11566
Author:   sagen at apple.com
Date:     2013-07-30 11:35:38 -0700 (Tue, 30 Jul 2013)
Log Message:
-----------
Agent shuts down after a period of inactivity, and immediately if DataRoot becomes unavailable.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
    CalendarServer/trunk/calendarserver/tools/agent.py
    CalendarServer/trunk/calendarserver/tools/test/test_agent.py
    CalendarServer/trunk/calendarserver/tools/test/test_gateway.py
    CalendarServer/trunk/calendarserver/tools/test/test_principals.py
    CalendarServer/trunk/contrib/launchd/calendarserver.plist
    CalendarServer/trunk/test
    CalendarServer/trunk/txdav/base/datastore/subpostgres.py
    CalendarServer/trunk/txdav/base/datastore/test/test_subpostgres.py
    CalendarServer/trunk/txdav/common/datastore/test/util.py

Added Paths:
-----------
    CalendarServer/trunk/twext/internet/fswatch.py
    CalendarServer/trunk/twext/internet/test/test_fswatch.py

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -57,6 +57,7 @@
 from twext.python.filepath import CachingFilePath
 from twext.internet.ssl import ChainingOpenSSLContextFactory
 from twext.internet.tcp import MaxAcceptTCPServer, MaxAcceptSSLServer
+from twext.internet.fswatch import DirectoryChangeListener, IDirectoryChangeListenee
 from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
 from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
 from twext.enterprise.ienterprise import POSTGRES_DIALECT
@@ -588,18 +589,21 @@
     """
 
     def __init__(self, serviceCreator, connectionPool, store, logObserver,
-        reactor=None):
+        storageService, reactor=None):
         """
         @param serviceCreator: callable which will be passed the connection
             pool, store, and log observer, and should return a Service
         @param connectionPool: connection pool to pass to serviceCreator
         @param store: the store object being processed
         @param logObserver: log observer to pass to serviceCreator
+        @param storageService: the service responsible for starting/stopping
+            the data store
         """
         self.serviceCreator = serviceCreator
         self.connectionPool = connectionPool
         self.store = store
         self.logObserver = logObserver
+        self.storageService = storageService
         self.stepper = Stepper()
 
         if reactor is None:
@@ -613,7 +617,7 @@
         we create the main service and pass in the store.
         """
         service = self.serviceCreator(self.connectionPool, self.store,
-            self.logObserver)
+            self.logObserver, self.storageService)
         if self.parent is not None:
             self.reactor.callLater(0, service.setServiceParent, self.parent)
         return succeed(None)
@@ -626,7 +630,7 @@
         """
         try:
             service = self.serviceCreator(self.connectionPool, None,
-                self.logObserver)
+                self.logObserver, self.storageService)
             if self.parent is not None:
                 self.reactor.callLater(0, service.setServiceParent, self.parent)
         except StoreNotAvailable:
@@ -1129,7 +1133,7 @@
         Create a service to be used in a single-process, stand-alone
         configuration.  Memcached will be spawned automatically.
         """
-        def slaveSvcCreator(pool, store, logObserver):
+        def slaveSvcCreator(pool, store, logObserver, storageService):
 
             if store is None:
                 raise StoreNotAvailable()
@@ -1232,7 +1236,7 @@
         When created, that service will have access to the storage facilities.
         """
 
-        def toolServiceCreator(pool, store, ignored):
+        def toolServiceCreator(pool, store, ignored, storageService):
             return config.UtilityServiceClass(store)
 
         uid, gid = getSystemIDs(config.UserName, config.GroupName)
@@ -1255,7 +1259,13 @@
         # These we need to set in order to open the store
         config.EnableCalDAV = config.EnableCardDAV = True
 
-        def agentServiceCreator(pool, store, ignored):
+        def agentServiceCreator(pool, store, ignored, storageService):
+            if storageService is not None:
+                # Shut down if DataRoot becomes unavailable
+                from twisted.internet import reactor
+                dataStoreWatcher = DirectoryChangeListener(reactor,
+                    config.DataRoot, DataStoreMonitor(reactor, storageService))
+                dataStoreWatcher.startListening()
             return makeAgentService(store)
 
         uid, gid = getSystemIDs(config.UserName, config.GroupName)
@@ -1294,7 +1304,7 @@
         """
         def createSubServiceFactory(dialect=POSTGRES_DIALECT,
                                     paramstyle='pyformat'):
-            def subServiceFactory(connectionFactory):
+            def subServiceFactory(connectionFactory, storageService):
                 ms = MultiService()
                 cp = ConnectionPool(connectionFactory, dialect=dialect,
                                     paramstyle=paramstyle,
@@ -1303,7 +1313,7 @@
                 store = storeFromConfig(config, cp.connection)
 
                 pps = PreProcessingService(createMainService, cp, store,
-                    logObserver)
+                    logObserver, storageService)
 
                 # The following "steps" will run sequentially when the service
                 # hierarchy is started.  If any of the steps raise an exception
@@ -1410,7 +1420,7 @@
                 raise UsageError("Unknown database type %r" (config.DBType,))
         else:
             store = storeFromConfig(config, None)
-            return createMainService(None, store, logObserver)
+            return createMainService(None, store, logObserver, None)
 
 
     def makeService_Combined(self, options):
@@ -1593,7 +1603,7 @@
         # to), and second, the service which does an upgrade from the
         # filesystem to the database (if that's necessary, and there is
         # filesystem data in need of upgrading).
-        def spawnerSvcCreator(pool, store, ignored):
+        def spawnerSvcCreator(pool, store, ignored, storageService):
             if store is None:
                 raise StoreNotAvailable()
 
@@ -2375,3 +2385,31 @@
         gid = getgid()
 
     return uid, gid
+
+
+class DataStoreMonitor(object):
+    implements(IDirectoryChangeListenee)
+
+    def __init__(self, reactor, storageService):
+        """
+        @param storageService: the service making use of the DataStore
+            directory; we send it a hardStop() to shut it down
+        """
+        self._reactor = reactor
+        self._storageService = storageService
+
+    def disconnected(self):
+        self._storageService.hardStop()
+        self._reactor.stop()
+
+    def deleted(self):
+        self._storageService.hardStop()
+        self._reactor.stop()
+
+    def renamed(self):
+        self._storageService.hardStop()
+        self._reactor.stop()
+
+    def connectionLost(self, reason):
+        pass
+

Modified: CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/test/test_caldav.py	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tap/test/test_caldav.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -61,7 +61,7 @@
     CalDAVOptions, CalDAVServiceMaker, CalDAVService, GroupOwnedUNIXServer,
     DelayedStartupProcessMonitor, DelayedStartupLineLogger, TwistdSlaveProcess,
     _CONTROL_SERVICE_NAME, getSystemIDs, PreProcessingService,
-    QuitAfterUpgradeStep
+    QuitAfterUpgradeStep, DataStoreMonitor
 )
 from calendarserver.provision.root import RootResource
 from twext.enterprise.queue import PeerConnectionPool, LocalQueuer
@@ -555,7 +555,9 @@
                                       uid=None, gid=None):
                 pool = None
                 logObserver = None
-                svc = createMainService(pool, store, logObserver)
+                storageService = None
+                svc = createMainService(pool, store, logObserver,
+                    storageService)
                 multi = MultiService()
                 svc.setServiceParent(multi)
                 return multi
@@ -1238,12 +1240,19 @@
         and stopService to be called again by counting the number of times
         START and STOP appear in the process output.
         """
+        # Inherit the reactor used to run trial
+        reactorArg = ""
+        for arg in sys.argv:
+            if arg.startswith("--reactor"):
+                reactorArg = arg
+                break
+
         tacFilePath = os.path.join(os.path.dirname(__file__), "reexec.tac")
         twistd = which("twistd")[0]
         deferred = Deferred()
         proc = reactor.spawnProcess(
             CapturingProcessProtocol(deferred, None), twistd,
-                [twistd, '-n', '-y', tacFilePath],
+                [twistd, reactorArg, '-n', '-y', tacFilePath],
                 env=os.environ
         )
         reactor.callLater(3, proc.signalProcess, "HUP")
@@ -1381,15 +1390,15 @@
 
 class PreProcessingServiceTestCase(TestCase):
 
-    def fakeServiceCreator(self, cp, store, lo):
-        self.history.append(("serviceCreator", store))
+    def fakeServiceCreator(self, cp, store, lo, storageService):
+        self.history.append(("serviceCreator", store, storageService))
 
 
     def setUp(self):
         self.history = []
         self.clock = Clock()
         self.pps = PreProcessingService(self.fakeServiceCreator, None, "store",
-            None, reactor=self.clock)
+            None, "storageService", reactor=self.clock)
 
 
     def _record(self, value, failure):
@@ -1409,7 +1418,7 @@
         self.pps.startService()
         self.assertEquals(self.history,
             ['one success', 'two success', 'three success', 'four success',
-            ('serviceCreator', 'store')])
+            ('serviceCreator', 'store', 'storageService')])
 
 
     def test_allFailure(self):
@@ -1425,7 +1434,7 @@
         self.pps.startService()
         self.assertEquals(self.history,
             ['one success', 'two failure', 'three failure', 'four failure',
-            ('serviceCreator', None)])
+            ('serviceCreator', None, 'storageService')])
 
 
     def test_partialFailure(self):
@@ -1441,7 +1450,7 @@
         self.pps.startService()
         self.assertEquals(self.history,
             ['one success', 'two failure', 'three success', 'four failure',
-            ('serviceCreator', 'store')])
+            ('serviceCreator', 'store', 'storageService')])
 
 
     def test_quitAfterUpgradeStep(self):
@@ -1460,5 +1469,47 @@
         self.pps.startService()
         self.assertEquals(self.history,
             ['one success', 'two success', 'four failure',
-            ('serviceCreator', None)])
+            ('serviceCreator', None, 'storageService')])
         self.assertFalse(triggerFile.exists())
+
+
+class StubStorageService(object):
+
+    def __init__(self):
+        self.hardStopCalled = False
+
+    def hardStop(self):
+        self.hardStopCalled = True
+
+
+class StubReactor(object):
+
+    def __init__(self):
+        self.stopCalled = False
+
+    def stop(self):
+        self.stopCalled = True
+
+
+class DataStoreMonitorTestCase(TestCase):
+
+    def test_monitor(self):
+        storageService = StubStorageService()
+        stubReactor = StubReactor()
+        monitor = DataStoreMonitor(stubReactor, storageService)
+
+        monitor.disconnected()
+        self.assertTrue(storageService.hardStopCalled)
+        self.assertTrue(stubReactor.stopCalled)
+
+        storageService.hardStopCalled = False
+        stubReactor.stopCalled = False
+        monitor.deleted()
+        self.assertTrue(storageService.hardStopCalled)
+        self.assertTrue(stubReactor.stopCalled)
+
+        storageService.hardStopCalled = False
+        stubReactor.stopCalled = False
+        monitor.renamed()
+        self.assertTrue(storageService.hardStopCalled)
+        self.assertTrue(stubReactor.stopCalled)

Modified: CalendarServer/trunk/calendarserver/tools/agent.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/agent.py	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tools/agent.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -161,17 +161,20 @@
     """
     isLeaf = True
 
-    def __init__(self, store, davRootResource, directory):
+    def __init__(self, store, davRootResource, directory, inactivityDetector):
         """
         @param store: an already opened store
         @param davRootResource: the root resource, required for principal
             operations
         @param directory: a directory service
+        @param inactivityDetector: the InactivityDetector to tell when requests
+            come in
         """
         Resource.__init__(self)
         self.store = store
         self.davRootResource = davRootResource
         self.directory = directory
+        self.inactivityDetector = inactivityDetector
 
     def render_POST(self, request):
         """
@@ -179,6 +182,8 @@
         return the result as the response body.
         """
 
+        self.inactivityDetector.activity()
+
         def onSuccess(result, output):
             txt = output.getvalue()
             output.close()
@@ -234,9 +239,14 @@
     davRootResource = getRootResource(config, store)
     directory = davRootResource.getDirectory()
 
+    def becameInactive():
+        log.warn("Agent inactive; shutting down")
+        reactor.stop()
+
+    inactivityDetector = InactivityDetector(reactor, 60 * 10, becameInactive)
     root = Resource()
     root.putChild("gateway", AgentGatewayResource(store,
-        davRootResource, directory))
+        davRootResource, directory, inactivityDetector))
 
     realmName = "/Local/Default"
     portal = Portal(AgentRealm(root, ["com.apple.calendarserver"]),
@@ -250,7 +260,57 @@
 
 
 
+class InactivityDetector(object):
+    """
+    If no 'activity' takes place for a specified amount of time, a method
+    will get called.  Activity causes the inactivity time threshold to be
+    reset.
+    """
 
+    def __init__(self, reactor, timeoutSeconds, becameInactive):
+        """
+        @param reactor: the reactor
+        @timeoutSeconds: the number of seconds considered to mean inactive
+        @becameInactive: the method to call (with no arguments) when
+            inactivity is reached
+        """
+        self._reactor = reactor
+        self._timeoutSeconds = timeoutSeconds
+        self._becameInactive = becameInactive
+
+        self._delayedCall = self._reactor.callLater(self._timeoutSeconds,
+            self._inactivityThresholdReached)
+
+
+    def _inactivityThresholdReached(self):
+        """
+        The delayed call has fired.  We're inactive.  Call the becameInactive
+            method.
+        """
+        self._becameInactive()
+
+
+    def activity(self):
+        """
+        Call this to let the InactivityMonitor that there has been activity.
+        It will reset the timeout.
+        """
+        if self._delayedCall.active():
+            self._delayedCall.reset(self._timeoutSeconds)
+        else:
+            self._delayedCall = self._reactor.callLater(self._timeoutSeconds,
+                self._inactivityThresholdReached)
+
+
+    def stop(self):
+        """
+        Cancels the delayed call
+        """
+        if self._delayedCall.active():
+            self._delayedCall.cancel()
+
+
+
 #
 # Alternate implementation using AMP instead of HTTP
 #

Modified: CalendarServer/trunk/calendarserver/tools/test/test_agent.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/test/test_agent.py	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tools/test/test_agent.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -17,8 +17,10 @@
 from calendarserver.tools.agent import AgentRealm
 from calendarserver.tools.agent import CustomDigestCredentialFactory
 from calendarserver.tools.agent import DirectoryServiceChecker
+from calendarserver.tools.agent import InactivityDetector
 from twistedcaldav.test.util import TestCase
 from twisted.internet.defer import inlineCallbacks
+from twisted.internet.task import Clock
 from twisted.cred.error import UnauthorizedLogin 
 from twisted.web.resource import IResource
 from twisted.web.resource import ForbiddenResource
@@ -112,7 +114,34 @@
 
 
 
+class InactivityDectectorTestCase(TestCase):
 
+    def test_inactivity(self):
+        clock = Clock()
+
+        self.inactivityReached = False
+        def becameInactive():
+            self.inactivityReached = True
+
+        id = InactivityDetector(clock, 5, becameInactive)
+
+        # After 3 seconds, not inactive
+        clock.advance(3)
+        self.assertFalse(self.inactivityReached)
+
+        # Activity happens, pushing out the inactivity threshold
+        id.activity()
+        clock.advance(3)
+        self.assertFalse(self.inactivityReached)
+
+        # Time passes without activity
+        clock.advance(3)
+        self.assertTrue(self.inactivityReached)
+
+        id.stop()
+
+
+
 class FakeRequest(object):
 
     def getClientIP(self):

Modified: CalendarServer/trunk/calendarserver/tools/test/test_gateway.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/test/test_gateway.py	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tools/test/test_gateway.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -27,7 +27,6 @@
 from twistedcaldav.config import config
 from twistedcaldav.test.util import TestCase, CapturingProcessProtocol
 from calendarserver.tools.util import getDirectory
-from txdav.common.datastore.test.util import SQLStoreBuilder
 import plistlib
 
 
@@ -42,8 +41,7 @@
         template = templateFile.read()
         templateFile.close()
 
-        # Use the same DatabaseRoot as the SQLStoreBuilder
-        databaseRoot = os.path.abspath(SQLStoreBuilder.SHARED_DB_PATH)
+        databaseRoot = os.path.abspath("_spawned_scripts_db" + str(os.getpid()))
         newConfig = template % {
             "ServerRoot" : os.path.abspath(config.ServerRoot),
             "DatabaseRoot" : databaseRoot,

Modified: CalendarServer/trunk/calendarserver/tools/test/test_principals.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/test/test_principals.py	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tools/test/test_principals.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -31,7 +31,6 @@
 from calendarserver.tap.util import directoryFromConfig
 from calendarserver.tools.principals import (parseCreationArgs, matchStrings,
     updateRecord, principalForPrincipalID, getProxies, setProxies)
-from txdav.common.datastore.test.util import SQLStoreBuilder
 
 
 class ManagePrincipalsTestCase(TestCase):
@@ -50,9 +49,7 @@
         template = templateFile.read()
         templateFile.close()
 
-        # Use the same DatabaseRoot as the SQLStoreBuilder
-        databaseRoot = os.path.abspath(SQLStoreBuilder.SHARED_DB_PATH)
-
+        databaseRoot = os.path.abspath("_spawned_scripts_db" + str(os.getpid()))
         newConfig = template % {
             "ServerRoot" : os.path.abspath(config.ServerRoot),
             "DataRoot" : os.path.abspath(config.DataRoot),

Modified: CalendarServer/trunk/contrib/launchd/calendarserver.plist
===================================================================
--- CalendarServer/trunk/contrib/launchd/calendarserver.plist	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/contrib/launchd/calendarserver.plist	2013-07-30 18:35:38 UTC (rev 11566)
@@ -31,7 +31,7 @@
     <string>/Applications/Server.app/Contents/ServerRoot/usr/sbin/caldavd</string>
     <string>-X</string>
     <string>-R</string>
-    <string>caldav_kqueue</string>
+    <string>kqueue</string>
     <string>-o</string>
     <string>FailIfUpgradeNeeded=False</string>
   </array>

Modified: CalendarServer/trunk/test
===================================================================
--- CalendarServer/trunk/test	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/test	2013-07-30 18:35:38 UTC (rev 11566)
@@ -83,7 +83,7 @@
 find "${wd}" -name \*.pyc -print0 | xargs -0 rm;
 
 mkdir -p "${wd}/data";
-cd "${wd}" && "${python}" "${trial}" --temp-directory="${wd}/data/trial" --rterrors ${random} ${until_fail} ${no_colour} ${coverage} ${numjobs} ${test_modules};
+cd "${wd}" && "${python}" "${trial}" --reactor=kqueue --temp-directory="${wd}/data/trial" --rterrors ${random} ${until_fail} ${no_colour} ${coverage} ${numjobs} ${test_modules};
 
 if ${flaky}; then
   echo "";

Added: CalendarServer/trunk/twext/internet/fswatch.py
===================================================================
--- CalendarServer/trunk/twext/internet/fswatch.py	                        (rev 0)
+++ CalendarServer/trunk/twext/internet/fswatch.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -0,0 +1,166 @@
+##
+# Copyright (c) 2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Watch the availablity of a file system directory
+"""
+
+import os
+from zope.interface import Interface
+from twisted.internet import reactor
+from twisted.python.log import Logger
+
+try:
+    from select import (kevent, KQ_FILTER_VNODE, KQ_EV_ADD, KQ_EV_ENABLE,
+                        KQ_EV_CLEAR, KQ_NOTE_DELETE, KQ_NOTE_RENAME, KQ_EV_EOF)
+    kqueueSupported = True
+except ImportError:
+    # kqueue not supported on this platform
+    kqueueSupported = False
+
+
+class IDirectoryChangeListenee(Interface):
+    """
+    A delegate of DirectoryChangeListener
+    """
+
+    def disconnected(): #@NoSelf
+        """
+        The directory has been unmounted
+        """
+
+    def deleted(): #@NoSelf
+        """
+        The directory has been deleted
+        """
+
+    def renamed(): #@NoSelf
+        """
+        The directory has been renamed
+        """
+
+    def connectionLost(reason): #@NoSelf
+        """
+        The file descriptor has been closed
+        """
+
+
+#TODO: better way to tell if reactor is kqueue or not
+if kqueueSupported and hasattr(reactor, "_doWriteOrRead"):
+
+
+    # Wrap _doWriteOrRead to support KQ_FILTER_VNODE
+    origDoWriteOrRead = reactor._doWriteOrRead
+    def _doWriteOrReadOrVNodeEvent(selectable, fd, event):
+        origDoWriteOrRead(selectable, fd, event)
+        if event.filter == KQ_FILTER_VNODE:
+            selectable.vnodeEventHappened(event)
+    reactor._doWriteOrRead = _doWriteOrReadOrVNodeEvent
+
+
+
+    class DirectoryChangeListener(Logger, object):
+        """
+        Listens for the removal, renaming, or general unavailability of a
+        given directory, and lets a delegate listenee know about them.
+        """
+
+        def __init__(self, reactor, dirname, listenee):
+            """
+            @param reactor: the reactor
+            @param dirname: the full path to the directory to watch; it must
+                already exist
+            @param listenee: the delegate to call
+            @type listenee: IDirectoryChangeListenee
+            """
+            self._reactor = reactor
+            self._fd = os.open(dirname, os.O_RDONLY)
+            self._dirname = dirname
+            self._listenee = listenee
+
+
+        def logPrefix(self):
+            return repr(self._dirname)
+
+
+        def fileno(self):
+            return self._fd
+
+
+        def vnodeEventHappened(self, evt):
+            if evt.flags & KQ_EV_EOF:
+                self._listenee.disconnected()
+            if evt.fflags & KQ_NOTE_DELETE:
+                self._listenee.deleted()
+            if evt.fflags & KQ_NOTE_RENAME:
+                self._listenee.renamed()
+
+
+        def startListening(self):
+            ke = kevent(self._fd, filter=KQ_FILTER_VNODE,
+                        flags=(KQ_EV_ADD | KQ_EV_ENABLE | KQ_EV_CLEAR),
+                        fflags=KQ_NOTE_DELETE | KQ_NOTE_RENAME)
+            self._reactor._kq.control([ke], 0, None)
+            self._reactor._selectables[self._fd] = self
+
+
+        def connectionLost(self, reason):
+            os.close(self._fd)
+            self._listenee.connectionLost(reason)
+
+
+else:
+
+    # TODO: implement this for systems without kqueue support:
+
+    class DirectoryChangeListener(Logger, object):
+        """
+        Listens for the removal, renaming, or general unavailability of a
+        given directory, and lets a delegate listenee know about them.
+        """
+
+        def __init__(self, reactor, dirname, listenee):
+            """
+            @param reactor: the reactor
+            @param dirname: the full path to the directory to watch
+            @param listenee: 
+            """
+            self._reactor = reactor
+            self._fd = os.open(dirname, os.O_RDONLY)
+            self._dirname = dirname
+            self._listenee = listenee
+
+
+        def logPrefix(self):
+            return repr(self._dirname)
+
+
+        def fileno(self):
+            return self._fd
+
+
+        def vnodeEventHappened(self, evt):
+            pass
+
+
+        def startListening(self):
+            pass
+
+
+        def connectionLost(self, reason):
+            os.close(self._fd)
+            self._listenee.connectionLost(reason)
+

Added: CalendarServer/trunk/twext/internet/test/test_fswatch.py
===================================================================
--- CalendarServer/trunk/twext/internet/test/test_fswatch.py	                        (rev 0)
+++ CalendarServer/trunk/twext/internet/test/test_fswatch.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -0,0 +1,27 @@
+##
+# Copyright (c) 2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.internet.fswatch}.
+"""
+
+# from twext.internet.fswatch import DirectoryChangeListener
+# from twisted.trial.unittest import TestCase
+
+# TODO: tests
+
+
+

Modified: CalendarServer/trunk/txdav/base/datastore/subpostgres.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/subpostgres.py	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/txdav/base/datastore/subpostgres.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -21,6 +21,8 @@
 
 import os
 import pwd
+import re
+import signal
 
 from hashlib import md5
 
@@ -250,6 +252,7 @@
         self._pgCtl = pgCtl
         self._initdb = initDB
         self._reactor = reactor
+        self._postgresPid = None
 
 
     @property
@@ -363,7 +366,7 @@
 
         if self.shutdownDeferred is None:
             # Only continue startup if we've not begun shutdown
-            self.subServiceFactory(self.produceConnection).setServiceParent(self)
+            self.subServiceFactory(self.produceConnection, self).setServiceParent(self)
 
 
     def pauseMonitor(self):
@@ -436,15 +439,37 @@
             uid=self.uid, gid=self.gid,
         )
         self.monitor = monitor
+
+        def gotStatus(result):
+            """
+            Grab the postgres pid from the pgCtl status call in case we need
+            to kill it directly later on in hardStop().  Useful in conjunction
+            with the DataStoreMonitor so we can shut down if DataRoot has been
+            removed/renamed/unmounted.
+            """
+            reResult = re.search("PID: (\d+)\D", result)
+            if reResult != None:
+                self._postgresPid = int(reResult.group(1))
+            self.ready(*createConnection())
+            self.deactivateDelayedShutdown()
+
         def gotReady(result):
             log.warn("{cmd} exited", cmd=pgCtl)
             self.shouldStopDatabase = True
-            self.ready(*createConnection())
-            self.deactivateDelayedShutdown()
+            d = Deferred()
+            statusMonitor = CapturingProcessProtocol(d, None)
+            self.reactor.spawnProcess(
+                statusMonitor, pgCtl, [pgCtl, "status"],
+                env=self.env, path=self.workingDir.path,
+                uid=self.uid, gid=self.gid,
+            )
+            d.addCallback(gotStatus)
+
         def reportit(f):
             log.failure("starting postgres", f)
             self.deactivateDelayedShutdown()
             self.reactor.stop()
+            
         self.monitor.completionDeferred.addCallback(
             gotReady).addErrback(reportit)
 
@@ -523,3 +548,13 @@
 #            return result
 #        d.addCallback(maybeStopSubprocess)
 #        return d
+
+    def hardStop(self):
+        """
+        Stop postgres quickly by sending it SIGQUIT
+        """
+        if self._postgresPid is not None:
+            try:
+                os.kill(self._postgresPid, signal.SIGQUIT)
+            except OSError: 
+                pass

Modified: CalendarServer/trunk/txdav/base/datastore/test/test_subpostgres.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/test/test_subpostgres.py	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/txdav/base/datastore/test/test_subpostgres.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -22,17 +22,13 @@
 
 # NOTE: This import will fail eventually when this functionality is added to
 # MemoryReactor:
-from twisted.runner.test.test_procmon import DummyProcessReactor
 
-from twisted.python.filepath import FilePath
 from twext.python.filepath import CachingFilePath
 
 from txdav.base.datastore.subpostgres import PostgresService
 from twisted.internet.defer import inlineCallbacks, Deferred
 from twisted.application.service import Service
 
-import pgdb
-
 class SubprocessStartup(TestCase):
     """
     Tests for starting and stopping the subprocess.
@@ -53,7 +49,7 @@
             instances = []
             ready = Deferred()
 
-            def __init__(self, connectionFactory):
+            def __init__(self, connectionFactory, storageService):
                 self.connection = connectionFactory()
                 test.addCleanup(self.connection.close)
                 self.instances.append(self)
@@ -104,7 +100,7 @@
             instances = []
             ready = Deferred()
 
-            def __init__(self, connectionFactory):
+            def __init__(self, connectionFactory, storageService):
                 self.connection = connectionFactory()
                 test.addCleanup(self.connection.close)
                 self.instances.append(self)
@@ -156,7 +152,7 @@
             instances = []
             ready = Deferred()
 
-            def __init__(self, connectionFactory):
+            def __init__(self, connectionFactory, storageService):
                 self.connection = connectionFactory()
                 test.addCleanup(self.connection.close)
                 self.instances.append(self)
@@ -195,73 +191,3 @@
         self.assertEquals(values, [["value1"], ["value2"]])
 
 
-    def test_startDatabaseRunning(self):
-        """ Ensure that if we can connect to postgres we don't spawn pg_ctl """
-
-        self.cursorHistory = []
-
-        class DummyCursor(object):
-            def __init__(self, historyHolder):
-                self.historyHolder = historyHolder
-
-            def execute(self, *args):
-                self.historyHolder.cursorHistory.append(args)
-
-            def close(self):
-                pass
-
-        class DummyConnection(object):
-            def __init__(self, historyHolder):
-                self.historyHolder = historyHolder
-
-            def cursor(self):
-                return DummyCursor(self.historyHolder)
-
-            def commit(self):
-                pass
-
-            def close(self):
-                pass
-
-        def produceConnection(*args):
-            return DummyConnection(self)
-
-        dummyReactor = DummyProcessReactor()
-        svc = PostgresService(
-            FilePath("postgres_4.pgdb"),
-            lambda x : Service(),
-            "",
-             reactor=dummyReactor,
-        )
-        svc.produceConnection = produceConnection
-        svc.env = {}
-        svc.startDatabase()
-        self.assertEquals(
-            self.cursorHistory,
-            [
-                ('commit',),
-                ("create database subpostgres with encoding 'UTF8'",),
-                ('',)
-            ]
-        )
-        self.assertEquals(dummyReactor.spawnedProcesses, [])
-
-
-    def test_startDatabaseNotRunning(self):
-        """ Ensure that if we can't connect to postgres we spawn pg_ctl """
-
-        def produceConnection(*args):
-            raise pgdb.DatabaseError
-
-        dummyReactor = DummyProcessReactor()
-        svc = PostgresService(
-            FilePath("postgres_4.pgdb"),
-            lambda x : Service(),
-            "",
-             reactor=dummyReactor,
-        )
-        svc.produceConnection = produceConnection
-        svc.env = {}
-        svc.startDatabase()
-        self.assertEquals(len(dummyReactor.spawnedProcesses), 1)
-        self.assertTrue(dummyReactor.spawnedProcesses[0]._executable.endswith("pg_ctl"))

Modified: CalendarServer/trunk/txdav/common/datastore/test/util.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/test/util.py	2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/txdav/common/datastore/test/util.py	2013-07-30 18:35:38 UTC (rev 11566)
@@ -199,7 +199,7 @@
             directoryService = TestStoreDirectoryService()
         if self.sharedService is None:
             ready = Deferred()
-            def getReady(connectionFactory):
+            def getReady(connectionFactory, storageService):
                 self.makeAndCleanStore(
                     testCase, notifierFactory, directoryService, attachmentRoot
                 ).chainDeferred(ready)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130730/4f98d753/attachment-0001.html>


More information about the calendarserver-changes mailing list