[CalendarServer-changes] [9689] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 02:01:06 PDT 2012
Revision: 9689
http://trac.macosforge.org/projects/calendarserver/changeset/9689
Author: glyph at apple.com
Date: 2012-08-11 02:01:06 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
Persistent, deferred, queued task execution.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/accesslog.py
CalendarServer/trunk/calendarserver/tap/caldav.py
CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
CalendarServer/trunk/twext/enterprise/adbapi2.py
CalendarServer/trunk/twext/enterprise/dal/syntax.py
CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
CalendarServer/trunk/twext/enterprise/ienterprise.py
CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
CalendarServer/trunk/twext/internet/threadutils.py
CalendarServer/trunk/txdav/base/datastore/file.py
CalendarServer/trunk/txdav/base/datastore/util.py
CalendarServer/trunk/txdav/common/datastore/sql.py
CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql
CalendarServer/trunk/txdav/idav.py
Added Paths:
-----------
CalendarServer/trunk/calendarserver/controlsocket.py
CalendarServer/trunk/twext/enterprise/dal/record.py
CalendarServer/trunk/twext/enterprise/dal/test/test_record.py
CalendarServer/trunk/twext/enterprise/queue.py
CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_11_to_12.sql
CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_11_to_12.sql
Property Changed:
----------------
CalendarServer/trunk/
Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
+ /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
Modified: CalendarServer/trunk/calendarserver/accesslog.py
===================================================================
--- CalendarServer/trunk/calendarserver/accesslog.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/calendarserver/accesslog.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -378,34 +378,25 @@
arguments = []
class AMPCommonAccessLoggingObserver(CommonAccessLoggingObserverExtensions):
- def __init__(self, mode, id):
- self.mode = mode
- self.id = id
+ def __init__(self):
self.protocol = None
self._buffer = []
+
def flushBuffer(self):
if self._buffer:
for msg in self._buffer:
self.logMessage(msg)
- def start(self):
- super(AMPCommonAccessLoggingObserver, self).start()
- from twisted.internet import reactor
+ def addClient(self, connectedClient):
+ """
+ An AMP client connected; hook it up to this observer.
+ """
+ self.protocol = connectedClient
+ self.flushBuffer()
- def _gotProtocol(proto):
- self.protocol = proto
- self.flushBuffer()
- self.client = protocol.ClientCreator(reactor, amp.AMP)
- if self.mode == "AF_UNIX":
- d = self.client.connectUNIX(self.id)
- else:
- d = self.client.connectTCP("localhost", self.id)
- d.addCallback(_gotProtocol)
-
-
def logMessage(self, message):
"""
Log a message to the remote AMP Protocol
@@ -420,6 +411,7 @@
else:
self._buffer.append(message)
+
def logGlobalHit(self):
"""
Log a server hit via the remote AMP Protocol
@@ -429,8 +421,10 @@
d = self.protocol.callRemote(LogGlobalHit)
d.addErrback(log.err)
else:
- log.msg("logGlobalHit() only works with an AMP Protocol")
+ log.msg("logGlobalHit() only works with an AMP Protocol")
+
+
class AMPLoggingProtocol(amp.AMP):
"""
A server side protocol for logging to the given observer.
@@ -453,15 +447,23 @@
LogGlobalHit.responder(logGlobalHit)
+
+
class AMPLoggingFactory(protocol.ServerFactory):
def __init__(self, observer):
self.observer = observer
+
def doStart(self):
self.observer.start()
+
def doStop(self):
self.observer.stop()
+
def buildProtocol(self, addr):
return AMPLoggingProtocol(self.observer)
+
+
+
Copied: CalendarServer/trunk/calendarserver/controlsocket.py (from rev 9688, CalendarServer/branches/users/glyph/q/calendarserver/controlsocket.py)
===================================================================
--- CalendarServer/trunk/calendarserver/controlsocket.py (rev 0)
+++ CalendarServer/trunk/calendarserver/controlsocket.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,128 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Multiplexing control socket. Currently used for messages related to queueing
+and logging, but extensible to more.
+"""
+
+from zope.interface import implements
+
+from twisted.internet.protocol import Factory
+from twisted.protocols.amp import BinaryBoxProtocol, IBoxReceiver, IBoxSender
+from twisted.application.service import Service
+
+class DispatchingSender(object):
+ implements(IBoxSender)
+
+ def __init__(self, sender, route):
+ self.sender = sender
+ self.route = route
+
+
+ def sendBox(self, box):
+ box['_route'] = self.route
+ self.sender.sendBox(box)
+
+
+ def unhandledError(self, failure):
+ self.sender.unhandledError(failure)
+
+
+
+class DispatchingBoxReceiver(object):
+ implements(IBoxReceiver)
+
+ def __init__(self, receiverMap):
+ self.receiverMap = receiverMap
+
+
+ def startReceivingBoxes(self, boxSender):
+ for key, receiver in self.receiverMap.items():
+ receiver.startReceivingBoxes(DispatchingSender(boxSender, key))
+
+
+ def ampBoxReceived(self, box):
+ self.receiverMap[box['_route']].ampBoxReceived(box)
+
+
+ def stopReceivingBoxes(self, reason):
+ for receiver in self.receiverMap.values():
+ receiver.stopReceivingBoxes(reason)
+
+
+
+class ControlSocket(Factory, object):
+ """
+ An AMP control socket that aggregates other AMP factories. This is the
+ service that listens in the master process.
+ """
+
+ def __init__(self):
+ """
+ Initialize this L{ControlSocket}.
+ """
+ self._factoryMap = {}
+
+
+ def addFactory(self, key, otherFactory):
+ """
+ Add another L{Factory} - one that returns L{AMP} instances - to this
+ socket.
+ """
+ self._factoryMap[key] = otherFactory
+
+
+ def buildProtocol(self, addr):
+ """
+ Build a thing that will multiplex AMP to all the relevant sockets.
+ """
+ receiverMap = {}
+ for k, f in self._factoryMap.items():
+ receiverMap[k] = f.buildProtocol(addr)
+ return BinaryBoxProtocol(DispatchingBoxReceiver(receiverMap))
+
+
+ def doStart(self):
+ """
+ Relay start notification to all added factories.
+ """
+ for f in self._factoryMap.values():
+ f.doStart()
+
+
+ def doStop(self):
+ """
+ Relay stop notification to all added factories.
+ """
+ for f in self._factoryMap.values():
+ f.doStop()
+
+
+
+class ControlSocketConnectingService(Service, object):
+
+ def __init__(self, endpointFactory, controlSocket):
+ super(ControlSocketConnectingService, self).__init__()
+ self.endpointFactory = endpointFactory
+ self.controlSocket = controlSocket
+
+
+ def privilegedStartService(self):
+ from twisted.internet import reactor
+ endpoint = self.endpointFactory(reactor)
+ endpoint.connect(self.controlSocket)
+
Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -61,6 +61,7 @@
from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
+from txdav.common.datastore.sql_tables import schema
from txdav.common.datastore.upgrade.sql.upgrade import (
UpgradeDatabaseSchemaService, UpgradeDatabaseDataService,
)
@@ -88,6 +89,13 @@
from calendarserver.tap.util import ConnectionDispenser
+from calendarserver.controlsocket import ControlSocket
+from twisted.internet.endpoints import UNIXClientEndpoint, TCP4ClientEndpoint
+
+from calendarserver.controlsocket import ControlSocketConnectingService
+from twisted.protocols.amp import AMP
+from twext.enterprise.queue import WorkerFactory as QueueWorkerFactory
+from twext.enterprise.queue import PeerConnectionPool
from calendarserver.accesslog import AMPCommonAccessLoggingObserver
from calendarserver.accesslog import AMPLoggingFactory
from calendarserver.accesslog import RotatingFileAccessLoggingObserver
@@ -117,6 +125,14 @@
from twisted.python.util import uidFromString, gidFromString
+
+# Control socket message-routing constants.
+_LOG_ROUTE = "log"
+_QUEUE_ROUTE = "queue"
+
+_CONTROL_SERVICE_NAME = "control"
+
+
def getid(uid, gid):
if uid is not None:
uid = uidFromString(uid)
@@ -762,22 +778,43 @@
#
self.log_info("Setting up service")
+ bonusServices = []
+
if config.ProcessType == "Slave":
+ logObserver = AMPCommonAccessLoggingObserver()
+
if config.ControlSocket:
- mode = "AF_UNIX"
id = config.ControlSocket
- self.log_info("Logging via AF_UNIX: %s" % (id,))
+ self.log_info("Control via AF_UNIX: %s" % (id,))
+ endpointFactory = lambda reactor: UNIXClientEndpoint(
+ reactor, id)
else:
- mode = "AF_INET"
id = int(config.ControlPort)
- self.log_info("Logging via AF_INET: %d" % (id,))
-
- logObserver = AMPCommonAccessLoggingObserver(mode, id)
-
+ self.log_info("Control via AF_INET: %d" % (id,))
+ endpointFactory = lambda reactor: TCP4ClientEndpoint(
+ reactor, "127.0.0.1", id)
+ controlSocketClient = ControlSocket()
+ class LogClient(AMP):
+ def startReceivingBoxes(self, sender):
+ super(LogClient, self).startReceivingBoxes(sender)
+ logObserver.addClient(self)
+ f = Factory()
+ f.protocol = LogClient
+ controlSocketClient.addFactory(_LOG_ROUTE, f)
+ from txdav.common.datastore.sql import CommonDataStore as SQLStore
+ if isinstance(store, SQLStore):
+ def queueMasterAvailable(connectionFromMaster):
+ store.queuer = connectionFromMaster
+ queueFactory = QueueWorkerFactory(store.newTransaction, schema,
+ queueMasterAvailable)
+ controlSocketClient.addFactory(_QUEUE_ROUTE, queueFactory)
+ controlClient = ControlSocketConnectingService(
+ endpointFactory, controlSocketClient
+ )
+ bonusServices.append(controlClient)
elif config.ProcessType == "Single":
# Make sure no old socket files are lying around.
self.deleteStaleSocketFiles()
-
logObserver = RotatingFileAccessLoggingObserver(
config.AccessLogFile,
)
@@ -785,18 +822,20 @@
self.log_info("Configuring access log observer: %s" % (logObserver,))
service = CalDAVService(logObserver)
+ for bonus in bonusServices:
+ bonus.setServiceParent(service)
rootResource = getRootResource(config, store, additional)
service.rootResource = rootResource
underlyingSite = Site(rootResource)
-
- # Need to cache SSL port info here so we can access it in a Request to deal with the
- # possibility of being behind an SSL decoder
+
+ # Need to cache SSL port info here so we can access it in a Request to
+ # deal with the possibility of being behind an SSL decoder
underlyingSite.EnableSSL = config.EnableSSL
underlyingSite.SSLPort = config.SSLPort
underlyingSite.BindSSLPorts = config.BindSSLPorts
-
+
requestFactory = underlyingSite
if config.RedirectHTTPToHTTPS:
@@ -1102,7 +1141,8 @@
try:
gid = getgrnam(config.GroupName).gr_gid
except KeyError:
- raise ConfigurationError("Invalid group name: %s" % (config.GroupName,))
+ raise ConfigurationError("Invalid group name: %s" %
+ (config.GroupName,))
else:
gid = os.getgid()
@@ -1110,20 +1150,24 @@
try:
uid = getpwnam(config.UserName).pw_uid
except KeyError:
- raise ConfigurationError("Invalid user name: %s" % (config.UserName,))
+ raise ConfigurationError("Invalid user name: %s" %
+ (config.UserName,))
else:
uid = os.getuid()
+
+ controlSocket = ControlSocket()
+ controlSocket.addFactory(_LOG_ROUTE, logger)
if config.ControlSocket:
- loggingService = GroupOwnedUNIXServer(
- gid, config.ControlSocket, logger, mode=0660
+ controlSocketService = GroupOwnedUNIXServer(
+ gid, config.ControlSocket, controlSocket, mode=0660
)
else:
- loggingService = ControlPortTCPServer(
- config.ControlPort, logger, interface="127.0.0.1"
+ controlSocketService = ControlPortTCPServer(
+ config.ControlPort, controlSocket, interface="127.0.0.1"
)
- loggingService.setName("logging")
- loggingService.setServiceParent(s)
+ controlSocketService.setName(_CONTROL_SERVICE_NAME)
+ controlSocketService.setServiceParent(s)
monitor = DelayedStartupProcessMonitor()
s.processMonitor = monitor
@@ -1244,12 +1288,22 @@
# filesystem to the database (if that's necessary, and there is
# filesystem data in need of upgrading).
def spawnerSvcCreator(pool, store):
+ from twisted.internet import reactor
+ pool = PeerConnectionPool(reactor, store.newTransaction,
+ 7654, schema)
+ controlSocket.addFactory(_QUEUE_ROUTE,
+ pool.workerListenerFactory())
+ # TODO: now that we have the shared control socket, we should get
+ # rid of the connection dispenser and make a shared / async
+ # connection pool implementation that can dispense transactions
+ # synchronously as the interface requires.
if pool is not None and config.SharedConnectionPool:
self.log_warn("Using Shared Connection Pool")
dispenser = ConnectionDispenser(pool)
else:
dispenser = None
multi = MultiService()
+ pool.setServiceParent(multi)
spawner = SlaveSpawnerService(
self, monitor, dispenser, dispatcher, options["config"],
inheritFDs=inheritFDs, inheritSSLFDs=inheritSSLFDs
Modified: CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/test/test_caldav.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/calendarserver/tap/test/test_caldav.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -57,7 +57,8 @@
from calendarserver.tap.caldav import (
CalDAVOptions, CalDAVServiceMaker, CalDAVService, GroupOwnedUNIXServer,
- DelayedStartupProcessMonitor, DelayedStartupLineLogger, TwistdSlaveProcess
+ DelayedStartupProcessMonitor, DelayedStartupLineLogger, TwistdSlaveProcess,
+ _CONTROL_SERVICE_NAME
)
from calendarserver.provision.root import RootResource
from StringIO import StringIO
@@ -460,7 +461,7 @@
self.config["ProcessType"] = "Combined"
self.writeConfig()
svc = self.makeService()
- for serviceName in ["logging"]:
+ for serviceName in [_CONTROL_SERVICE_NAME]:
socketService = svc.getServiceNamed(serviceName)
self.assertIsInstance(socketService, GroupOwnedUNIXServer)
m = socketService.kwargs.get("mode", 0666)
Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -484,7 +484,53 @@
-class _SingleTxn(proxyForInterface(iface=IAsyncTransaction,
+class _HookableOperation(object):
+
+ def __init__(self):
+ self._hooks = []
+
+
+ @inlineCallbacks
+ def runHooks(self, ignored):
+ """
+ Callback for C{commit} and C{abort} Deferreds.
+ """
+ for operation in self._hooks:
+ yield operation()
+ returnValue(ignored)
+
+
+ def addHook(self, operation):
+ """
+ Implement L{IAsyncTransaction.postCommit}.
+ """
+ self._hooks.append(operation)
+
+
+
+class _CommitAndAbortHooks(object):
+ """
+ Shared implementation of post-commit and post-abort hooks.
+ """
+ # FIXME: this functionality needs direct tests, although it's pretty well-
+ # covered by txdav's test suite.
+
+ def __init__(self):
+ self._commit = _HookableOperation()
+ self._abort = _HookableOperation()
+
+
+ def postCommit(self, operation):
+ return self._commit.addHook(operation)
+
+
+ def postAbort(self, operation):
+ return self._abort.addHook(operation)
+
+
+
+class _SingleTxn(_CommitAndAbortHooks,
+ proxyForInterface(iface=IAsyncTransaction,
originalAttribute='_baseTxn')):
"""
A L{_SingleTxn} is a single-use wrapper for the longer-lived
@@ -505,6 +551,7 @@
"""
def __init__(self, pool, baseTxn):
+ super(_SingleTxn, self).__init__()
self._pool = pool
self._baseTxn = baseTxn
self._completed = False
@@ -601,9 +648,9 @@
# We're in the process of executing a block of commands. Wait until
# they're done. (Commit will be repeated in _checkNextBlock.)
return self._blockedQueue.commit()
-
self._markComplete()
- return super(_SingleTxn, self).commit()
+ return (super(_SingleTxn, self).commit()
+ .addCallback(self._commit.runHooks))
def abort(self):
@@ -611,6 +658,7 @@
result = super(_SingleTxn, self).abort()
if self in self._pool._waiting:
self._stopWaiting()
+ result.addCallback(self._abort.runHooks)
return result
@@ -750,15 +798,40 @@
class _ConnectingPseudoTxn(object):
+ """
+ This is a pseudo-Transaction for bookkeeping purposes.
+ When a connection has asked to connect, but has not yet completed
+ connecting, the L{ConnectionPool} still needs a way to shut it down. This
+ object provides that tracking handle, and will be present in the pool's
+ C{busy} list while it is populating the list.
+ """
+
_retry = None
def __init__(self, pool, holder):
- self._pool = pool
- self._holder = holder
+ """
+ Initialize the L{_ConnectingPseudoTxn}; get ready to connect.
+ @param pool: The pool that this connection attempt is participating in.
+ @type pool: L{ConnectionPool}
+ @param holder: the L{ThreadHolder} allocated to this connection attempt
+ and subsequent SQL executions for this connection.
+ @type holder: L{ThreadHolder}
+ """
+ self._pool = pool
+ self._holder = holder
+ self._aborted = False
+
+
def abort(self):
+ """
+ Ignore the result of attempting to connect to this database, and
+ instead simply close the connection and free the L{ThreadHolder}
+ allocated for it.
+ """
+ self._aborted = True
if self._retry is not None:
self._retry.cancel()
d = self._holder.stop()
@@ -951,6 +1024,8 @@
cursor = connection.cursor()
return (connection, cursor)
def finishInit((connection, cursor)):
+ if txn._aborted:
+ return
baseTxn = _ConnectedTxn(
pool=self,
threadHolder=holder,
@@ -1187,8 +1262,8 @@
Initialize a mapping of transaction IDs to transaction objects.
"""
super(ConnectionPoolConnection, self).__init__()
- self.pool = pool
- self._txns = {}
+ self.pool = pool
+ self._txns = {}
self._blocks = {}
@@ -1405,7 +1480,7 @@
-class _NetTransaction(object):
+class _NetTransaction(_CommitAndAbortHooks):
"""
A L{_NetTransaction} is an L{AMP}-protocol-based provider of the
L{IAsyncTransaction} interface. It sends SQL statements, query results, and
@@ -1419,6 +1494,7 @@
Initialize a transaction with a L{ConnectionPoolClient} and a unique
transaction identifier.
"""
+ super(_NetTransaction, self).__init__()
self._client = client
self._transactionID = transactionID
self._completed = False
@@ -1476,11 +1552,12 @@
def done(whatever):
self._committed = True
return whatever
- return self._complete(Commit).addBoth(done)
+ return (self._complete(Commit).addBoth(done)
+ .addCallback(self._commit.runHooks))
def abort(self):
- return self._complete(Abort)
+ return self._complete(Abort).addCallback(self._abort.runHooks)
def commandBlock(self):
@@ -1523,7 +1600,6 @@
return self._transaction.dialect
-
def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
"""
Execute some SQL on this command block.
Copied: CalendarServer/trunk/twext/enterprise/dal/record.py (from rev 9688, CalendarServer/branches/users/glyph/q/twext/enterprise/dal/record.py)
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/record.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/record.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,328 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_record -*-
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+RECORD: Relational Entity Creation from Objects Representing Data.
+
+This is an asynchronous object-relational mapper based on
+L{twext.enterprise.dal.syntax}.
+"""
+
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twext.enterprise.dal.syntax import (
+ Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete
+)
+# from twext.enterprise.dal.syntax import ExpressionSyntax
+
+class ReadOnly(AttributeError):
+ """
+ A caller attempted to set an attribute on a database-backed record, rather
+ than updating it through L{Record.update}.
+ """
+
+ def __init__(self, className, attributeName):
+ self.className = className
+ self.attributeName = attributeName
+ super(ReadOnly, self).__init__("SQL-backed attribute '{0}.{1}' is "
+ "read-only. Use '.update(...)' to "
+ "modify attributes."
+ .format(className, attributeName))
+
+
+
+class NoSuchRecord(Exception):
+ """
+ No matching record could be found.
+ """
+
+
+class _RecordMeta(type):
+ """
+ Metaclass for associating a L{fromTable} with a L{Record} at inheritance
+ time.
+ """
+
+ def __new__(cls, name, bases, ns):
+ """
+ Create a new instance of this meta-type.
+ """
+ newbases = []
+ table = None
+ namer = None
+ for base in bases:
+ if isinstance(base, fromTable):
+ if table is not None:
+ raise RuntimeError(
+ "Can't define a class from two or more tables at once."
+ )
+ table = base.table
+ elif getattr(base, "table", None) is not None:
+ raise RuntimeError(
+ "Can't define a record class by inheriting one already "
+ "mapped to a table."
+ # TODO: more info
+ )
+ else:
+ if namer is None:
+ if isinstance(base, _RecordMeta):
+ namer = base
+ newbases.append(base)
+ if table is not None:
+ attrmap = {}
+ colmap = {}
+ allColumns = list(table)
+ for column in allColumns:
+ attrname = namer.namingConvention(column.model.name)
+ attrmap[attrname] = column
+ colmap[column] = attrname
+ ns.update(table=table, __attrmap__=attrmap, __colmap__=colmap)
+ ns.update(attrmap)
+ return super(_RecordMeta, cls).__new__(cls, name, tuple(newbases), ns)
+
+
+
+class fromTable(object):
+ """
+ Inherit from this after L{Record} to specify which table your L{Record}
+ subclass is mapped to.
+ """
+
+ def __init__(self, aTable):
+ """
+ @param table: The table to map to.
+ @type table: L{twext.enterprise.dal.syntax.TableSyntax}
+ """
+ self.table = aTable
+
+
+
+class Record(object):
+ """
+ Superclass for all database-backed record classes. (i.e. an object mapped
+ from a database record).
+
+ @cvar table: the table that represents this L{Record} in the database.
+ @type table: L{TableSyntax}
+
+ @ivar transaction: The L{IAsyncTransaction} where this record is being
+ loaded. This may be C{None} if this L{Record} is not participating in
+ a transaction, which may be true if it was instantiated but never
+ saved.
+
+ @cvar __colmap__: map of L{ColumnSyntax} objects to attribute names.
+ @type __colmap__: L{dict}
+
+ @cvar __attrmap__: map of attribute names to L{ColumnSyntax} objects.
+ @type __attrmap__: L{dict}
+ """
+
+ __metaclass__ = _RecordMeta
+
+ transaction = None
+ def __setattr__(self, name, value):
+ """
+ Once the transaction is initialized, this object is immutable. If you
+ want to change it, use L{Record.update}.
+ """
+ if self.transaction is not None:
+ raise ReadOnly(self.__class__.__name__, name)
+ return super(Record, self).__setattr__(name, value)
+
+
+ @staticmethod
+ def namingConvention(columnName):
+ """
+ Implement the convention for naming-conversion between column names
+ (typically, upper-case database names map to lower-case attribute
+ names).
+ """
+ words = columnName.lower().split("_")
+ def cap(word):
+ if word.lower() == 'id':
+ return word.upper()
+ else:
+ return word.capitalize()
+ return words[0] + "".join(map(cap, words[1:]))
+
+
+ @classmethod
+ def _primaryKeyExpression(cls):
+ return Tuple([ColumnSyntax(c) for c in cls.table.model.primaryKey])
+
+
+ def _primaryKeyValue(self):
+ val = []
+ for col in self._primaryKeyExpression().columns:
+ val.append(getattr(self, self.__class__.__colmap__[col]))
+ return val
+
+
+ @classmethod
+ def _primaryKeyComparison(cls, primaryKey):
+ return (cls._primaryKeyExpression() ==
+ Tuple(map(Constant, primaryKey)))
+
+
+ @classmethod
+ @inlineCallbacks
+ def load(cls, transaction, *primaryKey):
+ self = (yield cls.query(transaction,
+ cls._primaryKeyComparison(primaryKey)))[0]
+ returnValue(self)
+
+
+ @classmethod
+ @inlineCallbacks
+ def create(cls, transaction, **k):
+ """
+ Create a row.
+
+ Used like this::
+
+ MyRecord.create(transaction, column1=1, column2=u'two')
+ """
+ self = cls()
+ colmap = {}
+ attrtocol = cls.__attrmap__
+ for attr in k:
+ setattr(self, attr, k[attr])
+ # FIXME: better error reporting
+ colmap[attrtocol[attr]] = k[attr]
+ yield Insert(colmap).on(transaction)
+ self.transaction = transaction
+ returnValue(self)
+
+
+ def delete(self):
+ """
+ Delete this row from the database.
+
+ @return: a L{Deferred} which fires when the underlying row has been
+ deleted.
+ """
+ return Delete(From=self.table,
+ Where=self._primaryKeyComparison(self._primaryKeyValue())
+ ).on(self.transaction)
+
+
+ @inlineCallbacks
+ def update(self, **kw):
+ """
+ Modify the given attributes in the database.
+
+ @return: a L{Deferred} that fires when the updates have been sent to
+ the database.
+ """
+ colmap = {}
+ for k, v in kw.iteritems():
+ colmap[self.__attrmap__[k]] = v
+ yield (Update(colmap,
+ Where=self._primaryKeyComparison(self._primaryKeyValue()))
+ .on(self.transaction))
+ self.__dict__.update(kw)
+
+
+ @classmethod
+ def pop(cls, transaction, *primaryKey):
+ """
+ Atomically retrieve and remove a row from this L{Record}'s table
+ with a primary key value of C{primaryKey}.
+
+ @return: a L{Deferred} that fires with an instance of C{cls}, or fails
+ with L{NoSuchRecord} if there were no records in the database.
+ @rtype: L{Deferred}
+ """
+ return cls._rowsFromQuery(
+ transaction, Delete(Where=cls._primaryKeyComparison(primaryKey),
+ From=cls.table, Return=list(cls.table)),
+ lambda : NoSuchRecord()
+ ).addCallback(lambda x: x[0])
+
+
+ @classmethod
+ def query(cls, transaction, expr, order=None, ascending=True):
+ """
+ Query the table that corresponds to C{cls}, and return instances of
+ C{cls} corresponding to the rows that are returned from that table.
+
+ @param expr: An L{ExpressionSyntax} that constraints the results of the
+ query. This is most easily produced by accessing attributes on the
+ class; for example, C{MyRecordType.query((MyRecordType.col1 >
+ MyRecordType.col2).And(MyRecordType.col3 == 7))}
+
+ @param order: A L{ColumnSyntax} to order the resulting record objects
+ by.
+
+ @param ascending: A boolean; if C{order} is not C{None}, whether to
+ sort in ascending or descending order.
+ """
+ kw = {}
+ if order is not None:
+ kw.update(OrderBy=order, Ascending=ascending)
+ return cls._rowsFromQuery(transaction, Select(list(cls.table),
+ From=cls.table,
+ Where=expr, **kw), None)
+
+
+ @classmethod
+ def all(cls, transaction):
+ """
+ Load all rows from the table that corresponds to C{cls} and return
+ instances of C{cls} corresponding to all.
+ """
+ return cls._rowsFromQuery(transaction,
+ Select(list(cls.table),
+ From=cls.table,
+ OrderBy=cls._primaryKeyExpression()),
+ None)
+
+
+ @classmethod
+ @inlineCallbacks
+ def _rowsFromQuery(cls, transaction, qry, rozrc):
+ """
+ Execute the given query, and transform its results into rows.
+
+ @param transaction: an L{IAsyncTransaction} to execute the query on.
+
+ @param qry: a L{_DMLStatement} (XXX: maybe _DMLStatement or some
+ interface that defines 'on' should be public?) whose results are
+ the list of columns in C{self.table}.
+
+ @param rozrc: The C{raiseOnZeroRowCount} argument.
+
+ @return: a L{Deferred} that succeeds with a C{list} or fails with an
+ exception produced by C{rozrc}.
+ """
+ rows = yield qry.on(transaction, raiseOnZeroRowCount=rozrc)
+ selves = []
+ for row in rows:
+ self = cls()
+ for (column, value) in zip(list(cls.table), row):
+ name = cls.__colmap__[column]
+ setattr(self, name, value)
+ self.transaction = transaction
+ selves.append(self)
+ returnValue(selves)
+
+
+
+__all__ = [
+ "ReadOnly",
+ "fromTable",
+ "NoSuchRecord",
+]
Modified: CalendarServer/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/syntax.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/enterprise/dal/syntax.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -27,11 +27,14 @@
from twisted.internet.defer import succeed
-from twext.enterprise.dal.model import Schema, Table, Column, Sequence
-from twext.enterprise.ienterprise import POSTGRES_DIALECT, ORACLE_DIALECT
-from twext.enterprise.ienterprise import IDerivedParameter
+from twext.enterprise.dal.model import Schema, Table, Column, Sequence, SQLType
+from twext.enterprise.ienterprise import (
+ POSTGRES_DIALECT, ORACLE_DIALECT, SQLITE_DIALECT, IDerivedParameter
+)
from twext.enterprise.util import mapOracleOutputType
+from twisted.internet.defer import inlineCallbacks, returnValue
+
try:
import cx_Oracle
cx_Oracle
@@ -192,14 +195,13 @@
@param txn: the L{IAsyncTransaction} to execute this on.
- @param raiseOnZeroRowCount: the exception to raise if no data was
- affected or returned by this query.
+ @param raiseOnZeroRowCount: a 0-argument callable which returns an
+ exception to raise if the executed SQL does not affect any rows.
@param kw: keyword arguments, mapping names of L{Parameter} objects
located somewhere in C{self}
@return: results from the database.
-
@rtype: a L{Deferred} firing a C{list} of records (C{tuple}s or
C{list}s)
"""
@@ -445,6 +447,7 @@
Len = Function("character_length", "length")
Upper = Function("upper")
Lower = Function("lower")
+_sqliteLastInsertRowID = Function("last_insert_rowid")
# Use a specific value here for "the convention for case-insensitive values in
# the database" so we don't need to keep remembering whether it's upper or
@@ -602,7 +605,7 @@
def __contains__(self, columnSyntax):
if isinstance(columnSyntax, FunctionInvocation):
columnSyntax = columnSyntax.arg
- return (columnSyntax.model in self.model.columns)
+ return (columnSyntax.model.table is self.model)
@@ -945,19 +948,39 @@
-def _columnsMatchTables(columns, tables):
+def _checkColumnsMatchTables(columns, tables):
+ """
+ Verify that the given C{columns} match the given C{tables}; that is, that
+ every L{TableSyntax} referenced by every L{ColumnSyntax} referenced by
+ every L{ExpressionSyntax} in the given C{columns} list is present in the
+ given C{tables} list.
+
+ @param columns: a L{list} of L{ExpressionSyntax}, each of which references
+ some set of L{ColumnSyntax}es via its C{allColumns} method.
+
+ @param tables: a L{list} of L{TableSyntax}
+
+ @return: L{None}
+ @rtype: L{NoneType}
+
+ @raise TableMismatch: if any table referenced by a column is I{not} found
+ in C{tables}
+ """
for expression in columns:
for column in expression.allColumns():
for table in tables:
if column in table:
break
else:
- return False
- return True
+ raise TableMismatch("{} not found in {}".format(
+ column, tables
+ ))
+ return None
-class Tuple(object):
+class Tuple(ExpressionSyntax):
+
def __init__(self, columns):
self.columns = columns
@@ -1064,8 +1087,7 @@
if columns is None:
columns = ALL_COLUMNS
else:
- if not _columnsMatchTables(columns, From.tables()):
- raise TableMismatch()
+ _checkColumnsMatchTables(columns, From.tables())
columns = _SomeColumns(columns)
self.columns = columns
@@ -1263,12 +1285,11 @@
def _returningClause(self, queryGenerator, stmt, allTables):
"""
- Add a dialect-appropriate 'returning' clause to the end of the given SQL
- statement.
+ Add a dialect-appropriate 'returning' clause to the end of the given
+ SQL statement.
- @param queryGenerator: describes the database we are generating the statement
- for.
-
+ @param queryGenerator: describes the database we are generating the
+ statement for.
@type queryGenerator: L{QueryGenerator}
@param stmt: the SQL fragment generated without the 'returning' clause
@@ -1280,9 +1301,14 @@
@return: the C{stmt} parameter.
"""
retclause = self.Return
+ if retclause is None:
+ return stmt
if isinstance(retclause, (tuple, list)):
retclause = _CommaList(retclause)
- if retclause is not None:
+ if queryGenerator.dialect == SQLITE_DIALECT:
+ # sqlite does this another way.
+ return stmt
+ elif retclause is not None:
stmt.text += ' returning '
stmt.append(retclause.subSQL(queryGenerator, allTables))
if queryGenerator.dialect == ORACLE_DIALECT:
@@ -1410,7 +1436,27 @@
return self._returningClause(queryGenerator, stmt, allTables)
+ def on(self, txn, *a, **kw):
+ """
+ Override to provide extra logic for L{Insert}s that return values on
+ databases that don't provide return values as part of their C{INSERT}
+ behavior.
+ """
+ result = super(_DMLStatement, self).on(txn, *a, **kw)
+ if self.Return is not None and txn.dialect == SQLITE_DIALECT:
+ table = self._returnAsList()[0].model.table
+ return Select(self._returnAsList(),
+ # TODO: error reporting when 'return' includes columns
+ # foreign to the primary table.
+ From=TableSyntax(table),
+ Where=ColumnSyntax(Column(table, "rowid",
+ SQLType("integer", None))) ==
+ _sqliteLastInsertRowID()
+ ).on(txn, *a, **kw)
+ return result
+
+
def _convert(x):
"""
Convert a value to an appropriate SQL AST node. (Currently a simple
@@ -1426,6 +1472,12 @@
class Update(_DMLStatement):
"""
'update' statement
+
+ @ivar columnMap: A L{dict} mapping L{ColumnSyntax} objects to values to
+ change; values may be simple database values (such as L{str},
+ L{unicode}, L{datetime.datetime}, L{float}, L{int} etc) or L{Parameter}
+ instances.
+ @type columnMap: L{dict}
"""
def __init__(self, columnMap, Where, Return=None):
@@ -1436,6 +1488,37 @@
self.Return = Return
+ @inlineCallbacks
+ def on(self, txn, *a, **kw):
+ """
+ Override to provide extra logic for L{Update}s that return values on
+ databases that don't provide return values as part of their C{UPDATE}
+ behavior.
+ """
+ doExtra = self.Return is not None and txn.dialect == SQLITE_DIALECT
+ upcall = lambda: super(_DMLStatement, self).on(txn, *a, **kw)
+
+ if doExtra:
+ table = self._returnAsList()[0].model.table
+ rowidcol = ColumnSyntax(Column(table, "rowid",
+ SQLType("integer", None)))
+ prequery = Select([rowidcol], From=TableSyntax(table),
+ Where=self.Where)
+ preresult = prequery.on(txn, *a, **kw)
+ before = yield preresult
+ yield upcall()
+ result = (yield Select(self._returnAsList(),
+ # TODO: error reporting when 'return' includes
+ # columns foreign to the primary table.
+ From=TableSyntax(table),
+ Where=reduce(lambda left, right: left.Or(right),
+ ((rowidcol == x) for [x] in before))
+ ).on(txn, *a, **kw))
+ returnValue(result)
+ else:
+ returnValue((yield upcall()))
+
+
def _toSQL(self, queryGenerator):
"""
@return: a 'insert' statement with placeholders and arguments
@@ -1459,7 +1542,7 @@
for (c, v) in sortedColumns]
)
)
- result.append(SQLFragment( ' where '))
+ result.append(SQLFragment(' where '))
result.append(self.Where.subSQL(queryGenerator, allTables))
return self._returningClause(queryGenerator, result, allTables)
@@ -1490,7 +1573,19 @@
return self._returningClause(queryGenerator, result, allTables)
+ @inlineCallbacks
+ def on(self, txn, *a, **kw):
+ upcall = lambda: super(Delete, self).on(txn, *a, **kw)
+ if txn.dialect == SQLITE_DIALECT and self.Return is not None:
+ result = yield Select(self._returnAsList(), From=self.From,
+ Where=self.Where).on(txn, *a, **kw)
+ yield upcall()
+ else:
+ result = yield upcall()
+ returnValue(result)
+
+
class _LockingStatement(_Statement):
"""
A statement related to lock management, which implicitly has no results.
Copied: CalendarServer/trunk/twext/enterprise/dal/test/test_record.py (from rev 9688, CalendarServer/branches/users/glyph/q/twext/enterprise/dal/test/test_record.py)
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_record.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_record.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,247 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Test cases for L{twext.enterprise.dal.record}.
+"""
+
+import sqlite3
+
+from twisted.internet.defer import inlineCallbacks
+
+from twisted.trial.unittest import TestCase
+
+from twext.enterprise.dal.record import (
+ Record, fromTable, ReadOnly, NoSuchRecord
+)
+from twext.enterprise.dal.syntax import SQLITE_DIALECT
+
+from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
+from twext.enterprise.adbapi2 import ConnectionPool
+from twext.enterprise.dal.syntax import SchemaSyntax
+from twisted.internet.defer import gatherResults
+
+# from twext.enterprise.dal.syntax import
+
+
+sth = SchemaTestHelper()
+sth.id = lambda : __name__
+schemaString = """
+create table ALPHA (BETA integer primary key, GAMMA text);
+"""
+testSchema = SchemaSyntax(sth.schemaFromString(schemaString))
+
+
+
+class TestRecord(Record, fromTable(testSchema.ALPHA)):
+ """
+ A sample test record.
+ """
+
+
+
+class TestCRUD(TestCase):
+ """
+ Tests for creation, mutation, and deletion operations.
+ """
+
+ def setUp(self):
+ sqlitename = self.mktemp()
+ def connectionFactory(label=self.id()):
+ return sqlite3.connect(sqlitename)
+ con = connectionFactory()
+ con.execute(schemaString)
+ con.commit()
+ self.pool = ConnectionPool(connectionFactory, paramstyle='numeric',
+ dialect=SQLITE_DIALECT)
+ self.pool.startService()
+ self.addCleanup(self.pool.stopService)
+
+
+ @inlineCallbacks
+ def test_simpleLoad(self):
+ """
+ Loading an existing row from the database by its primary key will
+ populate its attributes from columns of the corresponding row in the
+ database.
+ """
+ txn = self.pool.connection()
+ yield txn.execSQL("insert into ALPHA values (:1, :2)", [234, "one"])
+ yield txn.execSQL("insert into ALPHA values (:1, :2)", [456, "two"])
+ rec = yield TestRecord.load(txn, 456)
+ self.assertIsInstance(rec, TestRecord)
+ self.assertEquals(rec.beta, 456)
+ self.assertEquals(rec.gamma, "two")
+ rec2 = yield TestRecord.load(txn, 234)
+ self.assertIsInstance(rec2, TestRecord)
+ self.assertEqual(rec2.beta, 234)
+ self.assertEqual(rec2.gamma, "one")
+
+
+ @inlineCallbacks
+ def test_simpleCreate(self):
+ """
+ When a record object is created, a row with matching column values will
+ be created in the database.
+ """
+ txn = self.pool.connection()
+ rec = yield TestRecord.create(txn, beta=3, gamma=u'epsilon')
+ self.assertEquals(rec.beta, 3)
+ self.assertEqual(rec.gamma, u'epsilon')
+ rows = yield txn.execSQL("select BETA, GAMMA from ALPHA")
+ self.assertEqual(rows, [tuple([3, u'epsilon'])])
+
+
+ @inlineCallbacks
+ def test_simpleDelete(self):
+ """
+ When a record object is deleted, a row with a matching primary key will
+ be created in the database.
+ """
+ txn = self.pool.connection()
+ def mkrow(beta, gamma):
+ return txn.execSQL("insert into ALPHA values (:1, :2)",
+ [beta, gamma])
+ yield gatherResults([mkrow(123, u"one"), mkrow(234, u"two"),
+ mkrow(345, u"three")])
+ tr = yield TestRecord.load(txn, 234)
+ yield tr.delete()
+ rows = yield txn.execSQL("select BETA, GAMMA from ALPHA order by BETA")
+ self.assertEqual(rows, [(123, u"one"), (345, u"three")])
+
+
+ @inlineCallbacks
+ def test_attributesArentMutableYet(self):
+ """
+ Changing attributes on a database object is not supported yet, because
+ it's not entirely clear when to flush the SQL to the database.
+ Instead, for the time being, use C{.update}. When you attempt to set
+ an attribute, an error will be raised informing you of this fact, so
+ that the error is clear.
+ """
+ txn = self.pool.connection()
+ rec = yield TestRecord.create(txn, beta=7, gamma=u'what')
+ def setit():
+ rec.beta = 12
+ ro = self.assertRaises(ReadOnly, setit)
+ self.assertEqual(rec.beta, 7)
+ self.assertIn("SQL-backed attribute 'TestRecord.beta' is read-only. "
+ "Use '.update(...)' to modify attributes.", str(ro))
+
+
+ @inlineCallbacks
+ def test_simpleUpdate(self):
+ """
+ L{Record.update} will change the values on the record and in te
+ database.
+ """
+ txn = self.pool.connection()
+ rec = yield TestRecord.create(txn, beta=3, gamma=u'epsilon')
+ yield rec.update(gamma=u'otherwise')
+ self.assertEqual(rec.gamma, u'otherwise')
+ yield txn.commit()
+ # Make sure that it persists.
+ txn = self.pool.connection()
+ rec = yield TestRecord.load(txn, 3)
+ self.assertEqual(rec.gamma, u'otherwise')
+
+
+ @inlineCallbacks
+ def test_simpleQuery(self):
+ """
+ L{Record.query} will allow you to query for a record by its class
+ attributes as columns.
+ """
+ txn = self.pool.connection()
+ for beta, gamma in [(123, u"one"), (234, u"two"), (345, u"three"),
+ (356, u"three"), (456, u"four")]:
+ yield txn.execSQL("insert into ALPHA values (:1, :2)",
+ [beta, gamma])
+ records = yield TestRecord.query(txn, TestRecord.gamma == u"three")
+ self.assertEqual(len(records), 2)
+ records.sort(key=lambda x: x.beta)
+ self.assertEqual(records[0].beta, 345)
+ self.assertEqual(records[1].beta, 356)
+
+
+ @inlineCallbacks
+ def test_all(self):
+ """
+ L{Record.all} will return all instances of the record, sorted by
+ primary key.
+ """
+ txn = self.pool.connection()
+ data = [(123, u"one"), (456, u"four"), (345, u"three"),
+ (234, u"two"), (356, u"three")]
+ for beta, gamma in data:
+ yield txn.execSQL("insert into ALPHA values (:1, :2)",
+ [beta, gamma])
+ self.assertEqual(
+ [(x.beta, x.gamma) for x in (yield TestRecord.all(txn))],
+ sorted(data)
+ )
+
+
+ @inlineCallbacks
+ def test_orderedQuery(self):
+ """
+ L{Record.query} takes an 'order' argument which will allow the objects
+ returned to be ordered.
+ """
+ txn = self.pool.connection()
+ for beta, gamma in [(123, u"one"), (234, u"two"), (345, u"three"),
+ (356, u"three"), (456, u"four")]:
+ yield txn.execSQL("insert into ALPHA values (:1, :2)",
+ [beta, gamma])
+
+ records = yield TestRecord.query(txn, TestRecord.gamma == u"three",
+ TestRecord.beta)
+ self.assertEqual([record.beta for record in records], [345, 356])
+ records = yield TestRecord.query(txn, TestRecord.gamma == u"three",
+ TestRecord.beta, ascending=False)
+ self.assertEqual([record.beta for record in records], [356, 345])
+
+
+ @inlineCallbacks
+ def test_pop(self):
+ """
+ A L{Record} may be loaded and deleted atomically, with L{Record.pop}.
+ """
+ txn = self.pool.connection()
+ for beta, gamma in [(123, u"one"), (234, u"two"), (345, u"three"),
+ (356, u"three"), (456, u"four")]:
+ yield txn.execSQL("insert into ALPHA values (:1, :2)",
+ [beta, gamma])
+ rec = yield TestRecord.pop(txn, 234)
+ self.assertEqual(rec.gamma, u'two')
+ self.assertEqual((yield txn.execSQL("select count(*) from ALPHA "
+ "where BETA = :1", [234])),
+ [tuple([0])])
+ yield self.failUnlessFailure(TestRecord.pop(txn, 234), NoSuchRecord)
+
+
+ def test_columnNamingConvention(self):
+ """
+ The naming convention maps columns C{LIKE_THIS} to be attributes
+ C{likeThis}.
+ """
+ self.assertEqual(Record.namingConvention(u"like_this"), "likeThis")
+ self.assertEqual(Record.namingConvention(u"LIKE_THIS"), "likeThis")
+ self.assertEqual(Record.namingConvention(u"LIKE_THIS_ID"), "likeThisID")
+
+
+
+
Modified: CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -30,14 +30,16 @@
from twext.enterprise.dal.syntax import Function
from twext.enterprise.dal.syntax import SchemaSyntax
from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
-from twext.enterprise.ienterprise import POSTGRES_DIALECT, ORACLE_DIALECT
+from twext.enterprise.ienterprise import (POSTGRES_DIALECT, ORACLE_DIALECT,
+ SQLITE_DIALECT)
from twext.enterprise.test.test_adbapi2 import ConnectionPoolHelper
from twext.enterprise.test.test_adbapi2 import NetworkedPoolHelper
-from twext.enterprise.test.test_adbapi2 import resultOf
+from twext.enterprise.test.test_adbapi2 import resultOf, AssertResultHelper
from twisted.internet.defer import succeed
from twisted.trial.unittest import TestCase
+
class _FakeTransaction(object):
"""
An L{IAsyncTransaction} that provides the relevant metadata for SQL
@@ -56,7 +58,42 @@
TIMESTAMP = 'for timestamps!'
+class CatchSQL(object):
+ """
+ L{IAsyncTransaction} emulator that records the SQL executed on it.
+ """
+ counter = 0
+ def __init__(self, dialect=SQLITE_DIALECT, paramstyle='numeric'):
+ self.execed = []
+ self.pendingResults = []
+ self.dialect = SQLITE_DIALECT
+ self.paramstyle = 'numeric'
+
+
+ def nextResult(self, result):
+ """
+ Make it so that the next result from L{execSQL} will be the argument.
+ """
+ self.pendingResults.append(result)
+
+
+ def execSQL(self, sql, args, rozrc):
+ """
+ Implement L{IAsyncTransaction} by recording C{sql} and C{args} in
+ C{self.execed}, and return a L{Deferred} firing either an integer or a
+ value pre-supplied by L{CatchSQL.nextResult}.
+ """
+ self.execed.append([sql, args])
+ self.counter += 1
+ if self.pendingResults:
+ result = self.pendingResults.pop(0)
+ else:
+ result = self.counter
+ return succeed(result)
+
+
+
class NullTestingOracleTxn(object):
"""
Fake transaction for testing oracle NULL behavior.
@@ -92,7 +129,7 @@
-class GenerationTests(ExampleSchemaHelper, TestCase):
+class GenerationTests(ExampleSchemaHelper, TestCase, AssertResultHelper):
"""
Tests for syntactic helpers to generate SQL queries.
"""
@@ -623,7 +660,6 @@
"""
L{SetExpression} in a From sub-select.
"""
-
# Simple UNION
self.assertEquals(
Select(
@@ -943,6 +979,137 @@
)
+ def test_insertMultiReturnSQLite(self):
+ """
+ In SQLite's SQL dialect, there is no 'returning' clause, but given that
+ SQLite serializes all SQL transactions, you can rely upon 'select'
+ after a write operation to reliably give you exactly what was just
+ modified. Therefore, although 'toSQL' won't include any indication of
+ the return value, the 'on' method will execute a 'select' statement
+ following the insert to retrieve the value.
+ """
+ insertStatement = Insert({self.schema.FOO.BAR: 39,
+ self.schema.FOO.BAZ: 82},
+ Return=(self.schema.FOO.BAR, self.schema.FOO.BAZ)
+ )
+ qg = lambda : QueryGenerator(SQLITE_DIALECT, NumericPlaceholder())
+ self.assertEquals(insertStatement.toSQL(qg()),
+ SQLFragment("insert into FOO (BAR, BAZ) values (:1, :2)",
+ [39, 82])
+ )
+ result = []
+ csql = CatchSQL()
+ insertStatement.on(csql).addCallback(result.append)
+ self.assertEqual(result, [2])
+ self.assertEqual(
+ csql.execed,
+ [["insert into FOO (BAR, BAZ) values (:1, :2)", [39, 82]],
+ ["select BAR, BAZ from FOO where rowid = last_insert_rowid()", []]]
+ )
+
+
+ def test_insertNoReturnSQLite(self):
+ """
+ Insert a row I{without} a C{Return=} parameter should also work as
+ normal in sqlite.
+ """
+ statement = Insert({self.schema.FOO.BAR: 12,
+ self.schema.FOO.BAZ: 48})
+ csql = CatchSQL()
+ statement.on(csql)
+ self.assertEqual(
+ csql.execed,
+ [["insert into FOO (BAR, BAZ) values (:1, :2)", [12, 48]]]
+ )
+
+
+ def test_updateReturningSQLite(self):
+ """
+ Since SQLite does not support the SQL 'returning' syntax extension, in
+ order to preserve the rows that will be modified during an UPDATE
+ statement, we must first find the rows that will be affected, then
+ update them, then return the rows that were affected. Since we might
+ be changing even part of the primary key, we use the internal 'rowid'
+ column to uniquely and reliably identify rows in the sqlite database
+ that have been modified.
+ """
+ csql = CatchSQL()
+ stmt = Update({self.schema.FOO.BAR: 4321},
+ Where=self.schema.FOO.BAZ == 1234,
+ Return=self.schema.FOO.BAR)
+ csql.nextResult([["sample row id"]])
+ result = resultOf(stmt.on(csql))
+ # Three statements were executed; make sure that the result returned was
+ # the result of executing the 3rd (and final) one.
+ self.assertResultList(result, 3)
+ # Check that they were the right statements.
+ self.assertEqual(len(csql.execed), 3)
+ self.assertEqual(
+ csql.execed[0],
+ ["select rowid from FOO where BAZ = :1", [1234]]
+ )
+ self.assertEqual(
+ csql.execed[1],
+ ["update FOO set BAR = :1 where BAZ = :2", [4321, 1234]]
+ )
+ self.assertEqual(
+ csql.execed[2],
+ ["select BAR from FOO where rowid = :1", ["sample row id"]]
+ )
+
+
+ def test_updateReturningMultipleValuesSQLite(self):
+ """
+ When SQLite updates multiple values, it must embed the row ID of each
+ subsequent value into its second 'where' clause, as there is no way to
+ pass a list of values to a single statement..
+ """
+ csql = CatchSQL()
+ stmt = Update({self.schema.FOO.BAR: 4321},
+ Where=self.schema.FOO.BAZ == 1234,
+ Return=self.schema.FOO.BAR)
+ csql.nextResult([["one row id"], ["and another"], ["and one more"]])
+ result = resultOf(stmt.on(csql))
+ # Three statements were executed; make sure that the result returned was
+ # the result of executing the 3rd (and final) one.
+ self.assertResultList(result, 3)
+ # Check that they were the right statements.
+ self.assertEqual(len(csql.execed), 3)
+ self.assertEqual(
+ csql.execed[0],
+ ["select rowid from FOO where BAZ = :1", [1234]]
+ )
+ self.assertEqual(
+ csql.execed[1],
+ ["update FOO set BAR = :1 where BAZ = :2", [4321, 1234]]
+ )
+ self.assertEqual(
+ csql.execed[2],
+ ["select BAR from FOO where rowid = :1 or rowid = :2 or rowid = :3",
+ ["one row id", "and another", "and one more"]]
+ )
+
+
+ def test_deleteReturningSQLite(self):
+ """
+ When SQLite deletes a value, ...
+ """
+ csql = CatchSQL()
+ stmt = Delete(From=self.schema.FOO, Where=self.schema.FOO.BAZ == 1234,
+ Return=self.schema.FOO.BAR)
+ result = resultOf(stmt.on(csql))
+ self.assertResultList(result, 1)
+ self.assertEqual(len(csql.execed), 2)
+ self.assertEqual(
+ csql.execed[0],
+ ["select BAR from FOO where BAZ = :1", [1234]]
+ )
+ self.assertEqual(
+ csql.execed[1],
+ ["delete from FOO where BAZ = :1", [1234]]
+ )
+
+
def test_insertMismatch(self):
"""
L{Insert} raises L{TableMismatch} if the columns specified aren't all
Modified: CalendarServer/trunk/twext/enterprise/ienterprise.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/ienterprise.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/enterprise/ienterprise.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -20,6 +20,16 @@
__all__ = [
"IAsyncTransaction",
+ "ISQLExecutor",
+ "ICommandBlock",
+ "IQueuer",
+ "IDerivedParameter",
+ "AlreadyFinishedError",
+ "ConnectionError",
+ "POSTGRES_DIALECT",
+ "SQLITE_DIALECT",
+ "ORACLE_DIALECT",
+ "ORACLE_TABLE_NAME_MAX",
]
from zope.interface import Interface, Attribute
@@ -42,6 +52,7 @@
POSTGRES_DIALECT = 'postgres-dialect'
ORACLE_DIALECT = 'oracle-dialect'
+SQLITE_DIALECT = 'sqlite-dialect'
ORACLE_TABLE_NAME_MAX = 30
@@ -102,6 +113,18 @@
"""
+ def postCommit(operation):
+ """
+ Perform the given operation only after this L{IAsyncTransaction}
+ commits. These will be invoked before the L{Deferred} returned by
+ L{IAsyncTransaction.commit} fires.
+
+ @param operation: a 0-argument callable that may return a L{Deferred}.
+ If it does, then the subsequent operations added by L{postCommit}
+ will not fire until that L{Deferred} fires.
+ """
+
+
def abort():
"""
Roll back changes caused by this transaction.
@@ -111,6 +134,17 @@
"""
+ def postAbort(operation):
+ """
+ Invoke a callback after abort.
+
+ @see: L{IAsyncTransaction.postCommit}
+
+ @param operation: 0-argument callable, potentially returning a
+ L{Deferred}.
+ """
+
+
def commandBlock():
"""
Create an object which will cause the commands executed on it to be
@@ -218,3 +252,32 @@
@return: C{None}
"""
+
+
+
+class IQueuer(Interface):
+ """
+ An L{IQueuer} can enqueue work for later execution.
+ """
+
+ def enqueueWork(self, transaction, workItemType, **kw):
+ """
+ Perform some work, eventually.
+
+ @param transaction: an L{IAsyncTransaction} within which to I{commit}
+ to doing the work. Note that this work will likely be done later
+ (but depending on various factors, may actually be done within this
+ transaction as well).
+
+ @param workItemType: the type of work item to create.
+ @type workItemType: L{type}, specifically, a subtype of L{WorkItem
+ <twext.enterprise.queue.WorkItem>}
+
+ @param kw: The keyword parameters are relayed to C{workItemType.create}
+ to create an appropriately initialized item.
+
+ @return: a work proposal that allows tracking of the various phases of
+ completion of the work item.
+ @rtype: L{twext.enterprise.queue.WorkItem}
+ """
+
Copied: CalendarServer/trunk/twext/enterprise/queue.py (from rev 9688, CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py)
===================================================================
--- CalendarServer/trunk/twext/enterprise/queue.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/queue.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,1230 @@
+# -*- test-case-name: twext.enterprise.test.test_queue -*-
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+L{twext.enterprise.queue} is a task-queueing system for use by applications
+with multiple front-end servers talking to a single database instance, that
+want to defer and parallelize work that involves storing the results of
+computation.
+
+By enqueuing with L{twisted.enterprise.queue}, you may guarantee that the work
+will I{eventually} be done, and reliably commit to doing it in the future, but
+defer it if it does not need to be done I{now}.
+
+To pick a hypothetical example, let's say that you have a store which wants to
+issue a promotional coupon based on a customer loyalty program, in response to
+an administrator clicking on a button. Determining the list of customers to
+send the coupon to is quick: a simple query will get you all their names.
+However, analyzing each user's historical purchase data is (A) time consuming
+and (B) relatively isolated, so it would be good to do that in parallel, and it
+would also be acceptable to have that happen at a later time, outside the
+critical path.
+
+Such an application might be implemented with this queueing system like so::
+
+ from twext.enterprise.queue import WorkItem, queueFromTransaction
+ from twext.enterprise.dal.parseschema import addSQLToSchema
+ from twext.enterprise.dal.syntax import SchemaSyntax
+
+ schemaModel = Schema()
+ addSQLToSchema('''
+ create table CUSTOMER (NAME varchar(255), ID integer primary key);
+ create table PRODUCT (NAME varchar(255), ID integer primary key);
+ create table PURCHASE (NAME varchar(255), WHEN timestamp,
+ CUSTOMER_ID integer references CUSTOMER,
+ PRODUCT_ID integer references PRODUCT;
+ create table COUPON_WORK (WORK_ID integer primary key,
+ CUSTOMER_ID integer references CUSTOMER);
+ create table COUPON (ID integer primary key,
+ CUSTOMER_ID integer references customer,
+ AMOUNT integer);
+ ''')
+ schema = SchemaSyntax(schemaModel)
+
+ class Coupon(Record, fromTable(schema.COUPON_WORK)):
+ pass
+
+ class CouponWork(WorkItem, fromTable(schema.COUPON_WORK)):
+ @inlineCallbacks
+ def doWork(self):
+ purchases = yield Select(schema.PURCHASE,
+ Where=schema.PURCHASE.CUSTOMER_ID
+ == self.customerID).on(self.transaction)
+ couponAmount = yield doSomeMathThatTakesAWhile(purchases)
+ yield Coupon.create(customerID=self.customerID,
+ amount=couponAmount)
+
+ @inlineCallbacks
+ def makeSomeCoupons(txn):
+ # Note, txn was started before, will be committed later...
+ for customerID in (yield Select([schema.CUSTOMER.CUSTOMER_ID],
+ From=schema.CUSTOMER).on(txn)):
+ # queuer is a provider of IQueuer, of which there are several
+ # implementations in this module.
+ queuer.enqueueWork(txn, CouponWork, customerID=customerID)
+"""
+
+from socket import getfqdn
+from functools import wraps
+from os import getpid
+from datetime import datetime
+
+from zope.interface import implements
+
+from twisted.application.service import Service
+from twisted.internet.protocol import Factory
+from twisted.internet.defer import (
+ inlineCallbacks, returnValue, Deferred, succeed
+)
+from twisted.internet.endpoints import TCP4ClientEndpoint
+from twisted.protocols.amp import AMP, Command, Integer, Argument, String
+from twisted.python.reflect import qual
+from twisted.python import log
+
+from twext.enterprise.dal.syntax import TableSyntax, SchemaSyntax
+from twext.enterprise.dal.model import ProcedureCall
+from twext.enterprise.dal.syntax import NamedValue
+from twext.enterprise.dal.record import Record, fromTable
+from twisted.python.failure import Failure
+from twisted.internet.defer import passthru
+from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
+from twisted.internet.endpoints import TCP4ServerEndpoint
+from twext.enterprise.dal.syntax import Lock
+from twext.enterprise.ienterprise import IQueuer
+
+def makeNodeSchema(inSchema):
+ """
+ Create a self-contained schema for L{NodeInfo} to use.
+
+ @return: a schema with just the one table.
+ """
+ # Initializing this duplicate schema avoids a circular dependency, but this
+ # should really be accomplished with independent schema objects that the
+ # transaction is made aware of somehow.
+ NodeTable = Table(inSchema, 'NODE_INFO')
+ NodeTable.addColumn("HOSTNAME", SQLType("varchar", 255))
+ NodeTable.addColumn("PID", SQLType("integer", None))
+ NodeTable.addColumn("PORT", SQLType("integer", None))
+ NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
+ # Note: in the real data structure, this is actually a not-cleaned-up
+ # sqlparse internal data structure, but it *should* look closer to this.
+ ProcedureCall("timezone", ["UTC", NamedValue('CURRENT_TIMESTAMP')])
+ )
+ for column in NodeTable.columns:
+ NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+ NodeTable.primaryKey = [NodeTable.columnNamed("HOSTNAME"),
+ NodeTable.columnNamed("PORT")]
+ return inSchema
+
+NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
+
+
+ at inlineCallbacks
+def inTransaction(transactionCreator, operation):
+ """
+ Perform the given operation in a transaction, committing or aborting as
+ required.
+
+ @param transactionCreator: a 0-arg callable that returns an
+ L{IAsyncTransaction}
+
+ @param operation: a 1-arg callable that takes an L{IAsyncTransaction} and
+ returns a value.
+
+ @return: a L{Deferred} that fires with C{operation}'s result or fails with
+ its error, unless there is an error creating, aborting or committing
+ the transaction.
+ """
+ txn = transactionCreator()
+ try:
+ result = yield operation(txn)
+ except:
+ f = Failure()
+ yield txn.abort()
+ returnValue(f)
+ else:
+ yield txn.commit()
+ returnValue(result)
+
+
+
+class TableSyntaxByName(Argument):
+ """
+ Serialize and deserialize L{TableSyntax} objects for an AMP protocol with
+ an attached schema.
+ """
+
+ def fromStringProto(self, inString, proto):
+ """
+ Convert the name of the table into a table, given a C{proto} with an
+ attached C{schema}.
+
+ @param inString: the name of a table, as utf-8 encoded bytes
+ @type inString: L{bytes}
+
+ @param proto: an L{SchemaAMP}
+ """
+ return TableSyntax(proto.schema.tableNamed(inString.decode("UTF-8")))
+
+
+ def toString(self, inObject):
+ """
+ Convert a L{TableSyntax} object into just its name for wire transport.
+
+ @param inObject: a table.
+ @type inObject: L{TableSyntax}
+
+ @return: the name of that table
+ @rtype: L{bytes}
+ """
+ return inObject.model.name.encode("UTF-8")
+
+
+
+class NodeInfo(Record, fromTable(NodeInfoSchema.NODE_INFO)):
+ """
+ A L{NodeInfo} is information about a currently-active Node process.
+ """
+
+ def endpoint(self, reactor):
+ """
+ Create an L{IStreamServerEndpoint} that will talk to the node process
+ that is described by this L{NodeInfo}.
+
+ @return: an endpoint that will connect to this host.
+ @rtype: L{IStreamServerEndpoint}
+ """
+ return TCP4ClientEndpoint(reactor, self.hostname, self.port)
+
+
+
+def abstract(thunk):
+ """
+ The decorated function is abstract.
+
+ @note: only methods are currently supported.
+ """
+ @classmethod
+ @wraps(thunk)
+ def inner(cls, *a, **k):
+ raise NotImplementedError(qual(cls) + " does not implement " +
+ thunk.func_name)
+ return inner
+
+
+
+class WorkItem(Record):
+ """
+ An item of work.
+
+ @ivar workID: the unique identifier (primary key) for items of this type.
+ There must be a corresponding column in the database.
+ @type workID: L{int}
+
+ @cvar created: the timestamp that a given item was created, or the column
+ describing its creation time, on the class.
+ @type created: L{datetime.datetime}
+ """
+
+ @abstract
+ def doWork(self):
+ """
+ Subclasses must implement this to actually perform the queued work.
+
+ This method will be invoked in a worker process.
+
+ This method does I{not} need to delete the row referencing it; that
+ will be taken care of by the job queueing machinery.
+ """
+
+
+ @classmethod
+ def forTable(cls, table):
+ """
+ Look up a work-item class given a particular L{TableSyntax}. Factoring
+ this correctly may place it into L{twext.enterprise.record.Record}
+ instead; it is probably generally useful to be able to look up a mapped
+ class from a table.
+
+ @param table: the table to look up
+ @type table: L{twext.enterprise.dal.model.Table}
+
+ @return: the relevant subclass
+ @rtype: L{type}
+ """
+ for subcls in cls.__subclasses__():
+ if table == getattr(subcls, "table", None):
+ return subcls
+ raise KeyError("No mapped {0} class for {1}.".format(
+ cls, table
+ ))
+
+
+
+class PerformWork(Command):
+ """
+ Notify another process that it must do some work that has been persisted to
+ the database, by informing it of the table and the ID where said work has
+ been persisted.
+ """
+
+ arguments = [
+ ("table", TableSyntaxByName()),
+ ("workID", Integer()),
+ ]
+ response = []
+
+
+
+class ReportLoad(Command):
+ """
+ Notify another node of the total, current load for this whole node (all of
+ its workers).
+ """
+ arguments = [
+ ("load", Integer())
+ ]
+ response = []
+
+
+class IdentifyNode(Command):
+ """
+ Identify this node to its peer. The connector knows which hostname it's
+ looking for, and which hostname it considers itself to be, only the
+ initiator (not the listener) issues this command. This command is
+ necessary because if reverse DNS isn't set up perfectly, the listener may
+ not be able to identify its peer.
+ """
+
+ arguments = [
+ ("host", String()),
+ ("port", Integer()),
+ ]
+
+
+
+class SchemaAMP(AMP):
+ """
+ An AMP instance which also has a L{Schema} attached to it.
+
+ @ivar schema: The schema to look up L{TableSyntaxByName} arguments in.
+ @type schema: L{Schema}
+ """
+
+ def __init__(self, schema, boxReceiver=None, locator=None):
+ self.schema = schema
+ super(SchemaAMP, self).__init__(boxReceiver, locator)
+
+
+
+class ConnectionFromPeerNode(SchemaAMP):
+ """
+ A connection to a peer node. Symmetric; since the 'client' and the
+ 'server' both serve the same role, the logic is the same in every node.
+ """
+
+ def __init__(self, peerPool, boxReceiver=None, locator=None):
+ """
+ Initialize this L{ConnectionFromPeerNode} with a reference to a pool of
+ local workers.
+
+ @param localWorkerPool: the pool of local worker procesess that can
+ process queue work.
+ @type localWorkerPool: L{WorkerConnectionPool}
+
+ @see: L{AMP.__init__}
+ """
+ self.peerPool = peerPool
+ self.localWorkerPool = peerPool.workerPool
+ self._bonusLoad = 0
+ self._reportedLoad = 0
+ super(ConnectionFromPeerNode, self).__init__(peerPool.schema,
+ boxReceiver, locator)
+
+
+ def reportCurrentLoad(self):
+ """
+ Report the current load for the local worker pool to this peer.
+ """
+ return self.callRemote(ReportLoad,
+ load=self.localWorkerPool.totalLoad())
+
+
+ @ReportLoad.responder
+ def repotedLoad(self, load):
+ """
+ The peer reports its load.
+ """
+ self._reportedLoad = (load - self._bonusLoad)
+ return {}
+
+
+ def startReceivingBoxes(self, sender):
+ """
+ Connection is up and running; add this to the list of active peers.
+ """
+ r = super(ConnectionFromPeerNode, self).startReceivingBoxes(sender)
+ self.peerPool.addPeerConnection(self)
+ return r
+
+
+ def stopReceivingBoxes(self, reason):
+ """
+ The connection has shut down; remove this from the list of active
+ peers.
+ """
+ self.peerPool.removePeerConnection(self)
+ r = super(ConnectionFromPeerNode, self).stopReceivingBoxes(reason)
+ return r
+
+
+ def currentLoadEstimate(self):
+ """
+ What is the current load estimate for this peer?
+
+ @return: The number of full "slots", i.e. currently-being-processed
+ queue items (and other items which may contribute to this process's
+ load, such as currently-being-processed client requests).
+ @rtype: L{int}
+ """
+ return self._reportedLoad + self._bonusLoad
+
+
+ def performWork(self, table, workID):
+ """
+ A L{local worker connection <ConnectionFromWorker>} is asking this
+ specific peer node-controller process to perform some work, having
+ already determined that it's appropriate.
+
+ @param table: The table where work is waiting.
+ @type table: L{TableSyntax}
+
+ @param workID: The primary key identifier of the given work.
+ @type workID: L{int}
+
+ @return: a L{Deferred} firing with an empty dictionary when the work is
+ complete.
+ @rtype: L{Deferred} firing L{dict}
+ """
+ d = self.callRemote(PerformWork, table=table, workID=workID)
+ self._bonusLoad += 1
+ @d.addBoth
+ def performed(result):
+ self._bonusLoad -= 1
+ return result
+ return d
+
+
+ @PerformWork.responder
+ def dispatchToWorker(self, table, workID):
+ """
+ A remote peer node has asked this node to do some work; dispatch it to
+ a local worker on this node.
+
+ @param table: the table to work on.
+ @type table: L{TableSyntax}
+
+ @param workID: the identifier within the table.
+ @type workID: L{int}
+
+ @return: a L{Deferred} that fires when the work has been completed.
+ """
+ return self.localWorkerPool.performWork(table, workID)
+
+
+ @IdentifyNode.responder
+ def identifyPeer(self, host, port):
+ self.peerPool.mapPeer(host, port, self)
+
+
+
+class WorkerConnectionPool(object):
+ """
+ A pool of L{ConnectionFromWorker}s.
+
+ L{WorkerConnectionPool} also implements the same implicit protocol as a
+ L{ConnectionFromPeerNode}, but one that dispenses work to the local worker
+ processes rather than to a remote connection pool.
+ """
+
+ def __init__(self, maximumLoadPerWorker=0):
+ self.workers = []
+ self.maximumLoadPerWorker = maximumLoadPerWorker
+
+
+ def addWorker(self, worker):
+ """
+ Add a L{ConnectionFromWorker} to this L{WorkerConnectionPool} so that
+ it can be selected.
+ """
+ self.workers.append(worker)
+
+
+ def removeWorker(self, worker):
+ """
+ Remove a L{ConnectionFromWorker} from this L{WorkerConnectionPool} that
+ was previously added.
+ """
+ self.workers.remove(worker)
+
+
+ def hasAvailableCapacity(self):
+ """
+ Does this worker connection pool have any local workers who have spare
+ hasAvailableCapacity to process another queue item?
+ """
+ for worker in self.workers:
+ if worker.currentLoad() < self.maximumLoadPerWorker:
+ return True
+ return False
+
+
+ def totalLoad(self):
+ """
+ The total load of all currently connected workers.
+ """
+ return sum(worker.currentLoad() for worker in self.workers)
+
+
+ def _selectLowestLoadWorker(self):
+ """
+ Select the local connection with the lowest current load, or C{None} if
+ all workers are too busy.
+
+ @return: a worker connection with the lowest current load.
+ @rtype: L{ConnectionFromWorker}
+ """
+ return sorted(self.workers[:], key=lambda w: w.currentLoad())[0]
+
+
+ def performWork(self, table, workID):
+ """
+ Select a local worker that is idle enough to perform the given work,
+ then ask them to perform it.
+
+ @param table: The table where work is waiting.
+ @type table: L{TableSyntax}
+
+ @param workID: The primary key identifier of the given work.
+ @type workID: L{int}
+
+ @return: a L{Deferred} firing with an empty dictionary when the work is
+ complete.
+ @rtype: L{Deferred} firing L{dict}
+ """
+ return self._selectLowestLoadWorker().performWork(table, workID)
+
+
+
+class ConnectionFromWorker(SchemaAMP):
+ """
+ An individual connection from a worker, as seem from the master's
+ perspective. L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
+
+ @ivar workerPool: The connection pool that this individual connection is
+ participating in.
+ @type workerPool: L{WorkerConnectionPool}
+ """
+
+ def __init__(self, schema, workerPool, boxReceiver=None, locator=None):
+ self.workerPool = workerPool
+ super(ConnectionFromWorker, self).__init__(schema, boxReceiver, locator)
+ self._load = 0
+
+
+ @property
+ def currentLoad(self):
+ """
+ What is the current load of this worker?
+ """
+ return self._load
+
+
+ def startReceivingBoxes(self, sender):
+ """
+ Start receiving AMP boxes from the peer. Initialize all necessary
+ state.
+ """
+ result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
+ self.workerPool.addWorker(self)
+ return result
+
+
+ def stopReceivingBoxes(self, reason):
+ """
+ AMP boxes will no longer be received.
+ """
+ result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
+ self.workerPool.removeWorker(self)
+ return result
+
+
+ def performWork(self, table, workID):
+ """
+ Dispatch work to this worker.
+
+ @see: The responder for this should always be
+ L{ConnectionFromController.actuallyReallyExecuteWorkHere}.
+ """
+ d = self.callRemote(PerformWork, table=table, workID=workID)
+ self._load += 1
+ @d.addBoth
+ def f(result):
+ self._load -= 1
+ return result
+ return d
+
+
+
+class ConnectionFromController(SchemaAMP):
+ """
+ A L{ConnectionFromController} is the connection to a node-controller
+ process, in a worker process. It processes requests from its own
+ controller to do work. It is the opposite end of the connection from
+ L{ConnectionFromWorker}.
+ """
+ implements(IQueuer)
+
+ def __init__(self, transactionFactory, schema, whenConnected,
+ boxReceiver=None, locator=None):
+ super(ConnectionFromController, self).__init__(schema,
+ boxReceiver, locator)
+ self.transactionFactory = transactionFactory
+ self.whenConnected = whenConnected
+
+
+ def startReceivingBoxes(self, sender):
+ super(ConnectionFromController, self).startReceivingBoxes(sender)
+ self.whenConnected(self)
+
+
+ def choosePerformer(self):
+ """
+ To conform with L{WorkProposal}'s expectations, which may run in either
+ a controller (against a L{PeerConnectionPool}) or in a worker (against
+ a L{ConnectionFromController}), this is implemented to always return
+ C{self}, since C{self} is also an object that has a C{performWork}
+ method.
+ """
+ return succeed(self)
+
+
+ def performWork(self, table, workID):
+ """
+ Ask the controller to perform some work on our behalf.
+ """
+ return self.callRemote(PerformWork, table=table, workID=workID)
+
+
+ def enqueueWork(self, txn, workItemType, **kw):
+ """
+ There is some work to do. Do it, someplace else, ideally in parallel.
+ Later, let the caller know that the work has been completed by firing a
+ L{Deferred}.
+
+ @param workItemType: The type of work item to be enqueued.
+ @type workItemType: A subtype of L{WorkItem}
+
+ @param kw: The parameters to construct a work item.
+ @type kw: keyword parameters to C{workItemType.create}, i.e.
+ C{workItemType.__init__}
+
+ @return: an object that can track the enqueuing and remote execution of
+ this work.
+ @rtype: L{WorkProposal}
+ """
+ wp = WorkProposal(self, txn, workItemType, kw)
+ wp._start()
+ return wp
+
+
+ @PerformWork.responder
+ def actuallyReallyExecuteWorkHere(self, table, workID):
+ """
+ This is where it's time to actually do the work. The controller
+ process has instructed this worker to do it; so, look up the data in
+ the row, and do it.
+ """
+ @inlineCallbacks
+ def work(txn):
+ workItemClass = WorkItem.forTable(table)
+ workItem = yield workItemClass.load(txn, workID)
+ # TODO: what if we fail? error-handling should be recorded
+ # someplace, the row should probably be marked, re-tries should be
+ # triggerable administratively.
+ yield workItem.delete()
+ # TODO: verify that workID is the primary key someplace.
+ yield workItem.doWork()
+ returnValue({})
+ return inTransaction(self.transactionFactory, work)
+
+
+
+class WorkerFactory(Factory, object):
+ """
+ Factory, to be used as the client to connect from the worker to the
+ controller.
+ """
+
+ def __init__(self, transactionFactory, schema, whenConnected):
+ """
+ Create a L{WorkerFactory} with a transaction factory and a schema.
+ """
+ self.transactionFactory = transactionFactory
+ self.schema = schema
+ self.whenConnected = whenConnected
+
+
+ def buildProtocol(self, addr):
+ """
+ Create a L{ConnectionFromController} connected to the
+ transactionFactory and store.
+ """
+ return ConnectionFromController(self.transactionFactory, self.schema,
+ self.whenConnected)
+
+
+
+class TransactionFailed(Exception):
+ """
+ A transaction failed.
+ """
+
+
+
+def _cloneDeferred(d):
+ """
+ Make a new Deferred, adding callbacks to C{d}.
+
+ @return: another L{Deferred} that fires with C{d's} result when C{d} fires.
+ @rtype: L{Deferred}
+ """
+ d2 = Deferred()
+ d.chainDeferred(d2)
+ return d2
+
+
+
+class WorkProposal(object):
+ """
+ A L{WorkProposal} is a proposal for work that will be executed, perhaps on
+ another node, perhaps in the future.
+
+ @ivar pool: the connection pool which this L{WorkProposal} will use to
+ submit its work.
+ @type pool: L{PeerConnectionPool}
+
+ @ivar txn: The transaction where the work will be enqueued.
+ @type txn: L{IAsyncTransaction}
+
+ @ivar workItemType: The type of work to be enqueued by this L{WorkProposal}
+ @type workItemType: L{WorkItem} subclass
+
+ @ivar kw: The keyword arguments to pass to C{self.workItemType.create} to
+ construct it.
+ @type kw: L{dict}
+ """
+
+ def __init__(self, pool, txn, workItemType, kw):
+ self.pool = pool
+ self.txn = txn
+ self.workItemType = workItemType
+ self.kw = kw
+ self._whenProposed = Deferred()
+ self._whenExecuted = Deferred()
+ self._whenCommitted = Deferred()
+
+
+ def _start(self):
+ """
+ Execute this L{WorkProposal} by creating the work item in the database,
+ waiting for the transaction where that addition was completed to
+ commit, and asking the local node controller process to do the work.
+ """
+ @passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
+ def created(item):
+ self._whenProposed.callback(None)
+ @self.txn.postCommit
+ def whenDone():
+ self._whenCommitted.callback(None)
+ @passthru(self.pool.choosePerformer().addCallback)
+ def performerChosen(performer):
+ @passthru(performer.performWork(item.table, item.workID))
+ def performed(result):
+ self._whenExecuted.callback(None)
+ @performed.addErrback
+ def notPerformed(why):
+ self._whenExecuted.errback(why)
+ @performerChosen.addErrback
+ def notChosen(whyNot):
+ self._whenExecuted.errback(whyNot)
+ @self.txn.postAbort
+ def whenFailed():
+ self._whenCommitted.errback(TransactionFailed)
+
+
+ def whenExecuted(self):
+ """
+ Let the caller know when the proposed work has been fully executed.
+
+ @note: The L{Deferred} returned by C{whenExecuted} should be used with
+ extreme caution. If an application decides to do any
+ database-persistent work as a result of this L{Deferred} firing,
+ that work I{may be lost} as a result of a service being normally
+ shut down between the time that the work is scheduled and the time
+ that it is executed. So, the only things that should be added as
+ callbacks to this L{Deferred} are those which are ephemeral, in
+ memory, and reflect only presentation state associated with the
+ user's perception of the completion of work, not logical chains of
+ work which need to be completed in sequence; those should all be
+ completed within the transaction of the L{WorkItem.doWork} that
+ gets executed.
+
+ @return: a L{Deferred} that fires with C{None} when the work has been
+ completed remotely.
+ """
+ return _cloneDeferred(self._whenExecuted)
+
+
+ def whenProposed(self):
+ """
+ Let the caller know when the work has been proposed; i.e. when the work
+ is first transmitted to the database.
+
+ @return: a L{Deferred} that fires with C{None} when the relevant
+ commands have been sent to the database to create the L{WorkItem},
+ and fails if those commands do not succeed for some reason.
+ """
+ return _cloneDeferred(self._whenProposed)
+
+
+ def whenCommitted(self):
+ """
+ Let the caller know when the work has been committed to; i.e. when the
+ transaction where the work was proposed has been committed to the
+ database.
+
+ @return: a L{Deferred} that fires with C{None} when the relevant
+ transaction has been committed, or fails if the transaction is not
+ committed for any reason.
+ """
+ return _cloneDeferred(self._whenCommitted)
+
+
+
+class PeerConnectionPool(Service, object):
+ """
+ Each node has a L{PeerConnectionPool} connecting it to all the other nodes
+ currently active on the same database.
+
+ @ivar hostname: The hostname where this node process is running, as
+ reported by the local host's configuration. Possibly this should be
+ obtained via C{config.ServerHostName} instead of C{socket.getfqdn()};
+ although hosts within a cluster may be configured with the same
+ C{ServerHostName}; TODO need to confirm.
+ @type hostname: L{bytes}
+
+ @ivar thisProcess: a L{NodeInfo} representing this process, which is
+ initialized when this L{PeerConnectionPool} service is started via
+ C{startService}. May be C{None} if this service is not fully started
+ up or if it is shutting down.
+ @type thisProcess: L{NodeInfo}
+
+ @ivar queueProcessTimeout: The maximum amount of time allowed for a queue
+ item to be processed. By default, 10 minutes.
+ @type queueProcessTimeout: L{float} (in seconds)
+
+ @ivar queueDelayedProcessInterval: The amount of time between database
+ pings, i.e. checks for over-due queue items that might have been
+ orphaned by a controller process that died mid-transaction. This is
+ how often the shared database should be pinged by I{all} nodes (i.e.,
+ all controller processes, or each instance of L{PeerConnectionPool});
+ each individual node will ping commensurately less often as more nodes
+ join the database.
+ @type queueDelayedProcessInterval: L{float} (in seconds)
+
+ @ivar reactor: The reactor used for scheduling timed events.
+ @type reactor: L{IReactorTime} provider.
+
+ @ivar peers: The list of currently connected peers.
+ @type peers: L{list} of L{PeerConnectionPool}
+ """
+ implements(IQueuer)
+
+ getfqdn = staticmethod(getfqdn)
+ getpid = staticmethod(getpid)
+
+ queueProcessTimeout = (10.0 * 60.0)
+ queueDelayedProcessInterval = (60.0)
+
+ def __init__(self, reactor, transactionFactory, ampPort, schema):
+ """
+ Initialize a L{PeerConnectionPool}.
+
+ @param ampPort: The AMP TCP port number to listen on for inter-host
+ communication. This must be an integer (and not, say, an endpoint,
+ or an endpoint description) because we need to communicate it to
+ the other peers in the cluster in a way that will be meaningful to
+ them as clients.
+ @type ampPort: L{int}
+
+ @param transactionFactory: a 0- or 1-argument callable that produces an
+ L{IAsyncTransaction}
+
+ @param schema: The schema which contains all the tables associated with
+ the L{WorkItem}s that this L{PeerConnectionPool} will process.
+ @type schema: L{Schema}
+ """
+ self.reactor = reactor
+ self.transactionFactory = transactionFactory
+ self.hostname = self.getfqdn()
+ self.pid = self.getpid()
+ self.ampPort = ampPort
+ self.thisProcess = None
+ self.workerPool = WorkerConnectionPool()
+ self.peers = []
+ self.mappedPeers = {}
+ self.schema = schema
+ self._startingUp = None
+ self._listeningPortObject = None
+ self._lastSeenTotalNodes = 1
+ self._lastSeenNodeIndex = 1
+
+
+ def addPeerConnection(self, peer):
+ """
+ Add a L{ConnectionFromPeerNode} to the active list of peers.
+ """
+ self.peers.append(peer)
+
+
+ def workerListenerFactory(self):
+ """
+ Factory that listens for connections from workers.
+ """
+ f = Factory()
+ f.buildProtocol = lambda addr: ConnectionFromWorker(self.schema,
+ self.workerPool)
+ return f
+
+
+ def removePeerConnection(self, peer):
+ """
+ Remove a L{ConnectionFromPeerNode} to the active list of peers.
+ """
+ self.peers.remove(peer)
+
+
+ def choosePerformer(self):
+ """
+ Choose a peer to distribute work to based on the current known slot
+ occupancy of the other nodes. Note that this will prefer distributing
+ work to local workers until the current node is full, because that
+ should be lower-latency. Also, if no peers are available, work will be
+ submitted locally even if the worker pool is already over-subscribed.
+
+ @return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
+ with the chosen 'peer', i.e. object with a C{performWork} method,
+ as soon as one is available. Normally this will be synchronous,
+ but we need to account for the possibility that we may need to
+ connect to other hosts.
+ @rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
+ L{ConnectionFromPeerNode} or L{WorkerConnectionPool}
+ """
+ if not self.workerPool.hasAvailableCapacity() and self.peers:
+ return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
+ else:
+ return succeed(self.workerPool)
+
+
+ def enqueueWork(self, txn, workItemType, **kw):
+ """
+ There is some work to do. Do it, someplace else, ideally in parallel.
+ Later, let the caller know that the work has been completed by firing a
+ L{Deferred}.
+
+ @param workItemType: The type of work item to be enqueued.
+ @type workItemType: A subtype of L{WorkItem}
+
+ @param kw: The parameters to construct a work item.
+ @type kw: keyword parameters to C{workItemType.create}, i.e.
+ C{workItemType.__init__}
+
+ @return: an object that can track the enqueuing and remote execution of
+ this work.
+ @rtype: L{WorkProposal}
+ """
+ wp = WorkProposal(self, txn, workItemType, kw)
+ wp._start()
+ return wp
+
+
+ def allWorkItemTypes(self):
+ """
+ Load all the L{WorkItem} types that this node can process and return
+ them.
+
+ @return: L{list} of L{type}
+ """
+ # TODO: For completeness, this may need to involve a plugin query to
+ # make sure that all WorkItem subclasses are imported first.
+ return WorkItem.__subclasses__()
+
+
+ def totalNumberOfNodes(self):
+ """
+ How many nodes are there, total?
+
+ @return: the maximum number of other L{PeerConnectionPool} instances
+ that may be connected to the database described by
+ C{self.transactionFactory}. Note that this is not the current count
+ by connectivity, but the count according to the database.
+ @rtype: L{int}
+ """
+ # TODO
+ return self._lastSeenTotalNodes
+
+
+ def nodeIndex(self):
+ """
+ What ordinal does this node, i.e. this instance of
+ L{PeerConnectionPool}, occupy within the ordered set of all nodes
+ connected to the database described by C{self.transactionFactory}?
+
+ @return: the index of this node within the total collection. For
+ example, if this L{PeerConnectionPool} is 6 out of 30, this method
+ will return C{6}.
+ @rtype: L{int}
+ """
+ # TODO
+ return self._lastSeenNodeIndex
+
+
+ def _periodicLostWorkCheck(self):
+ """
+ Periodically, every node controller has to check to make sure that work
+ hasn't been dropped on the floor by someone. In order to do that it
+ queries each work-item table.
+ """
+ @inlineCallbacks
+ def workCheck(txn):
+
+ nodes = [(node.hostname, node.port) for node in
+ (yield self.activeNodes(txn))]
+ nodes.sort()
+ self._lastSeenTotalNodes = len(nodes)
+ self._lastSeenNodeIndex = nodes.index((self.thisProcess.hostname,
+ self.thisProcess.port))
+ for itemType in self.allWorkItemTypes():
+ for overdueItem in (
+ yield itemType.query(
+ txn, itemType.created > self.queueProcessTimeout
+ )):
+ peer = yield self.choosePerformer()
+ yield peer.performWork(overdueItem.table,
+ overdueItem.workID)
+ return inTransaction(self.transactionFactory, workCheck)
+
+
+ _currentWorkDeferred = None
+ _lostWorkCheckCall = None
+
+ def _lostWorkCheckLoop(self):
+ """
+ While the service is running, keep checking for any overdue / lost work
+ items and re-submit them to the cluster for processing. Space out
+ those checks in time based on the size of the cluster.
+ """
+ self._lostWorkCheckCall = None
+ @passthru(self._periodicLostWorkCheck().addErrback(log.err)
+ .addCallback)
+ def scheduleNext(result):
+ self._currentWorkDeferred = None
+ if not self.running:
+ return
+ index = self.nodeIndex()
+ now = self.reactor.seconds()
+
+ interval = self.queueDelayedProcessInterval
+ count = self.totalNumberOfNodes()
+ when = (now - (now % interval)) + (interval * (count + index))
+ delay = when - now
+ self._lostWorkCheckCall = self.reactor.callLater(
+ delay, self._lostWorkCheckLoop
+ )
+ self._currentWorkDeferred = scheduleNext
+
+
+ def startService(self):
+ """
+ Register ourselves with the database and establish all outgoing
+ connections to other servers in the cluster.
+ """
+ @inlineCallbacks
+ def startup(txn):
+ endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+ f = Factory()
+ f.buildProtocol = self.createPeerConnection
+ # If this fails, the failure mode is going to be ugly, just like all
+ # conflicted-port failures. But, at least it won't proceed.
+ yield endpoint.listen(f)
+ yield Lock.exclusive(NodeInfo.table).on(txn)
+ nodes = yield self.activeNodes(txn)
+ selves = [node for node in nodes
+ if ((node.hostname == self.hostname) and
+ (node.port == self.ampPort))]
+ if selves:
+ self.thisProcess = selves[0]
+ nodes.remove(self.thisProcess)
+ yield self.thisProcess.update(pid=self.pid,
+ time=datetime.now())
+ else:
+ self.thisProcess = yield NodeInfo.create(
+ txn, hostname=self.hostname, port=self.ampPort,
+ pid=self.pid, time=datetime.now()
+ )
+ for node in nodes:
+ self._startConnectingTo(node)
+
+ self._startingUp = inTransaction(self.transactionFactory, startup)
+ @self._startingUp.addBoth
+ def done(result):
+ self._startingUp = None
+ return result
+
+
+ @inlineCallbacks
+ def stopService(self):
+ """
+ Stop this service, terminating any incoming or outgoing connections.
+ """
+ yield super(PeerConnectionPool, self).stopService()
+ if self._startingUp is not None:
+ yield self._startingUp
+ if self._listeningPortObject is not None:
+ yield self._listeningPortObject.stopListening()
+ if self._lostWorkCheckCall is not None:
+ self._lostWorkCheckCall.cancel()
+ if self._currentWorkDeferred is not None:
+ yield self._currentWorkDeferred
+ for peer in self.peers:
+ peer.transport.loseConnection()
+
+
+ def activeNodes(self, txn):
+ """
+ Load information about all other nodes.
+ """
+ return NodeInfo.all(txn)
+
+
+ def mapPeer(self, host, port, peer):
+ """
+ A peer has been identified as belonging to the given host/port
+ combination. Disconnect any other peer that claims to be connected for
+ the same peer.
+ """
+ # if (host, port) in self.mappedPeers:
+ # TODO: think about this for race conditions
+ # self.mappedPeers.pop((host, port)).transport.loseConnection()
+ self.mappedPeers[(host, port)] = peer
+
+
+ def _startConnectingTo(self, node):
+ """
+ Start an outgoing connection to another master process.
+
+ @param node: a description of the master to connect to.
+ @type node: L{NodeInfo}
+ """
+ f = Factory()
+ f.buildProtocol = self.createPeerConnection
+ @passthru(node.endpoint(self.reactor).connect(f).addCallback)
+ def connected(proto):
+ self.mapPeer(node, proto)
+ proto.callRemote(IdentifyNode, self.thisProcess)
+
+
+ def createPeerConnection(self, addr):
+ return ConnectionFromPeerNode(self)
+
+
+
+class ImmediateWorkProposal(object):
+ """
+ Like L{WorkProposal}, but for items that must be executed immediately
+ because no real queue is set up yet.
+
+ @see: L{WorkProposal}, L{NullQueuer.enqueueWork}
+ """
+ def __init__(self, proposed, done):
+ self.proposed = proposed
+ self.done = done
+
+
+ def whenExecuted(self):
+ return _cloneDeferred(self.done)
+
+
+ def whenProposed(self):
+ return _cloneDeferred(self.proposed)
+
+
+ def whenCommitted(self):
+ return _cloneDeferred(self.done)
+
+
+
+class NullQueuer(object):
+ """
+ When work is enqueued with this queuer, it is just executed immediately,
+ within the same transaction. While this is technically correct, it is not
+ very efficient.
+ """
+ implements(IQueuer)
+
+ def enqueueWork(self, txn, workItemType, **kw):
+ """
+ Do this work immediately.
+
+ @see: L{PeerConnectionPool.enqueueWork}
+
+ @return: a pseudo work proposal, since everything completes at the same
+ time.
+ @rtype: L{ImmediateWorkProposal}
+ """
+ proposed = Deferred()
+ done = Deferred()
+ @inlineCallbacks
+ def doit():
+ item = yield self.workItemType.create(self.txn, **self.kw)
+ proposed.callback(True)
+ yield item.delete()
+ yield item.doWork()
+ @txn.postCommit
+ def committed():
+ done.callback(True)
+ @txn.postAbort
+ def aborted():
+ tf = TransactionFailed()
+ done.errback(tf)
+ if not proposed.called:
+ proposed.errback(tf)
+ return ImmediateWorkProposal(proposed, done)
+
+
+
Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -19,19 +19,19 @@
"""
from itertools import count
+from Queue import Empty
from zope.interface.verify import verifyClass, verifyObject
from zope.interface.declarations import implements
from twisted.python.threadpool import ThreadPool
+from twisted.python.failure import Failure
from twisted.trial.unittest import TestCase
-from twisted.internet.defer import execute
from twisted.internet.task import Clock
from twisted.internet.interfaces import IReactorThreads
-from twisted.internet.defer import Deferred
from twisted.test.proto_helpers import StringTransport
@@ -45,6 +45,7 @@
from twext.enterprise.adbapi2 import FailsafeException
from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
from twext.enterprise.adbapi2 import ConnectionPool
+from twext.internet.threadutils import ThreadHolder
def resultOf(deferred, propagate=False):
@@ -65,6 +66,22 @@
+class AssertResultHelper(object):
+ """
+ Mixin for asserting about synchronous Deferred results.
+ """
+
+ def assertResultList(self, resultList, expected):
+ if not resultList:
+ self.fail("No result; Deferred didn't fire yet.")
+ else:
+ if isinstance(resultList[0], Failure):
+ resultList[0].raiseException()
+ else:
+ self.assertEqual(resultList, [expected])
+
+
+
class Child(object):
"""
An object with a L{Parent}, in its list of C{children}.
@@ -326,59 +343,75 @@
-class FakeThreadHolder(object):
+class FakeThreadHolder(ThreadHolder):
"""
- Run things submitted to this ThreadHolder on the main thread, so that
+ Run things to submitted this ThreadHolder on the main thread, so that
execution is easier to control.
"""
def __init__(self, test):
+ super(FakeThreadHolder, self).__init__(self)
+ self.test = test
self.started = False
self.stopped = False
- self.test = test
- self.queue = []
+ self._workerIsRunning = False
def start(self):
- """
- Mark this L{FakeThreadHolder} as not started.
- """
self.started = True
+ return super(FakeThreadHolder, self).start()
def stop(self):
- """
- Mark this L{FakeThreadHolder} as stopped.
- """
- def stopped(nothing):
- self.stopped = True
- return self.submit(lambda : None).addCallback(stopped)
+ result = super(FakeThreadHolder, self).stop()
+ self.stopped = True
+ return result
- def submit(self, work):
+ @property
+ def _q(self):
+ return self._q_
+
+
+ @_q.setter
+ def _q(self, newq):
+ if newq is not None:
+ oget = newq.get
+ newq.get = lambda: oget(timeout=0)
+ oput = newq.put
+ def putit(x):
+ p = oput(x)
+ if not self.test.paused:
+ self.flush()
+ return p
+ newq.put = putit
+ self._q_ = newq
+
+
+ def callFromThread(self, f, *a, **k):
+ result = f(*a, **k)
+ return result
+
+
+ def callInThread(self, f, *a, **k):
"""
- Call the function (or queue it)
+ This should be called only once, to start the worker function that
+ dedicates a thread to this L{ThreadHolder}.
"""
- if self.test.paused:
- d = Deferred()
- self.queue.append((d, work))
- return d
- else:
- return execute(work)
+ self._workerIsRunning = True
def flush(self):
"""
Fire all deferreds previously returned from submit.
"""
- self.queue, queue = [], self.queue
- for (d, work) in queue:
- try:
- result = work()
- except:
- d.errback()
+ try:
+ while self._workerIsRunning and self._qpull():
+ pass
else:
- d.callback(result)
+ self._workerIsRunning = False
+ except Empty:
+ pass
@@ -525,7 +558,7 @@
-class ConnectionPoolTests(ConnectionPoolHelper, TestCase):
+class ConnectionPoolTests(ConnectionPoolHelper, TestCase, AssertResultHelper):
"""
Tests for L{ConnectionPool}.
"""
@@ -634,7 +667,7 @@
self.assertEquals(len(errors), 1)
stopd = []
self.pool.stopService().addBoth(stopd.append)
- self.assertEquals([None], stopd)
+ self.assertResultList(stopd, None)
self.assertEquals(self.clock.calls, [])
[holder] = self.holders
self.assertEquals(holder.started, True)
@@ -643,8 +676,8 @@
def test_shutdownDuringAttemptSuccess(self):
"""
- If L{ConnectionPool.stopService} is called while a connection attempt is
- outstanding, the resulting L{Deferred} won't be fired until the
+ If L{ConnectionPool.stopService} is called while a connection attempt
+ is outstanding, the resulting L{Deferred} won't be fired until the
connection attempt has finished; in this case, succeeded.
"""
self.pauseHolders()
@@ -653,7 +686,7 @@
self.pool.stopService().addBoth(stopd.append)
self.assertEquals(stopd, [])
self.flushHolders()
- self.assertEquals(stopd, [None])
+ self.assertResultList(stopd, None)
[holder] = self.holders
self.assertEquals(holder.started, True)
self.assertEquals(holder.stopped, True)
@@ -661,8 +694,8 @@
def test_shutdownDuringAttemptFailed(self):
"""
- If L{ConnectionPool.stopService} is called while a connection attempt is
- outstanding, the resulting L{Deferred} won't be fired until the
+ If L{ConnectionPool.stopService} is called while a connection attempt
+ is outstanding, the resulting L{Deferred} won't be fired until the
connection attempt has finished; in this case, failed.
"""
self.factory.defaultFail()
@@ -674,7 +707,7 @@
self.flushHolders()
errors = self.flushLoggedErrors(FakeConnectionError)
self.assertEquals(len(errors), 1)
- self.assertEquals(stopd, [None])
+ self.assertResultList(stopd, None)
[holder] = self.holders
self.assertEquals(holder.started, True)
self.assertEquals(holder.stopped, True)
@@ -699,7 +732,7 @@
self.assertEquals(stopResult, [])
self.flushHolders()
#self.assertEquals(abortResult, [None])
- self.assertEquals(stopResult, [None])
+ self.assertResultList(stopResult, None)
def test_stopServiceWithSpooled(self):
@@ -845,10 +878,10 @@
abortResult = self.resultOf(it.abort())
# steal it from the queue so we can do it out of order
- d, work = self.holders[0].queue.pop()
+ d, work = self.holders[0]._q.get()
# that should be the only work unit so don't continue if something else
# got in there
- self.assertEquals(self.holders[0].queue, [])
+ self.assertEquals(list(self.holders[0]._q.queue), [])
self.assertEquals(len(self.holders), 1)
self.flushHolders()
stopResult = self.resultOf(self.pool.stopService())
Modified: CalendarServer/trunk/twext/internet/threadutils.py
===================================================================
--- CalendarServer/trunk/twext/internet/threadutils.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/twext/internet/threadutils.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -45,22 +45,32 @@
"""
Worker function which runs in a non-reactor thread.
"""
- while True:
- work = self._q.get()
- if work is _DONE:
- def finishStopping():
- self._state = _STATE_STOPPED
- self._q = None
- s = self._stopper
- self._stopper = None
- s.callback(None)
- self._reactor.callFromThread(finishStopping)
- return
- self._oneWorkUnit(*work)
+ while self._qpull():
+ pass
+ def _qpull(self):
+ """
+ Pull one item off the queue and react appropriately.
+
+ Return whether or not to keep going.
+ """
+ work = self._q.get()
+ if work is _DONE:
+ def finishStopping():
+ self._state = _STATE_STOPPED
+ self._q = None
+ s = self._stopper
+ self._stopper = None
+ s.callback(None)
+ self._reactor.callFromThread(finishStopping)
+ return False
+ self._oneWorkUnit(*work)
+ return True
+
+
def _oneWorkUnit(self, deferred, instruction):
- try:
+ try:
result = instruction()
except:
etype, evalue, etb = sys.exc_info()
@@ -80,6 +90,8 @@
@return: L{Deferred} that fires with the result of L{work}
"""
+ if self._state != _STATE_RUNNING:
+ raise RuntimeError("not running")
d = Deferred()
self._q.put((d, work))
return d
Modified: CalendarServer/trunk/txdav/base/datastore/file.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/file.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/txdav/base/datastore/file.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -187,15 +187,19 @@
self.log_error("Cannot undo DataStoreTransaction")
raise
- for (operation, ignored) in self._postCommitOperations:
+ for operation in self._postCommitOperations:
operation()
- def postCommit(self, operation, immediately=False):
- self._postCommitOperations.append((operation, immediately))
+ def postCommit(self, operation):
+ self._postCommitOperations.append(operation)
+
+
def postAbort(self, operation):
self._postAbortOperations.append(operation)
+
+
class FileMetaDataMixin(object):
implements(IDataStoreObject)
Modified: CalendarServer/trunk/txdav/base/datastore/util.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/util.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/txdav/base/datastore/util.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -70,13 +70,19 @@
def setAfterCommit(self, transaction, key, value):
- transaction.postCommit(lambda: self.set(key, value), immediately=True)
+ def setit():
+ # Don't return Deferred; let the postCommit chain continue.
+ self.set(key, value)
+ transaction.postCommit(setit)
def invalidateAfterCommit(self, transaction, key):
# Invalidate now (so that operations within this transaction see it)
# and *also* post-commit (because there could be a scheduled setAfterCommit
# for this key)
- transaction.postCommit(lambda: self.delete(key), immediately=True)
+ def delit():
+ # Don't return Deferred; let the postCommit chain continue.
+ self.delete(key)
+ transaction.postCommit(delit)
return self.delete(key)
# Home child objects by name
Modified: CalendarServer/trunk/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/txdav/common/datastore/sql.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -85,6 +85,7 @@
from txdav.xml.rfc2518 import DisplayName
from txdav.base.datastore.util import normalizeUUIDOrNot
+from twext.enterprise.queue import NullQueuer
from pycalendar.datetime import PyCalendarDateTime
@@ -132,8 +133,13 @@
@ivar quota: the amount of space granted to each calendar home (in bytes)
for storing attachments, or C{None} if quota should not be enforced.
+ @type quota: C{int} or C{NoneType}
- @type quota: C{int} or C{NoneType}
+ @ivar queuer: An object with an C{enqueueWork} method, from
+ L{twext.enterprise.queue}. Initially, this is a L{NullQueuer}, so it
+ is always usable, but in a properly configured environment it will be
+ upgraded to a more capable object that can distribute work throughout a
+ cluster.
"""
implements(ICalendarStore)
@@ -160,6 +166,7 @@
self.logSQL = logSQL
self.logTransactionWaits = logTransactionWaits
self.timeoutTransactions = timeoutTransactions
+ self.queuer = NullQueuer()
self._migrating = False
self._enableNotifications = True
@@ -335,6 +342,8 @@
if self.timeoutSeconds:
self.delayedTimeout = self.callLater(self.timeoutSeconds, _forceAbort)
+
+
class CommonStoreTransaction(object):
"""
Transaction implementation for SQL database.
@@ -350,8 +359,6 @@
self._calendarHomes = {}
self._addressbookHomes = {}
self._notificationHomes = {}
- self._postCommitOperations = []
- self._postAbortOperations = []
self._notifierFactory = notifierFactory
self._notifiedAlready = set()
self._bumpedAlready = set()
@@ -382,7 +389,10 @@
self.paramstyle = sqlTxn.paramstyle
self.dialect = sqlTxn.dialect
- self._stats = TransactionStatsCollector(self._label, self._store.logStatsLogFile) if self._store.logStats else None
+ self._stats = (
+ TransactionStatsCollector(self._label, self._store.logStatsLogFile)
+ if self._store.logStats else None
+ )
self.statementCount = 0
self.iudCount = 0
self.currentStatement = None
@@ -402,6 +412,22 @@
__import__("txdav.carddav.datastore.sql")
+ def enqueue(self, workItem, **kw):
+ """
+ Enqueue a L{twext.enterprise.queue.WorkItem} for later execution.
+
+ For example::
+
+ yield (txn.enqueue(MyWorkItem, workDescription="some work to do")
+ .whenProposed())
+
+ @return: a work proposal describing various events in the work's
+ life-cycle.
+ @rtype: L{twext.enterprise.queue.WorkProposal}
+ """
+ return self._store.queuer.enqueueWork(self, workItem, **kw)
+
+
def store(self):
return self._store
@@ -606,18 +632,18 @@
return self._apnSubscriptionsBySubscriberQuery.on(self, subscriberGUID=guid)
- def postCommit(self, operation, immediately=False):
+ def postCommit(self, operation):
"""
Run things after C{commit}.
"""
- self._postCommitOperations.append((operation, immediately))
+ return self._sqlTxn.postCommit(operation)
def postAbort(self, operation):
"""
Run things after C{abort}.
"""
- self._postAbortOperations.append(operation)
+ return self._sqlTxn.postAbort(operation)
def isNotifiedAlready(self, obj):
@@ -774,30 +800,16 @@
"""
Commit the transaction and execute any post-commit hooks.
"""
- @inlineCallbacks
- def postCommit(ignored):
- for operation, immediately in self._postCommitOperations:
- if immediately:
- yield operation()
- else:
- operation()
- returnValue(ignored)
-
if self._stats:
self._stats.printReport()
+ return self._sqlTxn.commit()
- return self._sqlTxn.commit().addCallback(postCommit)
-
def abort(self):
"""
Abort the transaction.
"""
- def postAbort(ignored):
- for operation in self._postAbortOperations:
- operation()
- return ignored
- return self._sqlTxn.abort().addCallback(postAbort)
+ return self._sqlTxn.abort()
def _oldEventsBase(limited): #@NoSelf
Modified: CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/current.sql 2012-08-11 09:01:06 UTC (rev 9689)
@@ -22,7 +22,23 @@
create sequence RESOURCE_ID_SEQ;
+-------------------------
+-- Cluster Bookkeeping --
+-------------------------
+-- Information about a process connected to this database.
+
+-- Note that this must match the node info schema in twext.enterprise.queue.
+create table NODE_INFO (
+ HOSTNAME varchar(255) not null,
+ PID integer not null,
+ PORT integer not null,
+ TIME timestamp not null default timezone('UTC', CURRENT_TIMESTAMP),
+
+ primary key(HOSTNAME, PORT)
+);
+
+
-------------------
-- Calendar Home --
-------------------
@@ -497,6 +513,6 @@
VALUE varchar(255)
);
-insert into CALENDARSERVER values ('VERSION', '11');
+insert into CALENDARSERVER values ('VERSION', '12');
insert into CALENDARSERVER values ('CALENDAR-DATAVERSION', '3');
insert into CALENDARSERVER values ('ADDRESSBOOK-DATAVERSION', '1');
Copied: CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_11_to_12.sql (from rev 9688, CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_11_to_12.sql)
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_11_to_12.sql (rev 0)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_11_to_12.sql 2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,31 @@
+----
+-- Copyright (c) 2012 Apple Inc. All rights reserved.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+----
+
+---------------------------------------------------
+-- Upgrade database schema from VERSION 11 to 12 --
+---------------------------------------------------
+
+create table NODE_INFO (
+ "HOSTNAME" nvarchar2(255),
+ "PID" integer not null,
+ "PORT" integer not null,
+ "TIME" timestamp default CURRENT_TIMESTAMP at time zone 'UTC' not null,
+ primary key("HOSTNAME", "PORT")
+);
+
+-- Now update the version
+-- No data upgrades
+update CALENDARSERVER set VALUE = '12' where NAME = 'VERSION';
Copied: CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_11_to_12.sql (from rev 9688, CalendarServer/branches/users/glyph/q/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_11_to_12.sql)
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_11_to_12.sql (rev 0)
+++ CalendarServer/trunk/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_11_to_12.sql 2012-08-11 09:01:06 UTC (rev 9689)
@@ -0,0 +1,31 @@
+----
+-- Copyright (c) 2012 Apple Inc. All rights reserved.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+----
+
+---------------------------------------------------
+-- Upgrade database schema from VERSION 11 to 12 --
+---------------------------------------------------
+
+create table NODE_INFO (
+ HOSTNAME varchar(255) not null,
+ PID integer not null,
+ PORT integer not null,
+ TIME timestamp not null default timezone('UTC', CURRENT_TIMESTAMP),
+ primary key(HOSTNAME, PORT)
+);
+
+-- Now update the version
+-- No data upgrades
+update CALENDARSERVER set VALUE = '12' where NAME = 'VERSION';
Modified: CalendarServer/trunk/txdav/idav.py
===================================================================
--- CalendarServer/trunk/txdav/idav.py 2012-08-11 08:55:55 UTC (rev 9688)
+++ CalendarServer/trunk/txdav/idav.py 2012-08-11 09:01:06 UTC (rev 9689)
@@ -207,29 +207,15 @@
"""
- def postCommit(operation, immediately=False):
+ def postCommit(operation):
"""
- Registers an operation to be executed after the transaction is
- committed.
-
- postCommit can be called multiple times, and operations are executed
- in the order which they were registered.
-
- @param operation: a callable.
- @param immediately: a boolean; True means finish this operation *before* the
- commit( ) call completes, defaults to False.
+ @see: L{IAsyncTransaction.postCommit}
"""
def postAbort(operation):
"""
- Registers an operation to be executed after the transaction is
- aborted.
-
- postAbort can be called multiple times, and operations are executed
- in the order which they were registered.
-
- @param operation: a callable.
+ @see: L{IAsyncTransaction.postAbort}
"""
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/e75bd55f/attachment-0001.html>
More information about the calendarserver-changes
mailing list