[CalendarServer-changes] [6791] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Wed Jan 19 23:03:24 PST 2011


Revision: 6791
          http://trac.macosforge.org/projects/calendarserver/changeset/6791
Author:   glyph at apple.com
Date:     2011-01-19 23:03:24 -0800 (Wed, 19 Jan 2011)
Log Message:
-----------
New modules to support generating SQL from objects rather than strings.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/calendarserver/tap/util.py
    CalendarServer/trunk/support/build.sh
    CalendarServer/trunk/twistedcaldav/directory/util.py
    CalendarServer/trunk/twistedcaldav/stdconfig.py
    CalendarServer/trunk/twistedcaldav/test/test_wrapping.py
    CalendarServer/trunk/txdav/base/datastore/file.py
    CalendarServer/trunk/txdav/caldav/datastore/sql.py
    CalendarServer/trunk/txdav/caldav/datastore/test/common.py
    CalendarServer/trunk/txdav/carddav/datastore/sql.py
    CalendarServer/trunk/txdav/common/datastore/sql.py
    CalendarServer/trunk/txdav/common/datastore/sql_tables.py
    CalendarServer/trunk/txdav/common/datastore/test/util.py
    CalendarServer/trunk/txdav/idav.py

Added Paths:
-----------
    CalendarServer/trunk/twext/enterprise/
    CalendarServer/trunk/twext/enterprise/__init__.py
    CalendarServer/trunk/twext/enterprise/adbapi2.py
    CalendarServer/trunk/twext/enterprise/dal/
    CalendarServer/trunk/twext/enterprise/dal/__init__.py
    CalendarServer/trunk/twext/enterprise/dal/model.py
    CalendarServer/trunk/twext/enterprise/dal/parseschema.py
    CalendarServer/trunk/twext/enterprise/dal/syntax.py
    CalendarServer/trunk/twext/enterprise/dal/test/
    CalendarServer/trunk/twext/enterprise/dal/test/__init__.py
    CalendarServer/trunk/twext/enterprise/dal/test/test_parseschema.py
    CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
    CalendarServer/trunk/twext/enterprise/ienterprise.py
    CalendarServer/trunk/twext/enterprise/test/
    CalendarServer/trunk/twext/enterprise/test/__init__.py
    CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
    CalendarServer/trunk/twext/internet/threadutils.py
    CalendarServer/trunk/twext/python/clsprop.py
    CalendarServer/trunk/txdav/common/datastore/test/test_sql_tables.py

Removed Paths:
-------------
    CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py
    CalendarServer/trunk/txdav/base/datastore/test/test_asyncsqlpool.py
    CalendarServer/trunk/txdav/base/datastore/threadutils.py

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -73,8 +73,8 @@
 
 from calendarserver.tap.util import pgServiceFromConfig
 
-from txdav.base.datastore.asyncsqlpool import ConnectionPool
-from txdav.base.datastore.asyncsqlpool import ConnectionPoolConnection
+from twext.enterprise.adbapi2 import ConnectionPool
+from twext.enterprise.adbapi2 import ConnectionPoolConnection
 
 try:
     from twistedcaldav.authkerb import NegotiateCredentialFactory
@@ -96,7 +96,9 @@
     from calendarserver.version import version
     version
 except ImportError:
-    sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "support"))
+    from twisted.python.modules import getModule
+    sys.path.insert(
+        0, getModule(__name__).pathEntry.filePath.child("support").path)
     from version import version as getVersion
     version = "%s (%s*)" % getVersion()
 twext.web2.server.VERSION = "CalendarServer/%s %s" % (

Modified: CalendarServer/trunk/calendarserver/tap/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/util.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/calendarserver/tap/util.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -68,7 +68,7 @@
 except ImportError:
     NegotiateCredentialFactory = None
 
-from txdav.base.datastore.asyncsqlpool import ConnectionPoolClient
+from twext.enterprise.adbapi2 import ConnectionPoolClient
 from txdav.base.datastore.dbapiclient import DBAPIConnector
 from txdav.base.datastore.dbapiclient import postgresPreflight
 from txdav.base.datastore.subpostgres import PostgresService

Modified: CalendarServer/trunk/support/build.sh
===================================================================
--- CalendarServer/trunk/support/build.sh	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/support/build.sh	2011-01-20 07:03:24 UTC (rev 6791)
@@ -570,9 +570,9 @@
   # they are useful to developers.
   #
 
-  py_dependency -v 0.1.1 -m "000053e0352f5bf19c2f8d5242329ea4" \
-    "SQLParse" "sqlparse" "sqlparse-0.1.1" \
-    "http://python-sqlparse.googlecode.com/files/sqlparse-0.1.1.tar.gz";
+  py_dependency -v 0.1.2 -m "aa9852ad81822723adcd9f96838de14e" \
+    "SQLParse" "sqlparse" "sqlparse-0.1.2" \
+    "http://python-sqlparse.googlecode.com/files/sqlparse-0.1.2.tar.gz";
 
   py_dependency -v 0.4.0 -m "630a72510aae8758f48cf60e4fa17176" \
     "Pyflakes" "pyflakes" "pyflakes-0.4.0" \

Added: CalendarServer/trunk/twext/enterprise/__init__.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/__init__.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/__init__.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,20 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Extensions in the spirit of Twisted's "enterprise" package; things related to
+database connectivity and management.
+"""

Added: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,678 @@
+# -*- test-case-name: twext.enterprise.test. -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Asynchronous multi-process connection pool.
+
+This is similar to L{twisted.enterprise.adbapi}, but can hold a transaction (and
+thereby a thread) open across multiple asynchronous operations, rather than
+forcing the transaction to be completed entirely in a thread and/or entirely in
+a single SQL statement.
+
+Also, this module includes an AMP protocol for multiplexing connections through
+a single choke-point host.  This is not currently in use, however, as AMP needs
+some optimization before it can be low-impact enough for this to be an
+improvement.
+"""
+
+import sys
+
+from cStringIO import StringIO
+from cPickle import dumps, loads
+from itertools import count
+
+from zope.interface import implements
+
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import returnValue
+from twisted.internet.defer import Deferred
+from twisted.protocols.amp import Boolean
+from twisted.python.failure import Failure
+from twisted.protocols.amp import Argument, String, Command, AMP, Integer
+from twisted.internet import reactor as _reactor
+from twisted.application.service import Service
+from twisted.python import log
+from twisted.internet.defer import maybeDeferred
+from twisted.python.components import proxyForInterface
+
+from twext.internet.threadutils import ThreadHolder
+from twext.enterprise.ienterprise import AlreadyFinishedError, IAsyncTransaction
+
+
+# FIXME: there should be no default, it should be discovered dynamically
+# everywhere.  Right now we're only using pgdb so we only support that.
+
+DEFAULT_PARAM_STYLE = 'pyformat'
+
+class BaseSqlTxn(object):
+    """
+    L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
+    current process.
+    """
+    implements(IAsyncTransaction)
+
+    # FIXME: this should *really* be 
+    paramstyle = DEFAULT_PARAM_STYLE
+
+    def __init__(self, connectionFactory, reactor=_reactor):
+        """
+        @param connectionFactory: A 0-argument callable which returns a DB-API
+            2.0 connection.
+        """
+        self._completed = False
+        self._cursor = None
+        self._holder = ThreadHolder(reactor)
+        self._holder.start()
+
+        def initCursor():
+            # support threadlevel=1; we can't necessarily cursor() in a
+            # different thread than we do transactions in.
+
+            # TODO: Re-try connect when it fails.  Specify a timeout.  That
+            # should happen in this layer because we need to be able to stop
+            # the reconnect attempt if it's hanging.
+            self._connection = connectionFactory()
+            self._cursor = self._connection.cursor()
+
+        # Note: no locking necessary here; since this gets submitted first, all
+        # subsequent submitted work-units will be in line behind it and the
+        # cursor will already have been initialized.
+        self._holder.submit(initCursor).addErrback(log.err)
+
+
+    def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        if args is None:
+            args = []
+        self._cursor.execute(sql, args)
+        if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
+            raise raiseOnZeroRowCount()
+        if self._cursor.description:
+            return self._cursor.fetchall()
+        else:
+            return None
+
+
+    noisy = False
+
+    def execSQL(self, *args, **kw):
+        result = self._holder.submit(
+            lambda : self._reallyExecSQL(*args, **kw)
+        )
+        if self.noisy:
+            def reportResult(results):
+                sys.stdout.write("\n".join([
+                    "",
+                    "SQL: %r %r" % (args, kw),
+                    "Results: %r" % (results,),
+                    "",
+                    ]))
+                return results
+            result.addBoth(reportResult)
+        return result
+
+
+    def commit(self):
+        if not self._completed:
+            self._completed = True
+            def reallyCommit():
+                self._connection.commit()
+            result = self._holder.submit(reallyCommit)
+            return result
+        else:
+            raise AlreadyFinishedError()
+
+
+    def abort(self):
+        if not self._completed:
+            self._completed = True
+            def reallyAbort():
+                self._connection.rollback()
+            result = self._holder.submit(reallyAbort)
+            return result
+        else:
+            raise AlreadyFinishedError()
+
+
+    def __del__(self):
+        if not self._completed:
+            print 'BaseSqlTxn.__del__: OK'
+            self.abort()
+
+
+    def reset(self):
+        """
+        Call this when placing this transaction back into the pool.
+
+        @raise RuntimeError: if the transaction has not been committed or
+            aborted.
+        """
+        if not self._completed:
+            raise RuntimeError("Attempt to re-set active transaction.")
+        self._completed = False
+
+
+    def stop(self):
+        """
+        Release the thread and database connection associated with this
+        transaction.
+        """
+        self._completed = True
+        self._stopped = True
+        holder = self._holder
+        self._holder = None
+        holder.submit(self._connection.close)
+        return holder.stop()
+
+
+
+class SpooledTxn(object):
+    """
+    A L{SpooledTxn} is an implementation of L{IAsyncTransaction} which cannot
+    yet actually execute anything, so it spools SQL reqeusts for later
+    execution.  When a L{BaseSqlTxn} becomes available later, it can be
+    unspooled onto that.
+    """
+
+    implements(IAsyncTransaction)
+
+    # FIXME: this should be relayed from the connection factory of the thing
+    # creating the spooled transaction.
+
+    paramstyle = DEFAULT_PARAM_STYLE
+
+    def __init__(self):
+        self._spool = []
+
+
+    def _enspool(self, cmd, a=(), kw={}):
+        d = Deferred()
+        self._spool.append((d, cmd, a, kw))
+        return d
+
+
+    def _iterDestruct(self):
+        """
+        Iterate the spool list destructively, while popping items from the
+        beginning.  This allows code which executes more SQL in the callback of
+        a Deferred to not interfere with the originally submitted order of
+        commands.
+        """
+        while self._spool:
+            yield self._spool.pop(0)
+
+
+    def _unspool(self, other):
+        """
+        Unspool this transaction onto another transaction.
+
+        @param other: another provider of L{IAsyncTransaction} which will
+            actually execute the SQL statements we have been buffering.
+        """
+        for (d, cmd, a, kw) in self._iterDestruct():
+            self._relayCommand(other, d, cmd, a, kw)
+
+
+    def _relayCommand(self, other, d, cmd, a, kw):
+        """
+        Relay a single command to another transaction.
+        """
+        maybeDeferred(getattr(other, cmd), *a, **kw).chainDeferred(d)
+
+
+    def execSQL(self, *a, **kw):
+        return self._enspool('execSQL', a, kw)
+
+
+    def commit(self):
+        return self._enspool('commit')
+
+
+    def abort(self):
+        return self._enspool('abort')
+
+
+
+class PooledSqlTxn(proxyForInterface(iface=IAsyncTransaction,
+                                     originalAttribute='_baseTxn')):
+    """
+    This is a temporary throw-away wrapper for the longer-lived BaseSqlTxn, so
+    that if a badly-behaved API client accidentally hangs on to one of these
+    and, for example C{.abort()}s it multiple times once another client is
+    using that connection, it will get some harmless tracebacks.
+    """
+
+    def __init__(self, pool, baseTxn):
+        self._pool     = pool
+        self._baseTxn  = baseTxn
+        self._complete = False
+
+
+    def execSQL(self, *a, **kw):
+        self._checkComplete()
+        return super(PooledSqlTxn, self).execSQL(*a, **kw)
+
+
+    def commit(self):
+        self._markComplete()
+        return self._repoolAfter(super(PooledSqlTxn, self).commit())
+
+
+    def abort(self):
+        self._markComplete()
+        return self._repoolAfter(super(PooledSqlTxn, self).abort())
+
+
+    def _checkComplete(self):
+        """
+        If the transaction is complete, raise L{AlreadyFinishedError}
+        """
+        if self._complete:
+            raise AlreadyFinishedError()
+
+
+    def _markComplete(self):
+        """
+        Mark the transaction as complete, raising AlreadyFinishedError.
+        """
+        self._checkComplete()
+        self._complete = True
+
+
+    def _repoolAfter(self, d):
+        def repool(result):
+            self._pool.reclaim(self)
+            return result
+        return d.addCallback(repool)
+
+
+
+class ConnectionPool(Service, object):
+    """
+    This is a central service that has a threadpool and executes SQL statements
+    asynchronously, in a pool.
+
+    @ivar connectionFactory: a 0-or-1-argument callable that returns a DB-API
+        connection.  The optional argument can be used as a label for
+        diagnostic purposes.
+
+    @ivar maxConnections: The connection pool will not attempt to make more
+        than this many concurrent connections to the database.
+
+    @type maxConnections: C{int}
+    """
+
+    reactor = _reactor
+
+    def __init__(self, connectionFactory, maxConnections=10):
+        super(ConnectionPool, self).__init__()
+        self.free = []
+        self.busy = []
+        self.waiting = []
+        self.connectionFactory = connectionFactory
+        self.maxConnections = maxConnections
+
+
+    def startService(self):
+        """
+        No startup necessary.
+        """
+
+
+    @inlineCallbacks
+    def stopService(self):
+        """
+        Forcibly abort any outstanding transactions.
+        """
+        for busy in self.busy[:]:
+            try:
+                yield busy.abort()
+            except:
+                log.err()
+        # all transactions should now be in the free list, since 'abort()' will
+        # have put them there.
+        for free in self.free:
+            yield free.stop()
+
+
+    def connection(self, label="<unlabeled>"):
+        """
+        Find a transaction; either retrieve a free one from the list or
+        allocate a new one if no free ones are available.
+
+        @return: an L{IAsyncTransaction}
+        """
+
+        overload = False
+        if self.free:
+            basetxn = self.free.pop(0)
+        elif len(self.busy) < self.maxConnections:
+            basetxn = BaseSqlTxn(
+                connectionFactory=self.connectionFactory,
+                reactor=self.reactor
+            )
+        else:
+            basetxn = SpooledTxn()
+            overload = True
+        txn = PooledSqlTxn(self, basetxn)
+        if overload:
+            self.waiting.append(txn)
+        else:
+            self.busy.append(txn)
+        return txn
+
+
+    def reclaim(self, txn):
+        """
+        Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
+        BaseSqlTxn into the free list.
+        """
+        baseTxn = txn._baseTxn
+        baseTxn.reset()
+        self.busy.remove(txn)
+        if self.waiting:
+            waiting = self.waiting.pop(0)
+            waiting._baseTxn._unspool(baseTxn)
+            # Note: although commit() may already have been called, we don't
+            # have to handle it specially here.  It only unspools after the
+            # deferred returned by commit() has actually been called, and since
+            # that occurs in a callFromThread, that won't happen until the next
+            # iteration of the mainloop, when the _baseTxn is safely correct.
+            waiting._baseTxn = baseTxn
+            self.busy.append(waiting)
+        else:
+            self.free.append(baseTxn)
+
+
+
+def txnarg():
+    return [('transactionID', Integer())]
+
+
+CHUNK_MAX = 0xffff
+
+class BigArgument(Argument):
+    """
+    An argument whose payload can be larger than L{CHUNK_MAX}, by splitting
+    across multiple AMP keys.
+    """
+    def fromBox(self, name, strings, objects, proto):
+        value = StringIO()
+        for counter in count():
+            chunk = strings.get("%s.%d" % (name, counter))
+            if chunk is None:
+                break
+            value.write(chunk)
+        objects[name] = self.fromString(value.getvalue())
+
+
+    def toBox(self, name, strings, objects, proto):
+        value = StringIO(self.toString(objects[name]))
+        for counter in count():
+            nextChunk = value.read(CHUNK_MAX)
+            if not nextChunk:
+                break
+            strings["%s.%d" % (name, counter)] = nextChunk
+
+
+
+class Pickle(BigArgument):
+    """
+    A pickle sent over AMP.  This is to serialize the 'args' argument to
+    C{execSQL}, which is the dynamically-typed 'args' list argument to a DB-API
+    C{execute} function, as well as its dynamically-typed result ('rows').
+
+    This should be cleaned up into a nicer structure, but this is not a network
+    protocol, so we can be a little relaxed about security.
+
+    This is a L{BigArgument} rather than a regular L{Argument} because
+    individual arguments and query results need to contain entire vCard or
+    iCalendar documents, which can easily be greater than 64k.
+    """
+
+    def toString(self, inObject):
+        return dumps(inObject)
+
+    def fromString(self, inString):
+        return loads(inString)
+
+
+
+
+class StartTxn(Command):
+    """
+    Start a transaction, identified with an ID generated by the client.
+    """
+    arguments = txnarg()
+
+
+
+class ExecSQL(Command):
+    """
+    Execute an SQL statement.
+    """
+    arguments = [('sql', String()),
+                 ('queryID', String()),
+                 ('args', Pickle())] + txnarg()
+
+
+
+class Row(Command):
+    """
+    A row has been returned.  Sent from server to client in response to
+    L{ExecSQL}.
+    """
+
+    arguments = [('queryID', String()),
+                 ('row', Pickle())]
+
+
+
+class QueryComplete(Command):
+    """
+    A query issued with ExecSQL is complete.
+    """
+
+    arguments = [('queryID', String()),
+                 ('norows', Boolean())]
+
+
+
+class Commit(Command):
+    arguments = txnarg()
+
+
+
+class Abort(Command):
+    arguments = txnarg()
+
+
+
+class _NoRows(Exception):
+    """
+    Placeholder exception to report zero rows.
+    """
+
+
+class ConnectionPoolConnection(AMP):
+    """
+    A L{ConnectionPoolConnection} is a single connection to a
+    L{ConnectionPool}.
+    """
+
+    def __init__(self, pool):
+        """
+        Initialize a mapping of transaction IDs to transaction objects.
+        """
+        super(ConnectionPoolConnection, self).__init__()
+        self.pool = pool
+        self._txns = {}
+
+
+    @StartTxn.responder
+    def start(self, transactionID):
+        self._txns[transactionID] = self.pool.connection()
+        return {}
+
+
+    @ExecSQL.responder
+    @inlineCallbacks
+    def receivedSQL(self, transactionID, queryID, sql, args):
+        try:
+            rows = yield self._txns[transactionID].execSQL(sql, args, _NoRows)
+        except _NoRows:
+            norows = True
+        else:
+            norows = False
+            if rows is not None:
+                for row in rows:
+                    # Either this should be yielded or it should be
+                    # requiresAnswer=False
+                    self.callRemote(Row, queryID=queryID, row=row)
+        self.callRemote(QueryComplete, queryID=queryID, norows=norows)
+        returnValue({})
+
+
+    def _complete(self, transactionID, thunk):
+        txn = self._txns.pop(transactionID)
+        return thunk(txn).addCallback(lambda ignored: {})
+
+
+    @Commit.responder
+    def commit(self, transactionID):
+        """
+        Successfully complete the given transaction.
+        """
+        return self._complete(transactionID, lambda x: x.commit())
+
+
+    @Abort.responder
+    def abort(self, transactionID):
+        """
+        Roll back the given transaction.
+        """
+        return self._complete(transactionID, lambda x: x.abort())
+
+
+
+class ConnectionPoolClient(AMP):
+    """
+    A client which can execute SQL.
+    """
+    def __init__(self):
+        super(ConnectionPoolClient, self).__init__()
+        self._nextID = count().next
+        self._txns = {}
+        self._queries = {}
+
+
+    def newTransaction(self):
+        txnid = str(self._nextID())
+        self.callRemote(StartTxn, transactionID=txnid)
+        txn = Transaction(client=self, transactionID=txnid)
+        self._txns[txnid] = txn
+        return txn
+
+
+    @Row.responder
+    def row(self, queryID, row):
+        self._queries[queryID].row(row)
+        return {}
+
+
+    @QueryComplete.responder
+    def complete(self, queryID, norows):
+        self._queries.pop(queryID).done(norows)
+        return {}
+
+
+
+class _Query(object):
+    def __init__(self, raiseOnZeroRowCount):
+        self.results = []
+        self.deferred = Deferred()
+        self.raiseOnZeroRowCount = raiseOnZeroRowCount
+
+
+    def row(self, row):
+        """
+        A row was received.
+        """
+        self.results.append(row)
+
+
+    def done(self, norows):
+        """
+        The query is complete.
+
+        @param norows: A boolean.  True if there were not any rows.
+        """
+        if norows and (self.raiseOnZeroRowCount is not None):
+            exc = self.raiseOnZeroRowCount()
+            self.deferred.errback(Failure(exc))
+        else:
+            self.deferred.callback(self.results)
+
+
+
+class Transaction(object):
+    """
+    Async protocol-based transaction implementation.
+    """
+
+    implements(IAsyncTransaction)
+
+    # FIXME: this needs to come from the other end of the wire.
+
+    paramstyle = DEFAULT_PARAM_STYLE
+
+    def __init__(self, client, transactionID):
+        """
+        Initialize a transaction with a L{ConnectionPoolClient} and a unique
+        transaction identifier.
+        """
+        self._client = client
+        self._transactionID = transactionID
+        self._completed = False
+
+
+    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        if args is None:
+            args = []
+        queryID = str(self._client._nextID())
+        query = self._client._queries[queryID] = _Query(raiseOnZeroRowCount)
+        self._client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args,
+                                transactionID=self._transactionID)
+        return query.deferred
+
+
+    def _complete(self, command):
+        if self._completed:
+            raise AlreadyFinishedError()
+        self._completed = True
+        return self._client.callRemote(
+            command, transactionID=self._transactionID
+            ).addCallback(lambda x: None)
+
+
+    def commit(self):
+        return self._complete(Commit)
+
+
+    def abort(self):
+        return self._complete(Abort)
+
+

