Revision: 6492 http://trac.macosforge.org/projects/calendarserver/changeset/6492 Author: glyph@apple.com Date: 2010-11-01 14:11:58 -0700 (Mon, 01 Nov 2010) Log Message: ----------- refactor data store to take an async SQL transaction instead of a synchronous one Modified Paths: -------------- CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py Modified: CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py =================================================================== --- CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py 2010-11-01 21:11:10 UTC (rev 6491) +++ CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py 2010-11-01 21:11:58 UTC (rev 6492) @@ -94,7 +94,7 @@ maxConnections=config.Postgres.MaxConnections, options=config.Postgres.Options, ) - return CommonSQLDataStore(postgresService.produceConnection, + return CommonSQLDataStore(postgresService.produceLocalTransaction, notifierFactory, dbRoot.child("attachments"), config.EnableCalDAV, config.EnableCardDAV) else: Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py =================================================================== --- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py 2010-11-01 21:11:10 UTC (rev 6491) +++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py 2010-11-01 21:11:58 UTC (rev 6492) @@ -20,10 +20,15 @@ """ import os import pwd +import sys #import thread +from Queue import Queue + from hashlib import md5 +from zope.interface import implements + from twisted.python.procutils import which from twisted.internet.protocol import ProcessProtocol @@ -35,6 +40,9 @@ from twisted.protocols.basic import LineReceiver from twisted.internet import reactor +from twisted.python.failure import Failure +from txdav.idav import IAsyncTransaction +from txdav.idav import AlreadyFinishedError from twisted.internet.defer import Deferred from twisted.application.service import MultiService @@ -167,6 +175,7 @@ self.completionDeferred.callback(None) + class ErrorOutput(Exception): """ The process produced some error output and exited with a non-zero exit @@ -174,6 +183,7 @@ """ + class CapturingProcessProtocol(ProcessProtocol): """ A L{ProcessProtocol} that captures its output and error. @@ -226,6 +236,183 @@ self.deferred.callback(''.join(self.output)) + +_DONE = object() + +_STATE_STOPPED = 'STOPPED' +_STATE_RUNNING = 'RUNNING' +_STATE_STOPPING = 'STOPPING' + +class ThreadHolder(object): + """ + A queue which will hold a reactor threadpool thread open until all of the + work in that queue is done. + """ + + def __init__(self, reactor): + self._reactor = reactor + self._state = _STATE_STOPPED + self._stopper = None + self._q = None + + + def _run(self): + """ + Worker function which runs in a non-reactor thread. + """ + while True: + work = self._q.get() + if work is _DONE: + def finishStopping(): + self._state = _STATE_STOPPED + self._q = None + s = self._stopper + self._stopper = None + s.callback(None) + self._reactor.callFromThread(finishStopping) + return + self._oneWorkUnit(*work) + + + def _oneWorkUnit(self, deferred, instruction): + try: + result = instruction() + except: + etype, evalue, etb = sys.exc_info() + def relayFailure(): + f = Failure(evalue, etype, etb) + deferred.errback(f) + self._reactor.callFromThread(relayFailure) + else: + self._reactor.callFromThread(deferred.callback, result) + + + def submit(self, work): + """ + Submit some work to be run. + + @param work: a 0-argument callable, which will be run in a thread. + + @return: L{Deferred} that fires with the result of L{work} + """ + d = Deferred() + self._q.put((d, work)) + return d + + + def start(self): + """ + Start this thing, if it's stopped. + """ + if self._state != _STATE_STOPPED: + raise RuntimeError("Not stopped.") + self._state = _STATE_RUNNING + self._q = Queue(0) + self._reactor.callInThread(self._run) + + + def stop(self): + """ + Stop this thing and release its thread, if it's running. + """ + if self._state != _STATE_RUNNING: + raise RuntimeError("Not running.") + s = self._stopper = Deferred() + self._state = _STATE_STOPPING + self._q.put(_DONE) + return s + + + +class ThisProcessSqlTxn(object): + """ + L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the + current process. + """ + implements(IAsyncTransaction) + + def __init__(self, connectionFactory): + """ + @param connectionFactory: A 0-argument callable which returns a DB-API + 2.0 connection. + """ + self._completed = False + self._holder = ThreadHolder(reactor) + self._holder.start() + def initCursor(): + # support threadlevel=1; we can't necessarily cursor() in a + # different thread than we do transactions in. + + # FIXME: may need to be pooling ThreadHolders along with + # connections, if threadlevel=1 requires connect() be called in the + # same thread as cursor() et. al. + self._connection = connectionFactory() + self._cursor = self._connection.cursor() + self._holder.submit(initCursor) + + + def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None): + self._cursor.execute(sql, args) + if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0: + raise raiseOnZeroRowCount() + if self._cursor.description: + return self._cursor.fetchall() + else: + return None + + + noisy = False + + def execSQL(self, *args, **kw): + result = self._holder.submit( + lambda : self._reallyExecSQL(*args, **kw) + ) + if self.noisy: + def reportResult(results): + sys.stdout.write("\n".join([ + "", + "SQL: %r %r" % (args, kw), + "Results: %r" % (results,), + "", + ])) + return results + result.addBoth(reportResult) + return result + + + def commit(self): + if not self._completed: + self._completed = True + def reallyCommit(): + self._connection.commit() + self._connection.close() + result = self._holder.submit(reallyCommit) + self._holder.stop() + return result + else: + raise AlreadyFinishedError() + + + def abort(self): + if not self._completed: + def reallyAbort(): + self._connection.rollback() + self._connection.close() + self._completed = True + result = self._holder.submit(reallyAbort) + self._holder.stop() + return result + else: + raise AlreadyFinishedError() + + + def __del__(self): + if not self._completed: + print 'CommonStoreTransaction.__del__: OK' + self.abort() + + + class PostgresService(MultiService): def __init__(self, dataStoreDirectory, subServiceFactory, @@ -290,6 +477,7 @@ self.monitor = None self.openConnections = [] + def activateDelayedShutdown(self): """ Call this when starting database initialization code to protect against @@ -301,6 +489,7 @@ """ self.delayedShutdown = True + def deactivateDelayedShutdown(self): """ Call this when database initialization code has completed so that the @@ -310,6 +499,7 @@ if self.shutdownDeferred: self.shutdownDeferred.callback(None) + def produceConnection(self, label="<unlabeled>", databaseName=None): """ Produce a DB-API 2.0 connection pointed at this database. @@ -345,6 +535,13 @@ return w + def produceLocalTransaction(self, label="<unlabeled>"): + """ + Create a L{IAsyncTransaction} based on a thread in the current process. + """ + return ThisProcessSqlTxn(lambda : self.produceConnection(label)) + + def ready(self): """ Subprocess is ready. Time to initialize the subservice. @@ -389,6 +586,7 @@ # Only continue startup if we've not begun shutdown self.subServiceFactory(self.produceConnection).setServiceParent(self) + def pauseMonitor(self): """ Pause monitoring. This is a testing hook for when (if) we are @@ -417,9 +615,11 @@ monitor = _PostgresMonitor(self) pg_ctl = which("pg_ctl")[0] # check consistency of initdb and postgres? - + options = [] - options.append("-c listen_addresses='%s'" % (",".join(self.listenAddresses))) + options.append( + "-c listen_addresses='%s'" % (",".join(self.listenAddresses)) + ) if self.socketDir: options.append("-k '%s'" % (self.socketDir.path,)) options.append("-c shared_buffers=%d" % (self.sharedBuffers,)) Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py =================================================================== --- CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py 2010-11-01 21:11:10 UTC (rev 6491) +++ CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py 2010-11-01 21:11:58 UTC (rev 6492) @@ -25,26 +25,23 @@ "CommonHome", ] -import sys import datetime -from Queue import Queue -from zope.interface.declarations import implements, directlyProvides +from zope.interface import implements, directlyProvides +from twext.python.log import Logger, LoggingMixIn +from twext.web2.dav.element.rfc2518 import ResourceType +from twext.web2.http_headers import MimeType + from twisted.python import hashlib from twisted.python.modules import getModule from twisted.python.util import FancyEqMixin -from twisted.python.failure import Failure -from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks, returnValue, Deferred +from twisted.internet.defer import inlineCallbacks, returnValue from twisted.application.service import Service -from twext.python.log import Logger, LoggingMixIn from twext.internet.decorate import memoizedKey -from twext.web2.dav.element.rfc2518 import ResourceType -from twext.web2.http_headers import MimeType from txdav.common.datastore.sql_legacy import PostgresLegacyNotificationsEmulator from txdav.caldav.icalendarstore import ICalendarTransaction, ICalendarStore @@ -60,10 +57,7 @@ NoSuchObjectResourceError from txdav.common.inotifications import INotificationCollection, \ INotificationObject -from txdav.idav import IAsyncTransaction -from txdav.idav import AlreadyFinishedError - from txdav.base.propertystore.sql import PropertyStore from txdav.base.propertystore.base import PropertyName @@ -89,15 +83,17 @@ implements(ICalendarStore) - def __init__(self, connectionFactory, notifierFactory, attachmentsPath, - enableCalendars=True, enableAddressBooks=True): + def __init__(self, sqlTxnFactory, notifierFactory, attachmentsPath, + enableCalendars=True, enableAddressBooks=True, + label="unlabeled"): assert enableCalendars or enableAddressBooks - self.connectionFactory = connectionFactory + self.sqlTxnFactory = sqlTxnFactory self.notifierFactory = notifierFactory self.attachmentsPath = attachmentsPath self.enableCalendars = enableCalendars self.enableAddressBooks = enableAddressBooks + self.label = label def eachCalendarHome(self): @@ -117,7 +113,7 @@ def newTransaction(self, label="unlabeled", migrating=False): return CommonStoreTransaction( self, - self.connectionFactory, + self.sqlTxnFactory(), self.enableCalendars, self.enableAddressBooks, self.notifierFactory, @@ -126,188 +122,7 @@ ) -_DONE = object() -_STATE_STOPPED = "STOPPED" -_STATE_RUNNING = "RUNNING" -_STATE_STOPPING = "STOPPING" - -class ThreadHolder(object): - """ - A queue which will hold a reactor threadpool thread open until all of the - work in that queue is done. - """ - - def __init__(self, reactor): - self._reactor = reactor - self._state = _STATE_STOPPED - self._stopper = None - self._q = None - - - def _run(self): - """ - Worker function which runs in a non-reactor thread. - """ - while True: - work = self._q.get() - if work is _DONE: - def finishStopping(): - self._state = _STATE_STOPPED - self._q = None - s = self._stopper - self._stopper = None - s.callback(None) - self._reactor.callFromThread(finishStopping) - return - self._oneWorkUnit(*work) - - - def _oneWorkUnit(self, deferred, instruction): - try: - result = instruction() - except: - etype, evalue, etb = sys.exc_info() - def relayFailure(): - f = Failure(evalue, etype, etb) - deferred.errback(f) - self._reactor.callFromThread(relayFailure) - else: - self._reactor.callFromThread(deferred.callback, result) - - - def submit(self, work): - """ - Submit some work to be run. - - @param work: a 0-argument callable, which will be run in a thread. - - @return: L{Deferred} that fires with the result of L{work} - """ - d = Deferred() - self._q.put((d, work)) - return d - - - def start(self): - """ - Start this thing, if it's stopped. - """ - if self._state != _STATE_STOPPED: - raise RuntimeError("Not stopped.") - self._state = _STATE_RUNNING - self._q = Queue(0) - self._reactor.callInThread(self._run) - - - def stop(self): - """ - Stop this thing and release its thread, if it's running. - """ - if self._state != _STATE_RUNNING: - raise RuntimeError("Not running.") - s = self._stopper = Deferred() - self._state = _STATE_STOPPING - self._q.put(_DONE) - return s - - - -class ThisProcessSqlTxn(object): - """ - L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the - current process. - """ - implements(IAsyncTransaction) - - def __init__(self, connectionFactory): - """ - @param connectionFactory: A 0-argument callable which returns a DB-API - 2.0 connection. - """ - self._completed = False - self._holder = ThreadHolder(reactor) - self._holder.start() - def initCursor(): - # support threadlevel=1; we can't necessarily cursor() in a - # different thread than we do transactions in. - - # FIXME: may need to be pooling ThreadHolders along with - # connections, if threadlevel=1 requires connect() be called in the - # same thread as cursor() et. al. - self._connection = connectionFactory() - self._cursor = self._connection.cursor() - self._holder.submit(initCursor) - - - def store(self): - return self._store - - - def __repr__(self): - return "PG-TXN<%s>" % (self._label,) - - - def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None): - self._cursor.execute(sql, args) - if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0: - raise raiseOnZeroRowCount() - if self._cursor.description: - return self._cursor.fetchall() - else: - return None - - - def execSQL(self, *args, **kw): - result = self._holder.submit( - lambda : self._reallyExecSQL(*args, **kw) - ) - if self.noisy: - def reportResult(results): - sys.stdout.write("\n".join([ - "", - "SQL (%d): %r %r" % (self._txid, args, kw), - "Results (%d): %r" % (self._txid, results,), - "", - ])) - return results - result.addBoth(reportResult) - return result - - - def commit(self): - if not self._completed: - self._completed = True - def reallyCommit(): - self._connection.commit() - self._connection.close() - result = self._holder.submit(reallyCommit) - self._holder.stop() - return result - else: - raise AlreadyFinishedError() - - - def abort(self): - if not self._completed: - def reallyAbort(): - self._connection.rollback() - self._connection.close() - self._completed = True - result = self._holder.submit(reallyAbort) - self._holder.stop() - return result - else: - raise AlreadyFinishedError() - - - def __del__(self): - if not self._completed: - print "CommonStoreTransaction.__del__: OK" - self.abort() - - - class CommonStoreTransaction(object): """ Transaction implementation for SQL database. @@ -317,7 +132,7 @@ id = 0 - def __init__(self, store, cursorFactory, + def __init__(self, store, sqlTxn, enableCalendars, enableAddressBooks, notifierFactory, label, migrating=False): self._store = store @@ -345,7 +160,7 @@ CommonStoreTransaction._homeClass[EADDRESSBOOKTYPE] = AddressBookHome CommonStoreTransaction._homeTable[ECALENDARTYPE] = CALENDAR_HOME_TABLE CommonStoreTransaction._homeTable[EADDRESSBOOKTYPE] = ADDRESSBOOK_HOME_TABLE - self._sqlTxn = ThisProcessSqlTxn(cursorFactory) + self._sqlTxn = sqlTxn def store(self): Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py =================================================================== --- CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py 2010-11-01 21:11:10 UTC (rev 6491) +++ CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py 2010-11-01 21:11:58 UTC (rev 6492) @@ -28,7 +28,7 @@ from twext.python.vcomponent import VComponent from twisted.internet import reactor -from twisted.internet.defer import Deferred, succeed, inlineCallbacks +from twisted.internet.defer import Deferred, inlineCallbacks from twisted.internet.task import deferLater from twisted.python import log @@ -52,6 +52,8 @@ print connection.label, connection.state print '--- CONNECTIONS END ---' + + class SQLStoreBuilder(object): """ Test-fixture-builder which can construct a PostgresStore. @@ -79,18 +81,18 @@ pass try: self.store = CommonDataStore( - lambda label=None: connectionFactory( - label or currentTestID - ), + self.sharedService.produceLocalTransaction, notifierFactory, attachmentRoot ) + self.store.label = currentTestID except: ready.errback() raise else: - self.cleanDatabase(testCase) - ready.callback(self.store) + def readyNow(ignored): + ready.callback(self.store) + return self.cleanDatabase(testCase).addCallback(readyNow) return self.store self.sharedService = PostgresService( dbRoot, getReady, v1_schema, resetSchema=True, @@ -107,8 +109,9 @@ result = ready else: self.store.notifierFactory = notifierFactory - self.cleanDatabase(testCase) - result = succeed(self.store) + result = self.cleanDatabase(testCase).addCallback( + lambda ignored: self.store + ) def cleanUp(): # FIXME: clean up any leaked connections and report them with an @@ -120,11 +123,13 @@ return result + @inlineCallbacks def cleanDatabase(self, testCase): - cleanupConn = self.store.connectionFactory( + cleanupTxn = self.store.sqlTxnFactory( "%s schema-cleanup" % (testCase.id(),) ) - cursor = cleanupConn.cursor() + # TODO: should be getting these tables from a declaration of the schema + # somewhere. tables = ['INVITE', 'RESOURCE_PROPERTY', 'ATTACHMENT', @@ -143,16 +148,16 @@ 'NOTIFICATION_HOME'] for table in tables: try: - cursor.execute("delete from "+table) + yield cleanupTxn.execSQL("delete from "+table, []) except: log.err() - cleanupConn.commit() - cleanupConn.close() + yield cleanupTxn.commit() theStoreBuilder = SQLStoreBuilder() buildStore = theStoreBuilder.buildStore + @inlineCallbacks def populateCalendarsFrom(requirements, store): """
participants (1)
-
source_changes@macosforge.org