[CalendarServer-changes] [6492] CalendarServer/branches/users/glyph/sharedpool
source_changes at macosforge.org
source_changes at macosforge.org
Mon Nov 1 14:12:00 PDT 2010
Revision: 6492
http://trac.macosforge.org/projects/calendarserver/changeset/6492
Author: glyph at apple.com
Date: 2010-11-01 14:11:58 -0700 (Mon, 01 Nov 2010)
Log Message:
-----------
refactor data store to take an async SQL transaction instead of a synchronous one
Modified Paths:
--------------
CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py
CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py
CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py
CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py
Modified: CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py 2010-11-01 21:11:10 UTC (rev 6491)
+++ CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py 2010-11-01 21:11:58 UTC (rev 6492)
@@ -94,7 +94,7 @@
maxConnections=config.Postgres.MaxConnections,
options=config.Postgres.Options,
)
- return CommonSQLDataStore(postgresService.produceConnection,
+ return CommonSQLDataStore(postgresService.produceLocalTransaction,
notifierFactory, dbRoot.child("attachments"),
config.EnableCalDAV, config.EnableCardDAV)
else:
Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py 2010-11-01 21:11:10 UTC (rev 6491)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py 2010-11-01 21:11:58 UTC (rev 6492)
@@ -20,10 +20,15 @@
"""
import os
import pwd
+import sys
#import thread
+from Queue import Queue
+
from hashlib import md5
+from zope.interface import implements
+
from twisted.python.procutils import which
from twisted.internet.protocol import ProcessProtocol
@@ -35,6 +40,9 @@
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
+from twisted.python.failure import Failure
+from txdav.idav import IAsyncTransaction
+from txdav.idav import AlreadyFinishedError
from twisted.internet.defer import Deferred
from twisted.application.service import MultiService
@@ -167,6 +175,7 @@
self.completionDeferred.callback(None)
+
class ErrorOutput(Exception):
"""
The process produced some error output and exited with a non-zero exit
@@ -174,6 +183,7 @@
"""
+
class CapturingProcessProtocol(ProcessProtocol):
"""
A L{ProcessProtocol} that captures its output and error.
@@ -226,6 +236,183 @@
self.deferred.callback(''.join(self.output))
+
+_DONE = object()
+
+_STATE_STOPPED = 'STOPPED'
+_STATE_RUNNING = 'RUNNING'
+_STATE_STOPPING = 'STOPPING'
+
+class ThreadHolder(object):
+ """
+ A queue which will hold a reactor threadpool thread open until all of the
+ work in that queue is done.
+ """
+
+ def __init__(self, reactor):
+ self._reactor = reactor
+ self._state = _STATE_STOPPED
+ self._stopper = None
+ self._q = None
+
+
+ def _run(self):
+ """
+ Worker function which runs in a non-reactor thread.
+ """
+ while True:
+ work = self._q.get()
+ if work is _DONE:
+ def finishStopping():
+ self._state = _STATE_STOPPED
+ self._q = None
+ s = self._stopper
+ self._stopper = None
+ s.callback(None)
+ self._reactor.callFromThread(finishStopping)
+ return
+ self._oneWorkUnit(*work)
+
+
+ def _oneWorkUnit(self, deferred, instruction):
+ try:
+ result = instruction()
+ except:
+ etype, evalue, etb = sys.exc_info()
+ def relayFailure():
+ f = Failure(evalue, etype, etb)
+ deferred.errback(f)
+ self._reactor.callFromThread(relayFailure)
+ else:
+ self._reactor.callFromThread(deferred.callback, result)
+
+
+ def submit(self, work):
+ """
+ Submit some work to be run.
+
+ @param work: a 0-argument callable, which will be run in a thread.
+
+ @return: L{Deferred} that fires with the result of L{work}
+ """
+ d = Deferred()
+ self._q.put((d, work))
+ return d
+
+
+ def start(self):
+ """
+ Start this thing, if it's stopped.
+ """
+ if self._state != _STATE_STOPPED:
+ raise RuntimeError("Not stopped.")
+ self._state = _STATE_RUNNING
+ self._q = Queue(0)
+ self._reactor.callInThread(self._run)
+
+
+ def stop(self):
+ """
+ Stop this thing and release its thread, if it's running.
+ """
+ if self._state != _STATE_RUNNING:
+ raise RuntimeError("Not running.")
+ s = self._stopper = Deferred()
+ self._state = _STATE_STOPPING
+ self._q.put(_DONE)
+ return s
+
+
+
+class ThisProcessSqlTxn(object):
+ """
+ L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
+ current process.
+ """
+ implements(IAsyncTransaction)
+
+ def __init__(self, connectionFactory):
+ """
+ @param connectionFactory: A 0-argument callable which returns a DB-API
+ 2.0 connection.
+ """
+ self._completed = False
+ self._holder = ThreadHolder(reactor)
+ self._holder.start()
+ def initCursor():
+ # support threadlevel=1; we can't necessarily cursor() in a
+ # different thread than we do transactions in.
+
+ # FIXME: may need to be pooling ThreadHolders along with
+ # connections, if threadlevel=1 requires connect() be called in the
+ # same thread as cursor() et. al.
+ self._connection = connectionFactory()
+ self._cursor = self._connection.cursor()
+ self._holder.submit(initCursor)
+
+
+ def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None):
+ self._cursor.execute(sql, args)
+ if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
+ raise raiseOnZeroRowCount()
+ if self._cursor.description:
+ return self._cursor.fetchall()
+ else:
+ return None
+
+
+ noisy = False
+
+ def execSQL(self, *args, **kw):
+ result = self._holder.submit(
+ lambda : self._reallyExecSQL(*args, **kw)
+ )
+ if self.noisy:
+ def reportResult(results):
+ sys.stdout.write("\n".join([
+ "",
+ "SQL: %r %r" % (args, kw),
+ "Results: %r" % (results,),
+ "",
+ ]))
+ return results
+ result.addBoth(reportResult)
+ return result
+
+
+ def commit(self):
+ if not self._completed:
+ self._completed = True
+ def reallyCommit():
+ self._connection.commit()
+ self._connection.close()
+ result = self._holder.submit(reallyCommit)
+ self._holder.stop()
+ return result
+ else:
+ raise AlreadyFinishedError()
+
+
+ def abort(self):
+ if not self._completed:
+ def reallyAbort():
+ self._connection.rollback()
+ self._connection.close()
+ self._completed = True
+ result = self._holder.submit(reallyAbort)
+ self._holder.stop()
+ return result
+ else:
+ raise AlreadyFinishedError()
+
+
+ def __del__(self):
+ if not self._completed:
+ print 'CommonStoreTransaction.__del__: OK'
+ self.abort()
+
+
+
class PostgresService(MultiService):
def __init__(self, dataStoreDirectory, subServiceFactory,
@@ -290,6 +477,7 @@
self.monitor = None
self.openConnections = []
+
def activateDelayedShutdown(self):
"""
Call this when starting database initialization code to protect against
@@ -301,6 +489,7 @@
"""
self.delayedShutdown = True
+
def deactivateDelayedShutdown(self):
"""
Call this when database initialization code has completed so that the
@@ -310,6 +499,7 @@
if self.shutdownDeferred:
self.shutdownDeferred.callback(None)
+
def produceConnection(self, label="<unlabeled>", databaseName=None):
"""
Produce a DB-API 2.0 connection pointed at this database.
@@ -345,6 +535,13 @@
return w
+ def produceLocalTransaction(self, label="<unlabeled>"):
+ """
+ Create a L{IAsyncTransaction} based on a thread in the current process.
+ """
+ return ThisProcessSqlTxn(lambda : self.produceConnection(label))
+
+
def ready(self):
"""
Subprocess is ready. Time to initialize the subservice.
@@ -389,6 +586,7 @@
# Only continue startup if we've not begun shutdown
self.subServiceFactory(self.produceConnection).setServiceParent(self)
+
def pauseMonitor(self):
"""
Pause monitoring. This is a testing hook for when (if) we are
@@ -417,9 +615,11 @@
monitor = _PostgresMonitor(self)
pg_ctl = which("pg_ctl")[0]
# check consistency of initdb and postgres?
-
+
options = []
- options.append("-c listen_addresses='%s'" % (",".join(self.listenAddresses)))
+ options.append(
+ "-c listen_addresses='%s'" % (",".join(self.listenAddresses))
+ )
if self.socketDir:
options.append("-k '%s'" % (self.socketDir.path,))
options.append("-c shared_buffers=%d" % (self.sharedBuffers,))
Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py 2010-11-01 21:11:10 UTC (rev 6491)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py 2010-11-01 21:11:58 UTC (rev 6492)
@@ -25,26 +25,23 @@
"CommonHome",
]
-import sys
import datetime
-from Queue import Queue
-from zope.interface.declarations import implements, directlyProvides
+from zope.interface import implements, directlyProvides
+from twext.python.log import Logger, LoggingMixIn
+from twext.web2.dav.element.rfc2518 import ResourceType
+from twext.web2.http_headers import MimeType
+
from twisted.python import hashlib
from twisted.python.modules import getModule
from twisted.python.util import FancyEqMixin
-from twisted.python.failure import Failure
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.application.service import Service
-from twext.python.log import Logger, LoggingMixIn
from twext.internet.decorate import memoizedKey
-from twext.web2.dav.element.rfc2518 import ResourceType
-from twext.web2.http_headers import MimeType
from txdav.common.datastore.sql_legacy import PostgresLegacyNotificationsEmulator
from txdav.caldav.icalendarstore import ICalendarTransaction, ICalendarStore
@@ -60,10 +57,7 @@
NoSuchObjectResourceError
from txdav.common.inotifications import INotificationCollection, \
INotificationObject
-from txdav.idav import IAsyncTransaction
-from txdav.idav import AlreadyFinishedError
-
from txdav.base.propertystore.sql import PropertyStore
from txdav.base.propertystore.base import PropertyName
@@ -89,15 +83,17 @@
implements(ICalendarStore)
- def __init__(self, connectionFactory, notifierFactory, attachmentsPath,
- enableCalendars=True, enableAddressBooks=True):
+ def __init__(self, sqlTxnFactory, notifierFactory, attachmentsPath,
+ enableCalendars=True, enableAddressBooks=True,
+ label="unlabeled"):
assert enableCalendars or enableAddressBooks
- self.connectionFactory = connectionFactory
+ self.sqlTxnFactory = sqlTxnFactory
self.notifierFactory = notifierFactory
self.attachmentsPath = attachmentsPath
self.enableCalendars = enableCalendars
self.enableAddressBooks = enableAddressBooks
+ self.label = label
def eachCalendarHome(self):
@@ -117,7 +113,7 @@
def newTransaction(self, label="unlabeled", migrating=False):
return CommonStoreTransaction(
self,
- self.connectionFactory,
+ self.sqlTxnFactory(),
self.enableCalendars,
self.enableAddressBooks,
self.notifierFactory,
@@ -126,188 +122,7 @@
)
-_DONE = object()
-_STATE_STOPPED = "STOPPED"
-_STATE_RUNNING = "RUNNING"
-_STATE_STOPPING = "STOPPING"
-
-class ThreadHolder(object):
- """
- A queue which will hold a reactor threadpool thread open until all of the
- work in that queue is done.
- """
-
- def __init__(self, reactor):
- self._reactor = reactor
- self._state = _STATE_STOPPED
- self._stopper = None
- self._q = None
-
-
- def _run(self):
- """
- Worker function which runs in a non-reactor thread.
- """
- while True:
- work = self._q.get()
- if work is _DONE:
- def finishStopping():
- self._state = _STATE_STOPPED
- self._q = None
- s = self._stopper
- self._stopper = None
- s.callback(None)
- self._reactor.callFromThread(finishStopping)
- return
- self._oneWorkUnit(*work)
-
-
- def _oneWorkUnit(self, deferred, instruction):
- try:
- result = instruction()
- except:
- etype, evalue, etb = sys.exc_info()
- def relayFailure():
- f = Failure(evalue, etype, etb)
- deferred.errback(f)
- self._reactor.callFromThread(relayFailure)
- else:
- self._reactor.callFromThread(deferred.callback, result)
-
-
- def submit(self, work):
- """
- Submit some work to be run.
-
- @param work: a 0-argument callable, which will be run in a thread.
-
- @return: L{Deferred} that fires with the result of L{work}
- """
- d = Deferred()
- self._q.put((d, work))
- return d
-
-
- def start(self):
- """
- Start this thing, if it's stopped.
- """
- if self._state != _STATE_STOPPED:
- raise RuntimeError("Not stopped.")
- self._state = _STATE_RUNNING
- self._q = Queue(0)
- self._reactor.callInThread(self._run)
-
-
- def stop(self):
- """
- Stop this thing and release its thread, if it's running.
- """
- if self._state != _STATE_RUNNING:
- raise RuntimeError("Not running.")
- s = self._stopper = Deferred()
- self._state = _STATE_STOPPING
- self._q.put(_DONE)
- return s
-
-
-
-class ThisProcessSqlTxn(object):
- """
- L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
- current process.
- """
- implements(IAsyncTransaction)
-
- def __init__(self, connectionFactory):
- """
- @param connectionFactory: A 0-argument callable which returns a DB-API
- 2.0 connection.
- """
- self._completed = False
- self._holder = ThreadHolder(reactor)
- self._holder.start()
- def initCursor():
- # support threadlevel=1; we can't necessarily cursor() in a
- # different thread than we do transactions in.
-
- # FIXME: may need to be pooling ThreadHolders along with
- # connections, if threadlevel=1 requires connect() be called in the
- # same thread as cursor() et. al.
- self._connection = connectionFactory()
- self._cursor = self._connection.cursor()
- self._holder.submit(initCursor)
-
-
- def store(self):
- return self._store
-
-
- def __repr__(self):
- return "PG-TXN<%s>" % (self._label,)
-
-
- def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None):
- self._cursor.execute(sql, args)
- if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
- raise raiseOnZeroRowCount()
- if self._cursor.description:
- return self._cursor.fetchall()
- else:
- return None
-
-
- def execSQL(self, *args, **kw):
- result = self._holder.submit(
- lambda : self._reallyExecSQL(*args, **kw)
- )
- if self.noisy:
- def reportResult(results):
- sys.stdout.write("\n".join([
- "",
- "SQL (%d): %r %r" % (self._txid, args, kw),
- "Results (%d): %r" % (self._txid, results,),
- "",
- ]))
- return results
- result.addBoth(reportResult)
- return result
-
-
- def commit(self):
- if not self._completed:
- self._completed = True
- def reallyCommit():
- self._connection.commit()
- self._connection.close()
- result = self._holder.submit(reallyCommit)
- self._holder.stop()
- return result
- else:
- raise AlreadyFinishedError()
-
-
- def abort(self):
- if not self._completed:
- def reallyAbort():
- self._connection.rollback()
- self._connection.close()
- self._completed = True
- result = self._holder.submit(reallyAbort)
- self._holder.stop()
- return result
- else:
- raise AlreadyFinishedError()
-
-
- def __del__(self):
- if not self._completed:
- print "CommonStoreTransaction.__del__: OK"
- self.abort()
-
-
-
class CommonStoreTransaction(object):
"""
Transaction implementation for SQL database.
@@ -317,7 +132,7 @@
id = 0
- def __init__(self, store, cursorFactory,
+ def __init__(self, store, sqlTxn,
enableCalendars, enableAddressBooks,
notifierFactory, label, migrating=False):
self._store = store
@@ -345,7 +160,7 @@
CommonStoreTransaction._homeClass[EADDRESSBOOKTYPE] = AddressBookHome
CommonStoreTransaction._homeTable[ECALENDARTYPE] = CALENDAR_HOME_TABLE
CommonStoreTransaction._homeTable[EADDRESSBOOKTYPE] = ADDRESSBOOK_HOME_TABLE
- self._sqlTxn = ThisProcessSqlTxn(cursorFactory)
+ self._sqlTxn = sqlTxn
def store(self):
Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py 2010-11-01 21:11:10 UTC (rev 6491)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py 2010-11-01 21:11:58 UTC (rev 6492)
@@ -28,7 +28,7 @@
from twext.python.vcomponent import VComponent
from twisted.internet import reactor
-from twisted.internet.defer import Deferred, succeed, inlineCallbacks
+from twisted.internet.defer import Deferred, inlineCallbacks
from twisted.internet.task import deferLater
from twisted.python import log
@@ -52,6 +52,8 @@
print connection.label, connection.state
print '--- CONNECTIONS END ---'
+
+
class SQLStoreBuilder(object):
"""
Test-fixture-builder which can construct a PostgresStore.
@@ -79,18 +81,18 @@
pass
try:
self.store = CommonDataStore(
- lambda label=None: connectionFactory(
- label or currentTestID
- ),
+ self.sharedService.produceLocalTransaction,
notifierFactory,
attachmentRoot
)
+ self.store.label = currentTestID
except:
ready.errback()
raise
else:
- self.cleanDatabase(testCase)
- ready.callback(self.store)
+ def readyNow(ignored):
+ ready.callback(self.store)
+ return self.cleanDatabase(testCase).addCallback(readyNow)
return self.store
self.sharedService = PostgresService(
dbRoot, getReady, v1_schema, resetSchema=True,
@@ -107,8 +109,9 @@
result = ready
else:
self.store.notifierFactory = notifierFactory
- self.cleanDatabase(testCase)
- result = succeed(self.store)
+ result = self.cleanDatabase(testCase).addCallback(
+ lambda ignored: self.store
+ )
def cleanUp():
# FIXME: clean up any leaked connections and report them with an
@@ -120,11 +123,13 @@
return result
+ @inlineCallbacks
def cleanDatabase(self, testCase):
- cleanupConn = self.store.connectionFactory(
+ cleanupTxn = self.store.sqlTxnFactory(
"%s schema-cleanup" % (testCase.id(),)
)
- cursor = cleanupConn.cursor()
+ # TODO: should be getting these tables from a declaration of the schema
+ # somewhere.
tables = ['INVITE',
'RESOURCE_PROPERTY',
'ATTACHMENT',
@@ -143,16 +148,16 @@
'NOTIFICATION_HOME']
for table in tables:
try:
- cursor.execute("delete from "+table)
+ yield cleanupTxn.execSQL("delete from "+table, [])
except:
log.err()
- cleanupConn.commit()
- cleanupConn.close()
+ yield cleanupTxn.commit()
theStoreBuilder = SQLStoreBuilder()
buildStore = theStoreBuilder.buildStore
+
@inlineCallbacks
def populateCalendarsFrom(requirements, store):
"""
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20101101/5f659bb9/attachment-0001.html>
More information about the calendarserver-changes
mailing list