Added: CalendarServer/trunk/twext/enterprise/dal/__init__.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/__init__.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/__init__.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,27 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Toolkit for building a Data-Access Layer (DAL).
+
+This includes an abstract representation of SQL objects like tables, columns,
+sequences and queries, a parser to convert your schema to that representation,
+and tools for working with it.
+
+In some ways this is similar to the low levels of something like SQLAlchemy, but
+it is designed to be more introspectable, to allow for features like automatic
+caching and index detection.  NB: work in progress.
+"""

Added: CalendarServer/trunk/twext/enterprise/dal/model.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/model.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/model.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,363 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_parseschema -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Model classes for SQL.
+"""
+
+class SQLType(object):
+    """
+    A data-type as defined in SQL; like "integer" or "real" or "varchar(255)".
+
+    @ivar name: the name of this type.
+    @type name: C{str}
+
+    @ivar length: the length of this type, if it is a type like 'varchar' or
+        'character' that comes with a parenthetical length.
+    @type length: C{int} or C{NoneType}
+    """
+
+    def __init__(self, name, length):
+        _checkstr(name)
+        self.name = name
+        self.length = length
+
+
+    def __eq__(self, other):
+        """
+        Compare equal to other L{SQLTypes} with matching name and length.
+        """
+        if not isinstance(other, SQLType):
+            return NotImplemented
+        return (self.name, self.length) == (other.name, other.length)
+
+
+    def __ne__(self, other):
+        """
+        (Inequality is the opposite of equality.)
+        """
+        if not isinstance(other, SQLType):
+            return NotImplemented
+        return not self.__eq__(other)
+
+
+    def __repr__(self):
+        """
+        A useful string representation which includes the name and length if
+        present.
+        """
+        if self.length:
+            lendesc = '(%s)' % (self.length)
+        else:
+            lendesc = ''
+        return '<SQL Type: %r%s>' % (self.name, lendesc)
+
+
+
+class Constraint(object):
+    """
+    A constraint on a set of columns.
+
+    @ivar type: the type of constraint.  Currently, only C{'UNIQUE'} and C{'NOT
+        NULL'} are supported.
+    @type type: C{str}
+
+    @ivar affectsColumns: Columns affected by this constraint.
+
+    @type affectsColumns: C{list} of L{Column}
+    """
+
+    # Values for 'type' attribute:
+    NOT_NULL = 'NOT NULL'
+    UNIQUE = 'UNIQUE'
+
+    def __init__(self, type, affectsColumns):
+        self.affectsColumns = affectsColumns
+        # XXX: possibly different constraint types should have different
+        # classes?
+        self.type = type
+
+
+
+class ProcedureCall(object):
+    """
+    An invocation of a stored procedure or built-in function.
+    """
+
+    def __init__(self, name, args):
+        _checkstr(name)
+        self.name = name
+        self.args = args
+
+
+
+class NO_DEFAULT(object):
+    """
+    Placeholder value for not having a default.  (C{None} would not be suitable,
+    as that would imply a default of C{NULL}).
+    """
+
+
+def _checkstr(x):
+    """
+    Verify that C{x} is a C{str}.  Raise a L{ValueError} if not.  This is to
+    prevent pollution with unicode values.
+    """
+    if not isinstance(x, str):
+        raise ValueError("%r is not a str." % (x,))
+
+
+class Column(object):
+    """
+    A column from a table.
+
+    @ivar table: The L{Table} to which this L{Column} belongs.
+    @type table: L{Table}
+
+    @ivar name: The unqualified name of this column.  For example, in the case
+        of a column BAR in a table FOO, this would be the string C{'BAR'}.
+    @type name: C{str}
+
+    @ivar type: The declared type of this column.
+    @type type: L{SQLType}
+
+    @ivar references: If this column references a foreign key on another table,
+        this will be a reference to that table; otherwise (normally) C{None}.
+    @type references: L{Table} or C{NoneType}
+    """
+
+    def __init__(self, table, name, type):
+        _checkstr(name)
+        self.table = table
+        self.name = name
+        self.type = type
+        self.default = NO_DEFAULT
+        self.references = None
+
+
+    def __repr__(self):
+        return '<Column (%s %r)>' % (self.name, self.type)
+
+
+    def canBeNull(self):
+        """
+        Can this column ever be C{NULL}, i.e. C{None}?  In other words, is it
+        free of any C{NOT NULL} constraints?
+
+        @return: C{True} if so, C{False} if not.
+        """
+        for constraint in self.table.constraints:
+            if self in constraint.affectsColumns:
+                if constraint.type is Constraint.NOT_NULL:
+                    return False
+        return True
+
+
+    def setDefaultValue(self, value):
+        """
+        Change the default value of this column.  (Should only be called during
+        schema parsing.)
+        """
+        self.default = value
+
+
+    def needsValue(self):
+        """
+        Does this column require a value in INSERT statements which create rows?
+
+        @return: C{True} for L{Column}s with no default specified which also
+            cannot be NULL, C{False} otherwise.
+
+        @rtype: C{bool}
+        """
+        return self.canBeNull() or (self.default is not None)
+
+
+    def doesReferenceName(self, name):
+        """
+        Change this column to refer to a table in the schema.  (Should only be
+        called during schema parsing.)
+
+        @param name: the name of a L{Table} in this L{Column}'s L{Schema}.
+        @type name: L{str}
+        """
+        self.references = self.table.schema.tableNamed(name)
+        if self.references.primaryKey.type != self.type:
+            print 'Mismatch', self.references.primaryKey.type, self.type
+
+
+
+class Table(object):
+    """
+    A set of columns.
+
+    @ivar descriptiveComment: A docstring for the table.  Parsed from a '--'
+        comment preceding this table in the SQL schema file that was parsed, if
+        any.
+    @type descriptiveComment: C{str}
+
+    @ivar schema: a reference to the L{Schema} to which this table belongs.
+    """
+
+    def __init__(self, schema, name):
+        _checkstr(name)
+        self.descriptiveComment = ''
+        self.schema = schema
+        self.name = name
+        self.columns = []
+        self.constraints = []
+        self.schemaRows = []
+        self.primaryKey = None
+        self.schema.tables.append(self)
+
+
+    def __repr__(self):
+        return '<Table %r:%r>' % (self.name, self.columns)
+
+
+    def columnNamed(self, name):
+        """
+        Retrieve a column from this table with a given name.
+
+        @raise KeyError: if no such table exists.
+
+        @return: a column
+
+        @rtype: L{Column}
+        """
+        for column in self.columns:
+            if column.name == name:
+                return column
+        raise KeyError("no such column: %r" % (name,))
+
+
+    def addColumn(self, name, type):
+        """
+        A new column was parsed for this table.
+
+        @param name: The unqualified name of the column.
+
+        @type name: C{str}
+
+        @param type: The L{SQLType} describing the column's type.
+        """
+        column = Column(self, name, type)
+        self.columns.append(column)
+        return column
+
+
+    def tableConstraint(self, constraintType, columnNames):
+        """
+        This table is affected by a constraint.  (Should only be called during
+        schema parsing.)
+
+        @param constraintType: the type of constraint; either
+            L{Constraint.NOT_NULL} or L{Constraint.UNIQUE}, currently.
+        """
+        affectsColumns = []
+        for name in columnNames:
+            affectsColumns.append(self.columnNamed(name))
+        self.constraints.append(Constraint(constraintType, affectsColumns))
+
+
+    def insertSchemaRow(self, values):
+        """
+        A statically-defined row was inserted as part of the schema itself.
+        This is used for tables that want to track static enumerations, for
+        example, but want to be referred to by a foreign key in other tables for
+        proper referential integrity.
+
+        Append this data to this L{Table}'s L{Table.schemaRows}.
+
+        (Should only be called during schema parsing.)
+
+        @param values: a C{list} of data items, one for each column in this
+            table's current list of L{Column}s.
+        """
+        row = {}
+        for column, value in zip(self.columns, values):
+            row[column] = value
+        self.schemaRows.append(row)
+
+
+    def addComment(self, comment):
+        """
+        Add a comment to C{descriptiveComment}.
+
+        @param comment: some additional descriptive text
+        @type comment: C{str}
+        """
+        self.descriptiveComment = comment
+
+
+    def uniques(self):
+        """
+        @return: an iterable of C{set}s of C{Column}s which are unique within
+            this table.
+        """
+        for constraint in self.constraints:
+            if constraint.type is Constraint.UNIQUE:
+                yield set(constraint.affectsColumns)
+
+
+
+class Sequence(object):
+    """
+    A sequence object.
+    """
+
+    def __init__(self, name):
+        _checkstr(name)
+        self.name = name
+        self.referringColumns = []
+
+
+    def __repr__(self):
+        return '<Sequence %r>' % (self.name,)
+
+
+
+class Schema(object):
+    """
+    A schema containing tables, indexes, and sequences.
+    """
+
+    def __init__(self, filename='<string>'):
+        self.filename = filename
+        self.tables = []
+        self.sequences = []
+
+
+    def __repr__(self):
+        return '<Schema %r>' % (self.filename,)
+
+
+    def tableNamed(self, name):
+        for table in self.tables:
+            if table.name == name:
+                return table
+        raise KeyError(name)
+
+
+    def sequenceNamed(self, name):
+        for sequence in self.sequences:
+            if sequence.name == name:
+                return sequence
+        raise KeyError(name)
+
+
+

