[CalendarServer-changes] [5978] CalendarServer/branches/users/glyph/sql-store
source_changes at macosforge.org
source_changes at macosforge.org
Wed Aug 4 13:17:08 PDT 2010
Revision: 5978
http://trac.macosforge.org/projects/calendarserver/changeset/5978
Author: glyph at apple.com
Date: 2010-08-04 13:17:07 -0700 (Wed, 04 Aug 2010)
Log Message:
-----------
use a common database across tests to avoid expensive initialization, and have a less race-prone test setup
Modified Paths:
--------------
CalendarServer/branches/users/glyph/sql-store/txcaldav/calendarstore/test/test_postgres.py
CalendarServer/branches/users/glyph/sql-store/txdav/datastore/subpostgres.py
CalendarServer/branches/users/glyph/sql-store/txdav/datastore/test/test_subpostgres.py
Modified: CalendarServer/branches/users/glyph/sql-store/txcaldav/calendarstore/test/test_postgres.py
===================================================================
--- CalendarServer/branches/users/glyph/sql-store/txcaldav/calendarstore/test/test_postgres.py 2010-08-04 20:12:53 UTC (rev 5977)
+++ CalendarServer/branches/users/glyph/sql-store/txcaldav/calendarstore/test/test_postgres.py 2010-08-04 20:17:07 UTC (rev 5978)
@@ -14,20 +14,45 @@
# limitations under the License.
##
+"""
+Tests for txcaldav.calendarstore.postgres, mostly based on
+L{txcaldav.calendarstore.test.common}.
+"""
+
+
from txcaldav.calendarstore.test.common import CommonTests
from twisted.trial import unittest
-from txdav.datastore.subpostgres import PostgresService
+from txdav.datastore.subpostgres import PostgresService, \
+ DiagnosticConnectionWrapper
from txcaldav.calendarstore.postgres import PostgresStore, v1_schema
from twisted.internet.defer import Deferred
from twisted.internet import reactor
from twext.python.filepath import CachingFilePath
from twext.python.vcomponent import VComponent
from twisted.internet.task import deferLater
+from twisted.python import log
+import gc
+def allInstancesOf(cls):
+ for o in gc.get_referrers(cls):
+ if isinstance(o, cls):
+ yield o
+
+
+
+def dumpConnectionStatus():
+ print '+++ ALL CONNECTIONS +++'
+ for connection in allInstancesOf(DiagnosticConnectionWrapper):
+ print connection.label, connection.state
+ print '--- CONNECTIONS END ---'
+
+
+
sharedService = None
+currentTestID = None
class SQLStorageTests(CommonTests, unittest.TestCase):
"""
@@ -36,54 +61,76 @@
def setUp(self):
global sharedService
+ global currentTestID
+ currentTestID = self.id()
if sharedService is None:
ready = Deferred()
def getReady(connectionFactory):
global calendarStore
- calendarStore = PostgresStore(connectionFactory)
- self.populate()
- ready.callback(None)
+ try:
+ calendarStore = PostgresStore(
+ lambda label=None: connectionFactory(
+ label or currentTestID
+ )
+ )
+ except:
+ ready.errback()
+ raise
+ else:
+ self.cleanAndPopulate().chainDeferred(ready)
return calendarStore
sharedService = PostgresService(
- CachingFilePath("pg"), getReady, v1_schema, "caldav"
+ CachingFilePath("../_test_postgres_db"),
+ getReady, v1_schema, "caldav"
)
sharedService.startService()
def startStopping():
- for pipe in sharedService.monitor.transport.pipes.values():
- pipe.startReading()
- pipe.startWriting()
- sharedService.stopService()
- reactor.addSystemEventTrigger(
+ log.msg("Starting stopping.")
+ sharedService.unpauseMonitor()
+ dumpConnectionStatus()
+ return sharedService.stopService()
+ reactor.addSystemEventTrigger(#@UndefinedVariable
"before", "shutdown", startStopping)
return ready
else:
- cleanupConn = calendarStore.connectionFactory()
- cursor = cleanupConn.cursor()
- cursor.execute("delete from RESOURCE_PROPERTY")
- cursor.execute("delete from ATTACHMENT")
- cursor.execute("delete from CALENDAR_OBJECT")
- cursor.execute("delete from CALENDAR_BIND")
- cursor.execute("delete from CALENDAR")
- cursor.execute("delete from CALENDAR_HOME")
- cleanupConn.commit()
- cleanupConn.close()
- self.populate()
- for pipe in sharedService.monitor.transport.pipes.values():
- pipe.startReading()
- pipe.startWriting()
- # I need to allow the log buffer to unspool.
- return deferLater(reactor, 0.1, lambda : None)
+ return self.cleanAndPopulate()
+ def cleanAndPopulate(self):
+ """
+ Delete everything from the database, then clean it up.
+ """
+ dumpConnectionStatus()
+ cleanupConn = calendarStore.connectionFactory(
+ "%s schema-cleanup" % (self.id(),)
+ )
+ cursor = cleanupConn.cursor()
+ cursor.execute("delete from RESOURCE_PROPERTY")
+ cleanupConn.commit()
+ cursor.execute("delete from ATTACHMENT")
+ cleanupConn.commit()
+ cursor.execute("delete from CALENDAR_OBJECT")
+ cleanupConn.commit()
+ cursor.execute("delete from CALENDAR_BIND")
+ cleanupConn.commit()
+ cursor.execute("delete from CALENDAR")
+ cleanupConn.commit()
+ cursor.execute("delete from CALENDAR_HOME")
+ cleanupConn.commit()
+ cleanupConn.close()
+ self.populate()
+ sharedService.unpauseMonitor()
+ # I need to allow the log buffer to unspool.
+ return deferLater(reactor, 0.1, lambda : None)
+
+
def tearDown(self):
+ super(SQLStorageTests, self).tearDown()
def stopit():
- for pipe in sharedService.monitor.transport.pipes.values():
- pipe.stopReading()
- pipe.stopWriting()
+ sharedService.pauseMonitor()
return deferLater(reactor, 0.1, stopit)
-
def populate(self):
populateTxn = calendarStore.newTransaction()
for homeUID in self.requirements:
Modified: CalendarServer/branches/users/glyph/sql-store/txdav/datastore/subpostgres.py
===================================================================
--- CalendarServer/branches/users/glyph/sql-store/txdav/datastore/subpostgres.py 2010-08-04 20:12:53 UTC (rev 5977)
+++ CalendarServer/branches/users/glyph/sql-store/txdav/datastore/subpostgres.py 2010-08-04 20:17:07 UTC (rev 5978)
@@ -19,28 +19,95 @@
Run and manage PostgreSQL as a subprocess.
"""
import os
-import pgdb
from twisted.python.procutils import which
from twisted.internet.utils import getProcessOutput
from twisted.internet.protocol import ProcessProtocol
+from twisted.python.reflect import namedAny
+from twisted.python import log
+pgdb = namedAny("pgdb")
+
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
-from twisted.internet.defer import Deferred, succeed
+from twisted.internet.defer import Deferred
from twisted.application.service import MultiService
+# This appears in the postgres log to indicate that it is accepting
+# connections.
_MAGIC_READY_COOKIE = "database system is ready to accept connections"
+class DiagnosticCursorWrapper(object):
+ """
+ Diagnostic wrapper around a DB-API 2.0 cursor for debugging connection
+ status.
+ """
+
+ def __init__(self, realCursor, connectionWrapper):
+ self.realCursor = realCursor
+ self.connectionWrapper = connectionWrapper
+
+
+ @property
+ def rowcount(self):
+ return self.realCursor.rowcount
+
+
+ def execute(self, sql, args=()):
+ self.connectionWrapper.state = 'executing %r' % (sql,)
+ self.realCursor.execute(sql, args)
+
+
+ def close(self):
+ self.realCursor.close()
+
+
+ def fetchall(self):
+ return self.realCursor.fetchall()
+
+
+
+class DiagnosticConnectionWrapper(object):
+ """
+ Diagnostic wrapper around a DB-API 2.0 connection for debugging connection
+ status.
+ """
+
+ def __init__(self, realConnection, label):
+ self.realConnection = realConnection
+ self.label = label
+ self.state = 'idle (start)'
+
+
+ def cursor(self):
+ return DiagnosticCursorWrapper(self.realConnection.cursor(), self)
+
+
+ def close(self):
+ self.realConnection.close()
+ self.state = 'closed'
+
+
+ def commit(self):
+ self.realConnection.commit()
+ self.state = 'idle (after commit)'
+
+
+ def rollback(self):
+ self.realConnection.rollback()
+ self.state = 'idle (after rollback)'
+
+
+
class _PostgresMonitor(ProcessProtocol):
"""
A monitoring protocol which watches the postgres subprocess.
"""
- def __init__(self, svc):
+ def __init__(self, svc=None):
self.lineReceiver = LineReceiver()
self.lineReceiver.delimiter = '\n'
self.lineReceiver.lineReceived = self.lineReceived
@@ -50,10 +117,11 @@
def lineReceived(self, line):
+ if self.svc is None:
+ return
if not self.isReady:
if _MAGIC_READY_COOKIE in line:
self.svc.ready()
- print 'log output:', repr(line)
disconnecting = False
@@ -62,16 +130,17 @@
def outReceived(self, out):
- print 'received postgres output', out
+ log.msg("received postgres stdout %r" % (out,))
# self.lineReceiver.dataReceived(out)
def errReceived(self, err):
- print 'postgress err received', repr(err)
+ log.msg("received postgres stderr %r" % (err,))
self.lineReceiver.dataReceived(err)
def processEnded(self, reason):
+ log.msg("postgres process ended %r" % (reason,))
self.lineReceiver.connectionLost(reason)
self.completionDeferred.callback(None)
@@ -97,96 +166,126 @@
self.databaseName = databaseName
self.schema = schema
self.monitor = None
+ self.openConnections = []
- def produceConnection(self):
+ def produceConnection(self, label="<unlabeled>", databaseName=None):
"""
Produce a DB-API 2.0 connection pointed at this database.
"""
- return pgdb.connect(
- "%s:dbname=%s" % (
- self.socketDir.path,
- self.databaseName
- )
+ if databaseName is None:
+ databaseName = self.databaseName
+ connection = pgdb.connect(
+ "%s:dbname=%s" % (self.socketDir.path, databaseName)
)
+ w = DiagnosticConnectionWrapper(connection, label)
+ c = w.cursor()
+ c.execute("set standard_conforming_strings=on")
+ w.commit()
+ c.close()
+ return w
def ready(self):
"""
Subprocess is ready. Time to initialize the subservice.
"""
- if self.firstTime:
- createDatabaseConn = pgdb.connect(
- self.socketDir.path + ":dbname=template1"
- )
- createDatabaseCursor = createDatabaseConn.cursor()
- createDatabaseCursor.execute("commit")
+ createDatabaseConn = self.produceConnection(
+ 'schema creation', 'postgres'
+ )
+ createDatabaseCursor = createDatabaseConn.cursor()
+ createDatabaseCursor.execute("commit")
+ try:
createDatabaseCursor.execute(
"create database %s" % (self.databaseName)
)
- createDatabaseCursor.close()
- createDatabaseConn.close()
- print 'executing schema', repr(self.schema)
+ except:
+ execSchema = False
+ else:
+ execSchema = True
+ createDatabaseCursor.close()
+ createDatabaseConn.close()
+ if execSchema:
connection = self.produceConnection()
cursor = connection.cursor()
cursor.execute(self.schema)
connection.commit()
connection.close()
- print 'creating subservice'
self.subServiceFactory(self.produceConnection).setServiceParent(self)
- print 'subservice created'
+ def pauseMonitor(self):
+ """
+ Pause monitoring. This is a testing hook for when (if) we are
+ continuously monitoring output from the 'postgres' process.
+ """
+# for pipe in self.monitor.transport.pipes.values():
+# pipe.stopReading()
+# pipe.stopWriting()
+
+
+ def unpauseMonitor(self):
+ """
+ Unpause monitoring.
+
+ @see: L{pauseMonitor}
+ """
+# for pipe in self.monitor.transport.pipes.values():
+# pipe.startReading()
+# pipe.startWriting()
+
+
def startDatabase(self):
"""
Start the database and initialize the subservice.
"""
monitor = _PostgresMonitor(self)
- postgres = which("postgres")[0]
+ pg_ctl = which("pg_ctl")[0]
# check consistency of initdb and postgres?
reactor.spawnProcess(
- monitor, postgres,
+ monitor, pg_ctl,
[
- postgres,
- "-k", self.socketDir.path,
- # "-N", "5000",
+ pg_ctl,
+ "start",
+ "-l", "logfile",
+ "-w",
+ # XXX what are the quoting rules for '-o'? do I need to repr()
+ # the path here?
+ "-o", "-k '%s' -c standard_conforming_strings=on"
+ % (self.socketDir.path,),
],
self.env
)
self.monitor = monitor
+ def gotReady(result):
+ self.ready()
+ def reportit(f):
+ log.err(f)
+ self.monitor.completionDeferred.addCallback(
+ gotReady).addErrback(reportit)
def startService(self):
MultiService.startService(self)
- self.dataStoreDirectory.createDirectory()
clusterDir = self.dataStoreDirectory.child("cluster")
self.socketDir = self.dataStoreDirectory.child("socket")
- self.socketDir.createDirectory()
workingDir = self.dataStoreDirectory.child("working")
env = self.env = os.environ.copy()
- env.update(PGDATA=clusterDir.path)
+ env.update(PGDATA=clusterDir.path,
+ PGHOST=self.socketDir.path)
initdb = which("initdb")[0]
if clusterDir.isdir():
- self.firstTime = False
self.startDatabase()
else:
+ self.dataStoreDirectory.createDirectory()
+ self.socketDir.createDirectory()
workingDir.createDirectory()
- self.firstTime = True
- print 'Creating database'
dbInited = getProcessOutput(
initdb, [], env, workingDir.path, errortoo=True
)
def doCreate(result):
- print '--- initdb ---'
- print result
- print '/// initdb ///'
self.startDatabase()
- dbInited.addCallback(
- doCreate
- )
- def showme(result):
- print 'SHOW ME:', result.getTraceback()
- dbInited.addErrback(showme)
+ dbInited.addCallback(doCreate)
def stopService(self):
@@ -194,10 +293,22 @@
Stop all child services, then stop the subprocess, if it's running.
"""
d = MultiService.stopService(self)
- def maybeStopSubprocess(result):
- if self.monitor is not None:
- self.monitor.transport.signalProcess("INT")
- return self.monitor.completionDeferred
- return result
- d.addCallback(maybeStopSubprocess)
- return d
+ def superStopped(result):
+ # Probably want to stop and wait for startup if that hasn't
+ # completed yet...
+ monitor = _PostgresMonitor()
+ pg_ctl = which("pg_ctl")[0]
+ reactor.spawnProcess(monitor, pg_ctl,
+ [pg_ctl, '-l', 'logfile', 'stop'],
+ self.env
+ )
+ return monitor.completionDeferred
+ return d.addCallback(superStopped)
+
+# def maybeStopSubprocess(result):
+# if self.monitor is not None:
+# self.monitor.transport.signalProcess("INT")
+# return self.monitor.completionDeferred
+# return result
+# d.addCallback(maybeStopSubprocess)
+# return d
Modified: CalendarServer/branches/users/glyph/sql-store/txdav/datastore/test/test_subpostgres.py
===================================================================
--- CalendarServer/branches/users/glyph/sql-store/txdav/datastore/test/test_subpostgres.py 2010-08-04 20:12:53 UTC (rev 5977)
+++ CalendarServer/branches/users/glyph/sql-store/txdav/datastore/test/test_subpostgres.py 2010-08-04 20:17:07 UTC (rev 5978)
@@ -13,18 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
-from txdav.datastore.subpostgres import PostgresService
-from twisted.internet.defer import inlineCallbacks, Deferred
-from twisted.application.service import Service
"""
Tests for txdav.datastore.subpostgres.
"""
-from twext.python.filepath import CachingFilePath
from twisted.trial.unittest import TestCase
+from twext.python.filepath import CachingFilePath
+from txdav.datastore.subpostgres import PostgresService
+from twisted.internet.defer import inlineCallbacks, Deferred
+from twisted.application.service import Service
+
class SubprocessStartup(TestCase):
"""
Tests for starting and stopping the subprocess.
@@ -41,26 +42,33 @@
test = self
class SimpleService(Service):
+
instances = []
rows = []
ready = Deferred()
+
def __init__(self, connectionFactory):
self.connection = connectionFactory()
test.addCleanup(self.connection.close)
- print 'CREATING simpleservice'
self.instances.append(self)
+
def startService(self):
- print 'STARTING simpleservice'
cursor = self.connection.cursor()
- cursor.execute(
- "insert into test_dummy_table values ('dummy')"
- )
- cursor.close()
- self.ready.callback(None)
+ try:
+ cursor.execute(
+ "insert into test_dummy_table values ('dummy')"
+ )
+ except:
+ self.ready.errback()
+ else:
+ self.ready.callback(None)
+ finally:
+ cursor.close()
+
svc = PostgresService(
- CachingFilePath("database"),
+ CachingFilePath("../_postgres_test_db"),
SimpleService,
"create table TEST_DUMMY_TABLE (stub varchar)",
"dummy_db"
@@ -72,5 +80,5 @@
connection = SimpleService.instances[0].connection
cursor = connection.cursor()
cursor.execute("select * from test_dummy_table")
- values = list(cursor)
+ values = cursor.fetchall()
self.assertEquals(values, [["dummy"]])
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100804/071f48a0/attachment-0001.html>
More information about the calendarserver-changes
mailing list