[CalendarServer-changes] [11566] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Tue Jul 30 11:35:39 PDT 2013
Revision: 11566
http://trac.calendarserver.org//changeset/11566
Author: sagen at apple.com
Date: 2013-07-30 11:35:38 -0700 (Tue, 30 Jul 2013)
Log Message:
-----------
Agent shuts down after a period of inactivity, and immediately if DataRoot becomes unavailable.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/tap/caldav.py
CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
CalendarServer/trunk/calendarserver/tools/agent.py
CalendarServer/trunk/calendarserver/tools/test/test_agent.py
CalendarServer/trunk/calendarserver/tools/test/test_gateway.py
CalendarServer/trunk/calendarserver/tools/test/test_principals.py
CalendarServer/trunk/contrib/launchd/calendarserver.plist
CalendarServer/trunk/test
CalendarServer/trunk/txdav/base/datastore/subpostgres.py
CalendarServer/trunk/txdav/base/datastore/test/test_subpostgres.py
CalendarServer/trunk/txdav/common/datastore/test/util.py
Added Paths:
-----------
CalendarServer/trunk/twext/internet/fswatch.py
CalendarServer/trunk/twext/internet/test/test_fswatch.py
Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -57,6 +57,7 @@
from twext.python.filepath import CachingFilePath
from twext.internet.ssl import ChainingOpenSSLContextFactory
from twext.internet.tcp import MaxAcceptTCPServer, MaxAcceptSSLServer
+from twext.internet.fswatch import DirectoryChangeListener, IDirectoryChangeListenee
from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
from twext.enterprise.ienterprise import POSTGRES_DIALECT
@@ -588,18 +589,21 @@
"""
def __init__(self, serviceCreator, connectionPool, store, logObserver,
- reactor=None):
+ storageService, reactor=None):
"""
@param serviceCreator: callable which will be passed the connection
pool, store, and log observer, and should return a Service
@param connectionPool: connection pool to pass to serviceCreator
@param store: the store object being processed
@param logObserver: log observer to pass to serviceCreator
+ @param storageService: the service responsible for starting/stopping
+ the data store
"""
self.serviceCreator = serviceCreator
self.connectionPool = connectionPool
self.store = store
self.logObserver = logObserver
+ self.storageService = storageService
self.stepper = Stepper()
if reactor is None:
@@ -613,7 +617,7 @@
we create the main service and pass in the store.
"""
service = self.serviceCreator(self.connectionPool, self.store,
- self.logObserver)
+ self.logObserver, self.storageService)
if self.parent is not None:
self.reactor.callLater(0, service.setServiceParent, self.parent)
return succeed(None)
@@ -626,7 +630,7 @@
"""
try:
service = self.serviceCreator(self.connectionPool, None,
- self.logObserver)
+ self.logObserver, self.storageService)
if self.parent is not None:
self.reactor.callLater(0, service.setServiceParent, self.parent)
except StoreNotAvailable:
@@ -1129,7 +1133,7 @@
Create a service to be used in a single-process, stand-alone
configuration. Memcached will be spawned automatically.
"""
- def slaveSvcCreator(pool, store, logObserver):
+ def slaveSvcCreator(pool, store, logObserver, storageService):
if store is None:
raise StoreNotAvailable()
@@ -1232,7 +1236,7 @@
When created, that service will have access to the storage facilities.
"""
- def toolServiceCreator(pool, store, ignored):
+ def toolServiceCreator(pool, store, ignored, storageService):
return config.UtilityServiceClass(store)
uid, gid = getSystemIDs(config.UserName, config.GroupName)
@@ -1255,7 +1259,13 @@
# These we need to set in order to open the store
config.EnableCalDAV = config.EnableCardDAV = True
- def agentServiceCreator(pool, store, ignored):
+ def agentServiceCreator(pool, store, ignored, storageService):
+ if storageService is not None:
+ # Shut down if DataRoot becomes unavailable
+ from twisted.internet import reactor
+ dataStoreWatcher = DirectoryChangeListener(reactor,
+ config.DataRoot, DataStoreMonitor(reactor, storageService))
+ dataStoreWatcher.startListening()
return makeAgentService(store)
uid, gid = getSystemIDs(config.UserName, config.GroupName)
@@ -1294,7 +1304,7 @@
"""
def createSubServiceFactory(dialect=POSTGRES_DIALECT,
paramstyle='pyformat'):
- def subServiceFactory(connectionFactory):
+ def subServiceFactory(connectionFactory, storageService):
ms = MultiService()
cp = ConnectionPool(connectionFactory, dialect=dialect,
paramstyle=paramstyle,
@@ -1303,7 +1313,7 @@
store = storeFromConfig(config, cp.connection)
pps = PreProcessingService(createMainService, cp, store,
- logObserver)
+ logObserver, storageService)
# The following "steps" will run sequentially when the service
# hierarchy is started. If any of the steps raise an exception
@@ -1410,7 +1420,7 @@
raise UsageError("Unknown database type %r" (config.DBType,))
else:
store = storeFromConfig(config, None)
- return createMainService(None, store, logObserver)
+ return createMainService(None, store, logObserver, None)
def makeService_Combined(self, options):
@@ -1593,7 +1603,7 @@
# to), and second, the service which does an upgrade from the
# filesystem to the database (if that's necessary, and there is
# filesystem data in need of upgrading).
- def spawnerSvcCreator(pool, store, ignored):
+ def spawnerSvcCreator(pool, store, ignored, storageService):
if store is None:
raise StoreNotAvailable()
@@ -2375,3 +2385,31 @@
gid = getgid()
return uid, gid
+
+
+class DataStoreMonitor(object):
+ implements(IDirectoryChangeListenee)
+
+ def __init__(self, reactor, storageService):
+ """
+ @param storageService: the service making use of the DataStore
+ directory; we send it a hardStop() to shut it down
+ """
+ self._reactor = reactor
+ self._storageService = storageService
+
+ def disconnected(self):
+ self._storageService.hardStop()
+ self._reactor.stop()
+
+ def deleted(self):
+ self._storageService.hardStop()
+ self._reactor.stop()
+
+ def renamed(self):
+ self._storageService.hardStop()
+ self._reactor.stop()
+
+ def connectionLost(self, reason):
+ pass
+
Modified: CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/test/test_caldav.py 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tap/test/test_caldav.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -61,7 +61,7 @@
CalDAVOptions, CalDAVServiceMaker, CalDAVService, GroupOwnedUNIXServer,
DelayedStartupProcessMonitor, DelayedStartupLineLogger, TwistdSlaveProcess,
_CONTROL_SERVICE_NAME, getSystemIDs, PreProcessingService,
- QuitAfterUpgradeStep
+ QuitAfterUpgradeStep, DataStoreMonitor
)
from calendarserver.provision.root import RootResource
from twext.enterprise.queue import PeerConnectionPool, LocalQueuer
@@ -555,7 +555,9 @@
uid=None, gid=None):
pool = None
logObserver = None
- svc = createMainService(pool, store, logObserver)
+ storageService = None
+ svc = createMainService(pool, store, logObserver,
+ storageService)
multi = MultiService()
svc.setServiceParent(multi)
return multi
@@ -1238,12 +1240,19 @@
and stopService to be called again by counting the number of times
START and STOP appear in the process output.
"""
+ # Inherit the reactor used to run trial
+ reactorArg = ""
+ for arg in sys.argv:
+ if arg.startswith("--reactor"):
+ reactorArg = arg
+ break
+
tacFilePath = os.path.join(os.path.dirname(__file__), "reexec.tac")
twistd = which("twistd")[0]
deferred = Deferred()
proc = reactor.spawnProcess(
CapturingProcessProtocol(deferred, None), twistd,
- [twistd, '-n', '-y', tacFilePath],
+ [twistd, reactorArg, '-n', '-y', tacFilePath],
env=os.environ
)
reactor.callLater(3, proc.signalProcess, "HUP")
@@ -1381,15 +1390,15 @@
class PreProcessingServiceTestCase(TestCase):
- def fakeServiceCreator(self, cp, store, lo):
- self.history.append(("serviceCreator", store))
+ def fakeServiceCreator(self, cp, store, lo, storageService):
+ self.history.append(("serviceCreator", store, storageService))
def setUp(self):
self.history = []
self.clock = Clock()
self.pps = PreProcessingService(self.fakeServiceCreator, None, "store",
- None, reactor=self.clock)
+ None, "storageService", reactor=self.clock)
def _record(self, value, failure):
@@ -1409,7 +1418,7 @@
self.pps.startService()
self.assertEquals(self.history,
['one success', 'two success', 'three success', 'four success',
- ('serviceCreator', 'store')])
+ ('serviceCreator', 'store', 'storageService')])
def test_allFailure(self):
@@ -1425,7 +1434,7 @@
self.pps.startService()
self.assertEquals(self.history,
['one success', 'two failure', 'three failure', 'four failure',
- ('serviceCreator', None)])
+ ('serviceCreator', None, 'storageService')])
def test_partialFailure(self):
@@ -1441,7 +1450,7 @@
self.pps.startService()
self.assertEquals(self.history,
['one success', 'two failure', 'three success', 'four failure',
- ('serviceCreator', 'store')])
+ ('serviceCreator', 'store', 'storageService')])
def test_quitAfterUpgradeStep(self):
@@ -1460,5 +1469,47 @@
self.pps.startService()
self.assertEquals(self.history,
['one success', 'two success', 'four failure',
- ('serviceCreator', None)])
+ ('serviceCreator', None, 'storageService')])
self.assertFalse(triggerFile.exists())
+
+
+class StubStorageService(object):
+
+ def __init__(self):
+ self.hardStopCalled = False
+
+ def hardStop(self):
+ self.hardStopCalled = True
+
+
+class StubReactor(object):
+
+ def __init__(self):
+ self.stopCalled = False
+
+ def stop(self):
+ self.stopCalled = True
+
+
+class DataStoreMonitorTestCase(TestCase):
+
+ def test_monitor(self):
+ storageService = StubStorageService()
+ stubReactor = StubReactor()
+ monitor = DataStoreMonitor(stubReactor, storageService)
+
+ monitor.disconnected()
+ self.assertTrue(storageService.hardStopCalled)
+ self.assertTrue(stubReactor.stopCalled)
+
+ storageService.hardStopCalled = False
+ stubReactor.stopCalled = False
+ monitor.deleted()
+ self.assertTrue(storageService.hardStopCalled)
+ self.assertTrue(stubReactor.stopCalled)
+
+ storageService.hardStopCalled = False
+ stubReactor.stopCalled = False
+ monitor.renamed()
+ self.assertTrue(storageService.hardStopCalled)
+ self.assertTrue(stubReactor.stopCalled)
Modified: CalendarServer/trunk/calendarserver/tools/agent.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/agent.py 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tools/agent.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -161,17 +161,20 @@
"""
isLeaf = True
- def __init__(self, store, davRootResource, directory):
+ def __init__(self, store, davRootResource, directory, inactivityDetector):
"""
@param store: an already opened store
@param davRootResource: the root resource, required for principal
operations
@param directory: a directory service
+ @param inactivityDetector: the InactivityDetector to tell when requests
+ come in
"""
Resource.__init__(self)
self.store = store
self.davRootResource = davRootResource
self.directory = directory
+ self.inactivityDetector = inactivityDetector
def render_POST(self, request):
"""
@@ -179,6 +182,8 @@
return the result as the response body.
"""
+ self.inactivityDetector.activity()
+
def onSuccess(result, output):
txt = output.getvalue()
output.close()
@@ -234,9 +239,14 @@
davRootResource = getRootResource(config, store)
directory = davRootResource.getDirectory()
+ def becameInactive():
+ log.warn("Agent inactive; shutting down")
+ reactor.stop()
+
+ inactivityDetector = InactivityDetector(reactor, 60 * 10, becameInactive)
root = Resource()
root.putChild("gateway", AgentGatewayResource(store,
- davRootResource, directory))
+ davRootResource, directory, inactivityDetector))
realmName = "/Local/Default"
portal = Portal(AgentRealm(root, ["com.apple.calendarserver"]),
@@ -250,7 +260,57 @@
+class InactivityDetector(object):
+ """
+ If no 'activity' takes place for a specified amount of time, a method
+ will get called. Activity causes the inactivity time threshold to be
+ reset.
+ """
+ def __init__(self, reactor, timeoutSeconds, becameInactive):
+ """
+ @param reactor: the reactor
+ @timeoutSeconds: the number of seconds considered to mean inactive
+ @becameInactive: the method to call (with no arguments) when
+ inactivity is reached
+ """
+ self._reactor = reactor
+ self._timeoutSeconds = timeoutSeconds
+ self._becameInactive = becameInactive
+
+ self._delayedCall = self._reactor.callLater(self._timeoutSeconds,
+ self._inactivityThresholdReached)
+
+
+ def _inactivityThresholdReached(self):
+ """
+ The delayed call has fired. We're inactive. Call the becameInactive
+ method.
+ """
+ self._becameInactive()
+
+
+ def activity(self):
+ """
+ Call this to let the InactivityMonitor that there has been activity.
+ It will reset the timeout.
+ """
+ if self._delayedCall.active():
+ self._delayedCall.reset(self._timeoutSeconds)
+ else:
+ self._delayedCall = self._reactor.callLater(self._timeoutSeconds,
+ self._inactivityThresholdReached)
+
+
+ def stop(self):
+ """
+ Cancels the delayed call
+ """
+ if self._delayedCall.active():
+ self._delayedCall.cancel()
+
+
+
#
# Alternate implementation using AMP instead of HTTP
#
Modified: CalendarServer/trunk/calendarserver/tools/test/test_agent.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/test/test_agent.py 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tools/test/test_agent.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -17,8 +17,10 @@
from calendarserver.tools.agent import AgentRealm
from calendarserver.tools.agent import CustomDigestCredentialFactory
from calendarserver.tools.agent import DirectoryServiceChecker
+from calendarserver.tools.agent import InactivityDetector
from twistedcaldav.test.util import TestCase
from twisted.internet.defer import inlineCallbacks
+from twisted.internet.task import Clock
from twisted.cred.error import UnauthorizedLogin
from twisted.web.resource import IResource
from twisted.web.resource import ForbiddenResource
@@ -112,7 +114,34 @@
+class InactivityDectectorTestCase(TestCase):
+ def test_inactivity(self):
+ clock = Clock()
+
+ self.inactivityReached = False
+ def becameInactive():
+ self.inactivityReached = True
+
+ id = InactivityDetector(clock, 5, becameInactive)
+
+ # After 3 seconds, not inactive
+ clock.advance(3)
+ self.assertFalse(self.inactivityReached)
+
+ # Activity happens, pushing out the inactivity threshold
+ id.activity()
+ clock.advance(3)
+ self.assertFalse(self.inactivityReached)
+
+ # Time passes without activity
+ clock.advance(3)
+ self.assertTrue(self.inactivityReached)
+
+ id.stop()
+
+
+
class FakeRequest(object):
def getClientIP(self):
Modified: CalendarServer/trunk/calendarserver/tools/test/test_gateway.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/test/test_gateway.py 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tools/test/test_gateway.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -27,7 +27,6 @@
from twistedcaldav.config import config
from twistedcaldav.test.util import TestCase, CapturingProcessProtocol
from calendarserver.tools.util import getDirectory
-from txdav.common.datastore.test.util import SQLStoreBuilder
import plistlib
@@ -42,8 +41,7 @@
template = templateFile.read()
templateFile.close()
- # Use the same DatabaseRoot as the SQLStoreBuilder
- databaseRoot = os.path.abspath(SQLStoreBuilder.SHARED_DB_PATH)
+ databaseRoot = os.path.abspath("_spawned_scripts_db" + str(os.getpid()))
newConfig = template % {
"ServerRoot" : os.path.abspath(config.ServerRoot),
"DatabaseRoot" : databaseRoot,
Modified: CalendarServer/trunk/calendarserver/tools/test/test_principals.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/test/test_principals.py 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/calendarserver/tools/test/test_principals.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -31,7 +31,6 @@
from calendarserver.tap.util import directoryFromConfig
from calendarserver.tools.principals import (parseCreationArgs, matchStrings,
updateRecord, principalForPrincipalID, getProxies, setProxies)
-from txdav.common.datastore.test.util import SQLStoreBuilder
class ManagePrincipalsTestCase(TestCase):
@@ -50,9 +49,7 @@
template = templateFile.read()
templateFile.close()
- # Use the same DatabaseRoot as the SQLStoreBuilder
- databaseRoot = os.path.abspath(SQLStoreBuilder.SHARED_DB_PATH)
-
+ databaseRoot = os.path.abspath("_spawned_scripts_db" + str(os.getpid()))
newConfig = template % {
"ServerRoot" : os.path.abspath(config.ServerRoot),
"DataRoot" : os.path.abspath(config.DataRoot),
Modified: CalendarServer/trunk/contrib/launchd/calendarserver.plist
===================================================================
--- CalendarServer/trunk/contrib/launchd/calendarserver.plist 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/contrib/launchd/calendarserver.plist 2013-07-30 18:35:38 UTC (rev 11566)
@@ -31,7 +31,7 @@
<string>/Applications/Server.app/Contents/ServerRoot/usr/sbin/caldavd</string>
<string>-X</string>
<string>-R</string>
- <string>caldav_kqueue</string>
+ <string>kqueue</string>
<string>-o</string>
<string>FailIfUpgradeNeeded=False</string>
</array>
Modified: CalendarServer/trunk/test
===================================================================
--- CalendarServer/trunk/test 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/test 2013-07-30 18:35:38 UTC (rev 11566)
@@ -83,7 +83,7 @@
find "${wd}" -name \*.pyc -print0 | xargs -0 rm;
mkdir -p "${wd}/data";
-cd "${wd}" && "${python}" "${trial}" --temp-directory="${wd}/data/trial" --rterrors ${random} ${until_fail} ${no_colour} ${coverage} ${numjobs} ${test_modules};
+cd "${wd}" && "${python}" "${trial}" --reactor=kqueue --temp-directory="${wd}/data/trial" --rterrors ${random} ${until_fail} ${no_colour} ${coverage} ${numjobs} ${test_modules};
if ${flaky}; then
echo "";
Added: CalendarServer/trunk/twext/internet/fswatch.py
===================================================================
--- CalendarServer/trunk/twext/internet/fswatch.py (rev 0)
+++ CalendarServer/trunk/twext/internet/fswatch.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -0,0 +1,166 @@
+##
+# Copyright (c) 2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Watch the availablity of a file system directory
+"""
+
+import os
+from zope.interface import Interface
+from twisted.internet import reactor
+from twisted.python.log import Logger
+
+try:
+ from select import (kevent, KQ_FILTER_VNODE, KQ_EV_ADD, KQ_EV_ENABLE,
+ KQ_EV_CLEAR, KQ_NOTE_DELETE, KQ_NOTE_RENAME, KQ_EV_EOF)
+ kqueueSupported = True
+except ImportError:
+ # kqueue not supported on this platform
+ kqueueSupported = False
+
+
+class IDirectoryChangeListenee(Interface):
+ """
+ A delegate of DirectoryChangeListener
+ """
+
+ def disconnected(): #@NoSelf
+ """
+ The directory has been unmounted
+ """
+
+ def deleted(): #@NoSelf
+ """
+ The directory has been deleted
+ """
+
+ def renamed(): #@NoSelf
+ """
+ The directory has been renamed
+ """
+
+ def connectionLost(reason): #@NoSelf
+ """
+ The file descriptor has been closed
+ """
+
+
+#TODO: better way to tell if reactor is kqueue or not
+if kqueueSupported and hasattr(reactor, "_doWriteOrRead"):
+
+
+ # Wrap _doWriteOrRead to support KQ_FILTER_VNODE
+ origDoWriteOrRead = reactor._doWriteOrRead
+ def _doWriteOrReadOrVNodeEvent(selectable, fd, event):
+ origDoWriteOrRead(selectable, fd, event)
+ if event.filter == KQ_FILTER_VNODE:
+ selectable.vnodeEventHappened(event)
+ reactor._doWriteOrRead = _doWriteOrReadOrVNodeEvent
+
+
+
+ class DirectoryChangeListener(Logger, object):
+ """
+ Listens for the removal, renaming, or general unavailability of a
+ given directory, and lets a delegate listenee know about them.
+ """
+
+ def __init__(self, reactor, dirname, listenee):
+ """
+ @param reactor: the reactor
+ @param dirname: the full path to the directory to watch; it must
+ already exist
+ @param listenee: the delegate to call
+ @type listenee: IDirectoryChangeListenee
+ """
+ self._reactor = reactor
+ self._fd = os.open(dirname, os.O_RDONLY)
+ self._dirname = dirname
+ self._listenee = listenee
+
+
+ def logPrefix(self):
+ return repr(self._dirname)
+
+
+ def fileno(self):
+ return self._fd
+
+
+ def vnodeEventHappened(self, evt):
+ if evt.flags & KQ_EV_EOF:
+ self._listenee.disconnected()
+ if evt.fflags & KQ_NOTE_DELETE:
+ self._listenee.deleted()
+ if evt.fflags & KQ_NOTE_RENAME:
+ self._listenee.renamed()
+
+
+ def startListening(self):
+ ke = kevent(self._fd, filter=KQ_FILTER_VNODE,
+ flags=(KQ_EV_ADD | KQ_EV_ENABLE | KQ_EV_CLEAR),
+ fflags=KQ_NOTE_DELETE | KQ_NOTE_RENAME)
+ self._reactor._kq.control([ke], 0, None)
+ self._reactor._selectables[self._fd] = self
+
+
+ def connectionLost(self, reason):
+ os.close(self._fd)
+ self._listenee.connectionLost(reason)
+
+
+else:
+
+ # TODO: implement this for systems without kqueue support:
+
+ class DirectoryChangeListener(Logger, object):
+ """
+ Listens for the removal, renaming, or general unavailability of a
+ given directory, and lets a delegate listenee know about them.
+ """
+
+ def __init__(self, reactor, dirname, listenee):
+ """
+ @param reactor: the reactor
+ @param dirname: the full path to the directory to watch
+ @param listenee:
+ """
+ self._reactor = reactor
+ self._fd = os.open(dirname, os.O_RDONLY)
+ self._dirname = dirname
+ self._listenee = listenee
+
+
+ def logPrefix(self):
+ return repr(self._dirname)
+
+
+ def fileno(self):
+ return self._fd
+
+
+ def vnodeEventHappened(self, evt):
+ pass
+
+
+ def startListening(self):
+ pass
+
+
+ def connectionLost(self, reason):
+ os.close(self._fd)
+ self._listenee.connectionLost(reason)
+
Added: CalendarServer/trunk/twext/internet/test/test_fswatch.py
===================================================================
--- CalendarServer/trunk/twext/internet/test/test_fswatch.py (rev 0)
+++ CalendarServer/trunk/twext/internet/test/test_fswatch.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -0,0 +1,27 @@
+##
+# Copyright (c) 2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.internet.fswatch}.
+"""
+
+# from twext.internet.fswatch import DirectoryChangeListener
+# from twisted.trial.unittest import TestCase
+
+# TODO: tests
+
+
+
Modified: CalendarServer/trunk/txdav/base/datastore/subpostgres.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/subpostgres.py 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/txdav/base/datastore/subpostgres.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -21,6 +21,8 @@
import os
import pwd
+import re
+import signal
from hashlib import md5
@@ -250,6 +252,7 @@
self._pgCtl = pgCtl
self._initdb = initDB
self._reactor = reactor
+ self._postgresPid = None
@property
@@ -363,7 +366,7 @@
if self.shutdownDeferred is None:
# Only continue startup if we've not begun shutdown
- self.subServiceFactory(self.produceConnection).setServiceParent(self)
+ self.subServiceFactory(self.produceConnection, self).setServiceParent(self)
def pauseMonitor(self):
@@ -436,15 +439,37 @@
uid=self.uid, gid=self.gid,
)
self.monitor = monitor
+
+ def gotStatus(result):
+ """
+ Grab the postgres pid from the pgCtl status call in case we need
+ to kill it directly later on in hardStop(). Useful in conjunction
+ with the DataStoreMonitor so we can shut down if DataRoot has been
+ removed/renamed/unmounted.
+ """
+ reResult = re.search("PID: (\d+)\D", result)
+ if reResult != None:
+ self._postgresPid = int(reResult.group(1))
+ self.ready(*createConnection())
+ self.deactivateDelayedShutdown()
+
def gotReady(result):
log.warn("{cmd} exited", cmd=pgCtl)
self.shouldStopDatabase = True
- self.ready(*createConnection())
- self.deactivateDelayedShutdown()
+ d = Deferred()
+ statusMonitor = CapturingProcessProtocol(d, None)
+ self.reactor.spawnProcess(
+ statusMonitor, pgCtl, [pgCtl, "status"],
+ env=self.env, path=self.workingDir.path,
+ uid=self.uid, gid=self.gid,
+ )
+ d.addCallback(gotStatus)
+
def reportit(f):
log.failure("starting postgres", f)
self.deactivateDelayedShutdown()
self.reactor.stop()
+
self.monitor.completionDeferred.addCallback(
gotReady).addErrback(reportit)
@@ -523,3 +548,13 @@
# return result
# d.addCallback(maybeStopSubprocess)
# return d
+
+ def hardStop(self):
+ """
+ Stop postgres quickly by sending it SIGQUIT
+ """
+ if self._postgresPid is not None:
+ try:
+ os.kill(self._postgresPid, signal.SIGQUIT)
+ except OSError:
+ pass
Modified: CalendarServer/trunk/txdav/base/datastore/test/test_subpostgres.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/test/test_subpostgres.py 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/txdav/base/datastore/test/test_subpostgres.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -22,17 +22,13 @@
# NOTE: This import will fail eventually when this functionality is added to
# MemoryReactor:
-from twisted.runner.test.test_procmon import DummyProcessReactor
-from twisted.python.filepath import FilePath
from twext.python.filepath import CachingFilePath
from txdav.base.datastore.subpostgres import PostgresService
from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.application.service import Service
-import pgdb
-
class SubprocessStartup(TestCase):
"""
Tests for starting and stopping the subprocess.
@@ -53,7 +49,7 @@
instances = []
ready = Deferred()
- def __init__(self, connectionFactory):
+ def __init__(self, connectionFactory, storageService):
self.connection = connectionFactory()
test.addCleanup(self.connection.close)
self.instances.append(self)
@@ -104,7 +100,7 @@
instances = []
ready = Deferred()
- def __init__(self, connectionFactory):
+ def __init__(self, connectionFactory, storageService):
self.connection = connectionFactory()
test.addCleanup(self.connection.close)
self.instances.append(self)
@@ -156,7 +152,7 @@
instances = []
ready = Deferred()
- def __init__(self, connectionFactory):
+ def __init__(self, connectionFactory, storageService):
self.connection = connectionFactory()
test.addCleanup(self.connection.close)
self.instances.append(self)
@@ -195,73 +191,3 @@
self.assertEquals(values, [["value1"], ["value2"]])
- def test_startDatabaseRunning(self):
- """ Ensure that if we can connect to postgres we don't spawn pg_ctl """
-
- self.cursorHistory = []
-
- class DummyCursor(object):
- def __init__(self, historyHolder):
- self.historyHolder = historyHolder
-
- def execute(self, *args):
- self.historyHolder.cursorHistory.append(args)
-
- def close(self):
- pass
-
- class DummyConnection(object):
- def __init__(self, historyHolder):
- self.historyHolder = historyHolder
-
- def cursor(self):
- return DummyCursor(self.historyHolder)
-
- def commit(self):
- pass
-
- def close(self):
- pass
-
- def produceConnection(*args):
- return DummyConnection(self)
-
- dummyReactor = DummyProcessReactor()
- svc = PostgresService(
- FilePath("postgres_4.pgdb"),
- lambda x : Service(),
- "",
- reactor=dummyReactor,
- )
- svc.produceConnection = produceConnection
- svc.env = {}
- svc.startDatabase()
- self.assertEquals(
- self.cursorHistory,
- [
- ('commit',),
- ("create database subpostgres with encoding 'UTF8'",),
- ('',)
- ]
- )
- self.assertEquals(dummyReactor.spawnedProcesses, [])
-
-
- def test_startDatabaseNotRunning(self):
- """ Ensure that if we can't connect to postgres we spawn pg_ctl """
-
- def produceConnection(*args):
- raise pgdb.DatabaseError
-
- dummyReactor = DummyProcessReactor()
- svc = PostgresService(
- FilePath("postgres_4.pgdb"),
- lambda x : Service(),
- "",
- reactor=dummyReactor,
- )
- svc.produceConnection = produceConnection
- svc.env = {}
- svc.startDatabase()
- self.assertEquals(len(dummyReactor.spawnedProcesses), 1)
- self.assertTrue(dummyReactor.spawnedProcesses[0]._executable.endswith("pg_ctl"))
Modified: CalendarServer/trunk/txdav/common/datastore/test/util.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/test/util.py 2013-07-30 03:15:54 UTC (rev 11565)
+++ CalendarServer/trunk/txdav/common/datastore/test/util.py 2013-07-30 18:35:38 UTC (rev 11566)
@@ -199,7 +199,7 @@
directoryService = TestStoreDirectoryService()
if self.sharedService is None:
ready = Deferred()
- def getReady(connectionFactory):
+ def getReady(connectionFactory, storageService):
self.makeAndCleanStore(
testCase, notifierFactory, directoryService, attachmentRoot
).chainDeferred(ready)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130730/4f98d753/attachment-0001.html>
More information about the calendarserver-changes
mailing list