Added: CalendarServer/trunk/twext/enterprise/dal/parseschema.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/parseschema.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/parseschema.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,461 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_parseschema -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Parser for SQL schema.
+"""
+
+from itertools import chain
+
+from sqlparse import parse, keywords
+from sqlparse.tokens import Keyword, Punctuation, Number, String, Name
+from sqlparse.sql import (Comment, Identifier, Parenthesis, IdentifierList,
+                          Function)
+
+from twext.enterprise.dal.model import Schema, Table, SQLType, ProcedureCall
+from twext.enterprise.dal.model import Constraint
+from twext.enterprise.dal.model import Sequence
+
+
+
+def _fixKeywords():
+    """
+    Work around bugs in SQLParse, adding SEQUENCE as a keyword (since it is
+    treated as one in postgres) and removing ACCESS and SIZE (since we use those
+    as column names).  Technically those are keywords in SQL, but they aren't
+    treated as such by postgres's parser.
+    """
+    keywords.KEYWORDS['SEQUENCE'] = Keyword
+    for columnNameKeyword in ['ACCESS', 'SIZE']:
+        del keywords.KEYWORDS[columnNameKeyword]
+
+_fixKeywords()
+
+
+
+def tableFromCreateStatement(schema, stmt):
+    """
+    Add a table from a CREATE TABLE sqlparse statement object.
+
+    @param schema: The schema to add the table statement to.
+
+    @type schema: L{Schema}
+
+    @param stmt: The C{CREATE TABLE} statement object.
+
+    @type stmt: L{Statement}
+    """
+    i = iterSignificant(stmt)
+    expect(i, ttype=Keyword.DDL, value='CREATE')
+    expect(i, ttype=Keyword, value='TABLE')
+    function = expect(i, cls=Function)
+    i = iterSignificant(function)
+    name = expect(i, cls=Identifier).get_name().encode('utf-8')
+    self = Table(schema, name)
+    parens = expect(i, cls=Parenthesis)
+    cp = _ColumnParser(self, iterSignificant(parens), parens)
+    cp.parse()
+    return self
+
+
+
+def schemaFromPath(path):
+    """
+    Get a L{Schema}.
+
+    @param path: a L{FilePath}-like object containing SQL.
+
+    @return: a L{Schema} object with the contents of the given C{path} parsed
+        and added to it as L{Table} objects.
+    """
+    schema = Schema(path.basename())
+    schemaData = path.getContent()
+    addSQLToSchema(schema, schemaData)
+    return schema
+
+
+
+def addSQLToSchema(schema, schemaData):
+    """
+    Add new SQL to an existing schema.
+
+    @param schema: The schema to add the new SQL to.
+
+    @type schema: L{Schema}
+
+    @param schemaData: A string containing some SQL statements.
+
+    @type schemaData: C{str}
+
+    @return: the C{schema} argument
+    """
+    parsed = parse(schemaData)
+    for stmt in parsed:
+        preface = ''
+        while stmt.tokens and not significant(stmt.tokens[0]):
+            preface += str(stmt.tokens.pop(0))
+        if not stmt.tokens:
+            continue
+        if stmt.get_type() == 'CREATE':
+            createType = stmt.token_next(1, True).value.upper()
+            if createType == u'TABLE':
+                t = tableFromCreateStatement(schema, stmt)
+                t.addComment(preface)
+            elif createType == u'SEQUENCE':
+                schema.sequences.append(
+                    Sequence(
+                        stmt.token_next(2, True).get_name().encode('utf-8')))
+        elif stmt.get_type() == 'INSERT':
+            insertTokens = iterSignificant(stmt)
+            expect(insertTokens, ttype=Keyword.DML, value='INSERT')
+            expect(insertTokens, ttype=Keyword, value='INTO')
+            tableName = expect(insertTokens, cls=Identifier).get_name()
+            expect(insertTokens, ttype=Keyword, value='VALUES')
+            values = expect(insertTokens, cls=Parenthesis)
+            vals = iterSignificant(values)
+            expect(vals, ttype=Punctuation, value='(')
+            valuelist = expect(vals, cls=IdentifierList)
+            expect(vals, ttype=Punctuation, value=')')
+            rowData = []
+            for ident in valuelist.get_identifiers():
+                rowData.append(
+                    {Number.Integer: int,
+                     String.Single: _destringify}
+                    [ident.ttype](ident.value)
+                )
+
+            schema.tableNamed(tableName).insertSchemaRow(rowData)
+        else:
+            print 'unknown type:', stmt.get_type()
+    return schema
+
+
+
+class _ColumnParser(object):
+    """
+    Stateful parser for the things between commas.
+    """
+
+    def __init__(self, table, parenIter, parens):
+        """
+        @param table: the L{Table} to add data to.
+
+        @param parenIter: the iterator.
+        """
+        self.parens = parens
+        self.iter = parenIter
+        self.table = table
+
+
+    def __iter__(self):
+        """
+        This object is an iterator; return itself.
+        """
+        return self
+
+
+    def next(self):
+        """
+        Get the next L{IdentifierList}.
+        """
+        result = self.iter.next()
+        if isinstance(result, IdentifierList):
+            # Expand out all identifier lists, since they seem to pop up
+            # incorrectly.  We should never see one in a column list anyway.
+            # http://code.google.com/p/python-sqlparse/issues/detail?id=25
+            while result.tokens:
+                it = result.tokens.pop()
+                if significant(it):
+                    self.pushback(it)
+            return self.next()
+        return result
+
+
+    def pushback(self, value):
+        """
+        Push the value back onto this iterator so it will be returned by the
+        next call to C{next}.
+        """
+        self.iter = chain(iter((value,)), self.iter)
+
+
+    def parse(self):
+        """
+        Parse everything.
+        """
+        expect(self.iter, ttype=Punctuation, value=u"(")
+        while self.nextColumn():
+            pass
+
+
+    def nextColumn(self):
+        """
+        Parse the next column or constraint, depending on the next token.
+        """
+        maybeIdent = self.next()
+        if maybeIdent.ttype == Name:
+            return self.parseColumn(maybeIdent.value)
+        elif isinstance(maybeIdent, Identifier):
+            return self.parseColumn(maybeIdent.get_name())
+        else:
+            return self.parseConstraint(maybeIdent)
+
+
+    def parseConstraint(self, constraintType):
+        """
+        Parse a 'free' constraint, described explicitly in the table as opposed
+        to being implicitly associated with a column by being placed after it.
+        """
+        # only know about PRIMARY KEY and UNIQUE for now
+        if constraintType.match(Keyword, 'PRIMARY'):
+            expect(self, ttype=Keyword, value='KEY')
+            expect(self, cls=Parenthesis)
+            self.primaryKey = 'MULTI-VALUE-KEY'
+        elif constraintType.match(Keyword, 'UNIQUE'):
+            parens = iterSignificant(expect(self, cls=Parenthesis))
+            expect(parens, ttype=Punctuation, value="(")
+            idorids = parens.next()
+            if isinstance(idorids, Identifier):
+                idnames = [idorids.get_name()]
+            elif isinstance(idorids, IdentifierList):
+                idnames = [x.get_name() for x in idorids.get_identifiers()]
+            else:
+                raise ViolatedExpectation("identifier or list", repr(idorids))
+            expect(parens, ttype=Punctuation, value=")")
+            self.table.tableConstraint(Constraint.UNIQUE, idnames)
+        else:
+            raise ViolatedExpectation('PRIMARY or UNIQUE', constraintType)
+        return self.checkEnd(self.next())
+
+
+    def checkEnd(self, val):
+        """
+        After a column or constraint, check the end.
+        """
+        if val.value == u",":
+            return True
+        elif val.value == u")":
+            return False
+        else:
+            raise ViolatedExpectation(", or )", val)
+
+
+    def parseColumn(self, name):
+        """
+        Parse a column with the given name.
+        """
+        typeName = self.next()
+        if isinstance(typeName, Function):
+            [funcIdent, args] = iterSignificant(typeName)
+            typeName = funcIdent
+            arggetter = iterSignificant(args)
+            expect(arggetter, value=u'(')
+            typeLength = int(expect(arggetter,
+                                    ttype=Number.Integer).value.encode('utf-8'))
+        else:
+            maybeTypeArgs = self.next()
+            if isinstance(maybeTypeArgs, Parenthesis):
+                # type arguments
+                significant = iterSignificant(maybeTypeArgs)
+                expect(significant, value=u"(")
+                typeLength = int(significant.next().value)
+            else:
+                # something else
+                typeLength = None
+                self.pushback(maybeTypeArgs)
+        theType = SQLType(typeName.value.encode("utf-8"), typeLength)
+        theColumn = self.table.addColumn(
+            name=name.encode("utf-8"), type=theType
+        )
+        for val in self:
+            if val.ttype == Punctuation:
+                return self.checkEnd(val)
+            else:
+                expected = True
+                def oneConstraint(t):
+                    self.table.tableConstraint(t,
+                                               [theColumn.name])
+
+                if val.match(Keyword, 'PRIMARY'):
+                    expect(self, ttype=Keyword, value='KEY')
+                    # XXX check to make sure there's no other primary key yet
+                    self.table.primaryKey = theColumn
+                elif val.match(Keyword, 'UNIQUE'):
+                    # XXX add UNIQUE constraint
+                    oneConstraint(Constraint.UNIQUE)
+                elif val.match(Keyword, 'NOT'):
+                    # possibly not necessary, as 'NOT NULL' is a single keyword
+                    # in sqlparse as of 0.1.2
+                    expect(self, ttype=Keyword, value='NULL')
+                    oneConstraint(Constraint.NOT_NULL)
+                elif val.match(Keyword, 'NOT NULL'):
+                    oneConstraint(Constraint.NOT_NULL)
+                elif val.match(Keyword, 'DEFAULT'):
+                    theDefault = self.next()
+                    if isinstance(theDefault, Function):
+                        thingo = theDefault.tokens[0].get_name()
+                        parens = expectSingle(
+                            theDefault.tokens[-1], cls=Parenthesis
+                        )
+                        pareniter = iterSignificant(parens)
+                        if thingo.upper() == 'NEXTVAL':
+                            expect(pareniter, ttype=Punctuation, value="(")
+                            seqname = _destringify(
+                                expect(pareniter, ttype=String.Single).value)
+                            defaultValue = self.table.schema.sequenceNamed(
+                                seqname
+                            )
+                            defaultValue.referringColumns.append(theColumn)
+                        else:
+                            defaultValue = ProcedureCall(thingo.encode('utf-8'),
+                                                         parens)
+                    elif theDefault.ttype == Number.Integer:
+                        defaultValue = int(theDefault.value)
+                    elif (theDefault.ttype == Keyword and
+                          theDefault.value.lower() == 'false'):
+                        defaultValue = False
+                    elif (theDefault.ttype == Keyword and
+                          theDefault.value.lower() == 'true'):
+                        defaultValue = True
+                    elif (theDefault.ttype == Keyword and
+                          theDefault.value.lower() == 'null'):
+                        defaultValue = None
+                    elif theDefault.ttype == String.Single:
+                        defaultValue = _destringify(theDefault.value)
+                    else:
+                        raise RuntimeError(
+                            "not sure what to do: default %r" % (
+                            theDefault))
+                    theColumn.setDefaultValue(defaultValue)
+                elif val.match(Keyword, 'REFERENCES'):
+                    target = nameOrIdentifier(self.next())
+                    theColumn.doesReferenceName(target)
+                elif val.match(Keyword, 'ON'):
+                    expect(self, ttype=Keyword.DML, value='DELETE')
+                    expect(self, ttype=Keyword, value='CASCADE')
+                else:
+                    expected = False
+                if not expected:
+                    print 'UNEXPECTED TOKEN:', repr(val), theColumn
+                    print self.parens
+                    import pprint
+                    pprint.pprint(self.parens.tokens)
+                    return 0
+
+
+
+
+class ViolatedExpectation(Exception):
+    """
+    An expectation about the structure of the SQL syntax was violated.
+    """
+
+    def __init__(self, expected, got):
+        self.expected = expected
+        self.got = got
+        super(ViolatedExpectation, self).__init__(
+            "Expected %r got %s" % (expected, got)
+        )
+
+
+
+def nameOrIdentifier(token):
+    """
+    Determine if the given object is a name or an identifier, and return the
+    textual value of that name or identifier.
+
+    @rtype: L{str}
+    """
+    if isinstance(token, Identifier):
+        return token.get_name()
+    elif token.ttype == Name:
+        return token.value
+    else:
+        raise ViolatedExpectation("identifier or name", repr(token))
+
+
+
+def expectSingle(nextval, ttype=None, value=None, cls=None):
+    """
+    Expect some properties from retrieved value.
+
+    @param ttype: A token type to compare against.
+
+    @param value: A value to compare against.
+
+    @param cls: A class to check if the value is an instance of.
+
+    @raise ViolatedExpectation: if an unexpected token is found.
+
+    @return: C{nextval}, if it matches.
+    """
+    if ttype is not None:
+        if nextval.ttype != ttype:
+            raise ViolatedExpectation(ttype, '%s:%s' % (nextval.ttype, nextval))
+    if value is not None:
+        if nextval.value.upper() != value.upper():
+            raise ViolatedExpectation(value, nextval.value)
+    if cls is not None:
+        if nextval.__class__ != cls:
+            raise ViolatedExpectation(cls, repr(nextval))
+    return nextval
+
+
+
+def expect(iterator, **kw):
+    """
+    Retrieve a value from an iterator and check its properties.  Same signature
+    as L{expectSingle}, except it takes an iterator instead of a value.
+
+    @see L{expectSingle}
+    """
+    nextval = iterator.next()
+    return expectSingle(nextval, **kw)
+
+
+
+def significant(token):
+    """
+    Determine if the token is 'significant', i.e. that it is not a comment and
+    not whitespace.
+    """
+    # comment has 'None' is_whitespace() result.  intentional?
+    return (not isinstance(token, Comment) and not token.is_whitespace())
+
+
+
+def iterSignificant(tokenList):
+    """
+    Iterate tokens that pass the test given by L{significant}, from a given
+    L{TokenList}.
+    """
+    for token in tokenList.tokens:
+        if significant(token):
+            yield token
+
+
+
+def _destringify(strval):
+    """
+    Convert a single-quoted SQL string into its actual represented value.
+    (Assumes standards compliance, since we should be controlling all the input
+    here.  The only quoting syntax respected is "''".)
+    """
+    return strval[1:-1].replace("''", "'")
+
+
+

Added: CalendarServer/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/syntax.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/syntax.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,668 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_sqlsyntax -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Syntax wrappers and generators for SQL.
+"""
+
+from twext.enterprise.dal.model import Schema, Table, Column
+
+
+class TableMismatch(Exception):
+    """
+    A table in a statement did not match with a column.
+    """
+
+
+
+class NotEnoughValues(ValueError):
+    """
+    Not enough values were supplied for an L{Insert}.
+    """
+
+
+
+class _Statement(object):
+    """
+    An SQL statement that may be executed.  (An abstract base class, must
+    implement several methods.)
+    """
+
+    _paramstyles = {
+        'pyformat': ('%s', lambda s: s.replace("%", "%%"))
+    }
+
+    def on(self, txn, raiseOnZeroRowCount=None, **kw):
+        """
+        Execute this statement on a given L{IAsyncTransaction} and return the
+        resulting L{Deferred}.
+        """
+        placeholder, quote = self._paramstyles[txn.paramstyle]
+        fragment = self.toSQL(placeholder, quote).bind(**kw)
+        return txn.execSQL(fragment.text, fragment.parameters,
+                           raiseOnZeroRowCount)
+
+
+
+class Syntax(object):
+    """
+    Base class for syntactic convenience.
+
+    This class will define dynamic attribute access to represent its underlying
+    model as a Python namespace.
+
+    You can access the underlying model as '.model'.
+    """
+
+    modelType = None
+
+    def __init__(self, model):
+        if not isinstance(model, self.modelType):
+            # make sure we don't get a misleading repr()
+            raise ValueError("type mismatch: %r %r", type(self), model)
+        self.model = model
+
+
+    def __repr__(self):
+        return '<Syntax for: %r>' % (self.model,)
+
+
+
+class FunctionInvocation(object):
+    def __init__(self, name, arg):
+        self.name = name
+        self.arg = arg
+
+
+    def subSQL(self, placeholder, quote, allTables):
+        result = SQLFragment(self.name)
+        result.text += "("
+        result.append(self.arg.subSQL(placeholder, quote, allTables))
+        result.text += ")"
+        return result
+
+
+
+class Function(object):
+    """
+    An L{Function} is a representation of an SQL Function function.
+    """
+
+    def __init__(self, name):
+        self.name = name
+
+
+    def __call__(self, arg):
+        """
+        Produce an L{FunctionInvocation}
+        """
+        return FunctionInvocation(self.name, arg)
+
+Max = Function("max")
+Len = Function("character_length")
+
+
+
+class SchemaSyntax(Syntax):
+    """
+    Syntactic convenience for L{Schema}.
+    """
+
+    modelType = Schema
+
+    def __getattr__(self, attr):
+        try:
+            tableModel = self.model.tableNamed(attr)
+        except KeyError:
+            raise AttributeError("schema has no table %r" % (attr,))
+        syntax = TableSyntax(tableModel)
+        # Needs to be preserved here so that aliasing will work.
+        setattr(self, attr, syntax)
+        return syntax
+
+
+    def __iter__(self):
+        for table in self.model.tables:
+            yield TableSyntax(table)
+
+
+
+class TableSyntax(Syntax):
+    """
+    Syntactic convenience for L{Table}.
+    """
+
+    modelType = Table
+
+    def join(self, otherTableSyntax, on, type=''):
+        return Join(self, type, otherTableSyntax, on)
+
+
+    def subSQL(self, placeholder, quote, allTables):
+        """
+        For use in a 'from' clause.
+        """
+        # XXX maybe there should be a specific method which is only invoked
+        # from the FROM clause, that only tables and joins would implement?
+        return SQLFragment(self.model.name)
+
+
+    def __getattr__(self, attr):
+        return ColumnSyntax(self.model.columnNamed(attr))
+
+
+    def __iter__(self):
+        for column in self.model.columns:
+            yield ColumnSyntax(column)
+
+
+    def tables(self):
+        return [self]
+
+
+    def aliases(self):
+        result = {}
+        for k, v in self.__dict__.items():
+            if isinstance(v, ColumnSyntax):
+                result[k] = v
+        return result
+
+
+    def __contains__(self, columnSyntax):
+        if isinstance(columnSyntax, FunctionInvocation):
+            columnSyntax = columnSyntax.arg
+        return (columnSyntax.model in self.model.columns)
+
+
+
+class Join(object):
+
+    def __init__(self, firstTable, type, secondTableOrJoin, on):
+        self.firstTable = firstTable
+        self.type = type
+        self.secondTableOrJoin = secondTableOrJoin
+        self.on = on
+
+
+    def subSQL(self, placeholder, quote, allTables):
+        stmt = SQLFragment()
+        stmt.append(self.firstTable.subSQL(placeholder, quote, allTables))
+        stmt.text += ' '
+        if self.type:
+            stmt.text += self.type
+            stmt.text += ' '
+        stmt.text += 'join '
+        stmt.append(self.secondTableOrJoin.subSQL(placeholder, quote, allTables))
+        stmt.text += ' on '
+        stmt.append(self.on.subSQL(placeholder, quote, allTables))
+        return stmt
+
+
+    def tables(self):
+        return self.firstTable.tables() + self.secondTableOrJoin.tables()
+
+
+
+def comparison(comparator):
+    def __(self, other):
+        if other is None:
+            return NullComparison(self, comparator)
+        if isinstance(other, ColumnSyntax):
+            return ColumnComparison(self, comparator, other)
+        else:
+            return ConstantComparison(self, comparator, other)
+    return __
+
+
+
+class ColumnSyntax(Syntax):
+    """
+    Syntactic convenience for L{Column}.
+    """
+
+    modelType = Column
+
+    __eq__ = comparison('=')
+    __ne__ = comparison('!=')
+    __gt__ = comparison('>')
+    __ge__ = comparison('>=')
+    __lt__ = comparison('<')
+    __le__ = comparison('<=')
+    __add__ = comparison("+")
+    __sub__ = comparison("-")
+
+
+    def subSQL(self, placeholder, quote, allTables):
+        # XXX This, and 'model', could in principle conflict with column names.
+        # Maybe do something about that.
+        for tableSyntax in allTables:
+            if self.model.table is not tableSyntax.model:
+                if self.model.name in (c.name for c in
+                                               tableSyntax.model.columns):
+                    return SQLFragment((self.model.table.name + '.' +
+                                         self.model.name))
+        return SQLFragment(self.model.name)
+
+
+    def In(self, subselect):
+        # Can't be Select.__contains__ because __contains__ gets __nonzero__
+        # called on its result by the 'in' syntax.
+        return CompoundComparison(self, 'in', subselect)
+
+
+
+class Comparison(object):
+
+    def __init__(self, a, op, b):
+        self.a = a
+        self.op = op
+        self.b = b
+
+
+    def __nonzero__(self):
+        raise ValueError(
+            "column comparisons should not be tested for truth value")
+
+
+    def booleanOp(self, operand, other):
+        return CompoundComparison(self, operand, other)
+
+
+    def And(self, other):
+        return self.booleanOp('and', other)
+
+
+    def Or(self, other):
+        return self.booleanOp('or', other)
+
+
+
+class NullComparison(Comparison):
+    """
+    A L{NullComparison} is a comparison of a column or expression with None.
+    """
+    def __init__(self, a, op):
+        # 'b' is always None for this comparison type
+        super(NullComparison, self).__init__(a, op, None)
+
+
+    def subSQL(self, placeholder, quote, allTables):
+        sqls = SQLFragment()
+        sqls.append(self.a.subSQL(placeholder, quote, allTables))
+        sqls.text += " is "
+        if self.op != "=":
+            sqls.text += "not "
+        sqls.text += "null"
+        return sqls
+
+
+class ConstantComparison(Comparison):
+
+    def subSQL(self, placeholder, quote, allTables):
+        sqls = SQLFragment()
+        sqls.append(self.a.subSQL(placeholder, quote, allTables))
+        sqls.append(SQLFragment(' ' + ' '.join([self.op, placeholder]),
+                                 [self.b]))
+        return sqls
+
+
+
+class CompoundComparison(Comparison):
+    """
+    A compound comparison; two or more constraints, joined by an operation
+    (currently only AND or OR).
+    """
+
+    def subSQL(self, placeholder, quote, allTables):
+        stmt = SQLFragment()
+        stmt.append(self.a.subSQL(placeholder, quote, allTables))
+        stmt.text += ' %s ' % (self.op,)
+        stmt.append(self.b.subSQL(placeholder, quote, allTables))
+        return stmt
+
+
+
+class ColumnComparison(CompoundComparison):
+    """
+    Comparing two columns is the same as comparing any other two expressions,
+    (for now).
+    """
+
+
+
+class _AllColumns(object):
+
+    def subSQL(self, placeholder, quote, allTables):
+        return SQLFragment(quote('*'))
+
+ALL_COLUMNS = _AllColumns()
+
+
+
+class _SomeColumns(object):
+
+    def __init__(self, columns):
+        self.columns = columns
+
+
+    def subSQL(self, placeholder, quote, allTables):
+        first = True
+        cstatement = SQLFragment()
+        for column in self.columns:
+            if first:
+                first = False
+            else:
+                cstatement.append(SQLFragment(", "))
+            cstatement.append(column.subSQL(placeholder, quote, allTables))
+        return cstatement
+
+
+
+class Select(_Statement):
+    """
+    'select' statement.
+    """
+
+    def __init__(self, columns=None, Where=None, From=None, OrderBy=None,
+                 GroupBy=None):
+        self.From = From
+        self.Where = Where
+        self.OrderBy = OrderBy
+        self.GroupBy = GroupBy
+        if columns is None:
+            columns = ALL_COLUMNS
+        else:
+            for column in columns:
+                for table in From.tables():
+                    if column in table:
+                        break
+                else:
+                    raise TableMismatch()
+            columns = _SomeColumns(columns)
+        self.columns = columns
+
+
+    def toSQL(self, placeholder="?", quote=lambda x: x):
+        """
+        @return: a 'select' statement with placeholders and arguments
+
+        @rtype: L{SQLFragment}
+        """
+        stmt = SQLFragment(quote("select "))
+        allTables = self.From.tables()
+        stmt.append(self.columns.subSQL(placeholder, quote, allTables))
+        stmt.text += quote(" from ")
+        stmt.append(self.From.subSQL(placeholder, quote, allTables))
+        if self.Where is not None:
+            wherestmt = self.Where.subSQL(placeholder, quote, allTables)
+            stmt.text += quote(" where ")
+            stmt.append(wherestmt)
+        for bywhat, expr in [('group', self.GroupBy), ('order', self.OrderBy)]:
+            if expr is not None:
+                stmt.text += quote(" " + bywhat + " by ")
+                stmt.append(expr.subSQL(placeholder, quote, allTables))
+        return stmt
+
+
+    def subSQL(self, placeholder, quote, allTables):
+        result = SQLFragment("(")
+        result.append(self.toSQL(placeholder, quote))
+        result.append(SQLFragment(")"))
+        return result
+
+
+
+def _commaJoined(stmts):
+    first = True
+    cstatement = SQLFragment()
+    for stmt in stmts:
+        if first:
+            first = False
+        else:
+            cstatement.append(SQLFragment(", "))
+        cstatement.append(stmt)
+    return cstatement
+
+
+
+def _inParens(stmt):
+    result = SQLFragment("(")
+    result.append(stmt)
+    result.append(SQLFragment(")"))
+    return result
+
+
+
+def _fromSameTable(columns):
+    """
+    Extract the common table used by a list of L{Column} objects, raising
+    L{TableMismatch}.
+    """
+    table = columns[0].table
+    for column in columns:
+        if table is not column.table:
+            raise TableMismatch("Columns must all be from the same table.")
+    return table
+
+
+
+def _modelsFromMap(columnMap):
+    """
+    Get the L{Column} objects from a mapping of L{ColumnSyntax} to values.
+    """
+    return [c.model for c in columnMap.keys()]
+
+
+
+class Insert(object):
+    """
+    'insert' statement.
+    """
+
+    def __init__(self, columnMap, Return=None):
+        self.columnMap = columnMap
+        self.Return = Return
+        columns = _modelsFromMap(columnMap)
+        table = _fromSameTable(columns)
+        required = [column for column in table.columns if column.needsValue()]
+        unspecified = [column for column in required
+                       if column not in columns]
+        if unspecified:
+            raise NotEnoughValues(
+                'Columns [%s] required.' %
+                    (', '.join([c.name for c in unspecified])))
+
+
+    def toSQL(self, placeholder="?", quote=lambda x: x):
+        """
+        @return: a 'insert' statement with placeholders and arguments
+
+        @rtype: L{SQLFragment}
+        """
+        sortedColumns = sorted(self.columnMap.items(),
+                               key=lambda (c, v): c.model.name)
+        allTables = []
+        stmt = SQLFragment('insert into ')
+        stmt.append(
+            TableSyntax(sortedColumns[0][0].model.table)
+            .subSQL(placeholder, quote, allTables))
+        stmt.append(SQLFragment(" "))
+        stmt.append(_inParens(_commaJoined(
+            [c.subSQL(placeholder, quote, allTables) for (c, v) in
+             sortedColumns])))
+        stmt.append(SQLFragment(" values "))
+        stmt.append(_inParens(_commaJoined(
+            [SQLFragment(placeholder, [v]) for (c, v) in sortedColumns])))
+        if self.Return is not None:
+            stmt.text += ' returning '
+            stmt.append(self.Return.subSQL(placeholder, quote, allTables))
+        return stmt
+
+
+
+class Update(object):
+    """
+    'update' statement
+    """
+
+    def __init__(self, columnMap, Where, Return=None):
+        super(Update, self).__init__()
+        _fromSameTable(_modelsFromMap(columnMap))
+        self.columnMap = columnMap
+        self.Where = Where
+        self.Return = Return
+
+
+    def toSQL(self, placeholder="?", quote=lambda x: x):
+        """
+        @return: a 'insert' statement with placeholders and arguments
+
+        @rtype: L{SQLFragment}
+        """
+        sortedColumns = sorted(self.columnMap.items(),
+                               key=lambda (c, v): c.model.name)
+        allTables = []
+        result = SQLFragment('update ')
+        result.append(
+            TableSyntax(sortedColumns[0][0].model.table).subSQL(
+                placeholder, quote, allTables)
+        )
+        result.text += ' set '
+        result.append(
+            _commaJoined(
+                [c.subSQL(placeholder, quote, allTables).append(
+                    SQLFragment(" = " + placeholder, [v]))
+                    for (c, v) in sortedColumns]
+            )
+        )
+        result.append(SQLFragment( ' where '))
+        result.append(self.Where.subSQL(placeholder, quote, allTables))
+        if self.Return is not None:
+            result.append(SQLFragment(' returning '))
+            result.append(self.Return.subSQL(placeholder, quote, allTables))
+        return result
+
+
+
+class Delete(object):
+    """
+    'delete' statement.
+    """
+
+    def __init__(self, From, Where, Return=None):
+        self.From = From
+        self.Where = Where
+        self.Return = Return
+
+
+    def toSQL(self, placeholder="?", quote=lambda x: x):
+        result = SQLFragment()
+        allTables = self.From.tables()
+        result.text += quote('delete from ')
+        result.append(self.From.subSQL(placeholder, quote, allTables))
+        result.text += quote(' where ')
+        result.append(self.Where.subSQL(placeholder, quote, allTables))
+        if self.Return is not None:
+            result.append(SQLFragment(' returning '))
+            result.append(self.Return.subSQL(placeholder, quote, allTables))
+        return result
+
+
+
+class Lock(object):
+    """
+    An SQL 'lock' statement.
+    """
+
+    def __init__(self, table, mode):
+        self.table = table
+        self.mode = mode
+
+
+    @classmethod
+    def exclusive(cls, table):
+        return cls(table, 'exclusive')
+
+
+    def toSQL(self, placeholder="?", quote=lambda x: x):
+        return SQLFragment('lock table ').append(
+            self.table.subSQL(placeholder, quote, [self.table])).append(
+            SQLFragment(' in %s mode' % (self.mode,)))
+
+
+
+class SQLFragment(object):
+    """
+    Combination of SQL text and arguments; a statement which may be executed
+    against a database.
+    """
+
+    def __init__(self, text="", parameters=None):
+        self.text = text
+        if parameters is None:
+            parameters = []
+        self.parameters = parameters
+
+
+    def bind(self, **kw):
+        params = []
+        for parameter in self.parameters:
+            if isinstance(parameter, Parameter):
+                params.append(kw[parameter.name])
+            else:
+                params.append(parameter)
+        return SQLFragment(self.text, params)
+
+
+    def append(self, anotherStatement):
+        self.text += anotherStatement.text
+        self.parameters += anotherStatement.parameters
+        return self
+
+
+    def __eq__(self, stmt):
+        if not isinstance(stmt, SQLFragment):
+            return NotImplemented
+        return (self.text, self.parameters) == (stmt.text, stmt.parameters)
+
+
+    def __ne__(self, stmt):
+        if not isinstance(stmt, SQLFragment):
+            return NotImplemented
+        return not self.__eq__(stmt)
+
+
+    def __repr__(self):
+        return self.__class__.__name__ + repr((self.text, self.parameters))
+
+
+    def subSQL(self, placeholder, quote, allTables):
+        return self
+
+
+
+class Parameter(object):
+
+    def __init__(self, name):
+        self.name = name
+
+
+    def __repr__(self):
+        return 'Parameter(%r)' % (self.name,)
+
+
+

