Revision: 6491 http://trac.macosforge.org/projects/calendarserver/changeset/6491 Author: glyph@apple.com Date: 2010-11-01 14:11:10 -0700 (Mon, 01 Nov 2010) Log Message: ----------- separate sql transaction from logical transaction Modified Paths: -------------- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py CalendarServer/branches/users/glyph/sharedpool/txdav/idav.py Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py =================================================================== --- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py 2010-11-01 21:06:58 UTC (rev 6490) +++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py 2010-11-01 21:11:10 UTC (rev 6491) @@ -402,7 +402,7 @@ def unpauseMonitor(self): """ Unpause monitoring. - + @see: L{pauseMonitor} """ # for pipe in self.monitor.transport.pipes.values(): Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py =================================================================== --- CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py 2010-11-01 21:06:58 UTC (rev 6490) +++ CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py 2010-11-01 21:11:10 UTC (rev 6491) @@ -60,10 +60,12 @@ NoSuchObjectResourceError from txdav.common.inotifications import INotificationCollection, \ INotificationObject +from txdav.idav import IAsyncTransaction +from txdav.idav import AlreadyFinishedError -from txdav.idav import AlreadyFinishedError + +from txdav.base.propertystore.sql import PropertyStore from txdav.base.propertystore.base import PropertyName -from txdav.base.propertystore.sql import PropertyStore from twistedcaldav.customxml import NotificationType from twistedcaldav.dateops import datetimeMktime @@ -211,44 +213,19 @@ -class CommonStoreTransaction(object): +class ThisProcessSqlTxn(object): """ - Transaction implementation for SQL database. + L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the + current process. """ - _homeClass = {} - _homeTable = {} + implements(IAsyncTransaction) - noisy = False - id = 0 - - def __init__(self, store, connectionFactory, - enableCalendars, enableAddressBooks, - notifierFactory, label, migrating=False): - self._store = store + def __init__(self, connectionFactory): + """ + @param connectionFactory: A 0-argument callable which returns a DB-API + 2.0 connection. + """ self._completed = False - self._calendarHomes = {} - self._addressbookHomes = {} - self._notificationHomes = {} - self._postCommitOperations = [] - self._notifierFactory = notifierFactory - self._label = label - self._migrating = migrating - CommonStoreTransaction.id += 1 - self._txid = CommonStoreTransaction.id - - extraInterfaces = [] - if enableCalendars: - extraInterfaces.append(ICalendarTransaction) - if enableAddressBooks: - extraInterfaces.append(IAddressBookTransaction) - directlyProvides(self, *extraInterfaces) - - from txdav.caldav.datastore.sql import CalendarHome - from txdav.carddav.datastore.sql import AddressBookHome - CommonStoreTransaction._homeClass[ECALENDARTYPE] = CalendarHome - CommonStoreTransaction._homeClass[EADDRESSBOOKTYPE] = AddressBookHome - CommonStoreTransaction._homeTable[ECALENDARTYPE] = CALENDAR_HOME_TABLE - CommonStoreTransaction._homeTable[EADDRESSBOOKTYPE] = ADDRESSBOOK_HOME_TABLE self._holder = ThreadHolder(reactor) self._holder.start() def initCursor(): @@ -298,13 +275,88 @@ return result + def commit(self): + if not self._completed: + self._completed = True + def reallyCommit(): + self._connection.commit() + self._connection.close() + result = self._holder.submit(reallyCommit) + self._holder.stop() + return result + else: + raise AlreadyFinishedError() + + + def abort(self): + if not self._completed: + def reallyAbort(): + self._connection.rollback() + self._connection.close() + self._completed = True + result = self._holder.submit(reallyAbort) + self._holder.stop() + return result + else: + raise AlreadyFinishedError() + + def __del__(self): if not self._completed: print "CommonStoreTransaction.__del__: OK" self.abort() - @memoizedKey("uid", "_calendarHomes") + +class CommonStoreTransaction(object): + """ + Transaction implementation for SQL database. + """ + _homeClass = {} + _homeTable = {} + + id = 0 + + def __init__(self, store, cursorFactory, + enableCalendars, enableAddressBooks, + notifierFactory, label, migrating=False): + self._store = store + self._calendarHomes = {} + self._addressbookHomes = {} + self._notificationHomes = {} + self._postCommitOperations = [] + self._notifierFactory = notifierFactory + self._label = label + self._migrating = migrating + + CommonStoreTransaction.id += 1 + self._txid = CommonStoreTransaction.id + + extraInterfaces = [] + if enableCalendars: + extraInterfaces.append(ICalendarTransaction) + if enableAddressBooks: + extraInterfaces.append(IAddressBookTransaction) + directlyProvides(self, *extraInterfaces) + + from txdav.caldav.datastore.sql import CalendarHome + from txdav.carddav.datastore.sql import AddressBookHome + CommonStoreTransaction._homeClass[ECALENDARTYPE] = CalendarHome + CommonStoreTransaction._homeClass[EADDRESSBOOKTYPE] = AddressBookHome + CommonStoreTransaction._homeTable[ECALENDARTYPE] = CALENDAR_HOME_TABLE + CommonStoreTransaction._homeTable[EADDRESSBOOKTYPE] = ADDRESSBOOK_HOME_TABLE + self._sqlTxn = ThisProcessSqlTxn(cursorFactory) + + + def store(self): + return self._store + + + def __repr__(self): + return 'PG-TXN<%s>' % (self._label,) + + + @memoizedKey('uid', '_calendarHomes') def calendarHomeWithUID(self, uid, create=False): return self.homeWithUID(ECALENDARTYPE, uid, create=create) @@ -409,40 +461,35 @@ returnValue(collection) - def abort(self): - if not self._completed: - def reallyAbort(): - self._connection.rollback() - self._connection.close() - self._completed = True - result = self._holder.submit(reallyAbort) - self._holder.stop() - return result - else: - raise AlreadyFinishedError() + def postCommit(self, operation): + """ + Run things after C{commit}. + """ + self._postCommitOperations.append(operation) + def execSQL(self, *a, **kw): + """ + """ + return self._sqlTxn.execSQL(*a, **kw) + + def commit(self): - if not self._completed: - self._completed = True - def postCommit(ignored): - for operation in self._postCommitOperations: - operation() - def reallyCommit(): - self._connection.commit() - self._connection.close() - result = self._holder.submit(reallyCommit).addCallback(postCommit) - self._holder.stop() - return result - else: - raise AlreadyFinishedError() + """ + Commit the transaction and return + """ + def postCommit(ignored): + for operation in self._postCommitOperations: + operation() + return ignored + return self._sqlTxn.commit().addCallback(postCommit) - def postCommit(self, operation): + def abort(self): """ - Run things after C{commit}. + docstring for abort """ - self._postCommitOperations.append(operation) + return self._sqlTxn.abort() @@ -717,13 +764,14 @@ raise NoSuchHomeChildError() yield child._deletedSyncToken() - yield self._txn.execSQL( - "delete from %(name)s where %(column_RESOURCE_ID)s = %%s" % self._childTable, - [child._resourceID] - ) - self._children.pop(name, None) - if self._txn._cursor.rowcount == 0: - raise NoSuchHomeChildError() + try: + yield self._txn.execSQL( + "delete from %(name)s where %(column_RESOURCE_ID)s = %%s" % self._childTable, + [child._resourceID], + raiseOnZeroRowCount=NoSuchHomeChildError + ) + finally: + self._children.pop(name, None) child.notifyChanged() Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/idav.py =================================================================== --- CalendarServer/branches/users/glyph/sharedpool/txdav/idav.py 2010-11-01 21:06:58 UTC (rev 6490) +++ CalendarServer/branches/users/glyph/sharedpool/txdav/idav.py 2010-11-01 21:11:10 UTC (rev 6491) @@ -172,6 +172,33 @@ +class IAsyncTransaction(Interface): + """ + Asynchronous execution of SQL. + + Note that there is no {begin()} method; if an L{IAsyncTransaction} exists, + it is assumed to have been started. + """ + + def execSQL(sql, args): + """ + Execute some SQL. + + @return: L{Deferred} which fires C{list} of C{tuple} + """ + + + def commit(): + """ + """ + + + def abort(): + """ + """ + + + class ITransaction(Interface): """ Transaction that can be aborted and either succeeds or fails in
participants (1)
-
source_changes@macosforge.org