[CalendarServer-changes] [6551] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Mon Nov 1 15:38:50 PDT 2010
Revision: 6551
http://trac.macosforge.org/projects/calendarserver/changeset/6551
Author: glyph at apple.com
Date: 2010-11-01 15:38:48 -0700 (Mon, 01 Nov 2010)
Log Message:
-----------
Add a connection pool.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/sidecar/task.py
CalendarServer/trunk/calendarserver/tap/caldav.py
CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
CalendarServer/trunk/calendarserver/tap/util.py
CalendarServer/trunk/calendarserver/tools/purge.py
CalendarServer/trunk/calendarserver/tools/test/test_principals.py
CalendarServer/trunk/calendarserver/tools/test/test_purge.py
CalendarServer/trunk/twistedcaldav/directory/util.py
CalendarServer/trunk/twistedcaldav/localization.py
CalendarServer/trunk/twistedcaldav/mail.py
CalendarServer/trunk/twistedcaldav/stdconfig.py
CalendarServer/trunk/twistedcaldav/storebridge.py
CalendarServer/trunk/twistedcaldav/test/test_mail.py
CalendarServer/trunk/twistedcaldav/test/test_wrapping.py
CalendarServer/trunk/twistedcaldav/timezoneservice.py
CalendarServer/trunk/txdav/base/datastore/subpostgres.py
CalendarServer/trunk/txdav/base/propertystore/test/test_sql.py
CalendarServer/trunk/txdav/caldav/datastore/sql.py
CalendarServer/trunk/txdav/caldav/datastore/test/common.py
CalendarServer/trunk/txdav/caldav/datastore/test/test_sql.py
CalendarServer/trunk/txdav/carddav/datastore/sql.py
CalendarServer/trunk/txdav/carddav/datastore/test/common.py
CalendarServer/trunk/txdav/carddav/datastore/test/test_sql.py
CalendarServer/trunk/txdav/common/datastore/sql.py
CalendarServer/trunk/txdav/common/datastore/test/util.py
CalendarServer/trunk/txdav/idav.py
Added Paths:
-----------
CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py
CalendarServer/trunk/txdav/base/datastore/threadutils.py
Property Changed:
----------------
CalendarServer/trunk/
CalendarServer/trunk/txdav/caldav/datastore/index_file.py
CalendarServer/trunk/txdav/caldav/datastore/test/test_index_file.py
CalendarServer/trunk/txdav/carddav/datastore/index_file.py
CalendarServer/trunk/txdav/carddav/datastore/test/test_index_file.py
Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
+ /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
Modified: CalendarServer/trunk/calendarserver/sidecar/task.py
===================================================================
--- CalendarServer/trunk/calendarserver/sidecar/task.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/sidecar/task.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -27,7 +27,7 @@
from zope.interface import implements
-from twisted.application.service import Service, IServiceMaker
+from twisted.application.service import MultiService, Service, IServiceMaker
from twisted.internet.defer import DeferredList, inlineCallbacks, returnValue
from twisted.internet.reactor import callLater
from twisted.plugin import IPlugin
@@ -318,6 +318,7 @@
def makeService(self, options):
+ svc = MultiService()
#
# The task sidecar doesn't care about system SACLs
#
@@ -330,11 +331,12 @@
oldLogLevel = logLevelForNamespace(None)
setLogLevelForNamespace(None, "info")
- rootResource = getRootResource(config)
+ rootResource = getRootResource(config, svc)
- service = CalDAVTaskService(rootResource)
+ CalDAVTaskService(rootResource).setServiceParent(svc)
# Change log level back to what it was before
setLogLevelForNamespace(None, oldLogLevel)
- return service
+ return svc
+
Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -29,7 +29,6 @@
from subprocess import Popen, PIPE
from pwd import getpwuid, getpwnam
from grp import getgrnam
-from inspect import getargspec
import OpenSSL
from OpenSSL.SSL import Error as SSLError
@@ -41,14 +40,14 @@
from twisted.python.reflect import namedClass
from twisted.plugin import IPlugin
from twisted.internet.defer import gatherResults
-from twisted.internet import reactor
+from twisted.internet import reactor as _reactor
from twisted.internet.reactor import addSystemEventTrigger
from twisted.internet.process import ProcessExitedAlready
from twisted.internet.protocol import Protocol, Factory
+from twisted.internet.protocol import ProcessProtocol
from twisted.application.internet import TCPServer, UNIXServer
from twisted.application.service import MultiService, IServiceMaker
-from twisted.scripts.mktap import getid
-from twisted.runner import procmon
+from twisted.application.service import Service
import twext
from twext.web2.server import Site
@@ -60,8 +59,6 @@
from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
-from txdav.common.datastore.sql import v1_schema
-from txdav.base.datastore.subpostgres import PostgresService
from txdav.common.datastore.util import UpgradeToDatabaseService
from twistedcaldav.config import ConfigurationError
@@ -72,7 +69,13 @@
from twistedcaldav.mail import IMIPReplyInboxResource
from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
from twistedcaldav.upgrade import upgradeData
+from txdav.base.datastore.subpostgres import PostgresService
+from calendarserver.tap.util import pgServiceFromConfig
+from txdav.base.datastore.asyncsqlpool import ConnectionPool
+
+from txdav.base.datastore.asyncsqlpool import ConnectionPoolConnection
+
try:
from twistedcaldav.authkerb import NegotiateCredentialFactory
NegotiateCredentialFactory # pacify pyflakes
@@ -83,10 +86,12 @@
from calendarserver.accesslog import AMPLoggingFactory
from calendarserver.accesslog import RotatingFileAccessLoggingObserver
from calendarserver.tap.util import getRootResource, computeProcessCount
+from calendarserver.tap.util import ConnectionWithPeer
from calendarserver.tools.util import checkDirectory
try:
from calendarserver.version import version
+ version
except ImportError:
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "support"))
from version import version as getVersion
@@ -97,22 +102,30 @@
log = Logger()
+from twisted.python.util import uidFromString, gidFromString
-class CalDAVStatisticsProtocol (Protocol):
+def getid(uid, gid):
+ if uid is not None:
+ uid = uidFromString(uid)
+ if gid is not None:
+ gid = gidFromString(gid)
+ return (uid, gid)
- def connectionMade(self):
- stats = self.factory.logger.observer.getGlobalHits()
- self.transport.write("%s\r\n" % (stats,))
- self.transport.loseConnection()
+class CalDAVStatisticsProtocol (Protocol):
-class CalDAVStatisticsServer (Factory):
+ def connectionMade(self):
+ stats = self.factory.logger.observer.getGlobalHits()
+ self.transport.write("%s\r\n" % (stats,))
+ self.transport.loseConnection()
- protocol = CalDAVStatisticsProtocol
+class CalDAVStatisticsServer (Factory):
- def __init__(self, logObserver):
- self.logger = logObserver
+ protocol = CalDAVStatisticsProtocol
+ def __init__(self, logObserver):
+ self.logger = logObserver
+
class ErrorLoggingMultiService(MultiService):
""" Registers a rotating file logger for error logging, iff
config.ErrorLogEnabled is True. """
@@ -233,7 +246,7 @@
def postOptions(self):
self.loadConfiguration()
self.checkConfiguration()
-
+
def loadConfiguration(self):
if not os.path.exists(self["config"]):
print "Config file %s not found. Exiting." % (self["config"],)
@@ -248,7 +261,7 @@
sys.exit(1)
config.updateDefaults(self.overrides)
-
+
def checkDirectory(self, dirpath, description, access=None, create=None):
checkDirectory(dirpath, description, access=access, create=create)
@@ -282,7 +295,7 @@
# Require write access because one might not allow editing on /
access=os.W_OK,
)
-
+
#
# Verify that other root paths are OK
#
@@ -325,7 +338,7 @@
access=os.W_OK,
create=(0750, config.UserName, config.GroupName),
)
-
+
#
# Nuke the file log observer's time format.
#
@@ -497,7 +510,6 @@
additional = []
if config.Scheduling.iMIP.Enabled:
additional.append(("inbox", IMIPReplyInboxResource, [], "digest"))
- rootResource = getRootResource(config, additional)
#
# Configure the service
@@ -528,6 +540,8 @@
service = CalDAVService(logObserver)
+ rootResource = getRootResource(config, service, additional)
+
underlyingSite = Site(rootResource)
requestFactory = underlyingSite
@@ -670,16 +684,22 @@
return service
+ def scheduleOnDiskUpgrade(self):
+ """
+ Schedule any on disk upgrades we might need. Note that this will only
+ do the filesystem-format upgrades; migration to the database needs to
+ be done when the connection and possibly server is already up and
+ running.
+ """
+ addSystemEventTrigger("before", "startup", upgradeData, config)
+
+
def makeService_Single(self, options):
"""
Create a service to be used in a single-process, stand-alone
configuration.
"""
- # Schedule any on disk upgrades we might need. Note that this
- # will only do the filesystem-format upgrades; migration to the
- # database needs to be done when the connection and possibly
- # server is already up and running. -glyph
- addSystemEventTrigger("before", "startup", upgradeData, config)
+ self.scheduleOnDiskUpgrade()
return self.storageService(self.makeService_Slave(options))
@@ -721,7 +741,13 @@
attachmentsRoot = dbRoot.child("attachments")
return UpgradeToDatabaseService.wrapService(
CachingFilePath(config.DocumentRoot), mainService,
- connectionFactory, attachmentsRoot,
+ # FIXME: somehow, this should be a connection pool too, not
+ # unpooled connections; this only runs in the master
+ # process, so this would be a good point to bootstrap that
+ # whole process. However, it's somewhat tricky to do that
+ # right. The upgrade needs to run in the master, before
+ # any other things have run.
+ pgserv.produceLocalTransaction, attachmentsRoot,
uid=postgresUID, gid=postgresGID
)
if os.getuid() == 0: # Only override if root
@@ -730,16 +756,8 @@
else:
postgresUID = None
postgresGID = None
- pgserv = PostgresService(
- dbRoot, subServiceFactory, v1_schema,
- databaseName=config.Postgres.DatabaseName,
- logFile=config.Postgres.LogFile,
- socketDir=config.RunRoot,
- listenAddresses=config.Postgres.ListenAddresses,
- sharedBuffers=config.Postgres.SharedBuffers,
- maxConnections=config.Postgres.MaxConnections,
- options=config.Postgres.Options,
- uid=postgresUID, gid=postgresGID
+ pgserv = pgServiceFromConfig(
+ config, subServiceFactory, postgresUID, postgresGID
)
return pgserv
else:
@@ -753,11 +771,7 @@
"""
s = ErrorLoggingMultiService()
- # Schedule any on disk upgrades we might need. Note that this
- # will only do the filesystem-format upgrades; migration to the
- # database needs to be done when the connection and possibly
- # server is already up and running. -glyph
- addSystemEventTrigger("before", "startup", upgradeData, config)
+ self.scheduleOnDiskUpgrade()
# Make sure no old socket files are lying around.
self.deleteStaleSocketFiles()
@@ -799,8 +813,18 @@
monitor = DelayedStartupProcessMonitor()
s.processMonitor = monitor
- self.storageService(monitor, uid, gid).setServiceParent(s)
+ ssvc = self.storageService(monitor, uid, gid)
+ ssvc.setServiceParent(s)
+ if isinstance(ssvc, PostgresService):
+ # TODO: better way of doing this conditional. Look at the config
+ # again, possibly?
+ pool = ConnectionPool(ssvc.produceConnection)
+ pool.setServiceParent(s)
+ dispenser = ConnectionDispenser(pool)
+ else:
+ dispenser = None
+
parentEnv = {
"PATH": os.environ.get("PATH", ""),
"PYTHONPATH": os.environ.get("PYTHONPATH", ""),
@@ -889,6 +913,8 @@
else:
extraArgs = dict(inheritFDs=inheritFDs,
inheritSSLFDs=inheritSSLFDs)
+ if dispenser is not None:
+ extraArgs.update(ampSQLDispenser=dispenser)
process = TwistdSlaveProcess(
sys.argv[0],
self.tapname,
@@ -902,21 +928,21 @@
for name, pool in config.Memcached.Pools.items():
if pool.ServerEnabled:
self.log_info("Adding memcached service for pool: %s" % (name,))
-
+
memcachedArgv = [
config.Memcached.memcached,
"-p", str(pool.Port),
"-l", pool.BindAddress,
"-U", "0",
]
-
+
if config.Memcached.MaxMemory is not 0:
memcachedArgv.extend(["-m", str(config.Memcached.MaxMemory)])
if config.UserName:
memcachedArgv.extend(["-u", config.UserName])
-
+
memcachedArgv.extend(config.Memcached.Options)
-
+
monitor.addProcess('memcached-%s' % (name,), memcachedArgv, env=parentEnv)
if (
@@ -981,7 +1007,7 @@
monitor.addProcess("caldav_task", taskArgv, env=parentEnv)
- stats = CalDAVStatisticsServer(logger)
+ stats = CalDAVStatisticsServer(logger)
statsService = GroupOwnedUNIXServer(
gid, config.GlobalStatsSocket, stats, mode=0440
)
@@ -992,10 +1018,10 @@
def deleteStaleSocketFiles(self):
-
+
# Check all socket files we use.
for checkSocket in [config.ControlSocket, config.GlobalStatsSocket] :
-
+
# See if the file exists.
if (os.path.exists(checkSocket)):
# See if the file represents a socket. If not, delete it.
@@ -1021,6 +1047,28 @@
+class ConnectionDispenser(object):
+
+ def __init__(self, connectionPool):
+ self.pool = connectionPool
+
+
+ def dispense(self):
+ """
+ Dispense a file descriptor, already connected to a server, for a
+ client.
+ """
+ # FIXME: these sockets need to be re-dispensed when the process is
+ # respawned, and they currently won't be.
+ c, s = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+ protocol = ConnectionPoolConnection(self.pool)
+ transport = ConnectionWithPeer(s, protocol)
+ protocol.makeConnection(transport)
+ transport.startReading()
+ return c
+
+
+
class TwistdSlaveProcess(object):
"""
A L{TwistdSlaveProcess} is information about how to start a slave process
@@ -1053,11 +1101,16 @@
subprocess and used to accept incoming connections.
@type metaSocket: L{socket.socket}
+
+ @ivar ampSQLDispenser: a factory for AF_UNIX/SOCK_STREAM sockets that are
+ to be inherited by subprocesses and used for sending AMP SQL commands
+ back to its parent.
"""
prefix = "caldav"
def __init__(self, twistd, tapname, configFile, id, interfaces,
- inheritFDs=None, inheritSSLFDs=None, metaSocket=None):
+ inheritFDs=None, inheritSSLFDs=None, metaSocket=None,
+ ampSQLDispenser=None):
self.twistd = twistd
@@ -1075,7 +1128,10 @@
self.inheritSSLFDs = emptyIfNone(inheritSSLFDs)
self.metaSocket = metaSocket
self.interfaces = interfaces
+ self.ampSQLDispenser = ampSQLDispenser
+ self.ampDBSocket = None
+
def getName(self):
return '%s-%s' % (self.prefix, self.id)
@@ -1086,10 +1142,13 @@
process to file descriptor numbers in the current (master) process.
"""
fds = {}
- maybeMetaFD = []
+ extraFDs = []
if self.metaSocket is not None:
- maybeMetaFD.append(self.metaSocket.fileno())
- for fd in self.inheritSSLFDs + self.inheritFDs + maybeMetaFD:
+ extraFDs.append(self.metaSocket.fileno())
+ if self.ampSQLDispenser is not None:
+ self.ampDBSocket = self.ampSQLDispenser.dispense()
+ extraFDs.append(self.ampDBSocket.fileno())
+ for fd in self.inheritSSLFDs + self.inheritFDs + extraFDs:
fds[fd] = fd
return fds
@@ -1146,7 +1205,10 @@
args.extend([
"-o", "MetaFD=%s" % (self.metaSocket.fileno(),)
])
-
+ if self.ampDBSocket is not None:
+ args.extend([
+ "-o", "DBAMPFD=%s" % (self.ampDBSocket.fileno(),)
+ ])
return args
@@ -1162,7 +1224,7 @@
-class DelayedStartupProcessMonitor(procmon.ProcessMonitor):
+class DelayedStartupProcessMonitor(Service, object):
"""
A L{DelayedStartupProcessMonitor} is a L{procmon.ProcessMonitor} that
defers building its command lines until the service is actually ready to
@@ -1174,15 +1236,6 @@
L{Deferred} which fires only when all processes have shut down, to allow
for a clean service shutdown.
- @ivar processObjects: a C{list} of L{TwistdSlaveProcess} to add using
- C{self.addProcess} when this service starts up.
-
- @ivar _extraFDs: a mapping from process names to extra file-descriptor
- maps. (By default, all processes will have the standard stdio mapping,
- so all file descriptors here should be >2.) This is updated during
- L{DelayedStartupProcessMonitor.startService}, by inspecting the result
- of L{TwistdSlaveProcess.getFileDescriptors}.
-
@ivar reactor: an L{IReactorProcess} for spawning processes, defaulting to
the global reactor.
@@ -1193,22 +1246,20 @@
Deferreds that track service shutdown.
"""
- _shouldPassReactor = (
- len(getargspec(procmon.ProcessMonitor.__init__)[0]) > 1
- )
+ threshold = 1
+ killTime = 5
+ minRestartDelay = 1
+ maxRestartDelay = 3600
- def __init__(self, *args, **kwargs):
- reactorToUse = kwargs.get("reactor", reactor)
- if not self._shouldPassReactor:
- # Try to do this the right way if we can, otherwise, let the tests
- # monkeypatch. (Our superclass does not accept a 'reactor'
- # argument in Twisted 10.0.0, but does in Twisted 10.1.0 and
- # later.)
- kwargs.pop('reactor', None)
- procmon.ProcessMonitor.__init__(self, *args, **kwargs)
- self.processObjects = []
- self._extraFDs = {}
- self.reactor = reactorToUse
+ def __init__(self, reactor=_reactor):
+ super(DelayedStartupProcessMonitor, self).__init__()
+ self._reactor = reactor
+ self.processes = {}
+ self.protocols = {}
+ self.delay = {}
+ self.timeStarted = {}
+ self.murder = {}
+ self.restart = {}
self.stopping = False
if config.MultiProcess.StaggeredStartup.Enabled:
self.delayInterval = config.MultiProcess.StaggeredStartup.Interval
@@ -1216,8 +1267,43 @@
self.delayInterval = 0
- def addProcessObject(self, process, env):
+ def addProcess(self, name, args, uid=None, gid=None, env={}):
"""
+ Add a new monitored process and start it immediately if the
+ L{DelayedStartupProcessMonitor} service is running.
+
+ Note that args are passed to the system call, not to the shell. If
+ running the shell is desired, the common idiom is to use
+ C{ProcessMonitor.addProcess("name", ['/bin/sh', '-c', shell_script])}
+
+ @param name: A name for this process. This value must be
+ unique across all processes added to this monitor.
+ @type name: C{str}
+ @param args: The argv sequence for the process to launch.
+ @param uid: The user ID to use to run the process. If C{None},
+ the current UID is used.
+ @type uid: C{int}
+ @param gid: The group ID to use to run the process. If C{None},
+ the current GID is used.
+ @type uid: C{int}
+ @param env: The environment to give to the launched process. See
+ L{IReactorProcess.spawnProcess}'s C{env} parameter.
+ @type env: C{dict}
+ @raises: C{KeyError} if a process with the given name already
+ exists
+ """
+ class SimpleProcessObject(object):
+ def getName(self):
+ return name
+ def getCommandLine(self):
+ return args
+ def getFileDescriptors(self):
+ return []
+ self.addProcessObject(SimpleProcessObject(), env, uid, gid)
+
+
+ def addProcessObject(self, process, env, uid=None, gid=None):
+ """
Add a process object to be run when this service is started.
@param env: a dictionary of environment variables.
@@ -1225,19 +1311,19 @@
@param process: a L{TwistdSlaveProcesses} object to be started upon
service startup.
"""
- self.processObjects.append((process, env))
+ name = process.getName()
+ self.processes[name] = (process, env, uid, gid)
+ self.delay[name] = self.minRestartDelay
+ if self.running:
+ self.startProcess(name)
def startService(self):
# Now we're ready to build the command lines and actualy add the
# processes to procmon.
- for processObject, env in self.processObjects:
- name = processObject.getName()
- cmdline = processObject.getCommandLine()
- filedes = processObject.getFileDescriptors()
- self._extraFDs[name] = filedes
- self.addProcess(name, cmdline, env=env)
- procmon.ProcessMonitor.startService(self)
+ super(DelayedStartupProcessMonitor, self).startService()
+ for name in self.processes:
+ self.startProcess(name)
def stopService(self):
@@ -1246,21 +1332,96 @@
"""
self.stopping = True
self.deferreds = {}
- procmon.ProcessMonitor.stopService(self)
+ super(DelayedStartupProcessMonitor, self).stopService()
+
+ # Cancel any outstanding restarts
+ for name, delayedCall in self.restart.items():
+ if delayedCall.active():
+ delayedCall.cancel()
+
+ for name in self.processes:
+ self.stopProcess(name)
return gatherResults(self.deferreds.values())
+ def removeProcess(self, name):
+ """
+ Stop the named process and remove it from the list of monitored
+ processes.
+
+ @type name: C{str}
+ @param name: A string that uniquely identifies the process.
+ """
+ self.stopProcess(name)
+ del self.processes[name]
+
+
+ def stopProcess(self, name):
+ """
+ @param name: The name of the process to be stopped
+ """
+ if name not in self.processes:
+ raise KeyError('Unrecognized process name: %s' % (name,))
+
+ proto = self.protocols.get(name, None)
+ if proto is not None:
+ proc = proto.transport
+ try:
+ proc.signalProcess('TERM')
+ except ProcessExitedAlready:
+ pass
+ else:
+ self.murder[name] = self._reactor.callLater(
+ self.killTime,
+ self._forceStopProcess, proc)
+
+
def processEnded(self, name):
"""
When a child process has ended it calls me so I can fire the
appropriate deferred which was created in stopService
"""
+ # Cancel the scheduled _forceStopProcess function if the process
+ # dies naturally
+ if name in self.murder:
+ if self.murder[name].active():
+ self.murder[name].cancel()
+ del self.murder[name]
+
+ del self.protocols[name]
+
+ if self._reactor.seconds() - self.timeStarted[name] < self.threshold:
+ # The process died too fast - backoff
+ nextDelay = self.delay[name]
+ self.delay[name] = min(self.delay[name] * 2, self.maxRestartDelay)
+
+ else:
+ # Process had been running for a significant amount of time
+ # restart immediately
+ nextDelay = 0
+ self.delay[name] = self.minRestartDelay
+
+ # Schedule a process restart if the service is running
+ if self.running and name in self.processes:
+ self.restart[name] = self._reactor.callLater(nextDelay,
+ self.startProcess,
+ name)
if self.stopping:
deferred = self.deferreds.get(name, None)
if deferred is not None:
deferred.callback(None)
+ def _forceStopProcess(self, proc):
+ """
+ @param proc: An L{IProcessTransport} provider
+ """
+ try:
+ proc.signalProcess('KILL')
+ except ProcessExitedAlready:
+ pass
+
+
def signalAll(self, signal, startswithname=None):
"""
Send a signal to all child processes.
@@ -1306,14 +1467,16 @@
p = self.protocols[name] = DelayedStartupLoggingProtocol()
p.service = self
p.name = name
- args, uid, gid, env = self.processes[name]
+ procObj, env, uid, gid= self.processes[name]
self.timeStarted[name] = time()
childFDs = { 0 : "w", 1 : "r", 2 : "r" }
- childFDs.update(self._extraFDs.get(name, {}))
+ childFDs.update(procObj.getFileDescriptors())
- self.reactor.spawnProcess(
+ args = procObj.getCommandLine()
+
+ self._reactor.spawnProcess(
p, args[0], args, uid=uid, gid=gid, env=env,
childFDs=childFDs
)
@@ -1333,10 +1496,38 @@
def delayedStart():
self._pendingStarts -= 1
self.reallyStartProcess(name)
- self.reactor.callLater(interval, delayedStart)
+ self._reactor.callLater(interval, delayedStart)
+ def restartAll(self):
+ """
+ Restart all processes. This is useful for third party management
+ services to allow a user to restart servers because of an outside change
+ in circumstances -- for example, a new version of a library is
+ installed.
+ """
+ for name in self.processes:
+ self.stopProcess(name)
+
+ def __repr__(self):
+ l = []
+ for name, (procObj, uid, gid, env) in self.processes.items():
+ uidgid = ''
+ if uid is not None:
+ uidgid = str(uid)
+ if gid is not None:
+ uidgid += ':'+str(gid)
+
+ if uidgid:
+ uidgid = '(' + uidgid + ')'
+ l.append('%r%s: %r' % (name, uidgid, procObj))
+ return ('<' + self.__class__.__name__ + ' '
+ + ' '.join(l)
+ + '>')
+
+
+
class DelayedStartupLineLogger(object):
"""
A line logger that can handle very long lines.
@@ -1389,28 +1580,42 @@
-class DelayedStartupLoggingProtocol(procmon.LoggingProtocol, object):
+class DelayedStartupLoggingProtocol(ProcessProtocol):
"""
Logging protocol that handles lines which are too long.
"""
+ service = None
+ name = None
+ empty = 1
+
def connectionMade(self):
"""
Replace the superclass's output monitoring logic with one that can
handle lineLengthExceeded.
"""
- super(DelayedStartupLoggingProtocol, self).connectionMade()
self.output = DelayedStartupLineLogger()
+ self.output.makeConnection(self.transport)
self.output.tag = self.name
+
+ def outReceived(self, data):
+ self.output.dataReceived(data)
+ self.empty = data[-1] == '\n'
+
+ errReceived = outReceived
+
+
def processEnded(self, reason):
"""
Let the service know that this child process has ended
"""
- procmon.LoggingProtocol.processEnded(self, reason)
+ if not self.empty:
+ self.output.dataReceived('\n')
self.service.processEnded(self.name)
+
def getSSLPassphrase(*ignored):
if not config.SSLPrivateKey:
Modified: CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/test/test_caldav.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tap/test/test_caldav.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -359,7 +359,8 @@
"""
service = self.makeService()
- return service.services[0].args[1].protocolArgs["requestFactory"]
+ # FIXME: should at least use service name, not index
+ return service.services[2].args[1].protocolArgs["requestFactory"]
@@ -489,20 +490,23 @@
"""
service = self.makeService()
- expectedSubServices = (
+ expectedSubServices = dict((
(MaxAcceptTCPServer, self.config["HTTPPort"]),
(MaxAcceptSSLServer, self.config["SSLPort"]),
- )
+ ))
- configuredSubServices = [(s.__class__, s.args) for s in service.services]
-
+ configuredSubServices = [(s.__class__, getattr(s, 'args', None))
+ for s in service.services]
+ checked = 0
for serviceClass, serviceArgs in configuredSubServices:
- self.failUnless(serviceClass in (s[0] for s in expectedSubServices))
+ if serviceClass in expectedSubServices:
+ checked += 1
+ self.assertEquals(
+ serviceArgs[0],
+ dict(expectedSubServices)[serviceClass]
+ )
+ self.assertEquals(checked, len(expectedSubServices))
- self.assertEquals(
- serviceArgs[0],
- dict(expectedSubServices)[serviceClass]
- )
def test_SSLKeyConfiguration(self):
"""
@@ -570,7 +574,8 @@
service = self.makeService()
for s in service.services:
- self.assertEquals(s.kwargs["interface"], "127.0.0.1")
+ if isinstance(s, (internet.TCPServer, internet.SSLServer)):
+ self.assertEquals(s.kwargs["interface"], "127.0.0.1")
def test_multipleBindAddresses(self):
"""
@@ -619,7 +624,8 @@
service = self.makeService()
for s in service.services:
- self.assertEquals(s.kwargs["backlog"], 1024)
+ if isinstance(s, (internet.TCPServer, internet.SSLServer)):
+ self.assertEquals(s.kwargs["backlog"], 1024)
class ServiceHTTPFactoryTests(BaseServiceMakerTests):
@@ -888,22 +894,22 @@
class DummyProcessObject(object):
"""
- Simple stub for the Process Object API that will run a test script.
+ Simple stub for Process Object API which just has an executable and some
+ arguments.
This is a stand in for L{TwistdSlaveProcess}.
"""
def __init__(self, scriptname, *args):
self.scriptname = scriptname
- self.args = list(args)
+ self.args = args
def getCommandLine(self):
"""
- Get the command line to invoke this script.
+ Simple command line.
"""
- return [sys.executable,
- FilePath(__file__).sibling(self.scriptname).path] + self.args
+ return [self.scriptname] + list(self.args)
def getFileDescriptors(self):
@@ -920,23 +926,29 @@
return 'Dummy'
-
-class DelayedStartupProcessMonitorTests(TestCase):
+class ScriptProcessObject(DummyProcessObject):
"""
- Test cases for L{DelayedStartupProcessMonitor}.
+ Simple stub for the Process Object API that will run a test script.
"""
- def useFakeReactor(self, fakeReactor):
+ def getCommandLine(self):
"""
- Earlier versions of Twisted used a global reactor in
- L{twisted.runner.procmon}, so we need to take that into account when
- testing.
+ Get the command line to invoke this script.
"""
- if not DelayedStartupProcessMonitor._shouldPassReactor:
- import twisted.runner.procmon as inherited
- self.patch(inherited, "reactor", fakeReactor)
+ return [
+ sys.executable,
+ FilePath(__file__).sibling(self.scriptname).path
+ ] + list(self.args)
+
+
+
+class DelayedStartupProcessMonitorTests(TestCase):
+ """
+ Test cases for L{DelayedStartupProcessMonitor}.
+ """
+
def test_lineAfterLongLine(self):
"""
A "long" line of output from a monitored process (longer than
@@ -944,7 +956,7 @@
at once, to avoid resource exhaustion.
"""
dspm = DelayedStartupProcessMonitor()
- dspm.addProcessObject(DummyProcessObject(
+ dspm.addProcessObject(ScriptProcessObject(
'longlines.py', str(DelayedStartupLineLogger.MAX_LENGTH)),
os.environ)
dspm.startService()
@@ -966,7 +978,7 @@
logged.append(event)
if m == '[Dummy] z':
d.callback("done")
-
+
log.addObserver(tempObserver)
self.addCleanup(log.removeObserver, tempObserver)
d = Deferred()
@@ -990,9 +1002,8 @@
If a L{TwistdSlaveProcess} specifies some file descriptors to be
inherited, they should be inherited by the subprocess.
"""
- dspm = DelayedStartupProcessMonitor()
- imps = dspm.reactor = InMemoryProcessSpawner()
- self.useFakeReactor(imps)
+ imps = InMemoryProcessSpawner()
+ dspm = DelayedStartupProcessMonitor(imps)
# Most arguments here will be ignored, so these are bogus values.
slave = TwistdSlaveProcess(
@@ -1017,6 +1028,27 @@
19: 19, 25: 25})
+ def test_changedArgumentEachSpawn(self):
+ """
+ If the result of C{getCommandLine} changes on subsequent calls,
+ subsequent calls should result in different arguments being passed to
+ C{spawnProcess} each time.
+ """
+ imps = InMemoryProcessSpawner()
+ dspm = DelayedStartupProcessMonitor(imps)
+ slave = DummyProcessObject('scriptname', 'first')
+ dspm.addProcessObject(slave, {})
+ dspm.startService()
+ oneProcessTransport = imps.waitForOneProcess()
+ self.assertEquals(oneProcessTransport.args,
+ ['scriptname', 'first'])
+ slave.args = ['second']
+ oneProcessTransport.processProtocol.processEnded(None)
+ twoProcessTransport = imps.waitForOneProcess()
+ self.assertEquals(twoProcessTransport.args,
+ ['scriptname', 'second'])
+
+
def test_metaDescriptorInheritance(self):
"""
If a L{TwistdSlaveProcess} specifies a meta-file-descriptor to be
@@ -1024,9 +1056,8 @@
configuration argument should be passed that indicates to the
subprocess.
"""
- dspm = DelayedStartupProcessMonitor()
- imps = dspm.reactor = InMemoryProcessSpawner()
- self.useFakeReactor(imps)
+ imps = InMemoryProcessSpawner()
+ dspm = DelayedStartupProcessMonitor(imps)
# Most arguments here will be ignored, so these are bogus values.
slave = TwistdSlaveProcess(
twistd = "bleh",
@@ -1057,10 +1088,9 @@
objects that have been added to it being started once per
delayInterval.
"""
- dspm = DelayedStartupProcessMonitor()
+ imps = InMemoryProcessSpawner()
+ dspm = DelayedStartupProcessMonitor(imps)
dspm.delayInterval = 3.0
- imps = dspm.reactor = InMemoryProcessSpawner()
- self.useFakeReactor(imps)
sampleCounter = range(0, 5)
for counter in sampleCounter:
slave = TwistdSlaveProcess(
Modified: CalendarServer/trunk/calendarserver/tap/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/util.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tap/util.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -14,6 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
+
+"""
+Utilities for assembling the service and resource hierarchy.
+"""
+
__all__ = [
"getRootResource",
"FakeRequest",
@@ -22,6 +27,7 @@
import errno
import os
from time import sleep
+from socket import fromfd, AF_UNIX, SOCK_STREAM
from twext.python.filepath import CachingFilePath as FilePath
from twext.python.log import Logger
@@ -34,6 +40,7 @@
from twisted.cred.portal import Portal
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.reactor import addSystemEventTrigger
+from twisted.internet.tcp import Connection
from twisted.python.reflect import namedClass
from twistedcaldav import memcachepool
@@ -61,6 +68,8 @@
NegotiateCredentialFactory # pacify pyflakes
except ImportError:
NegotiateCredentialFactory = None
+from txdav.base.datastore.asyncsqlpool import ConnectionPool
+from txdav.base.datastore.asyncsqlpool import ConnectionPoolClient
from calendarserver.accesslog import DirectoryLogWrapperResource
from calendarserver.provision.root import RootResource
@@ -77,77 +86,95 @@
log = Logger()
+def pgServiceFromConfig(config, subServiceFactory, uid=None, gid=None):
+ """
+ Construct a L{PostgresService} from a given configuration and subservice.
-def storeFromConfig(config, notifierFactory=None):
+ @param config: the configuration to derive postgres configuration
+ parameters from.
+
+ @param subServiceFactory: A factory for the service to start once the
+ L{PostgresService} has been initialized.
+
+ @param uid: The user-ID to run the PostgreSQL server as.
+
+ @param gid: The group-ID to run the PostgreSQL server as.
+
+ @return: a service which can start postgres.
+
+ @rtype: L{PostgresService}
"""
+ dbRoot = CachingFilePath(config.DatabaseRoot)
+ # Construct a PostgresService exactly as the parent would, so that we
+ # can establish connection information.
+ return PostgresService(
+ dbRoot, subServiceFactory, v1_schema,
+ databaseName=config.Postgres.DatabaseName,
+ logFile=config.Postgres.LogFile,
+ socketDir=config.RunRoot,
+ listenAddresses=config.Postgres.ListenAddresses,
+ sharedBuffers=config.Postgres.SharedBuffers,
+ maxConnections=config.Postgres.MaxConnections,
+ options=config.Postgres.Options,
+ uid=uid, gid=gid
+ )
+
+
+class ConnectionWithPeer(Connection):
+
+ connected = True
+
+ def getPeer(self):
+ return "<peer: %r %r>" % (self.socket.fileno(), id(self))
+
+ def getHost(self):
+ return "<host: %r %r>" % (self.socket.fileno(), id(self))
+
+def storeFromConfig(config, serviceParent, notifierFactory=None):
+ """
Produce an L{IDataStore} from the given configuration and notifier factory.
"""
if config.UseDatabase:
- dbRoot = CachingFilePath(config.DatabaseRoot)
- postgresService = PostgresService(
- dbRoot, None, v1_schema,
- databaseName=config.Postgres.DatabaseName,
- logFile=config.Postgres.LogFile,
- socketDir=config.RunRoot,
- listenAddresses=config.Postgres.ListenAddresses,
- sharedBuffers=config.Postgres.SharedBuffers,
- maxConnections=config.Postgres.MaxConnections,
- options=config.Postgres.Options,
+ postgresService = pgServiceFromConfig(config, None)
+ if config.DBAMPFD == 0:
+ cp = ConnectionPool(postgresService.produceConnection)
+ cp.setServiceParent(serviceParent)
+ txnFactory = cp.connection
+ else:
+ # TODO: something to do with loseConnection here, maybe? I don't
+ # think it actually needs to be shut down, though.
+ skt = fromfd(int(config.DBAMPFD), AF_UNIX, SOCK_STREAM)
+ os.close(config.DBAMPFD)
+ protocol = ConnectionPoolClient()
+ transport = ConnectionWithPeer(skt, protocol)
+ protocol.makeConnection(transport)
+ transport.startReading()
+ txnFactory = protocol.newTransaction
+ dataStore = CommonSQLDataStore(
+ txnFactory, notifierFactory,
+ postgresService.dataStoreDirectory.child("attachments"),
+ config.EnableCalDAV, config.EnableCardDAV
)
- return CommonSQLDataStore(postgresService.produceConnection,
- notifierFactory, dbRoot.child("attachments"),
- config.EnableCalDAV, config.EnableCardDAV)
+ dataStore.setServiceParent(serviceParent)
+ return dataStore
else:
return CommonFileDataStore(FilePath(config.DocumentRoot),
notifierFactory, config.EnableCalDAV, config.EnableCardDAV)
-def getRootResource(config, resources=None):
+def directoryFromConfig(config):
"""
- Set up directory service and resource hierarchy based on config.
- Return root resource.
-
- Additional resources can be added to the hierarchy by passing a list of
- tuples containing: path, resource class, __init__ args list, and optional
- authentication scheme ("basic" or "digest").
+ Create an L{AggregateDirectoryService} from the given configuration.
"""
-
- # FIXME: this is only here to workaround circular imports
- doBind()
#
- # Default resource classes
- #
- rootResourceClass = RootResource
- principalResourceClass = DirectoryPrincipalProvisioningResource
- calendarResourceClass = DirectoryCalendarHomeProvisioningResource
- iScheduleResourceClass = IScheduleInboxResource
- timezoneServiceResourceClass = TimezoneServiceResource
- webCalendarResourceClass = WebCalendarResource
- webAdminResourceClass = WebAdminResource
- addressBookResourceClass = DirectoryAddressBookHomeProvisioningResource
- directoryBackedAddressBookResourceClass = DirectoryBackedAddressBookResource
-
- #
- # Setup the Augment Service
- #
- augmentClass = namedClass(config.AugmentService.type)
-
- log.info("Configuring augment service of type: %s" % (augmentClass,))
-
- try:
- augment.AugmentService = augmentClass(**config.AugmentService.params)
- except IOError:
- log.error("Could not start augment service")
- raise
-
- #
# Setup the Directory
#
directories = []
directoryClass = namedClass(config.DirectoryService.type)
+ principalResourceClass = DirectoryPrincipalProvisioningResource
log.info("Configuring directory service of type: %s"
% (config.DirectoryService.type,))
@@ -223,8 +250,54 @@
directory.setRealm(realmName)
except ImportError:
pass
+ log.info("Setting up principal collection: %r"
+ % (principalResourceClass,))
+ principalResourceClass("/principals/", directory)
+ return directory
+
+def getRootResource(config, serviceParent, resources=None):
+ """
+ Set up directory service and resource hierarchy based on config.
+ Return root resource.
+
+ Additional resources can be added to the hierarchy by passing a list of
+ tuples containing: path, resource class, __init__ args list, and optional
+ authentication scheme ("basic" or "digest").
+ """
+
+ # FIXME: this is only here to workaround circular imports
+ doBind()
+
#
+ # Default resource classes
+ #
+ rootResourceClass = RootResource
+ calendarResourceClass = DirectoryCalendarHomeProvisioningResource
+ iScheduleResourceClass = IScheduleInboxResource
+ timezoneServiceResourceClass = TimezoneServiceResource
+ webCalendarResourceClass = WebCalendarResource
+ webAdminResourceClass = WebAdminResource
+ addressBookResourceClass = DirectoryAddressBookHomeProvisioningResource
+ directoryBackedAddressBookResourceClass = DirectoryBackedAddressBookResource
+
+ #
+ # Setup the Augment Service
+ #
+ augmentClass = namedClass(config.AugmentService.type)
+
+ log.info("Configuring augment service of type: %s" % (augmentClass,))
+
+ try:
+ augment.AugmentService = augmentClass(**config.AugmentService.params)
+ except IOError:
+ log.error("Could not start augment service")
+ raise
+
+
+ directory = directoryFromConfig(config)
+
+ #
# Setup the ProxyDB Service
#
proxydbClass = namedClass(config.ProxyDBService.type)
@@ -311,10 +384,8 @@
#
log.info("Setting up document root at: %s"
% (config.DocumentRoot,))
- log.info("Setting up principal collection: %r"
- % (principalResourceClass,))
- principalCollection = principalResourceClass("/principals/", directory)
+ principalCollection = directory.principalCollection
#
# Configure NotifierFactory
@@ -327,7 +398,7 @@
else:
notifierFactory = None
- newStore = storeFromConfig(config, notifierFactory)
+ newStore = storeFromConfig(config, serviceParent, notifierFactory)
if config.EnableCalDAV:
log.info("Setting up calendar collection: %r" % (calendarResourceClass,))
Modified: CalendarServer/trunk/calendarserver/tools/purge.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/purge.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tools/purge.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -97,6 +97,7 @@
os.umask(config.umask)
try:
+ # TODO: getRootResource needs a parent service now.
rootResource = getRootResource(config)
directory = rootResource.getDirectory()
except DirectoryError, e:
Modified: CalendarServer/trunk/calendarserver/tools/test/test_principals.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/test/test_principals.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tools/test/test_principals.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -24,7 +24,7 @@
from twistedcaldav.directory.directory import DirectoryError
from twistedcaldav.test.util import TestCase, CapturingProcessProtocol
-from calendarserver.tap.util import getRootResource
+from calendarserver.tap.util import directoryFromConfig
from calendarserver.tools.principals import parseCreationArgs, matchStrings, updateRecord, principalForPrincipalID, getProxies, setProxies
@@ -226,7 +226,7 @@
@inlineCallbacks
def test_updateRecord(self):
- directory = getRootResource(config).getDirectory()
+ directory = directoryFromConfig(config)
guid = "eee28807-a8c5-46c8-a558-a08281c558a7"
(yield updateRecord(True, directory, "locations",
@@ -264,7 +264,7 @@
"""
Read and Write proxies can be set en masse
"""
- directory = getRootResource(config).getDirectory()
+ directory = directoryFromConfig(config)
principal = principalForPrincipalID("users:user01", directory=directory)
readProxies, writeProxies = (yield getProxies(principal, directory=directory))
Modified: CalendarServer/trunk/calendarserver/tools/test/test_purge.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/test/test_purge.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/calendarserver/tools/test/test_purge.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -47,7 +47,11 @@
def setUp(self):
super(PurgeOldEventsTestCase, self).setUp()
- config.DirectoryService.params['xmlFile'] = os.path.join(os.path.dirname(__file__), "purge", "accounts.xml")
+ config.DirectoryService.params['xmlFile'] = os.path.join(
+ os.path.dirname(__file__), "purge", "accounts.xml"
+ )
+ # TODO: when rewriting for new-store, getRootResource needs a parent
+ # service argument now, to know when to start/stop the connection pool.
self.rootResource = getRootResource(config)
self.directory = self.rootResource.getDirectory()
@@ -381,6 +385,8 @@
copyAugmentFile = FilePath(config.DataRoot).child("augments.xml")
origAugmentFile.copyTo(copyAugmentFile)
+ # TODO: when rewriting for new-store, getRootResource needs a parent
+ # service argument now, to know when to start/stop the connection pool.
self.rootResource = getRootResource(config)
self.directory = self.rootResource.getDirectory()
Modified: CalendarServer/trunk/twistedcaldav/directory/util.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/directory/util.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/directory/util.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -66,6 +66,10 @@
transaction = newStore.newTransaction(repr(request))
def abortIfUncommitted(request, response):
try:
+ # TODO: missing 'yield' here. For formal correctness as per
+ # the interface, this should be allowed to be a Deferred. (The
+ # actual implementation still raises synchronously, so there's
+ # no bug currently.)
transaction.abort()
except AlreadyFinishedError:
pass
Modified: CalendarServer/trunk/twistedcaldav/localization.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/localization.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/localization.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -26,7 +26,9 @@
from twext.python.log import Logger
try:
- from Foundation import *
+ from Foundation import (
+ NSPropertyListImmutable, NSPropertyListSerialization, NSData
+ )
foundationImported = True
except ImportError:
foundationImported = False
Modified: CalendarServer/trunk/twistedcaldav/mail.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/mail.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/mail.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -27,6 +27,7 @@
try:
from cStringIO import StringIO
+ StringIO
except ImportError:
from StringIO import StringIO
@@ -37,7 +38,7 @@
from zope.interface import implements
from twisted.application import internet, service
-from twisted.internet import protocol, defer, ssl, reactor
+from twisted.internet import protocol, defer, ssl, reactor as _reactor
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from twisted.mail import pop3client, imap4
from twisted.mail.smtp import messageid, rfc822date, ESMTPSenderFactory
@@ -60,7 +61,7 @@
from twistedcaldav.config import config
from twistedcaldav.directory.util import transactionFromRequest
from twistedcaldav.ical import Property
-from twistedcaldav.localization import translationTo
+from twistedcaldav.localization import translationTo, _
from twistedcaldav.resource import CalDAVResource
from twistedcaldav.schedule import deliverSchedulePrivilegeSet
from twistedcaldav.scheduling.cuaddress import normalizeCUAddr
@@ -350,7 +351,7 @@
def injectMessage(organizer, attendee, calendar, msgId, reactor=None):
if reactor is None:
- from twisted.internet import reactor
+ reactor = _reactor
headers = {
'Content-Type' : 'text/calendar',
@@ -586,16 +587,18 @@
return multiService
-#
-# ISchedule Inbox
-#
-class IScheduleService(service.Service, LoggingMixIn):
+class IScheduleService(service.MultiService, LoggingMixIn):
+ """
+ ISchedule Inbox
+ """
def __init__(self, settings, mailer):
self.settings = settings
self.mailer = mailer
- rootResource = getRootResource(config,
+ rootResource = getRootResource(
+ config,
+ self,
(
("inbox", IMIPInvitationInboxResource, (mailer,), "digest"),
)
@@ -604,16 +607,10 @@
self.factory = HTTPFactory(server.Site(rootResource))
self.server = internet.TCPServer(settings['MailGatewayPort'],
self.factory)
+ self.server.setServiceParent(self)
- def startService(self):
- self.server.startService()
- def stopService(self):
- self.server.stopService()
-
-
-
class MailHandler(LoggingMixIn):
def __init__(self, dataRoot=None):
@@ -767,7 +764,7 @@
requireTransportSecurity=settings["UseSSL"],
)
- reactor.connectTCP(settings["Server"], settings["Port"], factory)
+ _reactor.connectTCP(settings["Server"], settings["Port"], factory)
return deferred
@@ -954,7 +951,7 @@
requireAuthentication=False,
requireTransportSecurity=settings["UseSSL"])
- reactor.connectTCP(settings['Server'], settings['Port'], factory)
+ _reactor.connectTCP(settings['Server'], settings['Port'], factory)
deferred.addCallback(_success, msgId, fromAddr, toAddr)
deferred.addErrback(_failure, msgId, fromAddr, toAddr)
return deferred
Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -139,17 +139,27 @@
# This configures the actual network address that the server binds to.
#
"BindAddresses": [], # List of IP addresses to bind to [empty = all]
- "BindHTTPPorts": [], # List of port numbers to bind to for HTTP [empty = same as "Port"]
- "BindSSLPorts" : [], # List of port numbers to bind to for SSL [empty = same as "SSLPort"]
- "InheritFDs" : [], # File descriptors to inherit for HTTP requests (empty = don't inherit)
- "InheritSSLFDs": [], # File descriptors to inherit for HTTPS requests (empty = don't inherit)
- "MetaFD": 0, # Inherited file descriptor to call recvmsg() on to receive sockets (none = don't inherit)
+ "BindHTTPPorts": [], # List of port numbers to bind to for HTTP
+ # [empty = same as "Port"]
+ "BindSSLPorts" : [], # List of port numbers to bind to for SSL
+ # [empty = same as "SSLPort"]
+ "InheritFDs" : [], # File descriptors to inherit for HTTP requests
+ # (empty = don't inherit)
+ "InheritSSLFDs": [], # File descriptors to inherit for HTTPS requests
+ # (empty = don't inherit)
+ "MetaFD" : 0, # Inherited file descriptor to call recvmsg() on to
+ # receive sockets (none = don't inherit)
- "UseMetaFD": True, # Use a 'meta' FD, i.e. an FD to transmit other
- # FDs to slave processes.
+ "UseMetaFD" : True, # Use a 'meta' FD, i.e. an FD to transmit other FDs
+ # to slave processes.
- "UseDatabase" : True, # True: postgres; False: files
+ "UseDatabase" : True, # True: postgres; False: files
+ "DBAMPFD" : 0, # Internally used by database to tell slave
+ # processes to inherit a file descriptor and use it
+ # as an AMP connection over a UNIX socket; see
+ # txdav.base.datastore.asyncsqlpool.
+
#
# Types of service provided
#
Modified: CalendarServer/trunk/twistedcaldav/storebridge.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/storebridge.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/storebridge.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -664,9 +664,10 @@
content_type = MimeType("application", "octet-stream")
t = self._newStoreAttachment.store(content_type)
+ @inlineCallbacks
def done(ignored):
- t.loseConnection()
- return NO_CONTENT
+ yield t.loseConnection()
+ returnValue(NO_CONTENT)
return readStream(request.stream, t.write).addCallback(done)
Modified: CalendarServer/trunk/twistedcaldav/test/test_mail.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_mail.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/test/test_mail.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -14,7 +14,6 @@
# limitations under the License.
##
-from twistedcaldav.mail import *
from twistedcaldav.test.util import TestCase
from twistedcaldav.ical import Component
from twistedcaldav.config import config
@@ -23,6 +22,8 @@
from twisted.internet.defer import inlineCallbacks
import email
+from twistedcaldav.mail import MailHandler
+from twistedcaldav.mail import MailGatewayTokensDatabase
import os
Modified: CalendarServer/trunk/twistedcaldav/test/test_wrapping.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_wrapping.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/test/test_wrapping.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -113,7 +113,9 @@
txn = self.calendarCollection._newStore.newTransaction()
home = yield txn.calendarHomeWithUID(uid, True)
cal = yield home.calendarWithName("calendar")
- cal.createCalendarObjectWithName(objectName, VComponent.fromString(objectText))
+ yield cal.createCalendarObjectWithName(
+ objectName, VComponent.fromString(objectText)
+ )
yield txn.commit()
@@ -141,7 +143,9 @@
if adbk is None:
yield home.createAddressBookWithName("addressbook")
adbk = yield home.addressbookWithName("addressbook")
- adbk.createAddressBookObjectWithName(objectName, VCComponent.fromString(objectText))
+ yield adbk.createAddressBookObjectWithName(
+ objectName, VCComponent.fromString(objectText)
+ )
yield txn.commit()
@@ -287,14 +291,14 @@
def test_lookupNewCalendar(self):
"""
When a L{CalDAVResource} which represents a not-yet-created calendar
- collection is looked up in a L{CalendarHomeResource} representing a calendar
- home, it will initially have a new storage backend set to C{None}, but
- when the calendar is created via a protocol action, the backend will be
- initialized to match.
+ collection is looked up in a L{CalendarHomeResource} representing a
+ calendar home, it will initially have a new storage backend set to
+ C{None}, but when the calendar is created via a protocol action, the
+ backend will be initialized to match.
"""
calDavFile = yield self.getResource("calendars/users/wsanchez/frobozz")
self.assertIsInstance(calDavFile, ProtoCalendarCollectionResource)
- calDavFile.createCalendarCollection()
+ yield calDavFile.createCalendarCollection()
yield self.commit()
@@ -417,7 +421,7 @@
"""
calDavFile = yield self.getResource("addressbooks/users/wsanchez/frobozz")
self.assertIsInstance(calDavFile, ProtoAddressBookCollectionResource)
- calDavFile.createAddressBookCollection()
+ yield calDavFile.createAddressBookCollection()
yield self.commit()
self.assertEquals(calDavFile._principalCollections,
frozenset([self.principalsResource]))
Modified: CalendarServer/trunk/twistedcaldav/timezoneservice.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/timezoneservice.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/twistedcaldav/timezoneservice.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -91,6 +91,9 @@
),
)
+ def contentType(self):
+ return MimeType.fromString("text/xml")
+
def resourceType(self):
return davxml.ResourceType.timezones
Copied: CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py (from rev 6550, CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py)
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py (rev 0)
+++ CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -0,0 +1,557 @@
+# -*- test-case-name: txdav.caldav.datastore -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Asynchronous multi-process connection pool.
+"""
+
+import sys
+
+from cStringIO import StringIO
+from cPickle import dumps, loads
+from itertools import count
+
+from zope.interface import implements
+
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import returnValue
+from txdav.idav import IAsyncTransaction
+from twisted.internet.defer import Deferred
+from twisted.protocols.amp import Boolean
+from twisted.python.failure import Failure
+from twisted.protocols.amp import Argument, String, Command, AMP, Integer
+from twisted.internet import reactor as _reactor
+from twisted.application.service import Service
+from txdav.base.datastore.threadutils import ThreadHolder
+from txdav.idav import AlreadyFinishedError
+from twisted.python import log
+from twisted.python.components import proxyForInterface
+
+
+class BaseSqlTxn(object):
+ """
+ L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
+ current process.
+ """
+ implements(IAsyncTransaction)
+
+ def __init__(self, connectionFactory, reactor=_reactor):
+ """
+ @param connectionFactory: A 0-argument callable which returns a DB-API
+ 2.0 connection.
+ """
+ self._completed = False
+ self._holder = ThreadHolder(reactor)
+ self._holder.start()
+ def initCursor():
+ # support threadlevel=1; we can't necessarily cursor() in a
+ # different thread than we do transactions in.
+
+ # TODO: Re-try connect when it fails. Specify a timeout. That
+ # should happen in this layer because we need to be able to stop
+ # the reconnect attempt if it's hanging.
+ self._connection = connectionFactory()
+ self._cursor = self._connection.cursor()
+
+ # Note: no locking necessary here; since this gets submitted first, all
+ # subsequent submitted work-units will be in line behind it and the
+ # cursor will already have been initialized.
+ self._holder.submit(initCursor)
+
+
+ def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ if args is None:
+ args = []
+ self._cursor.execute(sql, args)
+ if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
+ raise raiseOnZeroRowCount()
+ if self._cursor.description:
+ return self._cursor.fetchall()
+ else:
+ return None
+
+
+ noisy = False
+
+ def execSQL(self, *args, **kw):
+ result = self._holder.submit(
+ lambda : self._reallyExecSQL(*args, **kw)
+ )
+ if self.noisy:
+ def reportResult(results):
+ sys.stdout.write("\n".join([
+ "",
+ "SQL: %r %r" % (args, kw),
+ "Results: %r" % (results,),
+ "",
+ ]))
+ return results
+ result.addBoth(reportResult)
+ return result
+
+
+ def commit(self):
+ if not self._completed:
+ self._completed = True
+ def reallyCommit():
+ self._connection.commit()
+ result = self._holder.submit(reallyCommit)
+ return result
+ else:
+ raise AlreadyFinishedError()
+
+
+ def abort(self):
+ if not self._completed:
+ self._completed = True
+ def reallyAbort():
+ self._connection.rollback()
+ result = self._holder.submit(reallyAbort)
+ return result
+ else:
+ raise AlreadyFinishedError()
+
+
+ def __del__(self):
+ if not self._completed:
+ print 'BaseSqlTxn.__del__: OK'
+ self.abort()
+
+
+ def reset(self):
+ """
+ Call this when placing this transaction back into the pool.
+
+ @raise RuntimeError: if the transaction has not been committed or
+ aborted.
+ """
+ if not self._completed:
+ raise RuntimeError("Attempt to re-set active transaction.")
+ self._completed = False
+
+
+ def stop(self):
+ """
+ Release the thread and database connection associated with this
+ transaction.
+ """
+ self._completed = True
+ self._stopped = True
+ holder = self._holder
+ self._holder = None
+ holder.submit(self._connection.close)
+ return holder.stop()
+
+
+
+class PooledSqlTxn(proxyForInterface(iface=IAsyncTransaction,
+ originalAttribute='_baseTxn')):
+ """
+ This is a temporary throw-away wrapper for the longer-lived BaseSqlTxn, so
+ that if a badly-behaved API client accidentally hangs on to one of these
+ and, for example C{.abort()}s it multiple times once another client is
+ using that connection, it will get some harmless tracebacks.
+ """
+
+ def __init__(self, pool, baseTxn):
+ self._pool = pool
+ self._baseTxn = baseTxn
+ self._complete = False
+
+
+ def execSQL(self, *a, **kw):
+ self._checkComplete()
+ return super(PooledSqlTxn, self).execSQL(*a, **kw)
+
+
+ def commit(self):
+ self._markComplete()
+ return self._repoolAfter(super(PooledSqlTxn, self).commit())
+
+
+ def abort(self):
+ self._markComplete()
+ return self._repoolAfter(super(PooledSqlTxn, self).abort())
+
+
+ def _checkComplete(self):
+ """
+ If the transaction is complete, raise L{AlreadyFinishedError}
+ """
+ if self._complete:
+ raise AlreadyFinishedError()
+
+
+ def _markComplete(self):
+ """
+ Mark the transaction as complete, raising AlreadyFinishedError.
+ """
+ self._checkComplete()
+ self._complete = True
+
+
+ def _repoolAfter(self, d):
+ def repool(result):
+ self._pool.reclaim(self)
+ return result
+ return d.addCallback(repool)
+
+
+
+class ConnectionPool(Service, object):
+ """
+ This is a central service that has a threadpool and executes SQL statements
+ asynchronously, in a pool.
+ """
+
+ reactor = _reactor
+
+ def __init__(self, connectionFactory):
+ super(ConnectionPool, self).__init__()
+ self.free = []
+ self.busy = []
+ self.connectionFactory = connectionFactory
+
+
+ def startService(self):
+ """
+ No startup necessary.
+ """
+
+
+ @inlineCallbacks
+ def stopService(self):
+ """
+ Forcibly abort any outstanding transactions.
+ """
+ for busy in self.busy[:]:
+ try:
+ yield busy.abort()
+ except:
+ log.err()
+ # all transactions should now be in the free list, since 'abort()' will
+ # have put them there.
+ for free in self.free:
+ yield free.stop()
+
+
+ def connection(self, label="<unlabeled>"):
+ """
+ Find a transaction; either retrieve a free one from the list or
+ allocate a new one if no free ones are available.
+
+ @return: an L{IAsyncTransaction}
+ """
+ if self.free:
+ basetxn = self.free.pop(0)
+ else:
+ basetxn = BaseSqlTxn(
+ connectionFactory=self.connectionFactory,
+ reactor=self.reactor
+ )
+ txn = PooledSqlTxn(self, basetxn)
+ self.busy.append(txn)
+ return txn
+
+
+ def reclaim(self, txn):
+ """
+ Shuck the L{PooledSqlTxn} wrapper off, and put the BaseSqlTxn into the
+ free list.
+ """
+ baseTxn = txn._baseTxn
+ baseTxn.reset()
+ self.free.append(baseTxn)
+ self.busy.remove(txn)
+
+
+
+def txnarg():
+ return [('transactionID', Integer())]
+
+
+CHUNK_MAX = 0xffff
+
+class BigArgument(Argument):
+ """
+ An argument whose payload can be larger than L{CHUNK_MAX}, by splitting
+ across multiple AMP keys.
+ """
+ def fromBox(self, name, strings, objects, proto):
+ value = StringIO()
+ for counter in count():
+ chunk = strings.get("%s.%d" % (name, counter))
+ if chunk is None:
+ break
+ value.write(chunk)
+ objects[name] = self.fromString(value.getvalue())
+
+
+ def toBox(self, name, strings, objects, proto):
+ value = StringIO(self.toString(objects[name]))
+ for counter in count():
+ nextChunk = value.read(CHUNK_MAX)
+ if not nextChunk:
+ break
+ strings["%s.%d" % (name, counter)] = nextChunk
+
+
+
+class Pickle(BigArgument):
+ """
+ A pickle sent over AMP. This is to serialize the 'args' argument to
+ C{execSQL}, which is the dynamically-typed 'args' list argument to a DB-API
+ C{execute} function, as well as its dynamically-typed result ('rows').
+
+ This should be cleaned up into a nicer structure, but this is not a network
+ protocol, so we can be a little relaxed about security.
+
+ This is a L{BigArgument} rather than a regular L{Argument} because
+ individual arguments and query results need to contain entire vCard or
+ iCalendar documents, which can easily be greater than 64k.
+ """
+
+ def toString(self, inObject):
+ return dumps(inObject)
+
+ def fromString(self, inString):
+ return loads(inString)
+
+
+
+
+class StartTxn(Command):
+ """
+ Start a transaction, identified with an ID generated by the client.
+ """
+ arguments = txnarg()
+
+
+
+class ExecSQL(Command):
+ """
+ Execute an SQL statement.
+ """
+ arguments = [('sql', String()),
+ ('queryID', String()),
+ ('args', Pickle())] + txnarg()
+
+
+
+class Row(Command):
+ """
+ A row has been returned. Sent from server to client in response to
+ L{ExecSQL}.
+ """
+
+ arguments = [('queryID', String()),
+ ('row', Pickle())]
+
+
+
+class QueryComplete(Command):
+ """
+ A query issued with ExecSQL is complete.
+ """
+
+ arguments = [('queryID', String()),
+ ('norows', Boolean())]
+
+
+
+class Commit(Command):
+ arguments = txnarg()
+
+
+
+class Abort(Command):
+ arguments = txnarg()
+
+
+
+class _NoRows(Exception):
+ """
+ Placeholder exception to report zero rows.
+ """
+
+
+class ConnectionPoolConnection(AMP):
+ """
+ A L{ConnectionPoolConnection} is a single connection to a
+ L{ConnectionPool}.
+ """
+
+ def __init__(self, pool):
+ """
+ Initialize a mapping of transaction IDs to transaction objects.
+ """
+ super(ConnectionPoolConnection, self).__init__()
+ self.pool = pool
+ self._txns = {}
+
+
+ @StartTxn.responder
+ def start(self, transactionID):
+ self._txns[transactionID] = self.pool.connection()
+ return {}
+
+
+ @ExecSQL.responder
+ @inlineCallbacks
+ def receivedSQL(self, transactionID, queryID, sql, args):
+ try:
+ rows = yield self._txns[transactionID].execSQL(sql, args, _NoRows)
+ except _NoRows:
+ norows = True
+ else:
+ norows = False
+ if rows is not None:
+ for row in rows:
+ # Either this should be yielded or it should be
+ # requiresAnswer=False
+ self.callRemote(Row, queryID=queryID, row=row)
+ self.callRemote(QueryComplete, queryID=queryID, norows=norows)
+ returnValue({})
+
+
+ def _complete(self, transactionID, thunk):
+ txn = self._txns.pop(transactionID)
+ return thunk(txn).addCallback(lambda ignored: {})
+
+
+ @Commit.responder
+ def commit(self, transactionID):
+ """
+ Successfully complete the given transaction.
+ """
+ return self._complete(transactionID, lambda x: x.commit())
+
+
+ @Abort.responder
+ def abort(self, transactionID):
+ """
+ Roll back the given transaction.
+ """
+ return self._complete(transactionID, lambda x: x.abort())
+
+
+
+class ConnectionPoolClient(AMP):
+ """
+ A client which can execute SQL.
+ """
+ def __init__(self):
+ super(ConnectionPoolClient, self).__init__()
+ self._nextID = count().next
+ self._txns = {}
+ self._queries = {}
+
+
+ def newTransaction(self):
+ txnid = str(self._nextID())
+ self.callRemote(StartTxn, transactionID=txnid)
+ txn = Transaction(client=self, transactionID=txnid)
+ self._txns[txnid] = txn
+ return txn
+
+
+ @Row.responder
+ def row(self, queryID, row):
+ self._queries[queryID].row(row)
+ return {}
+
+
+ @QueryComplete.responder
+ def complete(self, queryID, norows):
+ self._queries.pop(queryID).done(norows)
+ return {}
+
+
+
+class _Query(object):
+ def __init__(self, raiseOnZeroRowCount):
+ self.results = []
+ self.deferred = Deferred()
+ self.raiseOnZeroRowCount = raiseOnZeroRowCount
+
+
+ def row(self, row):
+ """
+ A row was received.
+ """
+ self.results.append(row)
+
+
+ def done(self, norows):
+ """
+ The query is complete.
+
+ @param norows: A boolean. True if there were not any rows.
+ """
+ if norows and (self.raiseOnZeroRowCount is not None):
+ exc = self.raiseOnZeroRowCount()
+ self.deferred.errback(Failure(exc))
+ else:
+ self.deferred.callback(self.results)
+
+
+
+
+class Transaction(object):
+ """
+ Async protocol-based transaction implementation.
+ """
+
+ implements(IAsyncTransaction)
+
+ def __init__(self, client, transactionID):
+ """
+ Initialize a transaction with a L{ConnectionPoolClient} and a unique
+ transaction identifier.
+ """
+ self._client = client
+ self._transactionID = transactionID
+ self._completed = False
+
+
+ def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ if args is None:
+ args = []
+ queryID = str(self._client._nextID())
+ query = self._client._queries[queryID] = _Query(raiseOnZeroRowCount)
+ self._client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args,
+ transactionID=self._transactionID)
+ return query.deferred
+
+
+ def _complete(self, command):
+ if self._completed:
+ raise AlreadyFinishedError()
+ self._completed = True
+ return self._client.callRemote(
+ command, transactionID=self._transactionID
+ ).addCallback(lambda x: None)
+
+
+ def commit(self):
+ return self._complete(Commit)
+
+
+ def abort(self):
+ return self._complete(Abort)
+
+
Modified: CalendarServer/trunk/txdav/base/datastore/subpostgres.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/subpostgres.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/base/datastore/subpostgres.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -18,9 +18,9 @@
"""
Run and manage PostgreSQL as a subprocess.
"""
+
import os
import pwd
-#import thread
from hashlib import md5
@@ -36,10 +36,11 @@
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
from twisted.internet.defer import Deferred
+from txdav.base.datastore.asyncsqlpool import BaseSqlTxn
from twisted.application.service import MultiService
-log = Logger()
+log = Logger()
# This appears in the postgres log to indicate that it is accepting
# connections.
@@ -167,6 +168,7 @@
self.completionDeferred.callback(None)
+
class ErrorOutput(Exception):
"""
The process produced some error output and exited with a non-zero exit
@@ -174,6 +176,7 @@
"""
+
class CapturingProcessProtocol(ProcessProtocol):
"""
A L{ProcessProtocol} that captures its output and error.
@@ -212,6 +215,7 @@
"""
self.output.append(data)
+
def errReceived(self, data):
"""
Some output was received on stderr.
@@ -226,6 +230,24 @@
self.deferred.callback(''.join(self.output))
+
+class UnpooledSqlTxn(BaseSqlTxn):
+ """
+ Unpooled variant (releases thread immediately on commit or abort),
+ currently exclusively for testing.
+ """
+ def commit(self):
+ result = super(UnpooledSqlTxn, self).commit()
+ self.stop()
+ return result
+
+ def abort(self):
+ result = super(UnpooledSqlTxn, self).abort()
+ self.stop()
+ return result
+
+
+
class PostgresService(MultiService):
def __init__(self, dataStoreDirectory, subServiceFactory,
@@ -290,6 +312,7 @@
self.monitor = None
self.openConnections = []
+
def activateDelayedShutdown(self):
"""
Call this when starting database initialization code to protect against
@@ -301,6 +324,7 @@
"""
self.delayedShutdown = True
+
def deactivateDelayedShutdown(self):
"""
Call this when database initialization code has completed so that the
@@ -310,6 +334,7 @@
if self.shutdownDeferred:
self.shutdownDeferred.callback(None)
+
def produceConnection(self, label="<unlabeled>", databaseName=None):
"""
Produce a DB-API 2.0 connection pointed at this database.
@@ -326,6 +351,7 @@
w = DiagnosticConnectionWrapper(connection, label)
c = w.cursor()
+
# Turn on standard conforming strings. This option is _required_ if
# you want to get correct behavior out of parameter-passing with the
# pgdb module. If it is not set then the server is potentially
@@ -340,11 +366,24 @@
# preferable to see some exceptions while we're in this state than to
# have the entire worker process hang.
c.execute("set statement_timeout=30000")
+
+ # pgdb (as per DB-API 2.0) automatically puts the connection into a
+ # 'executing a transaction' state when _any_ statement is executed on
+ # it (even these not-touching-any-data statements); make sure to commit
+ # first so that the application sees a fresh transaction, and the
+ # connection can safely be pooled without executing anything on it.
w.commit()
c.close()
return w
+ def produceLocalTransaction(self, label="<unlabeled>"):
+ """
+ Create a L{IAsyncTransaction} based on a thread in the current process.
+ """
+ return UnpooledSqlTxn(lambda : self.produceConnection(label))
+
+
def ready(self):
"""
Subprocess is ready. Time to initialize the subservice.
@@ -389,6 +428,7 @@
# Only continue startup if we've not begun shutdown
self.subServiceFactory(self.produceConnection).setServiceParent(self)
+
def pauseMonitor(self):
"""
Pause monitoring. This is a testing hook for when (if) we are
@@ -402,7 +442,7 @@
def unpauseMonitor(self):
"""
Unpause monitoring.
-
+
@see: L{pauseMonitor}
"""
# for pipe in self.monitor.transport.pipes.values():
@@ -417,9 +457,11 @@
monitor = _PostgresMonitor(self)
pg_ctl = which("pg_ctl")[0]
# check consistency of initdb and postgres?
-
+
options = []
- options.append("-c listen_addresses='%s'" % (",".join(self.listenAddresses)))
+ options.append(
+ "-c listen_addresses='%s'" % (",".join(self.listenAddresses))
+ )
if self.socketDir:
options.append("-k '%s'" % (self.socketDir.path,))
options.append("-c shared_buffers=%d" % (self.sharedBuffers,))
Copied: CalendarServer/trunk/txdav/base/datastore/threadutils.py (from rev 6550, CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py)
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/threadutils.py (rev 0)
+++ CalendarServer/trunk/txdav/base/datastore/threadutils.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -0,0 +1,111 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import sys
+from Queue import Queue
+
+
+from twisted.python.failure import Failure
+from twisted.internet.defer import Deferred
+
+
+_DONE = object()
+
+_STATE_STOPPED = 'STOPPED'
+_STATE_RUNNING = 'RUNNING'
+_STATE_STOPPING = 'STOPPING'
+
+class ThreadHolder(object):
+ """
+ A queue which will hold a reactor threadpool thread open until all of the
+ work in that queue is done.
+ """
+
+ def __init__(self, reactor):
+ self._reactor = reactor
+ self._state = _STATE_STOPPED
+ self._stopper = None
+ self._q = None
+
+
+ def _run(self):
+ """
+ Worker function which runs in a non-reactor thread.
+ """
+ while True:
+ work = self._q.get()
+ if work is _DONE:
+ def finishStopping():
+ self._state = _STATE_STOPPED
+ self._q = None
+ s = self._stopper
+ self._stopper = None
+ s.callback(None)
+ self._reactor.callFromThread(finishStopping)
+ return
+ self._oneWorkUnit(*work)
+
+
+ def _oneWorkUnit(self, deferred, instruction):
+ try:
+ result = instruction()
+ except:
+ etype, evalue, etb = sys.exc_info()
+ def relayFailure():
+ f = Failure(evalue, etype, etb)
+ deferred.errback(f)
+ self._reactor.callFromThread(relayFailure)
+ else:
+ self._reactor.callFromThread(deferred.callback, result)
+
+
+ def submit(self, work):
+ """
+ Submit some work to be run.
+
+ @param work: a 0-argument callable, which will be run in a thread.
+
+ @return: L{Deferred} that fires with the result of L{work}
+ """
+ d = Deferred()
+ self._q.put((d, work))
+ return d
+
+
+ def start(self):
+ """
+ Start this thing, if it's stopped.
+ """
+ if self._state != _STATE_STOPPED:
+ raise RuntimeError("Not stopped.")
+ self._state = _STATE_RUNNING
+ self._q = Queue(0)
+ self._reactor.callInThread(self._run)
+
+
+ def stop(self):
+ """
+ Stop this thing and release its thread, if it's running.
+ """
+ if self._state != _STATE_RUNNING:
+ raise RuntimeError("Not running.")
+ s = self._stopper = Deferred()
+ self._state = _STATE_STOPPING
+ self._q.put(_DONE)
+ return s
+
+
+
Modified: CalendarServer/trunk/txdav/base/propertystore/test/test_sql.py
===================================================================
--- CalendarServer/trunk/txdav/base/propertystore/test/test_sql.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/base/propertystore/test/test_sql.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -41,6 +41,7 @@
def setUp(self):
self.notifierFactory = StubNotifierFactory()
self.store = yield buildStore(self, self.notifierFactory)
+ self.addCleanup(self.maybeCommitLast)
self._txn = self.store.newTransaction()
self.propertyStore = self.propertyStore1 = yield PropertyStore.load(
"user01", self._txn, 1
@@ -50,7 +51,7 @@
@inlineCallbacks
- def tearDown(self):
+ def maybeCommitLast(self):
if hasattr(self, "_txn"):
result = yield self._txn.commit()
delattr(self, "_txn")
@@ -66,7 +67,7 @@
yield self._txn.commit()
delattr(self, "_txn")
self._txn = self.store.newTransaction()
-
+
store = self.propertyStore1
self.propertyStore = self.propertyStore1 = yield PropertyStore.load(
"user01", self._txn, 1
Property changes on: CalendarServer/trunk/txdav/caldav/datastore/index_file.py
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/config-separation/txdav/caldav/datastore/index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/caldav/datastore/index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/caldav/datastore/index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/caldav/datastore/index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/caldav/datastore/index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/caldav/datastore/index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/caldav/datastore/index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/caldav/datastore/index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/caldav/datastore/index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/caldav/datastore/index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/caldav/datastore/index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/caldav/datastore/index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/caldav/datastore/index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/caldav/datastore/index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/caldav/datastore/index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/caldav/datastore/index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sql-store/txdav/caldav/datastore/index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/caldav/datastore/index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/caldav/datastore/index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/caldav/datastore/index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/caldav/datastore/index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/caldav/datastore/index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/caldav/datastore/index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/caldav/datastore/index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/index.py:6322-6394
+ /CalendarServer/branches/config-separation/txdav/caldav/datastore/index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/caldav/datastore/index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/caldav/datastore/index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/caldav/datastore/index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/caldav/datastore/index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/caldav/datastore/index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/caldav/datastore/index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/caldav/datastore/index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/caldav/datastore/index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/caldav/datastore/index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/caldav/datastore/index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/caldav/datastore/index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/caldav/datastore/index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/caldav/datastore/index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/caldav/datastore/index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/caldav/datastore/index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sharedpool/txdav/caldav/datastore/index_file.py:6490-6550
/CalendarServer/branches/users/glyph/sql-store/txdav/caldav/datastore/index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/caldav/datastore/index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/caldav/datastore/index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/caldav/datastore/index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/caldav/datastore/index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/caldav/datastore/index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/caldav/datastore/index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/caldav/datastore/index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/index.py:6322-6394
Modified: CalendarServer/trunk/txdav/caldav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/sql.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/caldav/datastore/sql.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -589,32 +589,38 @@
@inlineCallbacks
def loseConnection(self):
-
+
old_size = self.attachment.size()
self.attachment._path.setContent(self.buf)
self.attachment._contentType = self.contentType
self.attachment._md5 = self.hash.hexdigest()
self.attachment._size = len(self.buf)
- self.attachment._created, self.attachment._modified = (yield self._txn.execSQL(
- """
- update ATTACHMENT set CONTENT_TYPE = %s, SIZE = %s, MD5 = %s,
- MODIFIED = timezone('UTC', CURRENT_TIMESTAMP)
- where PATH = %s
- returning CREATED, MODIFIED
- """,
- [
- generateContentType(self.contentType),
- self.attachment._size,
- self.attachment._md5,
- self.attachment.name()
- ]
- ))[0]
+ self.attachment._created, self.attachment._modified = map(
+ sqltime,
+ (yield self._txn.execSQL(
+ """
+ update ATTACHMENT set CONTENT_TYPE = %s, SIZE = %s, MD5 = %s,
+ MODIFIED = timezone('UTC', CURRENT_TIMESTAMP)
+ where PATH = %s
+ returning CREATED, MODIFIED
+ """,
+ [
+ generateContentType(self.contentType),
+ self.attachment._size,
+ self.attachment._md5,
+ self.attachment.name()
+ ]
+ ))[0]
+ )
# Adjust quota
yield self.attachment._calendarObject._calendar._home.adjustQuotaUsedBytes(self.attachment.size() - old_size)
+def sqltime(value):
+ return datetimeMktime(datetime.datetime.strptime(value, "%Y-%m-%d %H:%M:%S.%f"))
+
class Attachment(object):
implements(IAttachment)
@@ -648,8 +654,8 @@
self._contentType = MimeType.fromString(rows[0][0])
self._size = rows[0][1]
self._md5 = rows[0][2]
- self._created = datetimeMktime(datetime.datetime.strptime(rows[0][3], "%Y-%m-%d %H:%M:%S.%f"))
- self._modified = datetimeMktime(datetime.datetime.strptime(rows[0][4], "%Y-%m-%d %H:%M:%S.%f"))
+ self._created = sqltime(rows[0][3])
+ self._modified = sqltime(rows[0][4])
returnValue(self)
@@ -692,3 +698,5 @@
def modified(self):
return self._modified
+
+
Modified: CalendarServer/trunk/txdav/caldav/datastore/test/common.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/test/common.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/caldav/datastore/test/common.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -993,7 +993,7 @@
# Sanity check; make sure the test has the right idea of the subject.
self.assertNotEquals(event1_text, event1_text_withDifferentSubject)
newComponent = VComponent.fromString(event1_text_withDifferentSubject)
- obj.setComponent(newComponent)
+ yield obj.setComponent(newComponent)
# Putting everything into a separate transaction to account for any
# caching that may take place.
@@ -1067,7 +1067,7 @@
"""
objName = "with-dropbox.ics"
cal = yield self.calendarUnderTest()
- cal.createCalendarObjectWithName(
+ yield cal.createCalendarObjectWithName(
objName, VComponent.fromString(
self.eventWithDropbox
)
@@ -1091,7 +1091,7 @@
)
t.write("new attachment")
t.write(" text")
- t.loseConnection()
+ yield t.loseConnection()
obj = yield refresh(obj)
class CaptureProtocol(Protocol):
buf = ''
@@ -1181,7 +1181,7 @@
"new.attachment", MimeType("text", "plain")
)
t.write("new attachment text")
- t.loseConnection()
+ yield t.loseConnection()
yield self.commit()
home = (yield self.homeUnderTest())
calendars = (yield home.calendars())
Property changes on: CalendarServer/trunk/txdav/caldav/datastore/test/test_index_file.py
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/config-separation/txdav/caldav/datastore/test/test_index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/caldav/datastore/test/test_index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/caldav/datastore/test/test_index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/caldav/datastore/test/test_index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/caldav/datastore/test/test_index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/caldav/datastore/test/test_index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/caldav/datastore/test/test_index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/caldav/datastore/test/test_index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/caldav/datastore/test/test_index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/caldav/datastore/test/test_index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/caldav/datastore/test/test_index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/caldav/datastore/test/test_index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/caldav/datastore/test/test_index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/caldav/datastore/test/test_index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/caldav/datastore/test/test_index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/caldav/datastore/test/test_index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sql-store/txdav/caldav/datastore/test/test_index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/caldav/datastore/test/test_index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/caldav/datastore/test/test_index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/caldav/datastore/test/test_index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/caldav/datastore/test/test_index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/caldav/datastore/test/test_index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/caldav/datastore/test/test_index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/caldav/datastore/test/test_index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/test/test_index.py:6322-6394
+ /CalendarServer/branches/config-separation/txdav/caldav/datastore/test/test_index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/caldav/datastore/test/test_index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/caldav/datastore/test/test_index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/caldav/datastore/test/test_index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/caldav/datastore/test/test_index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/caldav/datastore/test/test_index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/caldav/datastore/test/test_index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/caldav/datastore/test/test_index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/caldav/datastore/test/test_index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/caldav/datastore/test/test_index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/caldav/datastore/test/test_index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/caldav/datastore/test/test_index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/caldav/datastore/test/test_index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/caldav/datastore/test/test_index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/caldav/datastore/test/test_index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/caldav/datastore/test/test_index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sharedpool/txdav/caldav/datastore/test/test_index_file.py:6490-6550
/CalendarServer/branches/users/glyph/sql-store/txdav/caldav/datastore/test/test_index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/caldav/datastore/test/test_index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/caldav/datastore/test/test_index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/caldav/datastore/test/test_index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/caldav/datastore/test/test_index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/caldav/datastore/test/test_index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/caldav/datastore/test/test_index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/caldav/datastore/test/test_index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/test/test_index.py:6322-6394
Modified: CalendarServer/trunk/txdav/caldav/datastore/test/test_sql.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/test/test_sql.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/caldav/datastore/test/test_sql.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -44,7 +44,7 @@
@inlineCallbacks
def setUp(self):
- super(CalendarSQLStorageTests, self).setUp()
+ yield super(CalendarSQLStorageTests, self).setUp()
self._sqlCalendarStore = yield buildStore(self, self.notifierFactory)
yield self.populate()
@@ -86,7 +86,7 @@
"""
Assert that two objects with C{properties} methods have similar
properties.
-
+
@param disregard: a list of L{PropertyName} keys to discard from both
input and output.
"""
@@ -201,7 +201,7 @@
operations, that we do not block other reads of the table.
"""
- calendarStore = yield buildStore(self, self.notifierFactory)
+ calendarStore = self._sqlCalendarStore
txn1 = calendarStore.newTransaction()
txn2 = calendarStore.newTransaction()
@@ -211,7 +211,7 @@
# reads of existing data in the table
home_uid2 = yield txn3.homeWithUID(ECALENDARTYPE, "uid2", create=True)
self.assertNotEqual(home_uid2, None)
- txn3.commit()
+ yield txn3.commit()
home_uid1_1 = yield txn1.homeWithUID(
ECALENDARTYPE, "uid1", create=True
@@ -237,7 +237,7 @@
txn4 = calendarStore.newTransaction()
home_uid2 = yield txn4.homeWithUID(ECALENDARTYPE, "uid2", create=True)
self.assertNotEqual(home_uid2, None)
- txn4.commit()
+ yield txn4.commit()
# Now do the concurrent provision attempt
yield d2
@@ -250,21 +250,20 @@
@inlineCallbacks
def test_putConcurrency(self):
"""
- Test that two concurrent attempts to PUT different address book object resources to the
- same address book home does not cause a deadlock.
+ Test that two concurrent attempts to PUT different calendar object
+ resources to the same address book home does not cause a deadlock.
"""
- calendarStore1 = yield buildStore(self, self.notifierFactory)
- calendarStore2 = yield buildStore(self, self.notifierFactory)
+ calendarStore = self._sqlCalendarStore
# Provision the home now
- txn = calendarStore1.newTransaction()
+ txn = calendarStore.newTransaction()
home = yield txn.homeWithUID(ECALENDARTYPE, "uid1", create=True)
self.assertNotEqual(home, None)
- txn.commit()
+ yield txn.commit()
- txn1 = calendarStore1.newTransaction()
- txn2 = calendarStore2.newTransaction()
+ txn1 = calendarStore.newTransaction()
+ txn2 = calendarStore.newTransaction()
home1 = yield txn1.homeWithUID(ECALENDARTYPE, "uid1", create=True)
home2 = yield txn2.homeWithUID(ECALENDARTYPE, "uid1", create=True)
@@ -364,3 +363,6 @@
yield d1
yield d2
+
+
+
Property changes on: CalendarServer/trunk/txdav/carddav/datastore/index_file.py
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/config-separation/txdav/carddav/datastore/index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/carddav/datastore/index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/carddav/datastore/index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/carddav/datastore/index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/carddav/datastore/index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/carddav/datastore/index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/carddav/datastore/index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/carddav/datastore/index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/carddav/datastore/index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/carddav/datastore/index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/carddav/datastore/index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/carddav/datastore/index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/carddav/datastore/index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/carddav/datastore/index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/carddav/datastore/index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/carddav/datastore/index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sql-store/txdav/carddav/datastore/index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/carddav/datastore/index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/carddav/datastore/index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/carddav/datastore/index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/carddav/datastore/index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/carddav/datastore/index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/carddav/datastore/index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/carddav/datastore/index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/vcardindex.py:6322-6394
+ /CalendarServer/branches/config-separation/txdav/carddav/datastore/index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/carddav/datastore/index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/carddav/datastore/index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/carddav/datastore/index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/carddav/datastore/index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/carddav/datastore/index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/carddav/datastore/index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/carddav/datastore/index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/carddav/datastore/index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/carddav/datastore/index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/carddav/datastore/index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/carddav/datastore/index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/carddav/datastore/index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/carddav/datastore/index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/carddav/datastore/index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/carddav/datastore/index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sharedpool/txdav/carddav/datastore/index_file.py:6490-6550
/CalendarServer/branches/users/glyph/sql-store/txdav/carddav/datastore/index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/carddav/datastore/index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/carddav/datastore/index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/carddav/datastore/index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/carddav/datastore/index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/carddav/datastore/index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/carddav/datastore/index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/carddav/datastore/index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/vcardindex.py:6322-6394
Modified: CalendarServer/trunk/txdav/carddav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/carddav/datastore/sql.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/carddav/datastore/sql.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -102,7 +102,7 @@
will eventually have on disk.
@type realName: C{str}
"""
-
+
super(AddressBook, self).__init__(home, name, resourceID, notifier)
self._index = PostgresLegacyABIndexEmulator(self)
Modified: CalendarServer/trunk/txdav/carddav/datastore/test/common.py
===================================================================
--- CalendarServer/trunk/txdav/carddav/datastore/test/common.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/carddav/datastore/test/common.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -867,7 +867,7 @@
# Sanity check; make sure the test has the right idea of the subject.
self.assertNotEquals(vcard1_text, vcard1_text_withDifferentNote)
newComponent = VComponent.fromString(vcard1_text_withDifferentNote)
- obj.setComponent(newComponent)
+ yield obj.setComponent(newComponent)
# Putting everything into a separate transaction to account for any
# caching that may take place.
Property changes on: CalendarServer/trunk/txdav/carddav/datastore/test/test_index_file.py
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/config-separation/txdav/carddav/datastore/test/test_index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/carddav/datastore/test/test_index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/carddav/datastore/test/test_index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/carddav/datastore/test/test_index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/carddav/datastore/test/test_index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/carddav/datastore/test/test_index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/carddav/datastore/test/test_index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/carddav/datastore/test/test_index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/carddav/datastore/test/test_index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/carddav/datastore/test/test_index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/carddav/datastore/test/test_index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/carddav/datastore/test/test_index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/carddav/datastore/test/test_index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/carddav/datastore/test/test_index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/carddav/datastore/test/test_index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/carddav/datastore/test/test_index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sql-store/txdav/carddav/datastore/test/test_index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/carddav/datastore/test/test_index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/carddav/datastore/test/test_index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/carddav/datastore/test/test_index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/carddav/datastore/test/test_index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/carddav/datastore/test/test_index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/carddav/datastore/test/test_index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/carddav/datastore/test/test_index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/test/test_vcardindex.py:6322-6394
+ /CalendarServer/branches/config-separation/txdav/carddav/datastore/test/test_index_file.py:4379-4443
/CalendarServer/branches/egg-info-351/txdav/carddav/datastore/test/test_index_file.py:4589-4625
/CalendarServer/branches/generic-sqlstore/txdav/carddav/datastore/test/test_index_file.py:6167-6191
/CalendarServer/branches/new-store/txdav/carddav/datastore/test/test_index_file.py:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/txdav/carddav/datastore/test/test_index_file.py:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/txdav/carddav/datastore/test/test_index_file.py:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/txdav/carddav/datastore/test/test_index_file.py:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/txdav/carddav/datastore/test/test_index_file.py:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591/txdav/carddav/datastore/test/test_index_file.py:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/txdav/carddav/datastore/test/test_index_file.py:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/txdav/carddav/datastore/test/test_index_file.py:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/txdav/carddav/datastore/test/test_index_file.py:5188-5440
/CalendarServer/branches/users/glyph/contacts-server-merge/txdav/carddav/datastore/test/test_index_file.py:4971-5080
/CalendarServer/branches/users/glyph/more-deferreds-6/txdav/carddav/datastore/test/test_index_file.py:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/txdav/carddav/datastore/test/test_index_file.py:6369-6445
/CalendarServer/branches/users/glyph/sendfdport/txdav/carddav/datastore/test/test_index_file.py:5388-5424
/CalendarServer/branches/users/glyph/sharedpool/txdav/carddav/datastore/test/test_index_file.py:6490-6550
/CalendarServer/branches/users/glyph/sql-store/txdav/carddav/datastore/test/test_index_file.py:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted/txdav/carddav/datastore/test/test_index_file.py:5084-5149
/CalendarServer/branches/users/sagen/locations-resources/txdav/carddav/datastore/test/test_index_file.py:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/txdav/carddav/datastore/test/test_index_file.py:5052-5061
/CalendarServer/branches/users/sagen/resource-delegates-4038/txdav/carddav/datastore/test/test_index_file.py:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/txdav/carddav/datastore/test/test_index_file.py:4068-4075
/CalendarServer/branches/users/sagen/resources-2/txdav/carddav/datastore/test/test_index_file.py:5084-5093
/CalendarServer/branches/users/wsanchez/transations/txdav/carddav/datastore/test/test_index_file.py:5515-5593
/CalendarServer/trunk/twistedcaldav/test/test_vcardindex.py:6322-6394
Modified: CalendarServer/trunk/txdav/carddav/datastore/test/test_sql.py
===================================================================
--- CalendarServer/trunk/txdav/carddav/datastore/test/test_sql.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/carddav/datastore/test/test_sql.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -41,7 +41,7 @@
@inlineCallbacks
def setUp(self):
- super(AddressBookSQLStorageTests, self).setUp()
+ yield super(AddressBookSQLStorageTests, self).setUp()
self._sqlStore = yield buildStore(self, self.notifierFactory)
yield self.populate()
@@ -195,18 +195,16 @@
Test that two concurrent attempts to PUT different address book object resources to the
same address book home does not cause a deadlock.
"""
+ addressbookStore = yield buildStore(self, self.notifierFactory)
- addressbookStore1 = yield buildStore(self, self.notifierFactory)
- addressbookStore2 = yield buildStore(self, self.notifierFactory)
-
# Provision the home now
- txn = addressbookStore1.newTransaction()
+ txn = addressbookStore.newTransaction()
home = yield txn.homeWithUID(EADDRESSBOOKTYPE, "uid1", create=True)
self.assertNotEqual(home, None)
yield txn.commit()
- txn1 = addressbookStore1.newTransaction()
- txn2 = addressbookStore2.newTransaction()
+ txn1 = addressbookStore.newTransaction()
+ txn2 = addressbookStore.newTransaction()
home1 = yield txn1.homeWithUID(EADDRESSBOOKTYPE, "uid1", create=True)
home2 = yield txn2.homeWithUID(EADDRESSBOOKTYPE, "uid1", create=True)
Modified: CalendarServer/trunk/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/common/datastore/sql.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -25,26 +25,23 @@
"CommonHome",
]
-import sys
import datetime
-from Queue import Queue
-from zope.interface.declarations import implements, directlyProvides
+from zope.interface import implements, directlyProvides
+from twext.python.log import Logger, LoggingMixIn
+from twext.web2.dav.element.rfc2518 import ResourceType
+from twext.web2.http_headers import MimeType
+
from twisted.python import hashlib
from twisted.python.modules import getModule
from twisted.python.util import FancyEqMixin
-from twisted.python.failure import Failure
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.application.service import Service
-from twext.python.log import Logger, LoggingMixIn
from twext.internet.decorate import memoizedKey
-from twext.web2.dav.element.rfc2518 import ResourceType
-from twext.web2.http_headers import MimeType
from txdav.common.datastore.sql_legacy import PostgresLegacyNotificationsEmulator
from txdav.caldav.icalendarstore import ICalendarTransaction, ICalendarStore
@@ -61,9 +58,8 @@
from txdav.common.inotifications import INotificationCollection, \
INotificationObject
-from txdav.idav import AlreadyFinishedError
-from txdav.base.propertystore.base import PropertyName
from txdav.base.propertystore.sql import PropertyStore
+from txdav.base.propertystore.base import PropertyName
from twistedcaldav.customxml import NotificationType
from twistedcaldav.dateops import datetimeMktime
@@ -87,15 +83,17 @@
implements(ICalendarStore)
- def __init__(self, connectionFactory, notifierFactory, attachmentsPath,
- enableCalendars=True, enableAddressBooks=True):
+ def __init__(self, sqlTxnFactory, notifierFactory, attachmentsPath,
+ enableCalendars=True, enableAddressBooks=True,
+ label="unlabeled"):
assert enableCalendars or enableAddressBooks
- self.connectionFactory = connectionFactory
+ self.sqlTxnFactory = sqlTxnFactory
self.notifierFactory = notifierFactory
self.attachmentsPath = attachmentsPath
self.enableCalendars = enableCalendars
self.enableAddressBooks = enableAddressBooks
+ self.label = label
def eachCalendarHome(self):
@@ -113,9 +111,12 @@
def newTransaction(self, label="unlabeled", migrating=False):
+ """
+ @see L{IDataStore.newTransaction}
+ """
return CommonStoreTransaction(
self,
- self.connectionFactory,
+ self.sqlTxnFactory(),
self.enableCalendars,
self.enableAddressBooks,
self.notifierFactory,
@@ -124,93 +125,7 @@
)
-_DONE = object()
-_STATE_STOPPED = "STOPPED"
-_STATE_RUNNING = "RUNNING"
-_STATE_STOPPING = "STOPPING"
-
-class ThreadHolder(object):
- """
- A queue which will hold a reactor threadpool thread open until all of the
- work in that queue is done.
- """
-
- def __init__(self, reactor):
- self._reactor = reactor
- self._state = _STATE_STOPPED
- self._stopper = None
- self._q = None
-
-
- def _run(self):
- """
- Worker function which runs in a non-reactor thread.
- """
- while True:
- work = self._q.get()
- if work is _DONE:
- def finishStopping():
- self._state = _STATE_STOPPED
- self._q = None
- s = self._stopper
- self._stopper = None
- s.callback(None)
- self._reactor.callFromThread(finishStopping)
- return
- self._oneWorkUnit(*work)
-
-
- def _oneWorkUnit(self, deferred, instruction):
- try:
- result = instruction()
- except:
- etype, evalue, etb = sys.exc_info()
- def relayFailure():
- f = Failure(evalue, etype, etb)
- deferred.errback(f)
- self._reactor.callFromThread(relayFailure)
- else:
- self._reactor.callFromThread(deferred.callback, result)
-
-
- def submit(self, work):
- """
- Submit some work to be run.
-
- @param work: a 0-argument callable, which will be run in a thread.
-
- @return: L{Deferred} that fires with the result of L{work}
- """
- d = Deferred()
- self._q.put((d, work))
- return d
-
-
- def start(self):
- """
- Start this thing, if it's stopped.
- """
- if self._state != _STATE_STOPPED:
- raise RuntimeError("Not stopped.")
- self._state = _STATE_RUNNING
- self._q = Queue(0)
- self._reactor.callInThread(self._run)
-
-
- def stop(self):
- """
- Stop this thing and release its thread, if it's running.
- """
- if self._state != _STATE_RUNNING:
- raise RuntimeError("Not running.")
- s = self._stopper = Deferred()
- self._state = _STATE_STOPPING
- self._q.put(_DONE)
- return s
-
-
-
class CommonStoreTransaction(object):
"""
Transaction implementation for SQL database.
@@ -218,14 +133,12 @@
_homeClass = {}
_homeTable = {}
- noisy = False
id = 0
- def __init__(self, store, connectionFactory,
+ def __init__(self, store, sqlTxn,
enableCalendars, enableAddressBooks,
notifierFactory, label, migrating=False):
self._store = store
- self._completed = False
self._calendarHomes = {}
self._addressbookHomes = {}
self._notificationHomes = {}
@@ -233,6 +146,7 @@
self._notifierFactory = notifierFactory
self._label = label
self._migrating = migrating
+
CommonStoreTransaction.id += 1
self._txid = CommonStoreTransaction.id
@@ -249,62 +163,18 @@
CommonStoreTransaction._homeClass[EADDRESSBOOKTYPE] = AddressBookHome
CommonStoreTransaction._homeTable[ECALENDARTYPE] = CALENDAR_HOME_TABLE
CommonStoreTransaction._homeTable[EADDRESSBOOKTYPE] = ADDRESSBOOK_HOME_TABLE
- self._holder = ThreadHolder(reactor)
- self._holder.start()
- def initCursor():
- # support threadlevel=1; we can't necessarily cursor() in a
- # different thread than we do transactions in.
+ self._sqlTxn = sqlTxn
- # FIXME: may need to be pooling ThreadHolders along with
- # connections, if threadlevel=1 requires connect() be called in the
- # same thread as cursor() et. al.
- self._connection = connectionFactory()
- self._cursor = self._connection.cursor()
- self._holder.submit(initCursor)
-
def store(self):
return self._store
def __repr__(self):
- return "PG-TXN<%s>" % (self._label,)
+ return 'PG-TXN<%s>' % (self._label,)
- def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None):
- self._cursor.execute(sql, args)
- if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
- raise raiseOnZeroRowCount()
- if self._cursor.description:
- return self._cursor.fetchall()
- else:
- return None
-
-
- def execSQL(self, *args, **kw):
- result = self._holder.submit(
- lambda : self._reallyExecSQL(*args, **kw)
- )
- if self.noisy:
- def reportResult(results):
- sys.stdout.write("\n".join([
- "",
- "SQL (%d): %r %r" % (self._txid, args, kw),
- "Results (%d): %r" % (self._txid, results,),
- "",
- ]))
- return results
- result.addBoth(reportResult)
- return result
-
-
- def __del__(self):
- if not self._completed:
- print "CommonStoreTransaction.__del__: OK"
- self.abort()
-
-
- @memoizedKey("uid", "_calendarHomes")
+ @memoizedKey('uid', '_calendarHomes')
def calendarHomeWithUID(self, uid, create=False):
return self.homeWithUID(ECALENDARTYPE, uid, create=create)
@@ -409,40 +279,36 @@
returnValue(collection)
- def abort(self):
- if not self._completed:
- def reallyAbort():
- self._connection.rollback()
- self._connection.close()
- self._completed = True
- result = self._holder.submit(reallyAbort)
- self._holder.stop()
- return result
- else:
- raise AlreadyFinishedError()
+ def postCommit(self, operation):
+ """
+ Run things after C{commit}.
+ """
+ self._postCommitOperations.append(operation)
+ def execSQL(self, *a, **kw):
+ """
+ Execute some SQL (delegate to L{IAsyncTransaction}).
+ """
+ return self._sqlTxn.execSQL(*a, **kw)
+
+
def commit(self):
- if not self._completed:
- self._completed = True
- def postCommit(ignored):
- for operation in self._postCommitOperations:
- operation()
- def reallyCommit():
- self._connection.commit()
- self._connection.close()
- result = self._holder.submit(reallyCommit).addCallback(postCommit)
- self._holder.stop()
- return result
- else:
- raise AlreadyFinishedError()
+ """
+ Commit the transaction and execute any post-commit hooks.
+ """
+ def postCommit(ignored):
+ for operation in self._postCommitOperations:
+ operation()
+ return ignored
+ return self._sqlTxn.commit().addCallback(postCommit)
- def postCommit(self, operation):
+ def abort(self):
"""
- Run things after C{commit}.
+ Abort the transaction.
"""
- self._postCommitOperations.append(operation)
+ return self._sqlTxn.abort()
@@ -717,13 +583,14 @@
raise NoSuchHomeChildError()
yield child._deletedSyncToken()
- yield self._txn.execSQL(
- "delete from %(name)s where %(column_RESOURCE_ID)s = %%s" % self._childTable,
- [child._resourceID]
- )
- self._children.pop(name, None)
- if self._txn._cursor.rowcount == 0:
- raise NoSuchHomeChildError()
+ try:
+ yield self._txn.execSQL(
+ "delete from %(name)s where %(column_RESOURCE_ID)s = %%s" % self._childTable,
+ [child._resourceID],
+ raiseOnZeroRowCount=NoSuchHomeChildError
+ )
+ finally:
+ self._children.pop(name, None)
child.notifyChanged()
@@ -881,6 +748,7 @@
[self._ownerUID]
))[0][0])
+
@inlineCallbacks
def adjustQuotaUsedBytes(self, delta):
"""
@@ -888,7 +756,6 @@
is done atomically. It is import to do the 'select ... for update' because a race also
exists in the 'update ... x = x + 1' case as seen via unit tests.
"""
-
yield self._txn.execSQL("""
select * from %(name)s
where %(column_RESOURCE_ID)s = %%s
@@ -905,7 +772,6 @@
""" % self._homeTable,
[delta, self._resourceID]
))[0][0]
-
# Double check integrity
if quotaUsedBytes < 0:
log.error("Fixing quota adjusted below zero to %s by change amount %s" % (quotaUsedBytes, delta,))
@@ -916,8 +782,8 @@
""" % self._homeTable,
[self._resourceID]
)
-
+
def notifierID(self, label="default"):
if self._notifier:
return self._notifier.getID(label)
@@ -1137,7 +1003,7 @@
@inlineCallbacks
def removeObjectResourceWithName(self, name):
-
+
uid, old_size = (yield self._txn.execSQL(
"delete from %(name)s "
"where %(column_RESOURCE_NAME)s = %%s and %(column_PARENT_RESOURCE_ID)s = %%s "
Modified: CalendarServer/trunk/txdav/common/datastore/test/util.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/test/util.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/common/datastore/test/util.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -28,14 +28,17 @@
from twext.python.vcomponent import VComponent
from twisted.internet import reactor
-from twisted.internet.defer import Deferred, succeed, inlineCallbacks
+from twisted.internet.defer import Deferred, inlineCallbacks
from twisted.internet.task import deferLater
from twisted.python import log
+from twisted.application.service import Service
from txdav.common.datastore.sql import CommonDataStore, v1_schema
from txdav.base.datastore.subpostgres import PostgresService,\
DiagnosticConnectionWrapper
from txdav.common.icommondatastore import NoSuchHomeChildError
+from txdav.base.datastore.asyncsqlpool import ConnectionPool
+from twisted.internet.defer import returnValue
from twistedcaldav.notify import Notifier
@@ -52,6 +55,8 @@
print connection.label, connection.state
print '--- CONNECTIONS END ---'
+
+
class SQLStoreBuilder(object):
"""
Test-fixture-builder which can construct a PostgresStore.
@@ -67,31 +72,15 @@
@return: a L{Deferred} which fires with an L{IDataStore}.
"""
- currentTestID = testCase.id()
dbRoot = CachingFilePath(self.SHARED_DB_PATH)
+ attachmentRoot = dbRoot.child("attachments")
if self.sharedService is None:
ready = Deferred()
def getReady(connectionFactory):
- attachmentRoot = dbRoot.child("attachments")
- try:
- attachmentRoot.createDirectory()
- except OSError:
- pass
- try:
- self.store = CommonDataStore(
- lambda label=None: connectionFactory(
- label or currentTestID
- ),
- notifierFactory,
- attachmentRoot
- )
- except:
- ready.errback()
- raise
- else:
- self.cleanDatabase(testCase)
- ready.callback(self.store)
- return self.store
+ self.makeAndCleanStore(
+ testCase, notifierFactory, attachmentRoot
+ ).chainDeferred(ready)
+ return Service()
self.sharedService = PostgresService(
dbRoot, getReady, v1_schema, resetSchema=True,
databaseName="caldav",
@@ -106,13 +95,10 @@
"before", "shutdown", startStopping)
result = ready
else:
- self.store.notifierFactory = notifierFactory
- self.cleanDatabase(testCase)
- result = succeed(self.store)
-
+ result = self.makeAndCleanStore(
+ testCase, notifierFactory, attachmentRoot
+ )
def cleanUp():
- # FIXME: clean up any leaked connections and report them with an
- # immediate test failure.
def stopit():
self.sharedService.pauseMonitor()
return deferLater(reactor, 0.1, stopit)
@@ -120,11 +106,42 @@
return result
- def cleanDatabase(self, testCase):
- cleanupConn = self.store.connectionFactory(
+ @inlineCallbacks
+ def makeAndCleanStore(self, testCase, notifierFactory, attachmentRoot):
+ """
+ Create a L{CommonDataStore} specific to the given L{TestCase}.
+
+ This also creates a L{ConnectionPool} that gets stopped when the test
+ finishes, to make sure that any test which fails will terminate
+ cleanly.
+
+ @return: a L{Deferred} that fires with a L{CommonDataStore}
+ """
+ try:
+ attachmentRoot.createDirectory()
+ except OSError:
+ pass
+ cp = ConnectionPool(self.sharedService.produceConnection)
+ store = CommonDataStore(
+ cp.connection, notifierFactory, attachmentRoot
+ )
+ currentTestID = testCase.id()
+ store.label = currentTestID
+ cp.startService()
+ def stopIt():
+ return cp.stopService()
+ testCase.addCleanup(stopIt)
+ yield self.cleanStore(testCase, store)
+ returnValue(store)
+
+
+ @inlineCallbacks
+ def cleanStore(self, testCase, storeToClean):
+ cleanupTxn = storeToClean.sqlTxnFactory(
"%s schema-cleanup" % (testCase.id(),)
)
- cursor = cleanupConn.cursor()
+ # TODO: should be getting these tables from a declaration of the schema
+ # somewhere.
tables = ['INVITE',
'RESOURCE_PROPERTY',
'ATTACHMENT',
@@ -143,16 +160,16 @@
'NOTIFICATION_HOME']
for table in tables:
try:
- cursor.execute("delete from "+table)
+ yield cleanupTxn.execSQL("delete from "+table, [])
except:
log.err()
- cleanupConn.commit()
- cleanupConn.close()
+ yield cleanupTxn.commit()
theStoreBuilder = SQLStoreBuilder()
buildStore = theStoreBuilder.buildStore
+
@inlineCallbacks
def populateCalendarsFrom(requirements, store):
"""
@@ -215,6 +232,7 @@
lastTransaction = None
savedStore = None
assertProvides = assertProvides
+ lastCommitSetUp = False
def transactionUnderTest(self):
"""
@@ -222,12 +240,17 @@
C[lastTransaction}. Also makes sure to use the same store, saving the
value from C{storeUnderTest}.
"""
+ if not self.lastCommitSetUp:
+ self.lastCommitSetUp = True
+ self.addCleanup(self.commitLast)
if self.lastTransaction is not None:
return self.lastTransaction
if self.savedStore is None:
self.savedStore = self.storeUnderTest()
self.counter += 1
- txn = self.lastTransaction = self.savedStore.newTransaction(self.id() + " #" + str(self.counter))
+ txn = self.lastTransaction = self.savedStore.newTransaction(
+ self.id() + " #" + str(self.counter)
+ )
return txn
@@ -250,11 +273,12 @@
self.lastTransaction = None
return result
+
def setUp(self):
self.counter = 0
self.notifierFactory = StubNotifierFactory()
- def tearDown(self):
+ def commitLast(self):
if self.lastTransaction is not None:
return self.commit()
Modified: CalendarServer/trunk/txdav/idav.py
===================================================================
--- CalendarServer/trunk/txdav/idav.py 2010-11-01 21:32:28 UTC (rev 6550)
+++ CalendarServer/trunk/txdav/idav.py 2010-11-01 22:38:48 UTC (rev 6551)
@@ -102,7 +102,7 @@
def newTransaction(label=None):
"""
Create a new transaction.
-
+
@param label: A label to assign to this transaction for diagnostic
purposes.
@type label: C{str}
@@ -172,6 +172,53 @@
+class IAsyncTransaction(Interface):
+ """
+ Asynchronous execution of SQL.
+
+ Note that there is no {begin()} method; if an L{IAsyncTransaction} exists,
+ it is assumed to have been started.
+ """
+
+ def execSQL(sql, args=(), raiseOnZeroRowCount=None):
+ """
+ Execute some SQL.
+
+ @param sql: an SQL string.
+
+ @type sql: C{str}
+
+ @param args: C{list} of arguments to interpolate into C{sql}.
+
+ @param raiseOnZeroRowCount: a 0-argument callable which returns an
+ exception to raise if the executed SQL does not affect any rows.
+
+ @return: L{Deferred} which fires C{list} of C{tuple}
+
+ @raise: C{raiseOnZeroRowCount} if it was specified and no rows were
+ affected.
+ """
+
+
+ def commit():
+ """
+ Commit changes caused by this transaction.
+
+ @return: L{Deferred} which fires with C{None} upon successful
+ completion of this transaction.
+ """
+
+
+ def abort():
+ """
+ Roll back changes caused by this transaction.
+
+ @return: L{Deferred} which fires with C{None} upon successful
+ rollback of this transaction.
+ """
+
+
+
class ITransaction(Interface):
"""
Transaction that can be aborted and either succeeds or fails in
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20101101/c0fd0ca8/attachment-0001.html>
More information about the calendarserver-changes
mailing list