Added: CalendarServer/trunk/twext/enterprise/dal/test/__init__.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/__init__.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/test/__init__.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,19 @@
+##
+# Copyright (c) 2005-2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for twext.enterprise.dal.
+"""

Added: CalendarServer/trunk/twext/enterprise/dal/test/test_parseschema.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_parseschema.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_parseschema.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,174 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for parsing an SQL schema, which cover L{twext.enterprise.dal.model}
+and L{twext.enterprise.dal.parseschema}.
+"""
+
+from twext.enterprise.dal.model import Schema
+
+from twext.enterprise.dal.parseschema import addSQLToSchema
+from twisted.trial.unittest import TestCase
+
+
+class ParsingExampleTests(TestCase):
+    """
+    Tests for parsing some sample schemas.
+    """
+
+    def test_simplest(self):
+        """
+        Parse an extremely simple schema with one table in it.
+        """
+        s = Schema()
+        addSQLToSchema(s, "create table foo (bar integer);")
+        self.assertEquals(len(s.tables), 1)
+        foo = s.tableNamed('foo')
+        self.assertEquals(len(foo.columns), 1)
+        bar = foo.columns[0]
+        self.assertEquals(bar.name, "bar")
+        self.assertEquals(bar.type.name, "integer")
+
+
+    def test_stringTypes(self):
+        """
+        Table and column names should be byte strings.
+        """
+        s = Schema()
+        addSQLToSchema(s, "create table foo (bar integer);")
+        self.assertEquals(len(s.tables), 1)
+        foo = s.tableNamed('foo')
+        self.assertIsInstance(foo.name, str)
+        self.assertIsInstance(foo.columnNamed('bar').name, str)
+
+
+    def test_typeWithLength(self):
+        """
+        Parse a type with a length.
+        """
+        s = Schema()
+        addSQLToSchema(s, "create table foo (bar varchar(6543))")
+        bar = s.tableNamed('foo').columnNamed('bar')
+        self.assertEquals(bar.type.name, "varchar")
+        self.assertEquals(bar.type.length, 6543)
+
+
+    def test_sequence(self):
+        """
+        Parsing a 'create sequence' statement adds a L{Sequence} to the
+        L{Schema}.
+        """
+        s = Schema()
+        addSQLToSchema(s, "create sequence myseq;")
+        self.assertEquals(len(s.sequences), 1)
+        self.assertEquals(s.sequences[0].name, "myseq")
+
+
+    def test_sequenceColumn(self):
+        """
+        Parsing a 'create sequence' statement adds a L{Sequence} to the
+        L{Schema}, and then a table that contains a column which uses the SQL
+        C{nextval()} function to retrieve its default value from that sequence,
+        will cause the L{Column} object to refer to the L{Sequence} and vice
+        versa.
+        """
+        s = Schema()
+        addSQLToSchema(s,
+                       """
+                       create sequence thingy;
+                       create table thetable (
+                           thecolumn integer default nextval('thingy')
+                       );
+                       """)
+        self.assertEquals(len(s.sequences), 1)
+        self.assertEquals(s.sequences[0].name, "thingy")
+        self.assertEquals(s.tables[0].columns[0].default, s.sequences[0])
+        self.assertEquals(s.sequences[0].referringColumns,
+                          [s.tables[0].columns[0]])
+
+
+    def test_defaultConstantColumns(self):
+        """
+        Parsing a 'default' column with an appropriate type in it will return
+        that type as the 'default' attribute of the Column object.
+        """
+        s = Schema()
+        addSQLToSchema(s,
+                       """
+                       create table a (
+                        b integer default 4321,
+                        c boolean default false,
+                        d boolean default true,
+                        e varchar(255) default 'sample value',
+                        f varchar(255) default null
+                       );
+                       """)
+        table = s.tableNamed("a")
+        self.assertEquals(table.columnNamed("b").default, 4321)
+        self.assertEquals(table.columnNamed("c").default, False)
+        self.assertEquals(table.columnNamed("d").default, True)
+        self.assertEquals(table.columnNamed("e").default, 'sample value')
+        self.assertEquals(table.columnNamed("f").default, None)
+
+
+    def test_notNull(self):
+        """
+        A column with a NOT NULL constraint in SQL will be parsed as a
+        constraint which returns False from its C{canBeNull()} method.
+        """
+        s = Schema()
+        addSQLToSchema(s,
+                       """
+                       create table alpha (beta integer,
+                                           gamma integer not null);
+                       """)
+        t = s.tableNamed('alpha')
+        self.assertEquals(True, t.columnNamed('beta').canBeNull())
+        self.assertEquals(False, t.columnNamed('gamma').canBeNull())
+
+
+    def test_unique(self):
+        """
+        A column with a UNIQUE constraint in SQL will result in the table
+        listing that column as a unique set.
+        """
+        for identicalSchema in [
+                "create table sample (example integer unique);",
+                "create table sample (example integer, unique(example));"]:
+            s = Schema()
+            addSQLToSchema(s, identicalSchema)
+            table = s.tableNamed('sample')
+            column = table.columnNamed('example')
+            self.assertEquals(list(table.uniques()), [set([column])])
+
+
+    def test_multiUnique(self):
+        """
+        A column with a UNIQUE constraint in SQL will result in the table
+        listing that column as a unique set.
+        """
+        s = Schema()
+        addSQLToSchema(
+            s,
+            "create table a (b integer, c integer, unique(b, c), unique(c));")
+        a = s.tableNamed('a')
+        b = a.columnNamed('b')
+        c = a.columnNamed('c')
+        self.assertEquals(list(a.uniques()),
+                          [set([b, c]), set([c])])
+
+

Added: CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,417 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.enterprise.dal.syntax}
+"""
+
+from twext.enterprise.dal.model import Schema
+from twext.enterprise.dal.parseschema import addSQLToSchema
+from twext.enterprise.dal.syntax import (
+    SchemaSyntax, Select, Insert, Update, Delete, Lock, SQLFragment,
+    TableMismatch, Parameter, Max, Len, NotEnoughValues
+)
+
+from twisted.trial.unittest import TestCase
+
+class GenerationTests(TestCase):
+    """
+    Tests for syntactic helpers to generate SQL queries.
+    """
+
+    def setUp(self):
+        s = Schema(self.id())
+        addSQLToSchema(schema=s, schemaData="""
+                       create table FOO (BAR integer, BAZ integer);
+                       create table BOZ (QUX integer);
+                       create table OTHER (BAR integer,
+                                           FOO_BAR integer not null);
+                       create table TEXTUAL (MYTEXT varchar(255));
+                       """)
+        self.schema = SchemaSyntax(s)
+
+
+    def test_simplestSelect(self):
+        """
+        L{Select} generates a 'select' statement, by default, asking for all
+        rows in a table.
+        """
+        self.assertEquals(Select(From=self.schema.FOO).toSQL(),
+                          SQLFragment("select * from FOO", []))
+
+
+    def test_simpleWhereClause(self):
+        """
+        L{Select} generates a 'select' statement with a 'where' clause
+        containing an expression.
+        """
+        self.assertEquals(Select(From=self.schema.FOO,
+                                 Where=self.schema.FOO.BAR == 1).toSQL(),
+                          SQLFragment("select * from FOO where BAR = ?", [1]))
+
+
+    def test_quotingAndPlaceholder(self):
+        """
+        L{Select} generates a 'select' statement with the specified placeholder
+        syntax and quoting function.
+        """
+        self.assertEquals(Select(From=self.schema.FOO,
+                                 Where=self.schema.FOO.BAR == 1).toSQL(
+                                 placeholder="*",
+                                 quote=lambda partial: partial.replace("*", "**")),
+                          SQLFragment("select ** from FOO where BAR = *", [1]))
+
+
+    def test_columnComparison(self):
+        """
+        L{Select} generates a 'select' statement which compares columns.
+        """
+        self.assertEquals(Select(From=self.schema.FOO,
+                                 Where=self.schema.FOO.BAR ==
+                                 self.schema.FOO.BAZ).toSQL(),
+                          SQLFragment("select * from FOO where BAR = BAZ", []))
+
+
+    def test_comparisonTestErrorPrevention(self):
+        """
+        The comparison object between columns raises an exception when compared
+        for a truth value, so that code will not accidentally test for '==' and
+        always get a true value.
+        """
+        def sampleComparison():
+            if self.schema.FOO.BAR == self.schema.FOO.BAZ:
+                return 'comparison should not succeed'
+        self.assertRaises(ValueError, sampleComparison)
+
+
+    def test_nullComparison(self):
+        """
+        Comparing a column with None results in the generation of an 'is null'
+        or 'is not null' SQL statement.
+        """
+        self.assertEquals(Select(From=self.schema.FOO,
+                                 Where=self.schema.FOO.BAR ==
+                                 None).toSQL(),
+                          SQLFragment(
+                              "select * from FOO where BAR is null", []))
+        self.assertEquals(Select(From=self.schema.FOO,
+                                 Where=self.schema.FOO.BAR !=
+                                 None).toSQL(),
+                          SQLFragment(
+                              "select * from FOO where BAR is not null", []))
+
+
+    def test_compoundWhere(self):
+        """
+        L{Select.And} and L{Select.Or} will return compound columns.
+        """
+        self.assertEquals(
+            Select(From=self.schema.FOO,
+                   Where=(self.schema.FOO.BAR < 2).Or(
+                          self.schema.FOO.BAR > 5)).toSQL(),
+            SQLFragment("select * from FOO where BAR < ? or BAR > ?", [2, 5]))
+
+
+    def test_orderBy(self):
+        """
+        L{Select}'s L{OrderBy} parameter generates an 'order by' clause for a
+        'select' statement.
+        """
+        self.assertEquals(
+            Select(From=self.schema.FOO,
+                   OrderBy=self.schema.FOO.BAR).toSQL(),
+            SQLFragment("select * from FOO order by BAR")
+        )
+
+
+    def test_groupBy(self):
+        """
+        L{Select}'s L{GroupBy} parameter generates a 'group by' clause for a
+        'select' statement.
+        """
+        self.assertEquals(
+            Select(From=self.schema.FOO,
+                   GroupBy=self.schema.FOO.BAR).toSQL(),
+            SQLFragment("select * from FOO group by BAR")
+        )
+
+
+    def test_joinClause(self):
+        """
+        A table's .join() method returns a join statement in a SELECT.
+        """
+        self.assertEquals(
+            Select(From=self.schema.FOO.join(
+                self.schema.BOZ, self.schema.FOO.BAR ==
+                self.schema.BOZ.QUX)).toSQL(),
+            SQLFragment("select * from FOO join BOZ on BAR = QUX", [])
+        )
+
+
+    def test_columnSelection(self):
+        """
+        If a column is specified by the argument to L{Select}, those will be
+        output by the SQL statement rather than the all-columns wildcard.
+        """
+        self.assertEquals(
+            Select([self.schema.FOO.BAR],
+                   From=self.schema.FOO).toSQL(),
+            SQLFragment("select BAR from FOO")
+        )
+
+
+    def test_columnAliases(self):
+        """
+        When attributes are set on a L{TableSyntax}, they will be remembered as
+        column aliases, and their alias names may be retrieved via the
+        L{TableSyntax.aliases} method.
+        """
+        self.assertEquals(self.schema.FOO.aliases(), {})
+        self.schema.FOO.ALIAS = self.schema.FOO.BAR
+        # you comparing ColumnSyntax object results in a ColumnComparison, which
+        # you can't test for truth.
+        fixedForEquality = dict([(k, v.model) for k, v in
+                                 self.schema.FOO.aliases().items()])
+        self.assertEquals(fixedForEquality,
+                          {'ALIAS': self.schema.FOO.BAR.model})
+        self.assertIdentical(self.schema.FOO.ALIAS.model,
+                             self.schema.FOO.BAR.model)
+
+
+    def test_multiColumnSelection(self):
+        """
+        If multiple columns are specified by the argument to L{Select}, those
+        will be output by the SQL statement rather than the all-columns
+        wildcard.
+        """
+        self.assertEquals(
+            Select([self.schema.FOO.BAZ,
+                    self.schema.FOO.BAR],
+                   From=self.schema.FOO).toSQL(),
+            SQLFragment("select BAZ, BAR from FOO")
+        )
+
+
+    def test_joinColumnSelection(self):
+        """
+        If multiple columns are specified by the argument to L{Select} that uses
+        a L{TableSyntax.join}, those will be output by the SQL statement.
+        """
+        self.assertEquals(
+            Select([self.schema.FOO.BAZ,
+                    self.schema.BOZ.QUX],
+                   From=self.schema.FOO.join(self.schema.BOZ,
+                                             self.schema.FOO.BAR ==
+                                             self.schema.BOZ.QUX)).toSQL(),
+            SQLFragment("select BAZ, QUX from FOO join BOZ on BAR = QUX")
+        )
+
+
+    def test_tableMismatch(self):
+        """
+        When a column in the 'columns' argument does not match the table from
+        the 'From' argument, L{Select} raises a L{TableMismatch}.
+        """
+        self.assertRaises(TableMismatch, Select, [self.schema.BOZ.QUX],
+                          From=self.schema.FOO)
+
+
+    def test_qualifyNames(self):
+        """
+        When two columns in the FROM clause requested from different tables have
+        the same name, the emitted SQL should explicitly disambiguate them.
+        """
+        self.assertEquals(
+            Select([self.schema.FOO.BAR,
+                    self.schema.OTHER.BAR],
+                   From=self.schema.FOO.join(self.schema.OTHER,
+                                             self.schema.OTHER.FOO_BAR ==
+                                             self.schema.FOO.BAR)).toSQL(),
+            SQLFragment(
+                "select FOO.BAR, OTHER.BAR from FOO "
+                "join OTHER on FOO_BAR = FOO.BAR"))
+
+
+    def test_bindParameters(self):
+        """
+        L{SQLFragment.bind} returns a copy of that L{SQLFragment} with the
+        L{Parameter} objects in its parameter list replaced with the keyword
+        arguments to C{bind}.
+        """
+
+        self.assertEquals(
+            Select(From=self.schema.FOO,
+                   Where=(self.schema.FOO.BAR > Parameter("testing")).And(
+                   self.schema.FOO.BAZ < 7)).toSQL().bind(testing=173),
+            SQLFragment("select * from FOO where BAR > ? and BAZ < ?",
+                         [173, 7]))
+
+
+    def test_inSubSelect(self):
+        """
+        L{ColumnSyntax.In} returns a sub-expression using the SQL 'in' syntax.
+        """
+        wherein = (self.schema.FOO.BAR.In(
+                    Select([self.schema.BOZ.QUX], From=self.schema.BOZ)))
+        self.assertEquals(
+            Select(From=self.schema.FOO, Where=wherein).toSQL(),
+            SQLFragment(
+                "select * from FOO where BAR in (select QUX from BOZ)"))
+
+
+    def test_max(self):
+        """
+        Test for the 'Max' function.
+        """
+        self.assertEquals(
+            Select([Max(self.schema.BOZ.QUX)], From=self.schema.BOZ).toSQL(),
+            SQLFragment(
+                "select max(QUX) from BOZ"))
+
+
+    def test_len(self):
+        """
+        Test for the 'Len' function for determining character length of a
+        column.  (Note that this should be updated to use different techniques
+        as necessary in different databases.)
+        """
+        self.assertEquals(
+            Select([Len(self.schema.TEXTUAL.MYTEXT)],
+                    From=self.schema.TEXTUAL).toSQL(),
+            SQLFragment(
+                "select character_length(MYTEXT) from TEXTUAL"))
+
+
+    def test_insert(self):
+        """
+        L{Insert.toSQL} generates an 'insert' statement with all the relevant
+        columns.
+        """
+        self.assertEquals(
+            Insert({self.schema.FOO.BAR: 23,
+                    self.schema.FOO.BAZ: 9}).toSQL(),
+            SQLFragment("insert into FOO (BAR, BAZ) values (?, ?)", [23, 9]))
+
+
+    def test_insertNotEnough(self):
+        """
+        L{Insert}'s constructor will raise L{NotEnoughValues} if columns have
+        not been specified.
+        """
+        notEnough = self.assertRaises(
+            NotEnoughValues, Insert, {self.schema.OTHER.BAR: 9}
+        )
+        self.assertEquals(str(notEnough), "Columns [FOO_BAR] required.")
+
+
+    def test_insertReturning(self):
+        """
+        L{Insert}'s C{Return} argument will insert an SQL 'returning' clause.
+        """
+        self.assertEquals(
+            Insert({self.schema.FOO.BAR: 23,
+                    self.schema.FOO.BAZ: 9},
+                   Return=self.schema.FOO.BAR).toSQL(),
+            SQLFragment(
+                "insert into FOO (BAR, BAZ) values (?, ?) returning BAR",
+                [23, 9])
+        )
+
+
+    def test_insertMismatch(self):
+        """
+        L{Insert} raises L{TableMismatch} if the columns specified aren't all
+        from the same table.
+        """
+        self.assertRaises(
+            TableMismatch,
+            Insert, {self.schema.FOO.BAR: 23,
+                     self.schema.FOO.BAZ: 9,
+                     self.schema.TEXTUAL.MYTEXT: 'hello'}
+        )
+
+
+    def test_updateReturning(self):
+        """
+        L{update}'s C{Return} argument will update an SQL 'returning' clause.
+        """
+        self.assertEquals(
+            Update({self.schema.FOO.BAR: 23},
+                   self.schema.FOO.BAZ == 43,
+                   Return=self.schema.FOO.BAR).toSQL(),
+            SQLFragment(
+                "update FOO set BAR = ? where BAZ = ? returning BAR",
+                [23, 43])
+        )
+
+
+    def test_updateMismatch(self):
+        """
+        L{Update} raises L{TableMismatch} if the columns specified aren't all
+        from the same table.
+        """
+        self.assertRaises(
+            TableMismatch,
+            Update, {self.schema.FOO.BAR: 23,
+                     self.schema.FOO.BAZ: 9,
+                     self.schema.TEXTUAL.MYTEXT: 'hello'},
+            Where=self.schema.FOO.BAZ == 9
+        )
+
+
+    def test_deleteReturning(self):
+        """
+        L{Delete}'s C{Return} argument will delete an SQL 'returning' clause.
+        """
+        self.assertEquals(
+            Delete(self.schema.FOO,
+                   Where=self.schema.FOO.BAR == 7,
+                   Return=self.schema.FOO.BAZ).toSQL(),
+            SQLFragment(
+                "delete from FOO where BAR = ? returning BAZ", [7])
+        )
+
+
+    def test_update(self):
+        """
+        L{Update.toSQL} generates an 'update' statement.
+        """
+        self.assertEquals(
+            Update({self.schema.FOO.BAR: 4321},
+                    self.schema.FOO.BAZ == 1234).toSQL(),
+            SQLFragment("update FOO set BAR = ? where BAZ = ?", [4321, 1234]))
+
+
+    def test_delete(self):
+        """
+        L{Delete} generates an SQL 'delete' statement.
+        """
+        self.assertEquals(
+            Delete(self.schema.FOO,
+                   Where=self.schema.FOO.BAR == 12).toSQL(),
+            SQLFragment(
+                "delete from FOO where BAR = ?", [12])
+        )
+
+
+    def test_lock(self):
+        """
+        L{Lock.exclusive} generates a ('lock table') statement, locking the
+        table in the specified mode.
+        """
+        self.assertEquals(Lock.exclusive(self.schema.FOO).toSQL(),
+                          SQLFragment("lock table FOO in exclusive mode"))
+

Added: CalendarServer/trunk/twext/enterprise/ienterprise.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/ienterprise.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/ienterprise.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,86 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Interfaces, mostly related to L{twext.enterprise.adbapi2}.
+"""
+
+__all__ = [
+    "IAsyncTransaction",
+]
+
+from zope.interface import Interface, Attribute
+
+
+class AlreadyFinishedError(Exception):
+    """
+    The transaction was already completed via an C{abort} or C{commit} and
+    cannot be aborted or committed again.
+    """
+
+
+
+class IAsyncTransaction(Interface):
+    """
+    Asynchronous execution of SQL.
+
+    Note that there is no {begin()} method; if an L{IAsyncTransaction} exists,
+    it is assumed to have been started.
+    """
+
+    paramstyle = Attribute(
+        """
+        A copy of the 'paramstyle' attribute from a DB-API 2.0 module.
+        """)
+
+
+    def execSQL(sql, args=(), raiseOnZeroRowCount=None):
+        """
+        Execute some SQL.
+
+        @param sql: an SQL string.
+
+        @type sql: C{str}
+
+        @param args: C{list} of arguments to interpolate into C{sql}.
+
+        @param raiseOnZeroRowCount: a 0-argument callable which returns an
+            exception to raise if the executed SQL does not affect any rows.
+
+        @return: L{Deferred} which fires C{list} of C{tuple}
+
+        @raise: C{raiseOnZeroRowCount} if it was specified and no rows were
+            affected.
+        """
+
+
+    def commit():
+        """
+        Commit changes caused by this transaction.
+
+        @return: L{Deferred} which fires with C{None} upon successful
+            completion of this transaction.
+        """
+
+
+    def abort():
+        """
+        Roll back changes caused by this transaction.
+
+        @return: L{Deferred} which fires with C{None} upon successful
+            rollback of this transaction.
+        """
+

Added: CalendarServer/trunk/twext/enterprise/test/__init__.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/__init__.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/test/__init__.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,20 @@
+
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.enterprise}.
+"""

Added: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	                        (rev 0)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,175 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.enterprise.adbapi2}.
+"""
+
+from itertools import count
+
+from twisted.trial.unittest import TestCase
+
+from twisted.internet.defer import inlineCallbacks
+
+from twext.enterprise.adbapi2 import ConnectionPool
+
+
+class Child(object):
+    def __init__(self, parent):
+        self.parent = parent
+        self.parent.children.append(self)
+
+    def close(self):
+        self.parent.children.remove(self)
+
+
+
+class Parent(object):
+
+    def __init__(self):
+        self.children = []
+
+
+
+class FakeConnection(Parent, Child):
+    """
+    Fake Stand-in for DB-API 2.0 connection.
+    """
+
+    def __init__(self, factory):
+        """
+        Initialize list of cursors
+        """
+        Parent.__init__(self)
+        Child.__init__(self, factory)
+        self.id = factory.idcounter.next()
+
+
+    @property
+    def cursors(self):
+        "Alias to make tests more readable."
+        return self.children
+
+
+    def cursor(self):
+        return FakeCursor(self)
+
+
+    def commit(self):
+        return
+
+
+    def rollback(self):
+        return
+
+
+
+class FakeCursor(Child):
+    """
+    Fake stand-in for a DB-API 2.0 cursor.
+    """
+    def __init__(self, connection):
+        Child.__init__(self, connection)
+        self.rowcount = 0
+        # not entirely correct, but all we care about is its truth value.
+        self.description = False
+
+
+    @property
+    def connection(self):
+        "Alias to make tests more readable."
+        return self.parent
+
+
+    def execute(self, sql, args=()):
+        self.sql = sql
+        self.description = True
+        self.rowcount = 1
+        return
+
+
+    def fetchall(self):
+        """
+        Just echo the SQL that was executed in the last query.
+        """
+        return [[self.connection.id, self.sql]]
+
+
+
+class ConnectionFactory(Parent):
+    def __init__(self):
+        Parent.__init__(self)
+        self.idcounter = count(1)
+
+    @property
+    def connections(self):
+        "Alias to make tests more readable."
+        return self.children
+
+
+    def connect(self):
+        return FakeConnection(self)
+
+
+
+class ConnectionPoolTests(TestCase):
+
+    @inlineCallbacks
+    def test_tooManyConnections(self):
+        """
+        When the number of outstanding busy transactions exceeds the number of
+        slots specified by L{ConnectionPool.maxConnections},
+        L{ConnectionPool.connection} will return a L{PooledSqlTxn} that is not
+        backed by any L{BaseSqlTxn}; this object will queue its SQL statements
+        until an existing connection becomes available.
+        """
+        cf = ConnectionFactory()
+        cp = ConnectionPool(cf.connect, maxConnections=2)
+        cp.startService()
+        self.addCleanup(cp.stopService)
+        a = cp.connection()
+        [[counter, echo]] = yield a.execSQL("alpha")
+        b = cp.connection()
+        [[bcounter, becho]] = yield b.execSQL("beta")
+
+        # both 'a' and 'b' are holding open a connection now; let's try to open
+        # a third one.  (The ordering will be deterministic even if this fails,
+        # because those threads are already busy.)
+        c = cp.connection()
+        enqueue = c.execSQL("gamma")
+        x = []
+        def addtox(it):
+            x.append(it)
+            return it
+        enqueue.addCallback(addtox)
+
+        # Did 'c' open a connection?  Let's hope not...
+        self.assertEquals(len(cf.connections), 2)
+        # This assertion is _not_ deterministic, unfortunately; it's unlikely
+        # that the implementation could be adjusted such that this assertion
+        # would fail and the others would succeed.  However, if it does fail,
+        # that's really bad, so I am leaving it regardless.
+        self.failIf(bool(x), "SQL executed too soon!")
+        yield b.commit()
+
+        # Now that 'b' has committed, 'c' should be able to complete.
+        [[ccounter, cecho]] = yield enqueue
+
+        # The connection for 'a' ought to be busy, so let's make sure we're
+        # using the one for 'c'.
+        self.assertEquals(ccounter, bcounter)
+
+

Added: CalendarServer/trunk/twext/internet/threadutils.py
===================================================================
--- CalendarServer/trunk/twext/internet/threadutils.py	                        (rev 0)
+++ CalendarServer/trunk/twext/internet/threadutils.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,111 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import sys
+from Queue import Queue
+
+
+from twisted.python.failure import Failure
+from twisted.internet.defer import Deferred
+
+
+_DONE = object()
+
+_STATE_STOPPED = 'STOPPED'
+_STATE_RUNNING = 'RUNNING'
+_STATE_STOPPING = 'STOPPING'
+
+class ThreadHolder(object):
+    """
+    A queue which will hold a reactor threadpool thread open until all of the
+    work in that queue is done.
+    """
+
+    def __init__(self, reactor):
+        self._reactor = reactor
+        self._state = _STATE_STOPPED
+        self._stopper = None
+        self._q = None
+
+
+    def _run(self):
+        """
+        Worker function which runs in a non-reactor thread.
+        """
+        while True:
+            work = self._q.get()
+            if work is _DONE:
+                def finishStopping():
+                    self._state = _STATE_STOPPED
+                    self._q = None
+                    s = self._stopper
+                    self._stopper = None
+                    s.callback(None)
+                self._reactor.callFromThread(finishStopping)
+                return
+            self._oneWorkUnit(*work)
+
+
+    def _oneWorkUnit(self, deferred, instruction):
+        try: 
+            result = instruction()
+        except:
+            etype, evalue, etb = sys.exc_info()
+            def relayFailure():
+                f = Failure(evalue, etype, etb)
+                deferred.errback(f)
+            self._reactor.callFromThread(relayFailure)
+        else:
+            self._reactor.callFromThread(deferred.callback, result)
+
+
+    def submit(self, work):
+        """
+        Submit some work to be run.
+
+        @param work: a 0-argument callable, which will be run in a thread.
+
+        @return: L{Deferred} that fires with the result of L{work}
+        """
+        d = Deferred()
+        self._q.put((d, work))
+        return d
+
+
+    def start(self):
+        """
+        Start this thing, if it's stopped.
+        """
+        if self._state != _STATE_STOPPED:
+            raise RuntimeError("Not stopped.")
+        self._state = _STATE_RUNNING
+        self._q = Queue(0)
+        self._reactor.callInThread(self._run)
+
+
+    def stop(self):
+        """
+        Stop this thing and release its thread, if it's running.
+        """
+        if self._state != _STATE_RUNNING:
+            raise RuntimeError("Not running.")
+        s = self._stopper = Deferred()
+        self._state = _STATE_STOPPING
+        self._q.put(_DONE)
+        return s
+
+
+

Added: CalendarServer/trunk/twext/python/clsprop.py
===================================================================
--- CalendarServer/trunk/twext/python/clsprop.py	                        (rev 0)
+++ CalendarServer/trunk/twext/python/clsprop.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,41 @@
+##
+# Copyright (c) 2011 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+A small utility for defining static class properties.
+"""
+
+class classproperty(object):
+    """
+    Decorator for a method that wants to return a static class property.  The
+    decorated method will only be invoked once, for each class, and that value
+    will be returned for that class.
+    """
+
+    def __init__(self, thunk):
+        self.thunk = thunk
+        self._classcache = {}
+
+
+    def __get__(self, instance, owner):
+        cc = self._classcache
+        if owner in cc:
+            cached = cc[owner]
+        else:
+            cached = self.thunk(owner)
+            cc[owner] = cached
+        return cached
+

Modified: CalendarServer/trunk/twistedcaldav/directory/util.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/directory/util.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/twistedcaldav/directory/util.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -1,3 +1,4 @@
+# -*- test-case-name: twistedcaldav.directory.test.test_util -*-
 ##
 # Copyright (c) 2006-2007 Apple Inc. All rights reserved.
 #
@@ -22,7 +23,7 @@
     "uuidFromName",
 ]
 
-from txdav.idav import AlreadyFinishedError
+from twext.enterprise.ienterprise import AlreadyFinishedError
 
 from uuid import UUID, uuid5
 

Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -170,7 +170,7 @@
     "DBAMPFD"      : 0,    # Internally used by database to tell slave
                            # processes to inherit a file descriptor and use it
                            # as an AMP connection over a UNIX socket; see
-                           # txdav.base.datastore.asyncsqlpool.
+                           # twext.enterprise.adbapi2.ConnectionPoolConnection
 
     "SharedConnectionPool" : False, # Use a shared database connection pool in
                                     # the master process, rather than having

Modified: CalendarServer/trunk/twistedcaldav/test/test_wrapping.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_wrapping.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/twistedcaldav/test/test_wrapping.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -22,7 +22,7 @@
 from twext.web2.server import Request
 from twext.web2.responsecode import UNAUTHORIZED
 from twext.web2.http_headers import Headers
-from txdav.idav import AlreadyFinishedError, IDataStore
+from twext.enterprise.ienterprise import AlreadyFinishedError
 
 from twext.web2.dav import davxml
 from twistedcaldav.config import config
@@ -37,6 +37,7 @@
 
 from twistedcaldav.test.util import TestCase
 
+from txdav.idav import IDataStore
 from txdav.caldav.datastore.test.test_file import event4_text
 
 from txdav.carddav.datastore.test.test_file import vcard4_text

Deleted: CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -1,653 +0,0 @@
-# -*- test-case-name: txdav.caldav.datastore -*-
-##
-# Copyright (c) 2010 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-"""
-Asynchronous multi-process connection pool.
-"""
-
-import sys
-
-from cStringIO import StringIO
-from cPickle import dumps, loads
-from itertools import count
-
-from zope.interface import implements
-
-from twisted.internet.defer import inlineCallbacks
-from twisted.internet.defer import returnValue
-from txdav.idav import IAsyncTransaction
-from twisted.internet.defer import Deferred
-from twisted.protocols.amp import Boolean
-from twisted.python.failure import Failure
-from twisted.protocols.amp import Argument, String, Command, AMP, Integer
-from twisted.internet import reactor as _reactor
-from twisted.application.service import Service
-from txdav.base.datastore.threadutils import ThreadHolder
-from txdav.idav import AlreadyFinishedError
-from twisted.python import log
-from twisted.internet.defer import maybeDeferred
-from twisted.python.components import proxyForInterface
-
-
-
-class BaseSqlTxn(object):
-    """
-    L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
-    current process.
-    """
-    implements(IAsyncTransaction)
-
-    def __init__(self, connectionFactory, reactor=_reactor):
-        """
-        @param connectionFactory: A 0-argument callable which returns a DB-API
-            2.0 connection.
-        """
-        self._completed = False
-        self._cursor = None
-        self._holder = ThreadHolder(reactor)
-        self._holder.start()
-
-        def initCursor():
-            # support threadlevel=1; we can't necessarily cursor() in a
-            # different thread than we do transactions in.
-
-            # TODO: Re-try connect when it fails.  Specify a timeout.  That
-            # should happen in this layer because we need to be able to stop
-            # the reconnect attempt if it's hanging.
-            self._connection = connectionFactory()
-            self._cursor = self._connection.cursor()
-
-        # Note: no locking necessary here; since this gets submitted first, all
-        # subsequent submitted work-units will be in line behind it and the
-        # cursor will already have been initialized.
-        self._holder.submit(initCursor).addErrback(log.err)
-
-
-    def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
-        if args is None:
-            args = []
-        self._cursor.execute(sql, args)
-        if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
-            raise raiseOnZeroRowCount()
-        if self._cursor.description:
-            return self._cursor.fetchall()
-        else:
-            return None
-
-
-    noisy = False
-
-    def execSQL(self, *args, **kw):
-        result = self._holder.submit(
-            lambda : self._reallyExecSQL(*args, **kw)
-        )
-        if self.noisy:
-            def reportResult(results):
-                sys.stdout.write("\n".join([
-                    "",
-                    "SQL: %r %r" % (args, kw),
-                    "Results: %r" % (results,),
-                    "",
-                    ]))
-                return results
-            result.addBoth(reportResult)
-        return result
-
-
-    def commit(self):
-        if not self._completed:
-            self._completed = True
-            def reallyCommit():
-                self._connection.commit()
-            result = self._holder.submit(reallyCommit)
-            return result
-        else:
-            raise AlreadyFinishedError()
-
-
-    def abort(self):
-        if not self._completed:
-            self._completed = True
-            def reallyAbort():
-                self._connection.rollback()
-            result = self._holder.submit(reallyAbort)
-            return result
-        else:
-            raise AlreadyFinishedError()
-
-
-    def __del__(self):
-        if not self._completed:
-            print 'BaseSqlTxn.__del__: OK'
-            self.abort()
-
-
-    def reset(self):
-        """
-        Call this when placing this transaction back into the pool.
-
-        @raise RuntimeError: if the transaction has not been committed or
-            aborted.
-        """
-        if not self._completed:
-            raise RuntimeError("Attempt to re-set active transaction.")
-        self._completed = False
-
-
-    def stop(self):
-        """
-        Release the thread and database connection associated with this
-        transaction.
-        """
-        self._completed = True
-        self._stopped = True
-        holder = self._holder
-        self._holder = None
-        holder.submit(self._connection.close)
-        return holder.stop()
-
-
-
-class SpooledTxn(object):
-    """
-    A L{SpooledTxn} is an implementation of L{IAsyncTransaction} which cannot
-    yet actually execute anything, so it spools SQL reqeusts for later
-    execution.  When a L{BaseSqlTxn} becomes available later, it can be
-    unspooled onto that.
-    """
-
-    implements(IAsyncTransaction)
-
-    def __init__(self):
-        self._spool = []
-
-
-    def _enspool(self, cmd, a=(), kw={}):
-        d = Deferred()
-        self._spool.append((d, cmd, a, kw))
-        return d
-
-
-    def _iterDestruct(self):
-        """
-        Iterate the spool list destructively, while popping items from the
-        beginning.  This allows code which executes more SQL in the callback of
-        a Deferred to not interfere with the originally submitted order of
-        commands.
-        """
-        while self._spool:
-            yield self._spool.pop(0)
-
-
-    def _unspool(self, other):
-        """
-        Unspool this transaction onto another transaction.
-
-        @param other: another provider of L{IAsyncTransaction} which will
-            actually execute the SQL statements we have been buffering.
-        """
-        for (d, cmd, a, kw) in self._iterDestruct():
-            self._relayCommand(other, d, cmd, a, kw)
-
-
-    def _relayCommand(self, other, d, cmd, a, kw):
-        """
-        Relay a single command to another transaction.
-        """
-        maybeDeferred(getattr(other, cmd), *a, **kw).chainDeferred(d)
-
-
-    def execSQL(self, *a, **kw):
-        return self._enspool('execSQL', a, kw)
-
-
-    def commit(self):
-        return self._enspool('commit')
-
-
-    def abort(self):
-        return self._enspool('abort')
-
-
-
-class PooledSqlTxn(proxyForInterface(iface=IAsyncTransaction,
-                                     originalAttribute='_baseTxn')):
-    """
-    This is a temporary throw-away wrapper for the longer-lived BaseSqlTxn, so
-    that if a badly-behaved API client accidentally hangs on to one of these
-    and, for example C{.abort()}s it multiple times once another client is
-    using that connection, it will get some harmless tracebacks.
-    """
-
-    def __init__(self, pool, baseTxn):
-        self._pool     = pool
-        self._baseTxn  = baseTxn
-        self._complete = False
-
-
-    def execSQL(self, *a, **kw):
-        self._checkComplete()
-        return super(PooledSqlTxn, self).execSQL(*a, **kw)
-
-
-    def commit(self):
-        self._markComplete()
-        return self._repoolAfter(super(PooledSqlTxn, self).commit())
-
-
-    def abort(self):
-        self._markComplete()
-        return self._repoolAfter(super(PooledSqlTxn, self).abort())
-
-
-    def _checkComplete(self):
-        """
-        If the transaction is complete, raise L{AlreadyFinishedError}
-        """
-        if self._complete:
-            raise AlreadyFinishedError()
-
-
-    def _markComplete(self):
-        """
-        Mark the transaction as complete, raising AlreadyFinishedError.
-        """
-        self._checkComplete()
-        self._complete = True
-
-
-    def _repoolAfter(self, d):
-        def repool(result):
-            self._pool.reclaim(self)
-            return result
-        return d.addCallback(repool)
-
-
-
-class ConnectionPool(Service, object):
-    """
-    This is a central service that has a threadpool and executes SQL statements
-    asynchronously, in a pool.
-
-    @ivar connectionFactory: a 0-or-1-argument callable that returns a DB-API
-        connection.  The optional argument can be used as a label for
-        diagnostic purposes.
-
-    @ivar maxConnections: The connection pool will not attempt to make more
-        than this many concurrent connections to the database.
-
-    @type maxConnections: C{int}
-    """
-
-    reactor = _reactor
-
-    def __init__(self, connectionFactory, maxConnections=10):
-        super(ConnectionPool, self).__init__()
-        self.free = []
-        self.busy = []
-        self.waiting = []
-        self.connectionFactory = connectionFactory
-        self.maxConnections = maxConnections
-
-
-    def startService(self):
-        """
-        No startup necessary.
-        """
-
-
-    @inlineCallbacks
-    def stopService(self):
-        """
-        Forcibly abort any outstanding transactions.
-        """
-        for busy in self.busy[:]:
-            try:
-                yield busy.abort()
-            except:
-                log.err()
-        # all transactions should now be in the free list, since 'abort()' will
-        # have put them there.
-        for free in self.free:
-            yield free.stop()
-
-
-    def connection(self, label="<unlabeled>"):
-        """
-        Find a transaction; either retrieve a free one from the list or
-        allocate a new one if no free ones are available.
-
-        @return: an L{IAsyncTransaction}
-        """
-
-        overload = False
-        if self.free:
-            basetxn = self.free.pop(0)
-        elif len(self.busy) < self.maxConnections:
-            basetxn = BaseSqlTxn(
-                connectionFactory=self.connectionFactory,
-                reactor=self.reactor
-            )
-        else:
-            basetxn = SpooledTxn()
-            overload = True
-        txn = PooledSqlTxn(self, basetxn)
-        if overload:
-            self.waiting.append(txn)
-        else:
-            self.busy.append(txn)
-        return txn
-
-
-    def reclaim(self, txn):
-        """
-        Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
-        BaseSqlTxn into the free list.
-        """
-        baseTxn = txn._baseTxn
-        baseTxn.reset()
-        self.busy.remove(txn)
-        if self.waiting:
-            waiting = self.waiting.pop(0)
-            waiting._baseTxn._unspool(baseTxn)
-            # Note: although commit() may already have been called, we don't
-            # have to handle it specially here.  It only unspools after the
-            # deferred returned by commit() has actually been called, and since
-            # that occurs in a callFromThread, that won't happen until the next
-            # iteration of the mainloop, when the _baseTxn is safely correct.
-            waiting._baseTxn = baseTxn
-            self.busy.append(waiting)
-        else:
-            self.free.append(baseTxn)
-
-
-
-def txnarg():
-    return [('transactionID', Integer())]
-
-
-CHUNK_MAX = 0xffff
-
-class BigArgument(Argument):
-    """
-    An argument whose payload can be larger than L{CHUNK_MAX}, by splitting
-    across multiple AMP keys.
-    """
-    def fromBox(self, name, strings, objects, proto):
-        value = StringIO()
-        for counter in count():
-            chunk = strings.get("%s.%d" % (name, counter))
-            if chunk is None:
-                break
-            value.write(chunk)
-        objects[name] = self.fromString(value.getvalue())
-
-
-    def toBox(self, name, strings, objects, proto):
-        value = StringIO(self.toString(objects[name]))
-        for counter in count():
-            nextChunk = value.read(CHUNK_MAX)
-            if not nextChunk:
-                break
-            strings["%s.%d" % (name, counter)] = nextChunk
-
-
-
-class Pickle(BigArgument):
-    """
-    A pickle sent over AMP.  This is to serialize the 'args' argument to
-    C{execSQL}, which is the dynamically-typed 'args' list argument to a DB-API
-    C{execute} function, as well as its dynamically-typed result ('rows').
-
-    This should be cleaned up into a nicer structure, but this is not a network
-    protocol, so we can be a little relaxed about security.
-
-    This is a L{BigArgument} rather than a regular L{Argument} because
-    individual arguments and query results need to contain entire vCard or
-    iCalendar documents, which can easily be greater than 64k.
-    """
-
-    def toString(self, inObject):
-        return dumps(inObject)
-
-    def fromString(self, inString):
-        return loads(inString)
-
-
-
-
-class StartTxn(Command):
-    """
-    Start a transaction, identified with an ID generated by the client.
-    """
-    arguments = txnarg()
-
-
-
-class ExecSQL(Command):
-    """
-    Execute an SQL statement.
-    """
-    arguments = [('sql', String()),
-                 ('queryID', String()),
-                 ('args', Pickle())] + txnarg()
-
-
-
-class Row(Command):
-    """
-    A row has been returned.  Sent from server to client in response to
-    L{ExecSQL}.
-    """
-
-    arguments = [('queryID', String()),
-                 ('row', Pickle())]
-
-
-
-class QueryComplete(Command):
-    """
-    A query issued with ExecSQL is complete.
-    """
-
-    arguments = [('queryID', String()),
-                 ('norows', Boolean())]
-
-
-
-class Commit(Command):
-    arguments = txnarg()
-
-
-
-class Abort(Command):
-    arguments = txnarg()
-
-
-
-class _NoRows(Exception):
-    """
-    Placeholder exception to report zero rows.
-    """
-
-
-class ConnectionPoolConnection(AMP):
-    """
-    A L{ConnectionPoolConnection} is a single connection to a
-    L{ConnectionPool}.
-    """
-
-    def __init__(self, pool):
-        """
-        Initialize a mapping of transaction IDs to transaction objects.
-        """
-        super(ConnectionPoolConnection, self).__init__()
-        self.pool = pool
-        self._txns = {}
-
-
-    @StartTxn.responder
-    def start(self, transactionID):
-        self._txns[transactionID] = self.pool.connection()
-        return {}
-
-
-    @ExecSQL.responder
-    @inlineCallbacks
-    def receivedSQL(self, transactionID, queryID, sql, args):
-        try:
-            rows = yield self._txns[transactionID].execSQL(sql, args, _NoRows)
-        except _NoRows:
-            norows = True
-        else:
-            norows = False
-            if rows is not None:
-                for row in rows:
-                    # Either this should be yielded or it should be
-                    # requiresAnswer=False
-                    self.callRemote(Row, queryID=queryID, row=row)
-        self.callRemote(QueryComplete, queryID=queryID, norows=norows)
-        returnValue({})
-
-
-    def _complete(self, transactionID, thunk):
-        txn = self._txns.pop(transactionID)
-        return thunk(txn).addCallback(lambda ignored: {})
-
-
-    @Commit.responder
-    def commit(self, transactionID):
-        """
-        Successfully complete the given transaction.
-        """
-        return self._complete(transactionID, lambda x: x.commit())
-
-
-    @Abort.responder
-    def abort(self, transactionID):
-        """
-        Roll back the given transaction.
-        """
-        return self._complete(transactionID, lambda x: x.abort())
-
-
-
-class ConnectionPoolClient(AMP):
-    """
-    A client which can execute SQL.
-    """
-    def __init__(self):
-        super(ConnectionPoolClient, self).__init__()
-        self._nextID = count().next
-        self._txns = {}
-        self._queries = {}
-
-
-    def newTransaction(self):
-        txnid = str(self._nextID())
-        self.callRemote(StartTxn, transactionID=txnid)
-        txn = Transaction(client=self, transactionID=txnid)
-        self._txns[txnid] = txn
-        return txn
-
-
-    @Row.responder
-    def row(self, queryID, row):
-        self._queries[queryID].row(row)
-        return {}
-
-
-    @QueryComplete.responder
-    def complete(self, queryID, norows):
-        self._queries.pop(queryID).done(norows)
-        return {}
-
-
-
-class _Query(object):
-    def __init__(self, raiseOnZeroRowCount):
-        self.results = []
-        self.deferred = Deferred()
-        self.raiseOnZeroRowCount = raiseOnZeroRowCount
-
-
-    def row(self, row):
-        """
-        A row was received.
-        """
-        self.results.append(row)
-
-
-    def done(self, norows):
-        """
-        The query is complete.
-
-        @param norows: A boolean.  True if there were not any rows.
-        """
-        if norows and (self.raiseOnZeroRowCount is not None):
-            exc = self.raiseOnZeroRowCount()
-            self.deferred.errback(Failure(exc))
-        else:
-            self.deferred.callback(self.results)
-
-
-
-
-class Transaction(object):
-    """
-    Async protocol-based transaction implementation.
-    """
-
-    implements(IAsyncTransaction)
-
-    def __init__(self, client, transactionID):
-        """
-        Initialize a transaction with a L{ConnectionPoolClient} and a unique
-        transaction identifier.
-        """
-        self._client = client
-        self._transactionID = transactionID
-        self._completed = False
-
-
-    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
-        if args is None:
-            args = []
-        queryID = str(self._client._nextID())
-        query = self._client._queries[queryID] = _Query(raiseOnZeroRowCount)
-        self._client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args,
-                                transactionID=self._transactionID)
-        return query.deferred
-
-
-    def _complete(self, command):
-        if self._completed:
-            raise AlreadyFinishedError()
-        self._completed = True
-        return self._client.callRemote(
-            command, transactionID=self._transactionID
-            ).addCallback(lambda x: None)
-
-
-    def commit(self):
-        return self._complete(Commit)
-
-
-    def abort(self):
-        return self._complete(Abort)
-
-

Modified: CalendarServer/trunk/txdav/base/datastore/file.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/file.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/base/datastore/file.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -21,8 +21,8 @@
 """
 
 from twext.python.log import LoggingMixIn
+from twext.enterprise.ienterprise import AlreadyFinishedError
 from txdav.idav import IDataStoreResource
-from txdav.idav import AlreadyFinishedError
 from txdav.base.propertystore.base import PropertyName
 
 from twext.web2.dav.element.rfc2518 import GETContentType

Deleted: CalendarServer/trunk/txdav/base/datastore/test/test_asyncsqlpool.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/test/test_asyncsqlpool.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/base/datastore/test/test_asyncsqlpool.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -1,174 +0,0 @@
-# -*- test-case-name: txdav.caldav.datastore -*-
-##
-# Copyright (c) 2010 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-"""
-Tests for L{txdav.base.datastore.asyncsqlpool}.
-"""
-
-from itertools import count
-from twisted.trial.unittest import TestCase
-
-from txdav.base.datastore.asyncsqlpool import ConnectionPool
-from twisted.internet.defer import inlineCallbacks
-
-
-class Child(object):
-    def __init__(self, parent):
-        self.parent = parent
-        self.parent.children.append(self)
-
-    def close(self):
-        self.parent.children.remove(self)
-
-
-
-class Parent(object):
-
-    def __init__(self):
-        self.children = []
-
-
-
-class FakeConnection(Parent, Child):
-    """
-    Fake Stand-in for DB-API 2.0 connection.
-    """
-
-    def __init__(self, factory):
-        """
-        Initialize list of cursors
-        """
-        Parent.__init__(self)
-        Child.__init__(self, factory)
-        self.id = factory.idcounter.next()
-
-
-    @property
-    def cursors(self):
-        "Alias to make tests more readable."
-        return self.children
-
-
-    def cursor(self):
-        return FakeCursor(self)
-
-
-    def commit(self):
-        return
-
-
-    def rollback(self):
-        return
-
-
-
-class FakeCursor(Child):
-    """
-    Fake stand-in for a DB-API 2.0 cursor.
-    """
-    def __init__(self, connection):
-        Child.__init__(self, connection)
-        self.rowcount = 0
-        # not entirely correct, but all we care about is its truth value.
-        self.description = False
-
-
-    @property
-    def connection(self):
-        "Alias to make tests more readable."
-        return self.parent
-
-
-    def execute(self, sql, args=()):
-        self.sql = sql
-        self.description = True
-        self.rowcount = 1
-        return
-
-
-    def fetchall(self):
-        """
-        Just echo the SQL that was executed in the last query.
-        """
-        return [[self.connection.id, self.sql]]
-
-
-
-class ConnectionFactory(Parent):
-    def __init__(self):
-        Parent.__init__(self)
-        self.idcounter = count(1)
-
-    @property
-    def connections(self):
-        "Alias to make tests more readable."
-        return self.children
-
-
-    def connect(self):
-        return FakeConnection(self)
-
-
-
-class ConnectionPoolTests(TestCase):
-
-    @inlineCallbacks
-    def test_tooManyConnections(self):
-        """
-        When the number of outstanding busy transactions exceeds the number of
-        slots specified by L{ConnectionPool.maxConnections},
-        L{ConnectionPool.connection} will return a L{PooledSqlTxn} that is not
-        backed by any L{BaseSqlTxn}; this object will queue its SQL statements
-        until an existing connection becomes available.
-        """
-        cf = ConnectionFactory()
-        cp = ConnectionPool(cf.connect, maxConnections=2)
-        cp.startService()
-        self.addCleanup(cp.stopService)
-        a = cp.connection()
-        [[counter, echo]] = yield a.execSQL("alpha")
-        b = cp.connection()
-        [[bcounter, becho]] = yield b.execSQL("beta")
-
-        # both 'a' and 'b' are holding open a connection now; let's try to open
-        # a third one.  (The ordering will be deterministic even if this fails,
-        # because those threads are already busy.)
-        c = cp.connection()
-        enqueue = c.execSQL("gamma")
-        x = []
-        def addtox(it):
-            x.append(it)
-            return it
-        enqueue.addCallback(addtox)
-
-        # Did 'c' open a connection?  Let's hope not...
-        self.assertEquals(len(cf.connections), 2)
-        # This assertion is _not_ deterministic, unfortunately; it's unlikely
-        # that the implementation could be adjusted such that this assertion
-        # would fail and the others would succeed.  However, if it does fail,
-        # that's really bad, so I am leaving it regardless.
-        self.failIf(bool(x), "SQL executed too soon!")
-        yield b.commit()
-
-        # Now that 'b' has committed, 'c' should be able to complete.
-        [[ccounter, cecho]] = yield enqueue
-
-        # The connection for 'a' ought to be busy, so let's make sure we're
-        # using the one for 'c'.
-        self.assertEquals(ccounter, bcounter)
-
-

Deleted: CalendarServer/trunk/txdav/base/datastore/threadutils.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/threadutils.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/base/datastore/threadutils.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -1,111 +0,0 @@
-##
-# Copyright (c) 2010 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-import sys
-from Queue import Queue
-
-
-from twisted.python.failure import Failure
-from twisted.internet.defer import Deferred
-
-
-_DONE = object()
-
-_STATE_STOPPED = 'STOPPED'
-_STATE_RUNNING = 'RUNNING'
-_STATE_STOPPING = 'STOPPING'
-
-class ThreadHolder(object):
-    """
-    A queue which will hold a reactor threadpool thread open until all of the
-    work in that queue is done.
-    """
-
-    def __init__(self, reactor):
-        self._reactor = reactor
-        self._state = _STATE_STOPPED
-        self._stopper = None
-        self._q = None
-
-
-    def _run(self):
-        """
-        Worker function which runs in a non-reactor thread.
-        """
-        while True:
-            work = self._q.get()
-            if work is _DONE:
-                def finishStopping():
-                    self._state = _STATE_STOPPED
-                    self._q = None
-                    s = self._stopper
-                    self._stopper = None
-                    s.callback(None)
-                self._reactor.callFromThread(finishStopping)
-                return
-            self._oneWorkUnit(*work)
-
-
-    def _oneWorkUnit(self, deferred, instruction):
-        try: 
-            result = instruction()
-        except:
-            etype, evalue, etb = sys.exc_info()
-            def relayFailure():
-                f = Failure(evalue, etype, etb)
-                deferred.errback(f)
-            self._reactor.callFromThread(relayFailure)
-        else:
-            self._reactor.callFromThread(deferred.callback, result)
-
-
-    def submit(self, work):
-        """
-        Submit some work to be run.
-
-        @param work: a 0-argument callable, which will be run in a thread.
-
-        @return: L{Deferred} that fires with the result of L{work}
-        """
-        d = Deferred()
-        self._q.put((d, work))
-        return d
-
-
-    def start(self):
-        """
-        Start this thing, if it's stopped.
-        """
-        if self._state != _STATE_STOPPED:
-            raise RuntimeError("Not stopped.")
-        self._state = _STATE_RUNNING
-        self._q = Queue(0)
-        self._reactor.callInThread(self._run)
-
-
-    def stop(self):
-        """
-        Stop this thing and release its thread, if it's running.
-        """
-        if self._state != _STATE_RUNNING:
-            raise RuntimeError("Not running.")
-        s = self._stopper = Deferred()
-        self._state = _STATE_STOPPING
-        self._q.put(_DONE)
-        return s
-
-
-

Modified: CalendarServer/trunk/txdav/caldav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/sql.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/caldav/datastore/sql.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -56,7 +56,7 @@
     _ATTACHMENTS_MODE_NONE, _ATTACHMENTS_MODE_READ, _ATTACHMENTS_MODE_WRITE,\
     CALENDAR_HOME_TABLE, CALENDAR_HOME_METADATA_TABLE,\
     CALENDAR_AND_CALENDAR_BIND, CALENDAR_OBJECT_REVISIONS_AND_BIND_TABLE,\
-    CALENDAR_OBJECT_AND_BIND_TABLE
+    CALENDAR_OBJECT_AND_BIND_TABLE, schema
 from txdav.common.icommondatastore import IndexedSearchException
 
 from vobject.icalendar import utc
@@ -175,6 +175,7 @@
     """
     implements(ICalendar)
 
+    _bindSchema = schema.CALENDAR_BIND
     _bindTable = CALENDAR_BIND_TABLE
     _homeChildTable = CALENDAR_TABLE
     _homeChildBindTable = CALENDAR_AND_CALENDAR_BIND

Modified: CalendarServer/trunk/txdav/caldav/datastore/test/common.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/test/common.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/caldav/datastore/test/common.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -23,9 +23,16 @@
     maybeDeferred
 from twisted.internet.protocol import Protocol
 
-from txdav.idav import IPropertyStore, IDataStore, AlreadyFinishedError
+from twext.enterprise.ienterprise import AlreadyFinishedError
+
+from twext.python.filepath import CachingFilePath as FilePath
+from twext.web2.dav import davxml
+from twext.web2.http_headers import MimeType
+from twext.web2.dav.element.base import WebDAVUnknownElement
+from twext.python.vcomponent import VComponent
+
+from txdav.idav import IPropertyStore, IDataStore
 from txdav.base.propertystore.base import PropertyName
-
 from txdav.common.icommondatastore import HomeChildNameAlreadyExistsError, \
     ICommonTransaction
 from txdav.common.icommondatastore import InvalidObjectResourceError
@@ -39,11 +46,6 @@
     ICalendarObject, ICalendarHome,
     ICalendar, IAttachment, ICalendarTransaction)
 
-from twext.python.filepath import CachingFilePath as FilePath
-from twext.web2.dav import davxml
-from twext.web2.http_headers import MimeType
-from twext.web2.dav.element.base import WebDAVUnknownElement
-from twext.python.vcomponent import VComponent
 
 from twistedcaldav.customxml import InviteNotification, InviteSummary
 from twistedcaldav.ical import Component

Modified: CalendarServer/trunk/txdav/carddav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/carddav/datastore/sql.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/carddav/datastore/sql.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -50,8 +50,8 @@
     ADDRESSBOOK_BIND_TABLE, ADDRESSBOOK_OBJECT_REVISIONS_TABLE,\
     ADDRESSBOOK_OBJECT_TABLE, ADDRESSBOOK_HOME_TABLE,\
     ADDRESSBOOK_HOME_METADATA_TABLE, ADDRESSBOOK_AND_ADDRESSBOOK_BIND,\
-    ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE,\
-    ADDRESSBOOK_OBJECT_AND_BIND_TABLE
+    ADDRESSBOOK_OBJECT_AND_BIND_TABLE, \
+    ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE, schema
 from txdav.base.propertystore.base import PropertyName
 
 
@@ -94,6 +94,7 @@
     """
     implements(IAddressBook)
 
+    _bindSchema = schema.ADDRESSBOOK_BIND
     _bindTable = ADDRESSBOOK_BIND_TABLE
     _homeChildTable = ADDRESSBOOK_TABLE
     _homeChildBindTable = ADDRESSBOOK_AND_ADDRESSBOOK_BIND

Modified: CalendarServer/trunk/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/common/datastore/sql.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -57,6 +57,10 @@
 from txdav.common.inotifications import INotificationCollection, \
     INotificationObject
 
+from twext.enterprise.dal.syntax import Parameter
+from twext.python.clsprop import classproperty
+from twext.enterprise.dal.syntax import Select
+
 from txdav.base.propertystore.sql import PropertyStore
 from txdav.base.propertystore.base import PropertyName
 
@@ -211,6 +215,7 @@
         CommonStoreTransaction._homeClass[ECALENDARTYPE] = CalendarHome
         CommonStoreTransaction._homeClass[EADDRESSBOOKTYPE] = AddressBookHome
         self._sqlTxn = sqlTxn
+        self.paramstyle = sqlTxn.paramstyle
 
 
     def store(self):
@@ -889,7 +894,7 @@
                 ]
             ))
             revisions = dict(revisions)
-        
+
         # Create the actual objects merging in properties
         for resource_id, resource_name, created, modified in dataRows:
             child = cls(home, resource_name, resource_id, owned)
@@ -898,10 +903,48 @@
             child._syncTokenRevision = revisions[resource_id]
             yield child._loadPropertyStore(propertyStores.get(resource_id, None))
             results.append(child)
-        
         returnValue(results)
 
+
     @classmethod
+    def _objectResourceLookup(cls, ownedPart):
+        """
+        Common portions of C{_ownedResourceIDByName}
+        C{_resourceIDSharedToHomeByName}, except for the 'owned' fragment of the
+        Where clause, supplied as an argument.
+        """
+        bind = cls._bindSchema
+        return Select(
+            [bind.RESOURCE_ID],
+            From=bind,
+            Where=(bind.RESOURCE_NAME == Parameter('objectName')).And(
+                   bind.HOME_RESOURCE_ID == Parameter('homeID')).And(
+                    ownedPart))
+
+
+    @classproperty
+    def _resourceIDOwnedByHomeByName(cls):
+        """
+        DAL query to look up an object resource ID owned by a home, given a
+        resource name (C{objectName}), and a home resource ID
+        (C{homeID}).
+        """
+        return cls._objectResourceLookup(
+            cls._bindSchema.BIND_MODE == _BIND_MODE_OWN)
+
+
+    @classproperty
+    def _resourceIDSharedToHomeByName(cls):
+        """
+        DAL query to look up an object resource ID shared to a home, given a
+        resource name (C{objectName}), and a home resource ID
+        (C{homeID}).
+        """
+        return cls._objectResourceLookup(
+            cls._bindSchema.BIND_MODE != _BIND_MODE_OWN)
+
+
+    @classmethod
     @inlineCallbacks
     def objectWithName(cls, home, name, owned):
         """
@@ -911,39 +954,15 @@
         @param home: a L{CommonHome}.
         @param name: a string.
         @param owned: a boolean - whether or not to get a shared child
-        @return: an L{CommonHomChild} or C{None} if no such child
+        @return: an L{CommonHomeChild} or C{None} if no such child
             exists.
         """
-
         if owned:
-            data = yield home._txn.execSQL("""
-                select %(column_RESOURCE_ID)s from %(name)s
-                where
-                  %(column_RESOURCE_NAME)s = %%s and
-                  %(column_HOME_RESOURCE_ID)s = %%s and
-                  %(column_BIND_MODE)s = %%s
-                """ % cls._bindTable,
-                [
-                    name,
-                    home._resourceID,
-                    _BIND_MODE_OWN
-                ]
-            )
+            query = cls._resourceIDOwnedByHomeByName
         else:
-            data = yield home._txn.execSQL("""
-                select %(column_RESOURCE_ID)s from %(name)s
-                where
-                  %(column_RESOURCE_NAME)s = %%s and
-                  %(column_HOME_RESOURCE_ID)s = %%s and
-                  %(column_BIND_MODE)s != %%s
-                """ % cls._bindTable,
-                [
-                    name,
-                    home._resourceID,
-                    _BIND_MODE_OWN
-                ]
-            )
-
+            query = cls._resourceIDSharedToHomeByName
+        data = yield query.on(home._txn,
+                              objectName=name, homeID=home._resourceID)
         if not data:
             returnValue(None)
         resourceID = data[0][0]
@@ -951,6 +970,7 @@
         yield child.initFromStore()
         returnValue(child)
 
+
     @classmethod
     @inlineCallbacks
     def objectWithID(cls, home, resourceID):

Modified: CalendarServer/trunk/txdav/common/datastore/sql_tables.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_tables.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/common/datastore/sql_tables.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -19,174 +19,177 @@
 SQL Table definitions.
 """
 
-CALENDAR_HOME_TABLE = {
-    "name"                    : "CALENDAR_HOME",
-    "column_RESOURCE_ID"      : "RESOURCE_ID",
-    "column_OWNER_UID"        : "OWNER_UID",
-}
+from twisted.python.modules import getModule
+from twext.enterprise.dal.syntax import SchemaSyntax
+from twext.enterprise.dal.parseschema import schemaFromPath
 
-CALENDAR_HOME_METADATA_TABLE = {
-    "name"                    : "CALENDAR_HOME_METADATA",
-    "column_RESOURCE_ID"      : "RESOURCE_ID",
-    "column_QUOTA_USED_BYTES" : "QUOTA_USED_BYTES",
-}
 
-ADDRESSBOOK_HOME_TABLE = {
-    "name"                    : "ADDRESSBOOK_HOME",
-    "column_RESOURCE_ID"      : "RESOURCE_ID",
-    "column_OWNER_UID"        : "OWNER_UID",
-}
 
-ADDRESSBOOK_HOME_METADATA_TABLE = {
-    "name"                    : "ADDRESSBOOK_HOME_METADATA",
-    "column_RESOURCE_ID"      : "RESOURCE_ID",
-    "column_QUOTA_USED_BYTES" : "QUOTA_USED_BYTES",
-}
+def _populateSchema():
+    """
+    Generate the global L{SchemaSyntax}.
+    """
+    pathObj = getModule(__name__).filePath.sibling("sql_schema_v1.sql")
+    return SchemaSyntax(schemaFromPath(pathObj))
 
-NOTIFICATION_HOME_TABLE = {
-    "name"               : "NOTIFICATION_HOME",
-    "column_RESOURCE_ID" : "RESOURCE_ID",
-    "column_OWNER_UID"   : "OWNER_UID",
-}
 
-CALENDAR_TABLE = {
-    "name"               : "CALENDAR",
-    "column_RESOURCE_ID" : "RESOURCE_ID",
-    "column_CREATED"     : "CREATED",
-    "column_MODIFIED"    : "MODIFIED",
-}
 
-ADDRESSBOOK_TABLE = {
-    "name"               : "ADDRESSBOOK",
-    "column_RESOURCE_ID" : "RESOURCE_ID",
-    "column_CREATED"     : "CREATED",
-    "column_MODIFIED"    : "MODIFIED",
-}
+schema = _populateSchema()
 
-CALENDAR_BIND_TABLE = {
-    "name"                    : "CALENDAR_BIND",
-    "column_HOME_RESOURCE_ID" : "CALENDAR_HOME_RESOURCE_ID",
-    "column_RESOURCE_ID"      : "CALENDAR_RESOURCE_ID",
-    "column_RESOURCE_NAME"    : "CALENDAR_RESOURCE_NAME",
-    "column_BIND_MODE"        : "BIND_MODE",
-    "column_BIND_STATUS"      : "BIND_STATUS",
-    "column_SEEN_BY_OWNER"    : "SEEN_BY_OWNER",
-    "column_SEEN_BY_SHAREE"   : "SEEN_BY_SHAREE",
-    "column_MESSAGE"          : "MESSAGE",
-}
 
-ADDRESSBOOK_BIND_TABLE = {
-    "name"                    : "ADDRESSBOOK_BIND",
-    "column_HOME_RESOURCE_ID" : "ADDRESSBOOK_HOME_RESOURCE_ID",
-    "column_RESOURCE_ID"      : "ADDRESSBOOK_RESOURCE_ID",
-    "column_RESOURCE_NAME"    : "ADDRESSBOOK_RESOURCE_NAME",
-    "column_BIND_MODE"        : "BIND_MODE",
-    "column_BIND_STATUS"      : "BIND_STATUS",
-    "column_SEEN_BY_OWNER"    : "SEEN_BY_OWNER",
-    "column_SEEN_BY_SHAREE"   : "SEEN_BY_SHAREE",
-    "column_MESSAGE"          : "MESSAGE",
-}
+# Column aliases, defined so that similar tables (such as CALENDAR_OBJECT and
+# ADDRESSBOOK_OBJECT) can be used according to a polymorphic interface.
 
-CALENDAR_OBJECT_REVISIONS_TABLE = {
-    "name"                    : "CALENDAR_OBJECT_REVISIONS",
-    "sequence"                : "REVISION_SEQ",
-    "column_HOME_RESOURCE_ID" : "CALENDAR_HOME_RESOURCE_ID",
-    "column_RESOURCE_ID"      : "CALENDAR_RESOURCE_ID",
-    "column_COLLECTION_NAME"  : "CALENDAR_NAME",
-    "column_RESOURCE_NAME"    : "RESOURCE_NAME",
-    "column_REVISION"         : "REVISION",
-    "column_DELETED"          : "DELETED",
-}
+schema.CALENDAR_BIND.RESOURCE_NAME = \
+    schema.CALENDAR_BIND.CALENDAR_RESOURCE_NAME
+schema.CALENDAR_BIND.RESOURCE_ID = \
+    schema.CALENDAR_BIND.CALENDAR_RESOURCE_ID
+schema.CALENDAR_BIND.HOME_RESOURCE_ID = \
+    schema.CALENDAR_BIND.CALENDAR_HOME_RESOURCE_ID
+schema.ADDRESSBOOK_BIND.RESOURCE_NAME = \
+    schema.ADDRESSBOOK_BIND.ADDRESSBOOK_RESOURCE_NAME
+schema.ADDRESSBOOK_BIND.RESOURCE_ID = \
+    schema.ADDRESSBOOK_BIND.ADDRESSBOOK_RESOURCE_ID
+schema.ADDRESSBOOK_BIND.HOME_RESOURCE_ID = \
+    schema.ADDRESSBOOK_BIND.ADDRESSBOOK_HOME_RESOURCE_ID
+schema.CALENDAR_OBJECT_REVISIONS.RESOURCE_ID = \
+    schema.CALENDAR_OBJECT_REVISIONS.CALENDAR_RESOURCE_ID
+schema.CALENDAR_OBJECT_REVISIONS.HOME_RESOURCE_ID = \
+    schema.CALENDAR_OBJECT_REVISIONS.CALENDAR_HOME_RESOURCE_ID
+schema.CALENDAR_OBJECT_REVISIONS.COLLECTION_NAME = \
+    schema.CALENDAR_OBJECT_REVISIONS.CALENDAR_NAME
+schema.ADDRESSBOOK_OBJECT_REVISIONS.RESOURCE_ID = \
+    schema.ADDRESSBOOK_OBJECT_REVISIONS.ADDRESSBOOK_RESOURCE_ID
+schema.ADDRESSBOOK_OBJECT_REVISIONS.HOME_RESOURCE_ID = \
+    schema.ADDRESSBOOK_OBJECT_REVISIONS.ADDRESSBOOK_HOME_RESOURCE_ID
+schema.ADDRESSBOOK_OBJECT_REVISIONS.COLLECTION_NAME = \
+    schema.ADDRESSBOOK_OBJECT_REVISIONS.ADDRESSBOOK_NAME
+schema.NOTIFICATION_OBJECT_REVISIONS.HOME_RESOURCE_ID = \
+    schema.NOTIFICATION_OBJECT_REVISIONS.NOTIFICATION_HOME_RESOURCE_ID
+schema.CALENDAR_OBJECT.TEXT = \
+    schema.CALENDAR_OBJECT.ICALENDAR_TEXT
+schema.CALENDAR_OBJECT.UID = \
+    schema.CALENDAR_OBJECT.ICALENDAR_UID
+schema.CALENDAR_OBJECT.PARENT_RESOURCE_ID = \
+    schema.CALENDAR_OBJECT.CALENDAR_RESOURCE_ID
+schema.ADDRESSBOOK_OBJECT.TEXT = \
+    schema.ADDRESSBOOK_OBJECT.VCARD_TEXT
+schema.ADDRESSBOOK_OBJECT.UID = \
+    schema.ADDRESSBOOK_OBJECT.VCARD_UID
+schema.ADDRESSBOOK_OBJECT.PARENT_RESOURCE_ID = \
+    schema.ADDRESSBOOK_OBJECT.ADDRESSBOOK_RESOURCE_ID
 
-ADDRESSBOOK_OBJECT_REVISIONS_TABLE = {
-    "name"                    : "ADDRESSBOOK_OBJECT_REVISIONS",
-    "sequence"                : "REVISION_SEQ",
-    "column_HOME_RESOURCE_ID" : "ADDRESSBOOK_HOME_RESOURCE_ID",
-    "column_RESOURCE_ID"      : "ADDRESSBOOK_RESOURCE_ID",
-    "column_COLLECTION_NAME"  : "ADDRESSBOOK_NAME",
-    "column_RESOURCE_NAME"    : "RESOURCE_NAME",
-    "column_REVISION"         : "REVISION",
-    "column_DELETED"          : "DELETED",
-}
 
-NOTIFICATION_OBJECT_REVISIONS_TABLE = {
-    "name"                    : "NOTIFICATION_OBJECT_REVISIONS",
-    "sequence"                : "REVISION_SEQ",
-    "column_HOME_RESOURCE_ID" : "NOTIFICATION_HOME_RESOURCE_ID",
-    "column_RESOURCE_NAME"    : "RESOURCE_NAME",
-    "column_REVISION"         : "REVISION",
-    "column_DELETED"          : "DELETED",
-}
 
-CALENDAR_OBJECT_TABLE = {
-    "name"                      : "CALENDAR_OBJECT",
-    "column_RESOURCE_ID"        : "RESOURCE_ID",
-    "column_PARENT_RESOURCE_ID" : "CALENDAR_RESOURCE_ID",
-    "column_RESOURCE_NAME"      : "RESOURCE_NAME",
-    "column_TEXT"               : "ICALENDAR_TEXT",
-    "column_UID"                : "ICALENDAR_UID",
-    "column_ATTACHMENTS_MODE"   : "ATTACHMENTS_MODE",
-    "column_DROPBOX_ID"         : "DROPBOX_ID",
-    "column_ACCESS"             : "ACCESS",
-    "column_SCHEDULE_OBJECT"    : "SCHEDULE_OBJECT",
-    "column_SCHEDULE_TAG"       : "SCHEDULE_TAG",
-    "column_SCHEDULE_ETAGS"     : "SCHEDULE_ETAGS",
-    "column_PRIVATE_COMMENTS"   : "PRIVATE_COMMENTS",
-    "column_MD5"                : "MD5",
-    "column_CREATED"            : "CREATED",
-    "column_MODIFIED"           : "MODIFIED",
-}
+def _combine(**kw):
+    """
+    Combine two table dictionaries used in a join to produce a single dictionary
+    that can be used in formatting.
+    """
+    result = {}
+    for tableRole, tableDictionary in kw.items():
+        result.update([("%s:%s" % (tableRole, k), v)
+                       for k,v in tableDictionary.items()])
+    return result
 
-ADDRESSBOOK_OBJECT_TABLE = {
-    "name"                      : "ADDRESSBOOK_OBJECT",
-    "column_RESOURCE_ID"        : "RESOURCE_ID",
-    "column_PARENT_RESOURCE_ID" : "ADDRESSBOOK_RESOURCE_ID",
-    "column_RESOURCE_NAME"      : "RESOURCE_NAME",
-    "column_TEXT"               : "VCARD_TEXT",
-    "column_UID"                : "VCARD_UID",
-    "column_MD5"                : "MD5",
-    "column_CREATED"            : "CREATED",
-    "column_MODIFIED"           : "MODIFIED",
-}
 
 
+def _S(tableSyntax):
+    """
+    Construct a dictionary of strings from a L{TableSyntax} for those queries
+    that are still constructed via string interpolation.
+    """
+    result = {}
+    result['name'] = tableSyntax.model.name
+    #pkey = tableSyntax.model.primaryKey
+    #if pkey is not None:
+    #    default = pkey.default
+    #    if isinstance(default, Sequence):
+    #        result['sequence'] = default.name
+    result['sequence'] = schema.model.sequenceNamed('REVISION_SEQ').name
+    for columnSyntax in tableSyntax:
+        result['column_' + columnSyntax.model.name] = columnSyntax.model.name
+    for alias, realColumnSyntax in tableSyntax.aliases().items():
+        result['column_' + alias] = realColumnSyntax.model.name
+    return result
+
+
+
+def _schemaConstants(nameColumn, valueColumn):
+    """
+    Get a constant value from the rows defined in the schema.
+    """
+    def get(name):
+        for row in nameColumn.model.table.schemaRows:
+            if row[nameColumn.model] == name:
+                return row[valueColumn.model]
+    return get
+
+
+
 # Various constants
 
-_BIND_STATUS_INVITED  = 0
-_BIND_STATUS_ACCEPTED = 1
-_BIND_STATUS_DECLINED = 2
-_BIND_STATUS_INVALID  = 3
+_bindStatus = _schemaConstants(
+    schema.CALENDAR_BIND_STATUS.DESCRIPTION,
+    schema.CALENDAR_BIND_STATUS.ID
+)
 
-_ATTACHMENTS_MODE_NONE  = 0
-_ATTACHMENTS_MODE_READ  = 1
-_ATTACHMENTS_MODE_WRITE = 2
+_BIND_STATUS_INVITED  = _bindStatus('invited')
+_BIND_STATUS_ACCEPTED = _bindStatus('accepted')
+_BIND_STATUS_DECLINED = _bindStatus('declined')
+_BIND_STATUS_INVALID  = _bindStatus('invalid')
 
-_BIND_MODE_OWN = 0
-_BIND_MODE_READ = 1
-_BIND_MODE_WRITE = 2
-_BIND_MODE_DIRECT = 3
+_attachmentsMode = _schemaConstants(
+    schema.CALENDAR_OBJECT_ATTACHMENTS_MODE.DESCRIPTION,
+    schema.CALENDAR_OBJECT_ATTACHMENTS_MODE.ID
+)
 
-# Some combined tables used in joins
-CALENDAR_AND_CALENDAR_BIND = {}
-CALENDAR_AND_CALENDAR_BIND.update([("CHILD:%s" % (k,), v) for k,v in CALENDAR_TABLE.items()])
-CALENDAR_AND_CALENDAR_BIND.update([("BIND:%s" % (k,), v) for k,v in CALENDAR_BIND_TABLE.items()])
+_ATTACHMENTS_MODE_NONE  = _attachmentsMode('none')
+_ATTACHMENTS_MODE_READ  = _attachmentsMode('read')
+_ATTACHMENTS_MODE_WRITE = _attachmentsMode('write')
 
-CALENDAR_OBJECT_AND_BIND_TABLE = {}
-CALENDAR_OBJECT_AND_BIND_TABLE.update([("OBJECT:%s" % (k,), v) for k,v in CALENDAR_OBJECT_TABLE.items()])
-CALENDAR_OBJECT_AND_BIND_TABLE.update([("BIND:%s" % (k,), v) for k,v in CALENDAR_BIND_TABLE.items()])
 
-CALENDAR_OBJECT_REVISIONS_AND_BIND_TABLE = {}
-CALENDAR_OBJECT_REVISIONS_AND_BIND_TABLE.update([("REV:%s" % (k,), v) for k,v in CALENDAR_OBJECT_REVISIONS_TABLE.items()])
-CALENDAR_OBJECT_REVISIONS_AND_BIND_TABLE.update([("BIND:%s" % (k,), v) for k,v in CALENDAR_BIND_TABLE.items()])
+_bindMode = _schemaConstants(
+    schema.CALENDAR_BIND_MODE.DESCRIPTION,
+    schema.CALENDAR_BIND_MODE.ID
+)
 
-ADDRESSBOOK_AND_ADDRESSBOOK_BIND = {}
-ADDRESSBOOK_AND_ADDRESSBOOK_BIND.update([("CHILD:%s" % (k,), v) for k,v in ADDRESSBOOK_TABLE.items()])
-ADDRESSBOOK_AND_ADDRESSBOOK_BIND.update([("BIND:%s" % (k,), v) for k,v in ADDRESSBOOK_BIND_TABLE.items()])
 
-ADDRESSBOOK_OBJECT_AND_BIND_TABLE = {}
-ADDRESSBOOK_OBJECT_AND_BIND_TABLE.update([("OBJECT:%s" % (k,), v) for k,v in ADDRESSBOOK_OBJECT_TABLE.items()])
-ADDRESSBOOK_OBJECT_AND_BIND_TABLE.update([("BIND:%s" % (k,), v) for k,v in ADDRESSBOOK_BIND_TABLE.items()])
+_BIND_MODE_OWN = _bindMode('own')
+_BIND_MODE_READ = _bindMode('read')
+_BIND_MODE_WRITE = _bindMode('write')
+_BIND_MODE_DIRECT = _bindMode('direct')
 
-ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE = {}
-ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE.update([("REV:%s" % (k,), v) for k,v in ADDRESSBOOK_OBJECT_REVISIONS_TABLE.items()])
-ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE.update([("BIND:%s" % (k,), v) for k,v in ADDRESSBOOK_BIND_TABLE.items()])
+
+# Compatibility tables for string formatting:
+CALENDAR_HOME_TABLE                 = _S(schema.CALENDAR_HOME)
+CALENDAR_HOME_METADATA_TABLE        = _S(schema.CALENDAR_HOME_METADATA)
+ADDRESSBOOK_HOME_TABLE              = _S(schema.ADDRESSBOOK_HOME)
+ADDRESSBOOK_HOME_METADATA_TABLE     = _S(schema.ADDRESSBOOK_HOME_METADATA)
+NOTIFICATION_HOME_TABLE             = _S(schema.NOTIFICATION_HOME)
+CALENDAR_TABLE                      = _S(schema.CALENDAR)
+ADDRESSBOOK_TABLE                   = _S(schema.ADDRESSBOOK)
+CALENDAR_BIND_TABLE                 = _S(schema.CALENDAR_BIND)
+ADDRESSBOOK_BIND_TABLE              = _S(schema.ADDRESSBOOK_BIND)
+CALENDAR_OBJECT_REVISIONS_TABLE     = _S(schema.CALENDAR_OBJECT_REVISIONS)
+ADDRESSBOOK_OBJECT_REVISIONS_TABLE  = _S(schema.ADDRESSBOOK_OBJECT_REVISIONS)
+NOTIFICATION_OBJECT_REVISIONS_TABLE = _S(schema.NOTIFICATION_OBJECT_REVISIONS)
+CALENDAR_OBJECT_TABLE               = _S(schema.CALENDAR_OBJECT)
+ADDRESSBOOK_OBJECT_TABLE            = _S(schema.ADDRESSBOOK_OBJECT)
+
+# Some combined tables used in join-string-formatting.
+CALENDAR_AND_CALENDAR_BIND = _combine(CHILD=CALENDAR_TABLE,
+                                     BIND=CALENDAR_BIND_TABLE)
+CALENDAR_OBJECT_AND_BIND_TABLE = _combine(OBJECT=CALENDAR_OBJECT_TABLE,
+                                         BIND=CALENDAR_BIND_TABLE)
+CALENDAR_OBJECT_REVISIONS_AND_BIND_TABLE = _combine(
+    REV=CALENDAR_OBJECT_REVISIONS_TABLE,
+    BIND=CALENDAR_BIND_TABLE)
+ADDRESSBOOK_AND_ADDRESSBOOK_BIND = _combine(CHILD=ADDRESSBOOK_TABLE,
+                                           BIND=ADDRESSBOOK_BIND_TABLE)
+ADDRESSBOOK_OBJECT_AND_BIND_TABLE = _combine(OBJECT=ADDRESSBOOK_OBJECT_TABLE,
+                                             BIND=ADDRESSBOOK_BIND_TABLE)
+ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE = _combine(
+    REV=ADDRESSBOOK_OBJECT_REVISIONS_TABLE,
+    BIND=ADDRESSBOOK_BIND_TABLE)
+

Added: CalendarServer/trunk/txdav/common/datastore/test/test_sql_tables.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/test/test_sql_tables.py	                        (rev 0)
+++ CalendarServer/trunk/txdav/common/datastore/test/test_sql_tables.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,38 @@
+# -*- test-case-name: txdav.caldav.datastore.test.test_sql -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for the SQL Table definitions in txdav.common.datastore.sql_tables: sample
+a couple of tables to make sure the schema is adequately parsed.
+
+These aren't unit tests, they're integration tests to verify the behavior tested
+by L{txdav.base.datastore.test.test_parseschema}.
+"""
+
+from txdav.common.datastore.sql_tables import schema
+from twisted.trial.unittest import TestCase
+
+class SampleSomeColumns(TestCase):
+    """
+    Sample some columns from the tables defined by L{schema} and verify that
+    they look correct.
+    """
+
+    def test_addressbookObjectResourceID(self):
+        self.assertEquals(schema.ADDRESSBOOK_OBJECT.RESOURCE_ID.model.name,
+                          "RESOURCE_ID")
+

Modified: CalendarServer/trunk/txdav/common/datastore/test/util.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/test/util.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/common/datastore/test/util.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -37,7 +37,7 @@
 from txdav.base.datastore.subpostgres import PostgresService
 from txdav.base.datastore.dbapiclient import DiagnosticConnectionWrapper
 from txdav.common.icommondatastore import NoSuchHomeChildError
-from txdav.base.datastore.asyncsqlpool import ConnectionPool
+from twext.enterprise.adbapi2 import ConnectionPool
 from twisted.internet.defer import returnValue
 from twistedcaldav.notify import Notifier
 

Modified: CalendarServer/trunk/txdav/idav.py
===================================================================
--- CalendarServer/trunk/txdav/idav.py	2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/idav.py	2011-01-20 07:03:24 UTC (rev 6791)
@@ -21,7 +21,6 @@
 __all__ = [
     "PropertyStoreError",
     "PropertyChangeNotAllowedError",
-    "AlreadyFinishedError",
     "IPropertyName",
     "IPropertyStore",
     "IDataStore",
@@ -52,13 +51,7 @@
 
 
 
-class AlreadyFinishedError(Exception):
-    """
-    The transaction was already completed via an C{abort} or C{commit} and
-    cannot be aborted or committed again.
-    """
 
-
 #
 # Interfaces
 #
@@ -172,53 +165,6 @@
 
 
 
-class IAsyncTransaction(Interface):
-    """
-    Asynchronous execution of SQL.
-
-    Note that there is no {begin()} method; if an L{IAsyncTransaction} exists,
-    it is assumed to have been started.
-    """
-
-    def execSQL(sql, args=(), raiseOnZeroRowCount=None):
-        """
-        Execute some SQL.
-
-        @param sql: an SQL string.
-
-        @type sql: C{str}
-
-        @param args: C{list} of arguments to interpolate into C{sql}.
-
-        @param raiseOnZeroRowCount: a 0-argument callable which returns an
-            exception to raise if the executed SQL does not affect any rows.
-
-        @return: L{Deferred} which fires C{list} of C{tuple}
-
-        @raise: C{raiseOnZeroRowCount} if it was specified and no rows were
-            affected.
-        """
-
-
-    def commit():
-        """
-        Commit changes caused by this transaction.
-
-        @return: L{Deferred} which fires with C{None} upon successful
-            completion of this transaction.
-        """
-
-
-    def abort():
-        """
-        Roll back changes caused by this transaction.
-
-        @return: L{Deferred} which fires with C{None} upon successful
-            rollback of this transaction.
-        """
-
-
-
 class ITransaction(Interface):
     """
     Transaction that can be aborted and either succeeds or fails in
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110119/d9ee2950/attachment-0001.html>


More information about the calendarserver-changes mailing list