[CalendarServer-changes] [6791] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Wed Jan 19 23:03:24 PST 2011
Revision: 6791
http://trac.macosforge.org/projects/calendarserver/changeset/6791
Author: glyph at apple.com
Date: 2011-01-19 23:03:24 -0800 (Wed, 19 Jan 2011)
Log Message:
-----------
New modules to support generating SQL from objects rather than strings.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/tap/caldav.py
CalendarServer/trunk/calendarserver/tap/util.py
CalendarServer/trunk/support/build.sh
CalendarServer/trunk/twistedcaldav/directory/util.py
CalendarServer/trunk/twistedcaldav/stdconfig.py
CalendarServer/trunk/twistedcaldav/test/test_wrapping.py
CalendarServer/trunk/txdav/base/datastore/file.py
CalendarServer/trunk/txdav/caldav/datastore/sql.py
CalendarServer/trunk/txdav/caldav/datastore/test/common.py
CalendarServer/trunk/txdav/carddav/datastore/sql.py
CalendarServer/trunk/txdav/common/datastore/sql.py
CalendarServer/trunk/txdav/common/datastore/sql_tables.py
CalendarServer/trunk/txdav/common/datastore/test/util.py
CalendarServer/trunk/txdav/idav.py
Added Paths:
-----------
CalendarServer/trunk/twext/enterprise/
CalendarServer/trunk/twext/enterprise/__init__.py
CalendarServer/trunk/twext/enterprise/adbapi2.py
CalendarServer/trunk/twext/enterprise/dal/
CalendarServer/trunk/twext/enterprise/dal/__init__.py
CalendarServer/trunk/twext/enterprise/dal/model.py
CalendarServer/trunk/twext/enterprise/dal/parseschema.py
CalendarServer/trunk/twext/enterprise/dal/syntax.py
CalendarServer/trunk/twext/enterprise/dal/test/
CalendarServer/trunk/twext/enterprise/dal/test/__init__.py
CalendarServer/trunk/twext/enterprise/dal/test/test_parseschema.py
CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
CalendarServer/trunk/twext/enterprise/ienterprise.py
CalendarServer/trunk/twext/enterprise/test/
CalendarServer/trunk/twext/enterprise/test/__init__.py
CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
CalendarServer/trunk/twext/internet/threadutils.py
CalendarServer/trunk/twext/python/clsprop.py
CalendarServer/trunk/txdav/common/datastore/test/test_sql_tables.py
Removed Paths:
-------------
CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py
CalendarServer/trunk/txdav/base/datastore/test/test_asyncsqlpool.py
CalendarServer/trunk/txdav/base/datastore/threadutils.py
Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -73,8 +73,8 @@
from calendarserver.tap.util import pgServiceFromConfig
-from txdav.base.datastore.asyncsqlpool import ConnectionPool
-from txdav.base.datastore.asyncsqlpool import ConnectionPoolConnection
+from twext.enterprise.adbapi2 import ConnectionPool
+from twext.enterprise.adbapi2 import ConnectionPoolConnection
try:
from twistedcaldav.authkerb import NegotiateCredentialFactory
@@ -96,7 +96,9 @@
from calendarserver.version import version
version
except ImportError:
- sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "support"))
+ from twisted.python.modules import getModule
+ sys.path.insert(
+ 0, getModule(__name__).pathEntry.filePath.child("support").path)
from version import version as getVersion
version = "%s (%s*)" % getVersion()
twext.web2.server.VERSION = "CalendarServer/%s %s" % (
Modified: CalendarServer/trunk/calendarserver/tap/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/util.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/calendarserver/tap/util.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -68,7 +68,7 @@
except ImportError:
NegotiateCredentialFactory = None
-from txdav.base.datastore.asyncsqlpool import ConnectionPoolClient
+from twext.enterprise.adbapi2 import ConnectionPoolClient
from txdav.base.datastore.dbapiclient import DBAPIConnector
from txdav.base.datastore.dbapiclient import postgresPreflight
from txdav.base.datastore.subpostgres import PostgresService
Modified: CalendarServer/trunk/support/build.sh
===================================================================
--- CalendarServer/trunk/support/build.sh 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/support/build.sh 2011-01-20 07:03:24 UTC (rev 6791)
@@ -570,9 +570,9 @@
# they are useful to developers.
#
- py_dependency -v 0.1.1 -m "000053e0352f5bf19c2f8d5242329ea4" \
- "SQLParse" "sqlparse" "sqlparse-0.1.1" \
- "http://python-sqlparse.googlecode.com/files/sqlparse-0.1.1.tar.gz";
+ py_dependency -v 0.1.2 -m "aa9852ad81822723adcd9f96838de14e" \
+ "SQLParse" "sqlparse" "sqlparse-0.1.2" \
+ "http://python-sqlparse.googlecode.com/files/sqlparse-0.1.2.tar.gz";
py_dependency -v 0.4.0 -m "630a72510aae8758f48cf60e4fa17176" \
"Pyflakes" "pyflakes" "pyflakes-0.4.0" \
Added: CalendarServer/trunk/twext/enterprise/__init__.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/__init__.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/__init__.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,20 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Extensions in the spirit of Twisted's "enterprise" package; things related to
+database connectivity and management.
+"""
Added: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,678 @@
+# -*- test-case-name: twext.enterprise.test. -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Asynchronous multi-process connection pool.
+
+This is similar to L{twisted.enterprise.adbapi}, but can hold a transaction (and
+thereby a thread) open across multiple asynchronous operations, rather than
+forcing the transaction to be completed entirely in a thread and/or entirely in
+a single SQL statement.
+
+Also, this module includes an AMP protocol for multiplexing connections through
+a single choke-point host. This is not currently in use, however, as AMP needs
+some optimization before it can be low-impact enough for this to be an
+improvement.
+"""
+
+import sys
+
+from cStringIO import StringIO
+from cPickle import dumps, loads
+from itertools import count
+
+from zope.interface import implements
+
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import returnValue
+from twisted.internet.defer import Deferred
+from twisted.protocols.amp import Boolean
+from twisted.python.failure import Failure
+from twisted.protocols.amp import Argument, String, Command, AMP, Integer
+from twisted.internet import reactor as _reactor
+from twisted.application.service import Service
+from twisted.python import log
+from twisted.internet.defer import maybeDeferred
+from twisted.python.components import proxyForInterface
+
+from twext.internet.threadutils import ThreadHolder
+from twext.enterprise.ienterprise import AlreadyFinishedError, IAsyncTransaction
+
+
+# FIXME: there should be no default, it should be discovered dynamically
+# everywhere. Right now we're only using pgdb so we only support that.
+
+DEFAULT_PARAM_STYLE = 'pyformat'
+
+class BaseSqlTxn(object):
+ """
+ L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
+ current process.
+ """
+ implements(IAsyncTransaction)
+
+ # FIXME: this should *really* be
+ paramstyle = DEFAULT_PARAM_STYLE
+
+ def __init__(self, connectionFactory, reactor=_reactor):
+ """
+ @param connectionFactory: A 0-argument callable which returns a DB-API
+ 2.0 connection.
+ """
+ self._completed = False
+ self._cursor = None
+ self._holder = ThreadHolder(reactor)
+ self._holder.start()
+
+ def initCursor():
+ # support threadlevel=1; we can't necessarily cursor() in a
+ # different thread than we do transactions in.
+
+ # TODO: Re-try connect when it fails. Specify a timeout. That
+ # should happen in this layer because we need to be able to stop
+ # the reconnect attempt if it's hanging.
+ self._connection = connectionFactory()
+ self._cursor = self._connection.cursor()
+
+ # Note: no locking necessary here; since this gets submitted first, all
+ # subsequent submitted work-units will be in line behind it and the
+ # cursor will already have been initialized.
+ self._holder.submit(initCursor).addErrback(log.err)
+
+
+ def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ if args is None:
+ args = []
+ self._cursor.execute(sql, args)
+ if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
+ raise raiseOnZeroRowCount()
+ if self._cursor.description:
+ return self._cursor.fetchall()
+ else:
+ return None
+
+
+ noisy = False
+
+ def execSQL(self, *args, **kw):
+ result = self._holder.submit(
+ lambda : self._reallyExecSQL(*args, **kw)
+ )
+ if self.noisy:
+ def reportResult(results):
+ sys.stdout.write("\n".join([
+ "",
+ "SQL: %r %r" % (args, kw),
+ "Results: %r" % (results,),
+ "",
+ ]))
+ return results
+ result.addBoth(reportResult)
+ return result
+
+
+ def commit(self):
+ if not self._completed:
+ self._completed = True
+ def reallyCommit():
+ self._connection.commit()
+ result = self._holder.submit(reallyCommit)
+ return result
+ else:
+ raise AlreadyFinishedError()
+
+
+ def abort(self):
+ if not self._completed:
+ self._completed = True
+ def reallyAbort():
+ self._connection.rollback()
+ result = self._holder.submit(reallyAbort)
+ return result
+ else:
+ raise AlreadyFinishedError()
+
+
+ def __del__(self):
+ if not self._completed:
+ print 'BaseSqlTxn.__del__: OK'
+ self.abort()
+
+
+ def reset(self):
+ """
+ Call this when placing this transaction back into the pool.
+
+ @raise RuntimeError: if the transaction has not been committed or
+ aborted.
+ """
+ if not self._completed:
+ raise RuntimeError("Attempt to re-set active transaction.")
+ self._completed = False
+
+
+ def stop(self):
+ """
+ Release the thread and database connection associated with this
+ transaction.
+ """
+ self._completed = True
+ self._stopped = True
+ holder = self._holder
+ self._holder = None
+ holder.submit(self._connection.close)
+ return holder.stop()
+
+
+
+class SpooledTxn(object):
+ """
+ A L{SpooledTxn} is an implementation of L{IAsyncTransaction} which cannot
+ yet actually execute anything, so it spools SQL reqeusts for later
+ execution. When a L{BaseSqlTxn} becomes available later, it can be
+ unspooled onto that.
+ """
+
+ implements(IAsyncTransaction)
+
+ # FIXME: this should be relayed from the connection factory of the thing
+ # creating the spooled transaction.
+
+ paramstyle = DEFAULT_PARAM_STYLE
+
+ def __init__(self):
+ self._spool = []
+
+
+ def _enspool(self, cmd, a=(), kw={}):
+ d = Deferred()
+ self._spool.append((d, cmd, a, kw))
+ return d
+
+
+ def _iterDestruct(self):
+ """
+ Iterate the spool list destructively, while popping items from the
+ beginning. This allows code which executes more SQL in the callback of
+ a Deferred to not interfere with the originally submitted order of
+ commands.
+ """
+ while self._spool:
+ yield self._spool.pop(0)
+
+
+ def _unspool(self, other):
+ """
+ Unspool this transaction onto another transaction.
+
+ @param other: another provider of L{IAsyncTransaction} which will
+ actually execute the SQL statements we have been buffering.
+ """
+ for (d, cmd, a, kw) in self._iterDestruct():
+ self._relayCommand(other, d, cmd, a, kw)
+
+
+ def _relayCommand(self, other, d, cmd, a, kw):
+ """
+ Relay a single command to another transaction.
+ """
+ maybeDeferred(getattr(other, cmd), *a, **kw).chainDeferred(d)
+
+
+ def execSQL(self, *a, **kw):
+ return self._enspool('execSQL', a, kw)
+
+
+ def commit(self):
+ return self._enspool('commit')
+
+
+ def abort(self):
+ return self._enspool('abort')
+
+
+
+class PooledSqlTxn(proxyForInterface(iface=IAsyncTransaction,
+ originalAttribute='_baseTxn')):
+ """
+ This is a temporary throw-away wrapper for the longer-lived BaseSqlTxn, so
+ that if a badly-behaved API client accidentally hangs on to one of these
+ and, for example C{.abort()}s it multiple times once another client is
+ using that connection, it will get some harmless tracebacks.
+ """
+
+ def __init__(self, pool, baseTxn):
+ self._pool = pool
+ self._baseTxn = baseTxn
+ self._complete = False
+
+
+ def execSQL(self, *a, **kw):
+ self._checkComplete()
+ return super(PooledSqlTxn, self).execSQL(*a, **kw)
+
+
+ def commit(self):
+ self._markComplete()
+ return self._repoolAfter(super(PooledSqlTxn, self).commit())
+
+
+ def abort(self):
+ self._markComplete()
+ return self._repoolAfter(super(PooledSqlTxn, self).abort())
+
+
+ def _checkComplete(self):
+ """
+ If the transaction is complete, raise L{AlreadyFinishedError}
+ """
+ if self._complete:
+ raise AlreadyFinishedError()
+
+
+ def _markComplete(self):
+ """
+ Mark the transaction as complete, raising AlreadyFinishedError.
+ """
+ self._checkComplete()
+ self._complete = True
+
+
+ def _repoolAfter(self, d):
+ def repool(result):
+ self._pool.reclaim(self)
+ return result
+ return d.addCallback(repool)
+
+
+
+class ConnectionPool(Service, object):
+ """
+ This is a central service that has a threadpool and executes SQL statements
+ asynchronously, in a pool.
+
+ @ivar connectionFactory: a 0-or-1-argument callable that returns a DB-API
+ connection. The optional argument can be used as a label for
+ diagnostic purposes.
+
+ @ivar maxConnections: The connection pool will not attempt to make more
+ than this many concurrent connections to the database.
+
+ @type maxConnections: C{int}
+ """
+
+ reactor = _reactor
+
+ def __init__(self, connectionFactory, maxConnections=10):
+ super(ConnectionPool, self).__init__()
+ self.free = []
+ self.busy = []
+ self.waiting = []
+ self.connectionFactory = connectionFactory
+ self.maxConnections = maxConnections
+
+
+ def startService(self):
+ """
+ No startup necessary.
+ """
+
+
+ @inlineCallbacks
+ def stopService(self):
+ """
+ Forcibly abort any outstanding transactions.
+ """
+ for busy in self.busy[:]:
+ try:
+ yield busy.abort()
+ except:
+ log.err()
+ # all transactions should now be in the free list, since 'abort()' will
+ # have put them there.
+ for free in self.free:
+ yield free.stop()
+
+
+ def connection(self, label="<unlabeled>"):
+ """
+ Find a transaction; either retrieve a free one from the list or
+ allocate a new one if no free ones are available.
+
+ @return: an L{IAsyncTransaction}
+ """
+
+ overload = False
+ if self.free:
+ basetxn = self.free.pop(0)
+ elif len(self.busy) < self.maxConnections:
+ basetxn = BaseSqlTxn(
+ connectionFactory=self.connectionFactory,
+ reactor=self.reactor
+ )
+ else:
+ basetxn = SpooledTxn()
+ overload = True
+ txn = PooledSqlTxn(self, basetxn)
+ if overload:
+ self.waiting.append(txn)
+ else:
+ self.busy.append(txn)
+ return txn
+
+
+ def reclaim(self, txn):
+ """
+ Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
+ BaseSqlTxn into the free list.
+ """
+ baseTxn = txn._baseTxn
+ baseTxn.reset()
+ self.busy.remove(txn)
+ if self.waiting:
+ waiting = self.waiting.pop(0)
+ waiting._baseTxn._unspool(baseTxn)
+ # Note: although commit() may already have been called, we don't
+ # have to handle it specially here. It only unspools after the
+ # deferred returned by commit() has actually been called, and since
+ # that occurs in a callFromThread, that won't happen until the next
+ # iteration of the mainloop, when the _baseTxn is safely correct.
+ waiting._baseTxn = baseTxn
+ self.busy.append(waiting)
+ else:
+ self.free.append(baseTxn)
+
+
+
+def txnarg():
+ return [('transactionID', Integer())]
+
+
+CHUNK_MAX = 0xffff
+
+class BigArgument(Argument):
+ """
+ An argument whose payload can be larger than L{CHUNK_MAX}, by splitting
+ across multiple AMP keys.
+ """
+ def fromBox(self, name, strings, objects, proto):
+ value = StringIO()
+ for counter in count():
+ chunk = strings.get("%s.%d" % (name, counter))
+ if chunk is None:
+ break
+ value.write(chunk)
+ objects[name] = self.fromString(value.getvalue())
+
+
+ def toBox(self, name, strings, objects, proto):
+ value = StringIO(self.toString(objects[name]))
+ for counter in count():
+ nextChunk = value.read(CHUNK_MAX)
+ if not nextChunk:
+ break
+ strings["%s.%d" % (name, counter)] = nextChunk
+
+
+
+class Pickle(BigArgument):
+ """
+ A pickle sent over AMP. This is to serialize the 'args' argument to
+ C{execSQL}, which is the dynamically-typed 'args' list argument to a DB-API
+ C{execute} function, as well as its dynamically-typed result ('rows').
+
+ This should be cleaned up into a nicer structure, but this is not a network
+ protocol, so we can be a little relaxed about security.
+
+ This is a L{BigArgument} rather than a regular L{Argument} because
+ individual arguments and query results need to contain entire vCard or
+ iCalendar documents, which can easily be greater than 64k.
+ """
+
+ def toString(self, inObject):
+ return dumps(inObject)
+
+ def fromString(self, inString):
+ return loads(inString)
+
+
+
+
+class StartTxn(Command):
+ """
+ Start a transaction, identified with an ID generated by the client.
+ """
+ arguments = txnarg()
+
+
+
+class ExecSQL(Command):
+ """
+ Execute an SQL statement.
+ """
+ arguments = [('sql', String()),
+ ('queryID', String()),
+ ('args', Pickle())] + txnarg()
+
+
+
+class Row(Command):
+ """
+ A row has been returned. Sent from server to client in response to
+ L{ExecSQL}.
+ """
+
+ arguments = [('queryID', String()),
+ ('row', Pickle())]
+
+
+
+class QueryComplete(Command):
+ """
+ A query issued with ExecSQL is complete.
+ """
+
+ arguments = [('queryID', String()),
+ ('norows', Boolean())]
+
+
+
+class Commit(Command):
+ arguments = txnarg()
+
+
+
+class Abort(Command):
+ arguments = txnarg()
+
+
+
+class _NoRows(Exception):
+ """
+ Placeholder exception to report zero rows.
+ """
+
+
+class ConnectionPoolConnection(AMP):
+ """
+ A L{ConnectionPoolConnection} is a single connection to a
+ L{ConnectionPool}.
+ """
+
+ def __init__(self, pool):
+ """
+ Initialize a mapping of transaction IDs to transaction objects.
+ """
+ super(ConnectionPoolConnection, self).__init__()
+ self.pool = pool
+ self._txns = {}
+
+
+ @StartTxn.responder
+ def start(self, transactionID):
+ self._txns[transactionID] = self.pool.connection()
+ return {}
+
+
+ @ExecSQL.responder
+ @inlineCallbacks
+ def receivedSQL(self, transactionID, queryID, sql, args):
+ try:
+ rows = yield self._txns[transactionID].execSQL(sql, args, _NoRows)
+ except _NoRows:
+ norows = True
+ else:
+ norows = False
+ if rows is not None:
+ for row in rows:
+ # Either this should be yielded or it should be
+ # requiresAnswer=False
+ self.callRemote(Row, queryID=queryID, row=row)
+ self.callRemote(QueryComplete, queryID=queryID, norows=norows)
+ returnValue({})
+
+
+ def _complete(self, transactionID, thunk):
+ txn = self._txns.pop(transactionID)
+ return thunk(txn).addCallback(lambda ignored: {})
+
+
+ @Commit.responder
+ def commit(self, transactionID):
+ """
+ Successfully complete the given transaction.
+ """
+ return self._complete(transactionID, lambda x: x.commit())
+
+
+ @Abort.responder
+ def abort(self, transactionID):
+ """
+ Roll back the given transaction.
+ """
+ return self._complete(transactionID, lambda x: x.abort())
+
+
+
+class ConnectionPoolClient(AMP):
+ """
+ A client which can execute SQL.
+ """
+ def __init__(self):
+ super(ConnectionPoolClient, self).__init__()
+ self._nextID = count().next
+ self._txns = {}
+ self._queries = {}
+
+
+ def newTransaction(self):
+ txnid = str(self._nextID())
+ self.callRemote(StartTxn, transactionID=txnid)
+ txn = Transaction(client=self, transactionID=txnid)
+ self._txns[txnid] = txn
+ return txn
+
+
+ @Row.responder
+ def row(self, queryID, row):
+ self._queries[queryID].row(row)
+ return {}
+
+
+ @QueryComplete.responder
+ def complete(self, queryID, norows):
+ self._queries.pop(queryID).done(norows)
+ return {}
+
+
+
+class _Query(object):
+ def __init__(self, raiseOnZeroRowCount):
+ self.results = []
+ self.deferred = Deferred()
+ self.raiseOnZeroRowCount = raiseOnZeroRowCount
+
+
+ def row(self, row):
+ """
+ A row was received.
+ """
+ self.results.append(row)
+
+
+ def done(self, norows):
+ """
+ The query is complete.
+
+ @param norows: A boolean. True if there were not any rows.
+ """
+ if norows and (self.raiseOnZeroRowCount is not None):
+ exc = self.raiseOnZeroRowCount()
+ self.deferred.errback(Failure(exc))
+ else:
+ self.deferred.callback(self.results)
+
+
+
+class Transaction(object):
+ """
+ Async protocol-based transaction implementation.
+ """
+
+ implements(IAsyncTransaction)
+
+ # FIXME: this needs to come from the other end of the wire.
+
+ paramstyle = DEFAULT_PARAM_STYLE
+
+ def __init__(self, client, transactionID):
+ """
+ Initialize a transaction with a L{ConnectionPoolClient} and a unique
+ transaction identifier.
+ """
+ self._client = client
+ self._transactionID = transactionID
+ self._completed = False
+
+
+ def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ if args is None:
+ args = []
+ queryID = str(self._client._nextID())
+ query = self._client._queries[queryID] = _Query(raiseOnZeroRowCount)
+ self._client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args,
+ transactionID=self._transactionID)
+ return query.deferred
+
+
+ def _complete(self, command):
+ if self._completed:
+ raise AlreadyFinishedError()
+ self._completed = True
+ return self._client.callRemote(
+ command, transactionID=self._transactionID
+ ).addCallback(lambda x: None)
+
+
+ def commit(self):
+ return self._complete(Commit)
+
+
+ def abort(self):
+ return self._complete(Abort)
+
+
Added: CalendarServer/trunk/twext/enterprise/dal/__init__.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/__init__.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/__init__.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,27 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Toolkit for building a Data-Access Layer (DAL).
+
+This includes an abstract representation of SQL objects like tables, columns,
+sequences and queries, a parser to convert your schema to that representation,
+and tools for working with it.
+
+In some ways this is similar to the low levels of something like SQLAlchemy, but
+it is designed to be more introspectable, to allow for features like automatic
+caching and index detection. NB: work in progress.
+"""
Added: CalendarServer/trunk/twext/enterprise/dal/model.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/model.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/model.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,363 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_parseschema -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Model classes for SQL.
+"""
+
+class SQLType(object):
+ """
+ A data-type as defined in SQL; like "integer" or "real" or "varchar(255)".
+
+ @ivar name: the name of this type.
+ @type name: C{str}
+
+ @ivar length: the length of this type, if it is a type like 'varchar' or
+ 'character' that comes with a parenthetical length.
+ @type length: C{int} or C{NoneType}
+ """
+
+ def __init__(self, name, length):
+ _checkstr(name)
+ self.name = name
+ self.length = length
+
+
+ def __eq__(self, other):
+ """
+ Compare equal to other L{SQLTypes} with matching name and length.
+ """
+ if not isinstance(other, SQLType):
+ return NotImplemented
+ return (self.name, self.length) == (other.name, other.length)
+
+
+ def __ne__(self, other):
+ """
+ (Inequality is the opposite of equality.)
+ """
+ if not isinstance(other, SQLType):
+ return NotImplemented
+ return not self.__eq__(other)
+
+
+ def __repr__(self):
+ """
+ A useful string representation which includes the name and length if
+ present.
+ """
+ if self.length:
+ lendesc = '(%s)' % (self.length)
+ else:
+ lendesc = ''
+ return '<SQL Type: %r%s>' % (self.name, lendesc)
+
+
+
+class Constraint(object):
+ """
+ A constraint on a set of columns.
+
+ @ivar type: the type of constraint. Currently, only C{'UNIQUE'} and C{'NOT
+ NULL'} are supported.
+ @type type: C{str}
+
+ @ivar affectsColumns: Columns affected by this constraint.
+
+ @type affectsColumns: C{list} of L{Column}
+ """
+
+ # Values for 'type' attribute:
+ NOT_NULL = 'NOT NULL'
+ UNIQUE = 'UNIQUE'
+
+ def __init__(self, type, affectsColumns):
+ self.affectsColumns = affectsColumns
+ # XXX: possibly different constraint types should have different
+ # classes?
+ self.type = type
+
+
+
+class ProcedureCall(object):
+ """
+ An invocation of a stored procedure or built-in function.
+ """
+
+ def __init__(self, name, args):
+ _checkstr(name)
+ self.name = name
+ self.args = args
+
+
+
+class NO_DEFAULT(object):
+ """
+ Placeholder value for not having a default. (C{None} would not be suitable,
+ as that would imply a default of C{NULL}).
+ """
+
+
+def _checkstr(x):
+ """
+ Verify that C{x} is a C{str}. Raise a L{ValueError} if not. This is to
+ prevent pollution with unicode values.
+ """
+ if not isinstance(x, str):
+ raise ValueError("%r is not a str." % (x,))
+
+
+class Column(object):
+ """
+ A column from a table.
+
+ @ivar table: The L{Table} to which this L{Column} belongs.
+ @type table: L{Table}
+
+ @ivar name: The unqualified name of this column. For example, in the case
+ of a column BAR in a table FOO, this would be the string C{'BAR'}.
+ @type name: C{str}
+
+ @ivar type: The declared type of this column.
+ @type type: L{SQLType}
+
+ @ivar references: If this column references a foreign key on another table,
+ this will be a reference to that table; otherwise (normally) C{None}.
+ @type references: L{Table} or C{NoneType}
+ """
+
+ def __init__(self, table, name, type):
+ _checkstr(name)
+ self.table = table
+ self.name = name
+ self.type = type
+ self.default = NO_DEFAULT
+ self.references = None
+
+
+ def __repr__(self):
+ return '<Column (%s %r)>' % (self.name, self.type)
+
+
+ def canBeNull(self):
+ """
+ Can this column ever be C{NULL}, i.e. C{None}? In other words, is it
+ free of any C{NOT NULL} constraints?
+
+ @return: C{True} if so, C{False} if not.
+ """
+ for constraint in self.table.constraints:
+ if self in constraint.affectsColumns:
+ if constraint.type is Constraint.NOT_NULL:
+ return False
+ return True
+
+
+ def setDefaultValue(self, value):
+ """
+ Change the default value of this column. (Should only be called during
+ schema parsing.)
+ """
+ self.default = value
+
+
+ def needsValue(self):
+ """
+ Does this column require a value in INSERT statements which create rows?
+
+ @return: C{True} for L{Column}s with no default specified which also
+ cannot be NULL, C{False} otherwise.
+
+ @rtype: C{bool}
+ """
+ return self.canBeNull() or (self.default is not None)
+
+
+ def doesReferenceName(self, name):
+ """
+ Change this column to refer to a table in the schema. (Should only be
+ called during schema parsing.)
+
+ @param name: the name of a L{Table} in this L{Column}'s L{Schema}.
+ @type name: L{str}
+ """
+ self.references = self.table.schema.tableNamed(name)
+ if self.references.primaryKey.type != self.type:
+ print 'Mismatch', self.references.primaryKey.type, self.type
+
+
+
+class Table(object):
+ """
+ A set of columns.
+
+ @ivar descriptiveComment: A docstring for the table. Parsed from a '--'
+ comment preceding this table in the SQL schema file that was parsed, if
+ any.
+ @type descriptiveComment: C{str}
+
+ @ivar schema: a reference to the L{Schema} to which this table belongs.
+ """
+
+ def __init__(self, schema, name):
+ _checkstr(name)
+ self.descriptiveComment = ''
+ self.schema = schema
+ self.name = name
+ self.columns = []
+ self.constraints = []
+ self.schemaRows = []
+ self.primaryKey = None
+ self.schema.tables.append(self)
+
+
+ def __repr__(self):
+ return '<Table %r:%r>' % (self.name, self.columns)
+
+
+ def columnNamed(self, name):
+ """
+ Retrieve a column from this table with a given name.
+
+ @raise KeyError: if no such table exists.
+
+ @return: a column
+
+ @rtype: L{Column}
+ """
+ for column in self.columns:
+ if column.name == name:
+ return column
+ raise KeyError("no such column: %r" % (name,))
+
+
+ def addColumn(self, name, type):
+ """
+ A new column was parsed for this table.
+
+ @param name: The unqualified name of the column.
+
+ @type name: C{str}
+
+ @param type: The L{SQLType} describing the column's type.
+ """
+ column = Column(self, name, type)
+ self.columns.append(column)
+ return column
+
+
+ def tableConstraint(self, constraintType, columnNames):
+ """
+ This table is affected by a constraint. (Should only be called during
+ schema parsing.)
+
+ @param constraintType: the type of constraint; either
+ L{Constraint.NOT_NULL} or L{Constraint.UNIQUE}, currently.
+ """
+ affectsColumns = []
+ for name in columnNames:
+ affectsColumns.append(self.columnNamed(name))
+ self.constraints.append(Constraint(constraintType, affectsColumns))
+
+
+ def insertSchemaRow(self, values):
+ """
+ A statically-defined row was inserted as part of the schema itself.
+ This is used for tables that want to track static enumerations, for
+ example, but want to be referred to by a foreign key in other tables for
+ proper referential integrity.
+
+ Append this data to this L{Table}'s L{Table.schemaRows}.
+
+ (Should only be called during schema parsing.)
+
+ @param values: a C{list} of data items, one for each column in this
+ table's current list of L{Column}s.
+ """
+ row = {}
+ for column, value in zip(self.columns, values):
+ row[column] = value
+ self.schemaRows.append(row)
+
+
+ def addComment(self, comment):
+ """
+ Add a comment to C{descriptiveComment}.
+
+ @param comment: some additional descriptive text
+ @type comment: C{str}
+ """
+ self.descriptiveComment = comment
+
+
+ def uniques(self):
+ """
+ @return: an iterable of C{set}s of C{Column}s which are unique within
+ this table.
+ """
+ for constraint in self.constraints:
+ if constraint.type is Constraint.UNIQUE:
+ yield set(constraint.affectsColumns)
+
+
+
+class Sequence(object):
+ """
+ A sequence object.
+ """
+
+ def __init__(self, name):
+ _checkstr(name)
+ self.name = name
+ self.referringColumns = []
+
+
+ def __repr__(self):
+ return '<Sequence %r>' % (self.name,)
+
+
+
+class Schema(object):
+ """
+ A schema containing tables, indexes, and sequences.
+ """
+
+ def __init__(self, filename='<string>'):
+ self.filename = filename
+ self.tables = []
+ self.sequences = []
+
+
+ def __repr__(self):
+ return '<Schema %r>' % (self.filename,)
+
+
+ def tableNamed(self, name):
+ for table in self.tables:
+ if table.name == name:
+ return table
+ raise KeyError(name)
+
+
+ def sequenceNamed(self, name):
+ for sequence in self.sequences:
+ if sequence.name == name:
+ return sequence
+ raise KeyError(name)
+
+
+
Added: CalendarServer/trunk/twext/enterprise/dal/parseschema.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/parseschema.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/parseschema.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,461 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_parseschema -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Parser for SQL schema.
+"""
+
+from itertools import chain
+
+from sqlparse import parse, keywords
+from sqlparse.tokens import Keyword, Punctuation, Number, String, Name
+from sqlparse.sql import (Comment, Identifier, Parenthesis, IdentifierList,
+ Function)
+
+from twext.enterprise.dal.model import Schema, Table, SQLType, ProcedureCall
+from twext.enterprise.dal.model import Constraint
+from twext.enterprise.dal.model import Sequence
+
+
+
+def _fixKeywords():
+ """
+ Work around bugs in SQLParse, adding SEQUENCE as a keyword (since it is
+ treated as one in postgres) and removing ACCESS and SIZE (since we use those
+ as column names). Technically those are keywords in SQL, but they aren't
+ treated as such by postgres's parser.
+ """
+ keywords.KEYWORDS['SEQUENCE'] = Keyword
+ for columnNameKeyword in ['ACCESS', 'SIZE']:
+ del keywords.KEYWORDS[columnNameKeyword]
+
+_fixKeywords()
+
+
+
+def tableFromCreateStatement(schema, stmt):
+ """
+ Add a table from a CREATE TABLE sqlparse statement object.
+
+ @param schema: The schema to add the table statement to.
+
+ @type schema: L{Schema}
+
+ @param stmt: The C{CREATE TABLE} statement object.
+
+ @type stmt: L{Statement}
+ """
+ i = iterSignificant(stmt)
+ expect(i, ttype=Keyword.DDL, value='CREATE')
+ expect(i, ttype=Keyword, value='TABLE')
+ function = expect(i, cls=Function)
+ i = iterSignificant(function)
+ name = expect(i, cls=Identifier).get_name().encode('utf-8')
+ self = Table(schema, name)
+ parens = expect(i, cls=Parenthesis)
+ cp = _ColumnParser(self, iterSignificant(parens), parens)
+ cp.parse()
+ return self
+
+
+
+def schemaFromPath(path):
+ """
+ Get a L{Schema}.
+
+ @param path: a L{FilePath}-like object containing SQL.
+
+ @return: a L{Schema} object with the contents of the given C{path} parsed
+ and added to it as L{Table} objects.
+ """
+ schema = Schema(path.basename())
+ schemaData = path.getContent()
+ addSQLToSchema(schema, schemaData)
+ return schema
+
+
+
+def addSQLToSchema(schema, schemaData):
+ """
+ Add new SQL to an existing schema.
+
+ @param schema: The schema to add the new SQL to.
+
+ @type schema: L{Schema}
+
+ @param schemaData: A string containing some SQL statements.
+
+ @type schemaData: C{str}
+
+ @return: the C{schema} argument
+ """
+ parsed = parse(schemaData)
+ for stmt in parsed:
+ preface = ''
+ while stmt.tokens and not significant(stmt.tokens[0]):
+ preface += str(stmt.tokens.pop(0))
+ if not stmt.tokens:
+ continue
+ if stmt.get_type() == 'CREATE':
+ createType = stmt.token_next(1, True).value.upper()
+ if createType == u'TABLE':
+ t = tableFromCreateStatement(schema, stmt)
+ t.addComment(preface)
+ elif createType == u'SEQUENCE':
+ schema.sequences.append(
+ Sequence(
+ stmt.token_next(2, True).get_name().encode('utf-8')))
+ elif stmt.get_type() == 'INSERT':
+ insertTokens = iterSignificant(stmt)
+ expect(insertTokens, ttype=Keyword.DML, value='INSERT')
+ expect(insertTokens, ttype=Keyword, value='INTO')
+ tableName = expect(insertTokens, cls=Identifier).get_name()
+ expect(insertTokens, ttype=Keyword, value='VALUES')
+ values = expect(insertTokens, cls=Parenthesis)
+ vals = iterSignificant(values)
+ expect(vals, ttype=Punctuation, value='(')
+ valuelist = expect(vals, cls=IdentifierList)
+ expect(vals, ttype=Punctuation, value=')')
+ rowData = []
+ for ident in valuelist.get_identifiers():
+ rowData.append(
+ {Number.Integer: int,
+ String.Single: _destringify}
+ [ident.ttype](ident.value)
+ )
+
+ schema.tableNamed(tableName).insertSchemaRow(rowData)
+ else:
+ print 'unknown type:', stmt.get_type()
+ return schema
+
+
+
+class _ColumnParser(object):
+ """
+ Stateful parser for the things between commas.
+ """
+
+ def __init__(self, table, parenIter, parens):
+ """
+ @param table: the L{Table} to add data to.
+
+ @param parenIter: the iterator.
+ """
+ self.parens = parens
+ self.iter = parenIter
+ self.table = table
+
+
+ def __iter__(self):
+ """
+ This object is an iterator; return itself.
+ """
+ return self
+
+
+ def next(self):
+ """
+ Get the next L{IdentifierList}.
+ """
+ result = self.iter.next()
+ if isinstance(result, IdentifierList):
+ # Expand out all identifier lists, since they seem to pop up
+ # incorrectly. We should never see one in a column list anyway.
+ # http://code.google.com/p/python-sqlparse/issues/detail?id=25
+ while result.tokens:
+ it = result.tokens.pop()
+ if significant(it):
+ self.pushback(it)
+ return self.next()
+ return result
+
+
+ def pushback(self, value):
+ """
+ Push the value back onto this iterator so it will be returned by the
+ next call to C{next}.
+ """
+ self.iter = chain(iter((value,)), self.iter)
+
+
+ def parse(self):
+ """
+ Parse everything.
+ """
+ expect(self.iter, ttype=Punctuation, value=u"(")
+ while self.nextColumn():
+ pass
+
+
+ def nextColumn(self):
+ """
+ Parse the next column or constraint, depending on the next token.
+ """
+ maybeIdent = self.next()
+ if maybeIdent.ttype == Name:
+ return self.parseColumn(maybeIdent.value)
+ elif isinstance(maybeIdent, Identifier):
+ return self.parseColumn(maybeIdent.get_name())
+ else:
+ return self.parseConstraint(maybeIdent)
+
+
+ def parseConstraint(self, constraintType):
+ """
+ Parse a 'free' constraint, described explicitly in the table as opposed
+ to being implicitly associated with a column by being placed after it.
+ """
+ # only know about PRIMARY KEY and UNIQUE for now
+ if constraintType.match(Keyword, 'PRIMARY'):
+ expect(self, ttype=Keyword, value='KEY')
+ expect(self, cls=Parenthesis)
+ self.primaryKey = 'MULTI-VALUE-KEY'
+ elif constraintType.match(Keyword, 'UNIQUE'):
+ parens = iterSignificant(expect(self, cls=Parenthesis))
+ expect(parens, ttype=Punctuation, value="(")
+ idorids = parens.next()
+ if isinstance(idorids, Identifier):
+ idnames = [idorids.get_name()]
+ elif isinstance(idorids, IdentifierList):
+ idnames = [x.get_name() for x in idorids.get_identifiers()]
+ else:
+ raise ViolatedExpectation("identifier or list", repr(idorids))
+ expect(parens, ttype=Punctuation, value=")")
+ self.table.tableConstraint(Constraint.UNIQUE, idnames)
+ else:
+ raise ViolatedExpectation('PRIMARY or UNIQUE', constraintType)
+ return self.checkEnd(self.next())
+
+
+ def checkEnd(self, val):
+ """
+ After a column or constraint, check the end.
+ """
+ if val.value == u",":
+ return True
+ elif val.value == u")":
+ return False
+ else:
+ raise ViolatedExpectation(", or )", val)
+
+
+ def parseColumn(self, name):
+ """
+ Parse a column with the given name.
+ """
+ typeName = self.next()
+ if isinstance(typeName, Function):
+ [funcIdent, args] = iterSignificant(typeName)
+ typeName = funcIdent
+ arggetter = iterSignificant(args)
+ expect(arggetter, value=u'(')
+ typeLength = int(expect(arggetter,
+ ttype=Number.Integer).value.encode('utf-8'))
+ else:
+ maybeTypeArgs = self.next()
+ if isinstance(maybeTypeArgs, Parenthesis):
+ # type arguments
+ significant = iterSignificant(maybeTypeArgs)
+ expect(significant, value=u"(")
+ typeLength = int(significant.next().value)
+ else:
+ # something else
+ typeLength = None
+ self.pushback(maybeTypeArgs)
+ theType = SQLType(typeName.value.encode("utf-8"), typeLength)
+ theColumn = self.table.addColumn(
+ name=name.encode("utf-8"), type=theType
+ )
+ for val in self:
+ if val.ttype == Punctuation:
+ return self.checkEnd(val)
+ else:
+ expected = True
+ def oneConstraint(t):
+ self.table.tableConstraint(t,
+ [theColumn.name])
+
+ if val.match(Keyword, 'PRIMARY'):
+ expect(self, ttype=Keyword, value='KEY')
+ # XXX check to make sure there's no other primary key yet
+ self.table.primaryKey = theColumn
+ elif val.match(Keyword, 'UNIQUE'):
+ # XXX add UNIQUE constraint
+ oneConstraint(Constraint.UNIQUE)
+ elif val.match(Keyword, 'NOT'):
+ # possibly not necessary, as 'NOT NULL' is a single keyword
+ # in sqlparse as of 0.1.2
+ expect(self, ttype=Keyword, value='NULL')
+ oneConstraint(Constraint.NOT_NULL)
+ elif val.match(Keyword, 'NOT NULL'):
+ oneConstraint(Constraint.NOT_NULL)
+ elif val.match(Keyword, 'DEFAULT'):
+ theDefault = self.next()
+ if isinstance(theDefault, Function):
+ thingo = theDefault.tokens[0].get_name()
+ parens = expectSingle(
+ theDefault.tokens[-1], cls=Parenthesis
+ )
+ pareniter = iterSignificant(parens)
+ if thingo.upper() == 'NEXTVAL':
+ expect(pareniter, ttype=Punctuation, value="(")
+ seqname = _destringify(
+ expect(pareniter, ttype=String.Single).value)
+ defaultValue = self.table.schema.sequenceNamed(
+ seqname
+ )
+ defaultValue.referringColumns.append(theColumn)
+ else:
+ defaultValue = ProcedureCall(thingo.encode('utf-8'),
+ parens)
+ elif theDefault.ttype == Number.Integer:
+ defaultValue = int(theDefault.value)
+ elif (theDefault.ttype == Keyword and
+ theDefault.value.lower() == 'false'):
+ defaultValue = False
+ elif (theDefault.ttype == Keyword and
+ theDefault.value.lower() == 'true'):
+ defaultValue = True
+ elif (theDefault.ttype == Keyword and
+ theDefault.value.lower() == 'null'):
+ defaultValue = None
+ elif theDefault.ttype == String.Single:
+ defaultValue = _destringify(theDefault.value)
+ else:
+ raise RuntimeError(
+ "not sure what to do: default %r" % (
+ theDefault))
+ theColumn.setDefaultValue(defaultValue)
+ elif val.match(Keyword, 'REFERENCES'):
+ target = nameOrIdentifier(self.next())
+ theColumn.doesReferenceName(target)
+ elif val.match(Keyword, 'ON'):
+ expect(self, ttype=Keyword.DML, value='DELETE')
+ expect(self, ttype=Keyword, value='CASCADE')
+ else:
+ expected = False
+ if not expected:
+ print 'UNEXPECTED TOKEN:', repr(val), theColumn
+ print self.parens
+ import pprint
+ pprint.pprint(self.parens.tokens)
+ return 0
+
+
+
+
+class ViolatedExpectation(Exception):
+ """
+ An expectation about the structure of the SQL syntax was violated.
+ """
+
+ def __init__(self, expected, got):
+ self.expected = expected
+ self.got = got
+ super(ViolatedExpectation, self).__init__(
+ "Expected %r got %s" % (expected, got)
+ )
+
+
+
+def nameOrIdentifier(token):
+ """
+ Determine if the given object is a name or an identifier, and return the
+ textual value of that name or identifier.
+
+ @rtype: L{str}
+ """
+ if isinstance(token, Identifier):
+ return token.get_name()
+ elif token.ttype == Name:
+ return token.value
+ else:
+ raise ViolatedExpectation("identifier or name", repr(token))
+
+
+
+def expectSingle(nextval, ttype=None, value=None, cls=None):
+ """
+ Expect some properties from retrieved value.
+
+ @param ttype: A token type to compare against.
+
+ @param value: A value to compare against.
+
+ @param cls: A class to check if the value is an instance of.
+
+ @raise ViolatedExpectation: if an unexpected token is found.
+
+ @return: C{nextval}, if it matches.
+ """
+ if ttype is not None:
+ if nextval.ttype != ttype:
+ raise ViolatedExpectation(ttype, '%s:%s' % (nextval.ttype, nextval))
+ if value is not None:
+ if nextval.value.upper() != value.upper():
+ raise ViolatedExpectation(value, nextval.value)
+ if cls is not None:
+ if nextval.__class__ != cls:
+ raise ViolatedExpectation(cls, repr(nextval))
+ return nextval
+
+
+
+def expect(iterator, **kw):
+ """
+ Retrieve a value from an iterator and check its properties. Same signature
+ as L{expectSingle}, except it takes an iterator instead of a value.
+
+ @see L{expectSingle}
+ """
+ nextval = iterator.next()
+ return expectSingle(nextval, **kw)
+
+
+
+def significant(token):
+ """
+ Determine if the token is 'significant', i.e. that it is not a comment and
+ not whitespace.
+ """
+ # comment has 'None' is_whitespace() result. intentional?
+ return (not isinstance(token, Comment) and not token.is_whitespace())
+
+
+
+def iterSignificant(tokenList):
+ """
+ Iterate tokens that pass the test given by L{significant}, from a given
+ L{TokenList}.
+ """
+ for token in tokenList.tokens:
+ if significant(token):
+ yield token
+
+
+
+def _destringify(strval):
+ """
+ Convert a single-quoted SQL string into its actual represented value.
+ (Assumes standards compliance, since we should be controlling all the input
+ here. The only quoting syntax respected is "''".)
+ """
+ return strval[1:-1].replace("''", "'")
+
+
+
Added: CalendarServer/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/syntax.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/syntax.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,668 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_sqlsyntax -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Syntax wrappers and generators for SQL.
+"""
+
+from twext.enterprise.dal.model import Schema, Table, Column
+
+
+class TableMismatch(Exception):
+ """
+ A table in a statement did not match with a column.
+ """
+
+
+
+class NotEnoughValues(ValueError):
+ """
+ Not enough values were supplied for an L{Insert}.
+ """
+
+
+
+class _Statement(object):
+ """
+ An SQL statement that may be executed. (An abstract base class, must
+ implement several methods.)
+ """
+
+ _paramstyles = {
+ 'pyformat': ('%s', lambda s: s.replace("%", "%%"))
+ }
+
+ def on(self, txn, raiseOnZeroRowCount=None, **kw):
+ """
+ Execute this statement on a given L{IAsyncTransaction} and return the
+ resulting L{Deferred}.
+ """
+ placeholder, quote = self._paramstyles[txn.paramstyle]
+ fragment = self.toSQL(placeholder, quote).bind(**kw)
+ return txn.execSQL(fragment.text, fragment.parameters,
+ raiseOnZeroRowCount)
+
+
+
+class Syntax(object):
+ """
+ Base class for syntactic convenience.
+
+ This class will define dynamic attribute access to represent its underlying
+ model as a Python namespace.
+
+ You can access the underlying model as '.model'.
+ """
+
+ modelType = None
+
+ def __init__(self, model):
+ if not isinstance(model, self.modelType):
+ # make sure we don't get a misleading repr()
+ raise ValueError("type mismatch: %r %r", type(self), model)
+ self.model = model
+
+
+ def __repr__(self):
+ return '<Syntax for: %r>' % (self.model,)
+
+
+
+class FunctionInvocation(object):
+ def __init__(self, name, arg):
+ self.name = name
+ self.arg = arg
+
+
+ def subSQL(self, placeholder, quote, allTables):
+ result = SQLFragment(self.name)
+ result.text += "("
+ result.append(self.arg.subSQL(placeholder, quote, allTables))
+ result.text += ")"
+ return result
+
+
+
+class Function(object):
+ """
+ An L{Function} is a representation of an SQL Function function.
+ """
+
+ def __init__(self, name):
+ self.name = name
+
+
+ def __call__(self, arg):
+ """
+ Produce an L{FunctionInvocation}
+ """
+ return FunctionInvocation(self.name, arg)
+
+Max = Function("max")
+Len = Function("character_length")
+
+
+
+class SchemaSyntax(Syntax):
+ """
+ Syntactic convenience for L{Schema}.
+ """
+
+ modelType = Schema
+
+ def __getattr__(self, attr):
+ try:
+ tableModel = self.model.tableNamed(attr)
+ except KeyError:
+ raise AttributeError("schema has no table %r" % (attr,))
+ syntax = TableSyntax(tableModel)
+ # Needs to be preserved here so that aliasing will work.
+ setattr(self, attr, syntax)
+ return syntax
+
+
+ def __iter__(self):
+ for table in self.model.tables:
+ yield TableSyntax(table)
+
+
+
+class TableSyntax(Syntax):
+ """
+ Syntactic convenience for L{Table}.
+ """
+
+ modelType = Table
+
+ def join(self, otherTableSyntax, on, type=''):
+ return Join(self, type, otherTableSyntax, on)
+
+
+ def subSQL(self, placeholder, quote, allTables):
+ """
+ For use in a 'from' clause.
+ """
+ # XXX maybe there should be a specific method which is only invoked
+ # from the FROM clause, that only tables and joins would implement?
+ return SQLFragment(self.model.name)
+
+
+ def __getattr__(self, attr):
+ return ColumnSyntax(self.model.columnNamed(attr))
+
+
+ def __iter__(self):
+ for column in self.model.columns:
+ yield ColumnSyntax(column)
+
+
+ def tables(self):
+ return [self]
+
+
+ def aliases(self):
+ result = {}
+ for k, v in self.__dict__.items():
+ if isinstance(v, ColumnSyntax):
+ result[k] = v
+ return result
+
+
+ def __contains__(self, columnSyntax):
+ if isinstance(columnSyntax, FunctionInvocation):
+ columnSyntax = columnSyntax.arg
+ return (columnSyntax.model in self.model.columns)
+
+
+
+class Join(object):
+
+ def __init__(self, firstTable, type, secondTableOrJoin, on):
+ self.firstTable = firstTable
+ self.type = type
+ self.secondTableOrJoin = secondTableOrJoin
+ self.on = on
+
+
+ def subSQL(self, placeholder, quote, allTables):
+ stmt = SQLFragment()
+ stmt.append(self.firstTable.subSQL(placeholder, quote, allTables))
+ stmt.text += ' '
+ if self.type:
+ stmt.text += self.type
+ stmt.text += ' '
+ stmt.text += 'join '
+ stmt.append(self.secondTableOrJoin.subSQL(placeholder, quote, allTables))
+ stmt.text += ' on '
+ stmt.append(self.on.subSQL(placeholder, quote, allTables))
+ return stmt
+
+
+ def tables(self):
+ return self.firstTable.tables() + self.secondTableOrJoin.tables()
+
+
+
+def comparison(comparator):
+ def __(self, other):
+ if other is None:
+ return NullComparison(self, comparator)
+ if isinstance(other, ColumnSyntax):
+ return ColumnComparison(self, comparator, other)
+ else:
+ return ConstantComparison(self, comparator, other)
+ return __
+
+
+
+class ColumnSyntax(Syntax):
+ """
+ Syntactic convenience for L{Column}.
+ """
+
+ modelType = Column
+
+ __eq__ = comparison('=')
+ __ne__ = comparison('!=')
+ __gt__ = comparison('>')
+ __ge__ = comparison('>=')
+ __lt__ = comparison('<')
+ __le__ = comparison('<=')
+ __add__ = comparison("+")
+ __sub__ = comparison("-")
+
+
+ def subSQL(self, placeholder, quote, allTables):
+ # XXX This, and 'model', could in principle conflict with column names.
+ # Maybe do something about that.
+ for tableSyntax in allTables:
+ if self.model.table is not tableSyntax.model:
+ if self.model.name in (c.name for c in
+ tableSyntax.model.columns):
+ return SQLFragment((self.model.table.name + '.' +
+ self.model.name))
+ return SQLFragment(self.model.name)
+
+
+ def In(self, subselect):
+ # Can't be Select.__contains__ because __contains__ gets __nonzero__
+ # called on its result by the 'in' syntax.
+ return CompoundComparison(self, 'in', subselect)
+
+
+
+class Comparison(object):
+
+ def __init__(self, a, op, b):
+ self.a = a
+ self.op = op
+ self.b = b
+
+
+ def __nonzero__(self):
+ raise ValueError(
+ "column comparisons should not be tested for truth value")
+
+
+ def booleanOp(self, operand, other):
+ return CompoundComparison(self, operand, other)
+
+
+ def And(self, other):
+ return self.booleanOp('and', other)
+
+
+ def Or(self, other):
+ return self.booleanOp('or', other)
+
+
+
+class NullComparison(Comparison):
+ """
+ A L{NullComparison} is a comparison of a column or expression with None.
+ """
+ def __init__(self, a, op):
+ # 'b' is always None for this comparison type
+ super(NullComparison, self).__init__(a, op, None)
+
+
+ def subSQL(self, placeholder, quote, allTables):
+ sqls = SQLFragment()
+ sqls.append(self.a.subSQL(placeholder, quote, allTables))
+ sqls.text += " is "
+ if self.op != "=":
+ sqls.text += "not "
+ sqls.text += "null"
+ return sqls
+
+
+class ConstantComparison(Comparison):
+
+ def subSQL(self, placeholder, quote, allTables):
+ sqls = SQLFragment()
+ sqls.append(self.a.subSQL(placeholder, quote, allTables))
+ sqls.append(SQLFragment(' ' + ' '.join([self.op, placeholder]),
+ [self.b]))
+ return sqls
+
+
+
+class CompoundComparison(Comparison):
+ """
+ A compound comparison; two or more constraints, joined by an operation
+ (currently only AND or OR).
+ """
+
+ def subSQL(self, placeholder, quote, allTables):
+ stmt = SQLFragment()
+ stmt.append(self.a.subSQL(placeholder, quote, allTables))
+ stmt.text += ' %s ' % (self.op,)
+ stmt.append(self.b.subSQL(placeholder, quote, allTables))
+ return stmt
+
+
+
+class ColumnComparison(CompoundComparison):
+ """
+ Comparing two columns is the same as comparing any other two expressions,
+ (for now).
+ """
+
+
+
+class _AllColumns(object):
+
+ def subSQL(self, placeholder, quote, allTables):
+ return SQLFragment(quote('*'))
+
+ALL_COLUMNS = _AllColumns()
+
+
+
+class _SomeColumns(object):
+
+ def __init__(self, columns):
+ self.columns = columns
+
+
+ def subSQL(self, placeholder, quote, allTables):
+ first = True
+ cstatement = SQLFragment()
+ for column in self.columns:
+ if first:
+ first = False
+ else:
+ cstatement.append(SQLFragment(", "))
+ cstatement.append(column.subSQL(placeholder, quote, allTables))
+ return cstatement
+
+
+
+class Select(_Statement):
+ """
+ 'select' statement.
+ """
+
+ def __init__(self, columns=None, Where=None, From=None, OrderBy=None,
+ GroupBy=None):
+ self.From = From
+ self.Where = Where
+ self.OrderBy = OrderBy
+ self.GroupBy = GroupBy
+ if columns is None:
+ columns = ALL_COLUMNS
+ else:
+ for column in columns:
+ for table in From.tables():
+ if column in table:
+ break
+ else:
+ raise TableMismatch()
+ columns = _SomeColumns(columns)
+ self.columns = columns
+
+
+ def toSQL(self, placeholder="?", quote=lambda x: x):
+ """
+ @return: a 'select' statement with placeholders and arguments
+
+ @rtype: L{SQLFragment}
+ """
+ stmt = SQLFragment(quote("select "))
+ allTables = self.From.tables()
+ stmt.append(self.columns.subSQL(placeholder, quote, allTables))
+ stmt.text += quote(" from ")
+ stmt.append(self.From.subSQL(placeholder, quote, allTables))
+ if self.Where is not None:
+ wherestmt = self.Where.subSQL(placeholder, quote, allTables)
+ stmt.text += quote(" where ")
+ stmt.append(wherestmt)
+ for bywhat, expr in [('group', self.GroupBy), ('order', self.OrderBy)]:
+ if expr is not None:
+ stmt.text += quote(" " + bywhat + " by ")
+ stmt.append(expr.subSQL(placeholder, quote, allTables))
+ return stmt
+
+
+ def subSQL(self, placeholder, quote, allTables):
+ result = SQLFragment("(")
+ result.append(self.toSQL(placeholder, quote))
+ result.append(SQLFragment(")"))
+ return result
+
+
+
+def _commaJoined(stmts):
+ first = True
+ cstatement = SQLFragment()
+ for stmt in stmts:
+ if first:
+ first = False
+ else:
+ cstatement.append(SQLFragment(", "))
+ cstatement.append(stmt)
+ return cstatement
+
+
+
+def _inParens(stmt):
+ result = SQLFragment("(")
+ result.append(stmt)
+ result.append(SQLFragment(")"))
+ return result
+
+
+
+def _fromSameTable(columns):
+ """
+ Extract the common table used by a list of L{Column} objects, raising
+ L{TableMismatch}.
+ """
+ table = columns[0].table
+ for column in columns:
+ if table is not column.table:
+ raise TableMismatch("Columns must all be from the same table.")
+ return table
+
+
+
+def _modelsFromMap(columnMap):
+ """
+ Get the L{Column} objects from a mapping of L{ColumnSyntax} to values.
+ """
+ return [c.model for c in columnMap.keys()]
+
+
+
+class Insert(object):
+ """
+ 'insert' statement.
+ """
+
+ def __init__(self, columnMap, Return=None):
+ self.columnMap = columnMap
+ self.Return = Return
+ columns = _modelsFromMap(columnMap)
+ table = _fromSameTable(columns)
+ required = [column for column in table.columns if column.needsValue()]
+ unspecified = [column for column in required
+ if column not in columns]
+ if unspecified:
+ raise NotEnoughValues(
+ 'Columns [%s] required.' %
+ (', '.join([c.name for c in unspecified])))
+
+
+ def toSQL(self, placeholder="?", quote=lambda x: x):
+ """
+ @return: a 'insert' statement with placeholders and arguments
+
+ @rtype: L{SQLFragment}
+ """
+ sortedColumns = sorted(self.columnMap.items(),
+ key=lambda (c, v): c.model.name)
+ allTables = []
+ stmt = SQLFragment('insert into ')
+ stmt.append(
+ TableSyntax(sortedColumns[0][0].model.table)
+ .subSQL(placeholder, quote, allTables))
+ stmt.append(SQLFragment(" "))
+ stmt.append(_inParens(_commaJoined(
+ [c.subSQL(placeholder, quote, allTables) for (c, v) in
+ sortedColumns])))
+ stmt.append(SQLFragment(" values "))
+ stmt.append(_inParens(_commaJoined(
+ [SQLFragment(placeholder, [v]) for (c, v) in sortedColumns])))
+ if self.Return is not None:
+ stmt.text += ' returning '
+ stmt.append(self.Return.subSQL(placeholder, quote, allTables))
+ return stmt
+
+
+
+class Update(object):
+ """
+ 'update' statement
+ """
+
+ def __init__(self, columnMap, Where, Return=None):
+ super(Update, self).__init__()
+ _fromSameTable(_modelsFromMap(columnMap))
+ self.columnMap = columnMap
+ self.Where = Where
+ self.Return = Return
+
+
+ def toSQL(self, placeholder="?", quote=lambda x: x):
+ """
+ @return: a 'insert' statement with placeholders and arguments
+
+ @rtype: L{SQLFragment}
+ """
+ sortedColumns = sorted(self.columnMap.items(),
+ key=lambda (c, v): c.model.name)
+ allTables = []
+ result = SQLFragment('update ')
+ result.append(
+ TableSyntax(sortedColumns[0][0].model.table).subSQL(
+ placeholder, quote, allTables)
+ )
+ result.text += ' set '
+ result.append(
+ _commaJoined(
+ [c.subSQL(placeholder, quote, allTables).append(
+ SQLFragment(" = " + placeholder, [v]))
+ for (c, v) in sortedColumns]
+ )
+ )
+ result.append(SQLFragment( ' where '))
+ result.append(self.Where.subSQL(placeholder, quote, allTables))
+ if self.Return is not None:
+ result.append(SQLFragment(' returning '))
+ result.append(self.Return.subSQL(placeholder, quote, allTables))
+ return result
+
+
+
+class Delete(object):
+ """
+ 'delete' statement.
+ """
+
+ def __init__(self, From, Where, Return=None):
+ self.From = From
+ self.Where = Where
+ self.Return = Return
+
+
+ def toSQL(self, placeholder="?", quote=lambda x: x):
+ result = SQLFragment()
+ allTables = self.From.tables()
+ result.text += quote('delete from ')
+ result.append(self.From.subSQL(placeholder, quote, allTables))
+ result.text += quote(' where ')
+ result.append(self.Where.subSQL(placeholder, quote, allTables))
+ if self.Return is not None:
+ result.append(SQLFragment(' returning '))
+ result.append(self.Return.subSQL(placeholder, quote, allTables))
+ return result
+
+
+
+class Lock(object):
+ """
+ An SQL 'lock' statement.
+ """
+
+ def __init__(self, table, mode):
+ self.table = table
+ self.mode = mode
+
+
+ @classmethod
+ def exclusive(cls, table):
+ return cls(table, 'exclusive')
+
+
+ def toSQL(self, placeholder="?", quote=lambda x: x):
+ return SQLFragment('lock table ').append(
+ self.table.subSQL(placeholder, quote, [self.table])).append(
+ SQLFragment(' in %s mode' % (self.mode,)))
+
+
+
+class SQLFragment(object):
+ """
+ Combination of SQL text and arguments; a statement which may be executed
+ against a database.
+ """
+
+ def __init__(self, text="", parameters=None):
+ self.text = text
+ if parameters is None:
+ parameters = []
+ self.parameters = parameters
+
+
+ def bind(self, **kw):
+ params = []
+ for parameter in self.parameters:
+ if isinstance(parameter, Parameter):
+ params.append(kw[parameter.name])
+ else:
+ params.append(parameter)
+ return SQLFragment(self.text, params)
+
+
+ def append(self, anotherStatement):
+ self.text += anotherStatement.text
+ self.parameters += anotherStatement.parameters
+ return self
+
+
+ def __eq__(self, stmt):
+ if not isinstance(stmt, SQLFragment):
+ return NotImplemented
+ return (self.text, self.parameters) == (stmt.text, stmt.parameters)
+
+
+ def __ne__(self, stmt):
+ if not isinstance(stmt, SQLFragment):
+ return NotImplemented
+ return not self.__eq__(stmt)
+
+
+ def __repr__(self):
+ return self.__class__.__name__ + repr((self.text, self.parameters))
+
+
+ def subSQL(self, placeholder, quote, allTables):
+ return self
+
+
+
+class Parameter(object):
+
+ def __init__(self, name):
+ self.name = name
+
+
+ def __repr__(self):
+ return 'Parameter(%r)' % (self.name,)
+
+
+
Added: CalendarServer/trunk/twext/enterprise/dal/test/__init__.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/__init__.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/test/__init__.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,19 @@
+##
+# Copyright (c) 2005-2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for twext.enterprise.dal.
+"""
Added: CalendarServer/trunk/twext/enterprise/dal/test/test_parseschema.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_parseschema.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_parseschema.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,174 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for parsing an SQL schema, which cover L{twext.enterprise.dal.model}
+and L{twext.enterprise.dal.parseschema}.
+"""
+
+from twext.enterprise.dal.model import Schema
+
+from twext.enterprise.dal.parseschema import addSQLToSchema
+from twisted.trial.unittest import TestCase
+
+
+class ParsingExampleTests(TestCase):
+ """
+ Tests for parsing some sample schemas.
+ """
+
+ def test_simplest(self):
+ """
+ Parse an extremely simple schema with one table in it.
+ """
+ s = Schema()
+ addSQLToSchema(s, "create table foo (bar integer);")
+ self.assertEquals(len(s.tables), 1)
+ foo = s.tableNamed('foo')
+ self.assertEquals(len(foo.columns), 1)
+ bar = foo.columns[0]
+ self.assertEquals(bar.name, "bar")
+ self.assertEquals(bar.type.name, "integer")
+
+
+ def test_stringTypes(self):
+ """
+ Table and column names should be byte strings.
+ """
+ s = Schema()
+ addSQLToSchema(s, "create table foo (bar integer);")
+ self.assertEquals(len(s.tables), 1)
+ foo = s.tableNamed('foo')
+ self.assertIsInstance(foo.name, str)
+ self.assertIsInstance(foo.columnNamed('bar').name, str)
+
+
+ def test_typeWithLength(self):
+ """
+ Parse a type with a length.
+ """
+ s = Schema()
+ addSQLToSchema(s, "create table foo (bar varchar(6543))")
+ bar = s.tableNamed('foo').columnNamed('bar')
+ self.assertEquals(bar.type.name, "varchar")
+ self.assertEquals(bar.type.length, 6543)
+
+
+ def test_sequence(self):
+ """
+ Parsing a 'create sequence' statement adds a L{Sequence} to the
+ L{Schema}.
+ """
+ s = Schema()
+ addSQLToSchema(s, "create sequence myseq;")
+ self.assertEquals(len(s.sequences), 1)
+ self.assertEquals(s.sequences[0].name, "myseq")
+
+
+ def test_sequenceColumn(self):
+ """
+ Parsing a 'create sequence' statement adds a L{Sequence} to the
+ L{Schema}, and then a table that contains a column which uses the SQL
+ C{nextval()} function to retrieve its default value from that sequence,
+ will cause the L{Column} object to refer to the L{Sequence} and vice
+ versa.
+ """
+ s = Schema()
+ addSQLToSchema(s,
+ """
+ create sequence thingy;
+ create table thetable (
+ thecolumn integer default nextval('thingy')
+ );
+ """)
+ self.assertEquals(len(s.sequences), 1)
+ self.assertEquals(s.sequences[0].name, "thingy")
+ self.assertEquals(s.tables[0].columns[0].default, s.sequences[0])
+ self.assertEquals(s.sequences[0].referringColumns,
+ [s.tables[0].columns[0]])
+
+
+ def test_defaultConstantColumns(self):
+ """
+ Parsing a 'default' column with an appropriate type in it will return
+ that type as the 'default' attribute of the Column object.
+ """
+ s = Schema()
+ addSQLToSchema(s,
+ """
+ create table a (
+ b integer default 4321,
+ c boolean default false,
+ d boolean default true,
+ e varchar(255) default 'sample value',
+ f varchar(255) default null
+ );
+ """)
+ table = s.tableNamed("a")
+ self.assertEquals(table.columnNamed("b").default, 4321)
+ self.assertEquals(table.columnNamed("c").default, False)
+ self.assertEquals(table.columnNamed("d").default, True)
+ self.assertEquals(table.columnNamed("e").default, 'sample value')
+ self.assertEquals(table.columnNamed("f").default, None)
+
+
+ def test_notNull(self):
+ """
+ A column with a NOT NULL constraint in SQL will be parsed as a
+ constraint which returns False from its C{canBeNull()} method.
+ """
+ s = Schema()
+ addSQLToSchema(s,
+ """
+ create table alpha (beta integer,
+ gamma integer not null);
+ """)
+ t = s.tableNamed('alpha')
+ self.assertEquals(True, t.columnNamed('beta').canBeNull())
+ self.assertEquals(False, t.columnNamed('gamma').canBeNull())
+
+
+ def test_unique(self):
+ """
+ A column with a UNIQUE constraint in SQL will result in the table
+ listing that column as a unique set.
+ """
+ for identicalSchema in [
+ "create table sample (example integer unique);",
+ "create table sample (example integer, unique(example));"]:
+ s = Schema()
+ addSQLToSchema(s, identicalSchema)
+ table = s.tableNamed('sample')
+ column = table.columnNamed('example')
+ self.assertEquals(list(table.uniques()), [set([column])])
+
+
+ def test_multiUnique(self):
+ """
+ A column with a UNIQUE constraint in SQL will result in the table
+ listing that column as a unique set.
+ """
+ s = Schema()
+ addSQLToSchema(
+ s,
+ "create table a (b integer, c integer, unique(b, c), unique(c));")
+ a = s.tableNamed('a')
+ b = a.columnNamed('b')
+ c = a.columnNamed('c')
+ self.assertEquals(list(a.uniques()),
+ [set([b, c]), set([c])])
+
+
Added: CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,417 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.enterprise.dal.syntax}
+"""
+
+from twext.enterprise.dal.model import Schema
+from twext.enterprise.dal.parseschema import addSQLToSchema
+from twext.enterprise.dal.syntax import (
+ SchemaSyntax, Select, Insert, Update, Delete, Lock, SQLFragment,
+ TableMismatch, Parameter, Max, Len, NotEnoughValues
+)
+
+from twisted.trial.unittest import TestCase
+
+class GenerationTests(TestCase):
+ """
+ Tests for syntactic helpers to generate SQL queries.
+ """
+
+ def setUp(self):
+ s = Schema(self.id())
+ addSQLToSchema(schema=s, schemaData="""
+ create table FOO (BAR integer, BAZ integer);
+ create table BOZ (QUX integer);
+ create table OTHER (BAR integer,
+ FOO_BAR integer not null);
+ create table TEXTUAL (MYTEXT varchar(255));
+ """)
+ self.schema = SchemaSyntax(s)
+
+
+ def test_simplestSelect(self):
+ """
+ L{Select} generates a 'select' statement, by default, asking for all
+ rows in a table.
+ """
+ self.assertEquals(Select(From=self.schema.FOO).toSQL(),
+ SQLFragment("select * from FOO", []))
+
+
+ def test_simpleWhereClause(self):
+ """
+ L{Select} generates a 'select' statement with a 'where' clause
+ containing an expression.
+ """
+ self.assertEquals(Select(From=self.schema.FOO,
+ Where=self.schema.FOO.BAR == 1).toSQL(),
+ SQLFragment("select * from FOO where BAR = ?", [1]))
+
+
+ def test_quotingAndPlaceholder(self):
+ """
+ L{Select} generates a 'select' statement with the specified placeholder
+ syntax and quoting function.
+ """
+ self.assertEquals(Select(From=self.schema.FOO,
+ Where=self.schema.FOO.BAR == 1).toSQL(
+ placeholder="*",
+ quote=lambda partial: partial.replace("*", "**")),
+ SQLFragment("select ** from FOO where BAR = *", [1]))
+
+
+ def test_columnComparison(self):
+ """
+ L{Select} generates a 'select' statement which compares columns.
+ """
+ self.assertEquals(Select(From=self.schema.FOO,
+ Where=self.schema.FOO.BAR ==
+ self.schema.FOO.BAZ).toSQL(),
+ SQLFragment("select * from FOO where BAR = BAZ", []))
+
+
+ def test_comparisonTestErrorPrevention(self):
+ """
+ The comparison object between columns raises an exception when compared
+ for a truth value, so that code will not accidentally test for '==' and
+ always get a true value.
+ """
+ def sampleComparison():
+ if self.schema.FOO.BAR == self.schema.FOO.BAZ:
+ return 'comparison should not succeed'
+ self.assertRaises(ValueError, sampleComparison)
+
+
+ def test_nullComparison(self):
+ """
+ Comparing a column with None results in the generation of an 'is null'
+ or 'is not null' SQL statement.
+ """
+ self.assertEquals(Select(From=self.schema.FOO,
+ Where=self.schema.FOO.BAR ==
+ None).toSQL(),
+ SQLFragment(
+ "select * from FOO where BAR is null", []))
+ self.assertEquals(Select(From=self.schema.FOO,
+ Where=self.schema.FOO.BAR !=
+ None).toSQL(),
+ SQLFragment(
+ "select * from FOO where BAR is not null", []))
+
+
+ def test_compoundWhere(self):
+ """
+ L{Select.And} and L{Select.Or} will return compound columns.
+ """
+ self.assertEquals(
+ Select(From=self.schema.FOO,
+ Where=(self.schema.FOO.BAR < 2).Or(
+ self.schema.FOO.BAR > 5)).toSQL(),
+ SQLFragment("select * from FOO where BAR < ? or BAR > ?", [2, 5]))
+
+
+ def test_orderBy(self):
+ """
+ L{Select}'s L{OrderBy} parameter generates an 'order by' clause for a
+ 'select' statement.
+ """
+ self.assertEquals(
+ Select(From=self.schema.FOO,
+ OrderBy=self.schema.FOO.BAR).toSQL(),
+ SQLFragment("select * from FOO order by BAR")
+ )
+
+
+ def test_groupBy(self):
+ """
+ L{Select}'s L{GroupBy} parameter generates a 'group by' clause for a
+ 'select' statement.
+ """
+ self.assertEquals(
+ Select(From=self.schema.FOO,
+ GroupBy=self.schema.FOO.BAR).toSQL(),
+ SQLFragment("select * from FOO group by BAR")
+ )
+
+
+ def test_joinClause(self):
+ """
+ A table's .join() method returns a join statement in a SELECT.
+ """
+ self.assertEquals(
+ Select(From=self.schema.FOO.join(
+ self.schema.BOZ, self.schema.FOO.BAR ==
+ self.schema.BOZ.QUX)).toSQL(),
+ SQLFragment("select * from FOO join BOZ on BAR = QUX", [])
+ )
+
+
+ def test_columnSelection(self):
+ """
+ If a column is specified by the argument to L{Select}, those will be
+ output by the SQL statement rather than the all-columns wildcard.
+ """
+ self.assertEquals(
+ Select([self.schema.FOO.BAR],
+ From=self.schema.FOO).toSQL(),
+ SQLFragment("select BAR from FOO")
+ )
+
+
+ def test_columnAliases(self):
+ """
+ When attributes are set on a L{TableSyntax}, they will be remembered as
+ column aliases, and their alias names may be retrieved via the
+ L{TableSyntax.aliases} method.
+ """
+ self.assertEquals(self.schema.FOO.aliases(), {})
+ self.schema.FOO.ALIAS = self.schema.FOO.BAR
+ # you comparing ColumnSyntax object results in a ColumnComparison, which
+ # you can't test for truth.
+ fixedForEquality = dict([(k, v.model) for k, v in
+ self.schema.FOO.aliases().items()])
+ self.assertEquals(fixedForEquality,
+ {'ALIAS': self.schema.FOO.BAR.model})
+ self.assertIdentical(self.schema.FOO.ALIAS.model,
+ self.schema.FOO.BAR.model)
+
+
+ def test_multiColumnSelection(self):
+ """
+ If multiple columns are specified by the argument to L{Select}, those
+ will be output by the SQL statement rather than the all-columns
+ wildcard.
+ """
+ self.assertEquals(
+ Select([self.schema.FOO.BAZ,
+ self.schema.FOO.BAR],
+ From=self.schema.FOO).toSQL(),
+ SQLFragment("select BAZ, BAR from FOO")
+ )
+
+
+ def test_joinColumnSelection(self):
+ """
+ If multiple columns are specified by the argument to L{Select} that uses
+ a L{TableSyntax.join}, those will be output by the SQL statement.
+ """
+ self.assertEquals(
+ Select([self.schema.FOO.BAZ,
+ self.schema.BOZ.QUX],
+ From=self.schema.FOO.join(self.schema.BOZ,
+ self.schema.FOO.BAR ==
+ self.schema.BOZ.QUX)).toSQL(),
+ SQLFragment("select BAZ, QUX from FOO join BOZ on BAR = QUX")
+ )
+
+
+ def test_tableMismatch(self):
+ """
+ When a column in the 'columns' argument does not match the table from
+ the 'From' argument, L{Select} raises a L{TableMismatch}.
+ """
+ self.assertRaises(TableMismatch, Select, [self.schema.BOZ.QUX],
+ From=self.schema.FOO)
+
+
+ def test_qualifyNames(self):
+ """
+ When two columns in the FROM clause requested from different tables have
+ the same name, the emitted SQL should explicitly disambiguate them.
+ """
+ self.assertEquals(
+ Select([self.schema.FOO.BAR,
+ self.schema.OTHER.BAR],
+ From=self.schema.FOO.join(self.schema.OTHER,
+ self.schema.OTHER.FOO_BAR ==
+ self.schema.FOO.BAR)).toSQL(),
+ SQLFragment(
+ "select FOO.BAR, OTHER.BAR from FOO "
+ "join OTHER on FOO_BAR = FOO.BAR"))
+
+
+ def test_bindParameters(self):
+ """
+ L{SQLFragment.bind} returns a copy of that L{SQLFragment} with the
+ L{Parameter} objects in its parameter list replaced with the keyword
+ arguments to C{bind}.
+ """
+
+ self.assertEquals(
+ Select(From=self.schema.FOO,
+ Where=(self.schema.FOO.BAR > Parameter("testing")).And(
+ self.schema.FOO.BAZ < 7)).toSQL().bind(testing=173),
+ SQLFragment("select * from FOO where BAR > ? and BAZ < ?",
+ [173, 7]))
+
+
+ def test_inSubSelect(self):
+ """
+ L{ColumnSyntax.In} returns a sub-expression using the SQL 'in' syntax.
+ """
+ wherein = (self.schema.FOO.BAR.In(
+ Select([self.schema.BOZ.QUX], From=self.schema.BOZ)))
+ self.assertEquals(
+ Select(From=self.schema.FOO, Where=wherein).toSQL(),
+ SQLFragment(
+ "select * from FOO where BAR in (select QUX from BOZ)"))
+
+
+ def test_max(self):
+ """
+ Test for the 'Max' function.
+ """
+ self.assertEquals(
+ Select([Max(self.schema.BOZ.QUX)], From=self.schema.BOZ).toSQL(),
+ SQLFragment(
+ "select max(QUX) from BOZ"))
+
+
+ def test_len(self):
+ """
+ Test for the 'Len' function for determining character length of a
+ column. (Note that this should be updated to use different techniques
+ as necessary in different databases.)
+ """
+ self.assertEquals(
+ Select([Len(self.schema.TEXTUAL.MYTEXT)],
+ From=self.schema.TEXTUAL).toSQL(),
+ SQLFragment(
+ "select character_length(MYTEXT) from TEXTUAL"))
+
+
+ def test_insert(self):
+ """
+ L{Insert.toSQL} generates an 'insert' statement with all the relevant
+ columns.
+ """
+ self.assertEquals(
+ Insert({self.schema.FOO.BAR: 23,
+ self.schema.FOO.BAZ: 9}).toSQL(),
+ SQLFragment("insert into FOO (BAR, BAZ) values (?, ?)", [23, 9]))
+
+
+ def test_insertNotEnough(self):
+ """
+ L{Insert}'s constructor will raise L{NotEnoughValues} if columns have
+ not been specified.
+ """
+ notEnough = self.assertRaises(
+ NotEnoughValues, Insert, {self.schema.OTHER.BAR: 9}
+ )
+ self.assertEquals(str(notEnough), "Columns [FOO_BAR] required.")
+
+
+ def test_insertReturning(self):
+ """
+ L{Insert}'s C{Return} argument will insert an SQL 'returning' clause.
+ """
+ self.assertEquals(
+ Insert({self.schema.FOO.BAR: 23,
+ self.schema.FOO.BAZ: 9},
+ Return=self.schema.FOO.BAR).toSQL(),
+ SQLFragment(
+ "insert into FOO (BAR, BAZ) values (?, ?) returning BAR",
+ [23, 9])
+ )
+
+
+ def test_insertMismatch(self):
+ """
+ L{Insert} raises L{TableMismatch} if the columns specified aren't all
+ from the same table.
+ """
+ self.assertRaises(
+ TableMismatch,
+ Insert, {self.schema.FOO.BAR: 23,
+ self.schema.FOO.BAZ: 9,
+ self.schema.TEXTUAL.MYTEXT: 'hello'}
+ )
+
+
+ def test_updateReturning(self):
+ """
+ L{update}'s C{Return} argument will update an SQL 'returning' clause.
+ """
+ self.assertEquals(
+ Update({self.schema.FOO.BAR: 23},
+ self.schema.FOO.BAZ == 43,
+ Return=self.schema.FOO.BAR).toSQL(),
+ SQLFragment(
+ "update FOO set BAR = ? where BAZ = ? returning BAR",
+ [23, 43])
+ )
+
+
+ def test_updateMismatch(self):
+ """
+ L{Update} raises L{TableMismatch} if the columns specified aren't all
+ from the same table.
+ """
+ self.assertRaises(
+ TableMismatch,
+ Update, {self.schema.FOO.BAR: 23,
+ self.schema.FOO.BAZ: 9,
+ self.schema.TEXTUAL.MYTEXT: 'hello'},
+ Where=self.schema.FOO.BAZ == 9
+ )
+
+
+ def test_deleteReturning(self):
+ """
+ L{Delete}'s C{Return} argument will delete an SQL 'returning' clause.
+ """
+ self.assertEquals(
+ Delete(self.schema.FOO,
+ Where=self.schema.FOO.BAR == 7,
+ Return=self.schema.FOO.BAZ).toSQL(),
+ SQLFragment(
+ "delete from FOO where BAR = ? returning BAZ", [7])
+ )
+
+
+ def test_update(self):
+ """
+ L{Update.toSQL} generates an 'update' statement.
+ """
+ self.assertEquals(
+ Update({self.schema.FOO.BAR: 4321},
+ self.schema.FOO.BAZ == 1234).toSQL(),
+ SQLFragment("update FOO set BAR = ? where BAZ = ?", [4321, 1234]))
+
+
+ def test_delete(self):
+ """
+ L{Delete} generates an SQL 'delete' statement.
+ """
+ self.assertEquals(
+ Delete(self.schema.FOO,
+ Where=self.schema.FOO.BAR == 12).toSQL(),
+ SQLFragment(
+ "delete from FOO where BAR = ?", [12])
+ )
+
+
+ def test_lock(self):
+ """
+ L{Lock.exclusive} generates a ('lock table') statement, locking the
+ table in the specified mode.
+ """
+ self.assertEquals(Lock.exclusive(self.schema.FOO).toSQL(),
+ SQLFragment("lock table FOO in exclusive mode"))
+
Added: CalendarServer/trunk/twext/enterprise/ienterprise.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/ienterprise.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/ienterprise.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,86 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Interfaces, mostly related to L{twext.enterprise.adbapi2}.
+"""
+
+__all__ = [
+ "IAsyncTransaction",
+]
+
+from zope.interface import Interface, Attribute
+
+
+class AlreadyFinishedError(Exception):
+ """
+ The transaction was already completed via an C{abort} or C{commit} and
+ cannot be aborted or committed again.
+ """
+
+
+
+class IAsyncTransaction(Interface):
+ """
+ Asynchronous execution of SQL.
+
+ Note that there is no {begin()} method; if an L{IAsyncTransaction} exists,
+ it is assumed to have been started.
+ """
+
+ paramstyle = Attribute(
+ """
+ A copy of the 'paramstyle' attribute from a DB-API 2.0 module.
+ """)
+
+
+ def execSQL(sql, args=(), raiseOnZeroRowCount=None):
+ """
+ Execute some SQL.
+
+ @param sql: an SQL string.
+
+ @type sql: C{str}
+
+ @param args: C{list} of arguments to interpolate into C{sql}.
+
+ @param raiseOnZeroRowCount: a 0-argument callable which returns an
+ exception to raise if the executed SQL does not affect any rows.
+
+ @return: L{Deferred} which fires C{list} of C{tuple}
+
+ @raise: C{raiseOnZeroRowCount} if it was specified and no rows were
+ affected.
+ """
+
+
+ def commit():
+ """
+ Commit changes caused by this transaction.
+
+ @return: L{Deferred} which fires with C{None} upon successful
+ completion of this transaction.
+ """
+
+
+ def abort():
+ """
+ Roll back changes caused by this transaction.
+
+ @return: L{Deferred} which fires with C{None} upon successful
+ rollback of this transaction.
+ """
+
Added: CalendarServer/trunk/twext/enterprise/test/__init__.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/__init__.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/test/__init__.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,20 @@
+
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.enterprise}.
+"""
Added: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py (rev 0)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,175 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.enterprise.adbapi2}.
+"""
+
+from itertools import count
+
+from twisted.trial.unittest import TestCase
+
+from twisted.internet.defer import inlineCallbacks
+
+from twext.enterprise.adbapi2 import ConnectionPool
+
+
+class Child(object):
+ def __init__(self, parent):
+ self.parent = parent
+ self.parent.children.append(self)
+
+ def close(self):
+ self.parent.children.remove(self)
+
+
+
+class Parent(object):
+
+ def __init__(self):
+ self.children = []
+
+
+
+class FakeConnection(Parent, Child):
+ """
+ Fake Stand-in for DB-API 2.0 connection.
+ """
+
+ def __init__(self, factory):
+ """
+ Initialize list of cursors
+ """
+ Parent.__init__(self)
+ Child.__init__(self, factory)
+ self.id = factory.idcounter.next()
+
+
+ @property
+ def cursors(self):
+ "Alias to make tests more readable."
+ return self.children
+
+
+ def cursor(self):
+ return FakeCursor(self)
+
+
+ def commit(self):
+ return
+
+
+ def rollback(self):
+ return
+
+
+
+class FakeCursor(Child):
+ """
+ Fake stand-in for a DB-API 2.0 cursor.
+ """
+ def __init__(self, connection):
+ Child.__init__(self, connection)
+ self.rowcount = 0
+ # not entirely correct, but all we care about is its truth value.
+ self.description = False
+
+
+ @property
+ def connection(self):
+ "Alias to make tests more readable."
+ return self.parent
+
+
+ def execute(self, sql, args=()):
+ self.sql = sql
+ self.description = True
+ self.rowcount = 1
+ return
+
+
+ def fetchall(self):
+ """
+ Just echo the SQL that was executed in the last query.
+ """
+ return [[self.connection.id, self.sql]]
+
+
+
+class ConnectionFactory(Parent):
+ def __init__(self):
+ Parent.__init__(self)
+ self.idcounter = count(1)
+
+ @property
+ def connections(self):
+ "Alias to make tests more readable."
+ return self.children
+
+
+ def connect(self):
+ return FakeConnection(self)
+
+
+
+class ConnectionPoolTests(TestCase):
+
+ @inlineCallbacks
+ def test_tooManyConnections(self):
+ """
+ When the number of outstanding busy transactions exceeds the number of
+ slots specified by L{ConnectionPool.maxConnections},
+ L{ConnectionPool.connection} will return a L{PooledSqlTxn} that is not
+ backed by any L{BaseSqlTxn}; this object will queue its SQL statements
+ until an existing connection becomes available.
+ """
+ cf = ConnectionFactory()
+ cp = ConnectionPool(cf.connect, maxConnections=2)
+ cp.startService()
+ self.addCleanup(cp.stopService)
+ a = cp.connection()
+ [[counter, echo]] = yield a.execSQL("alpha")
+ b = cp.connection()
+ [[bcounter, becho]] = yield b.execSQL("beta")
+
+ # both 'a' and 'b' are holding open a connection now; let's try to open
+ # a third one. (The ordering will be deterministic even if this fails,
+ # because those threads are already busy.)
+ c = cp.connection()
+ enqueue = c.execSQL("gamma")
+ x = []
+ def addtox(it):
+ x.append(it)
+ return it
+ enqueue.addCallback(addtox)
+
+ # Did 'c' open a connection? Let's hope not...
+ self.assertEquals(len(cf.connections), 2)
+ # This assertion is _not_ deterministic, unfortunately; it's unlikely
+ # that the implementation could be adjusted such that this assertion
+ # would fail and the others would succeed. However, if it does fail,
+ # that's really bad, so I am leaving it regardless.
+ self.failIf(bool(x), "SQL executed too soon!")
+ yield b.commit()
+
+ # Now that 'b' has committed, 'c' should be able to complete.
+ [[ccounter, cecho]] = yield enqueue
+
+ # The connection for 'a' ought to be busy, so let's make sure we're
+ # using the one for 'c'.
+ self.assertEquals(ccounter, bcounter)
+
+
Added: CalendarServer/trunk/twext/internet/threadutils.py
===================================================================
--- CalendarServer/trunk/twext/internet/threadutils.py (rev 0)
+++ CalendarServer/trunk/twext/internet/threadutils.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,111 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import sys
+from Queue import Queue
+
+
+from twisted.python.failure import Failure
+from twisted.internet.defer import Deferred
+
+
+_DONE = object()
+
+_STATE_STOPPED = 'STOPPED'
+_STATE_RUNNING = 'RUNNING'
+_STATE_STOPPING = 'STOPPING'
+
+class ThreadHolder(object):
+ """
+ A queue which will hold a reactor threadpool thread open until all of the
+ work in that queue is done.
+ """
+
+ def __init__(self, reactor):
+ self._reactor = reactor
+ self._state = _STATE_STOPPED
+ self._stopper = None
+ self._q = None
+
+
+ def _run(self):
+ """
+ Worker function which runs in a non-reactor thread.
+ """
+ while True:
+ work = self._q.get()
+ if work is _DONE:
+ def finishStopping():
+ self._state = _STATE_STOPPED
+ self._q = None
+ s = self._stopper
+ self._stopper = None
+ s.callback(None)
+ self._reactor.callFromThread(finishStopping)
+ return
+ self._oneWorkUnit(*work)
+
+
+ def _oneWorkUnit(self, deferred, instruction):
+ try:
+ result = instruction()
+ except:
+ etype, evalue, etb = sys.exc_info()
+ def relayFailure():
+ f = Failure(evalue, etype, etb)
+ deferred.errback(f)
+ self._reactor.callFromThread(relayFailure)
+ else:
+ self._reactor.callFromThread(deferred.callback, result)
+
+
+ def submit(self, work):
+ """
+ Submit some work to be run.
+
+ @param work: a 0-argument callable, which will be run in a thread.
+
+ @return: L{Deferred} that fires with the result of L{work}
+ """
+ d = Deferred()
+ self._q.put((d, work))
+ return d
+
+
+ def start(self):
+ """
+ Start this thing, if it's stopped.
+ """
+ if self._state != _STATE_STOPPED:
+ raise RuntimeError("Not stopped.")
+ self._state = _STATE_RUNNING
+ self._q = Queue(0)
+ self._reactor.callInThread(self._run)
+
+
+ def stop(self):
+ """
+ Stop this thing and release its thread, if it's running.
+ """
+ if self._state != _STATE_RUNNING:
+ raise RuntimeError("Not running.")
+ s = self._stopper = Deferred()
+ self._state = _STATE_STOPPING
+ self._q.put(_DONE)
+ return s
+
+
+
Added: CalendarServer/trunk/twext/python/clsprop.py
===================================================================
--- CalendarServer/trunk/twext/python/clsprop.py (rev 0)
+++ CalendarServer/trunk/twext/python/clsprop.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,41 @@
+##
+# Copyright (c) 2011 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+A small utility for defining static class properties.
+"""
+
+class classproperty(object):
+ """
+ Decorator for a method that wants to return a static class property. The
+ decorated method will only be invoked once, for each class, and that value
+ will be returned for that class.
+ """
+
+ def __init__(self, thunk):
+ self.thunk = thunk
+ self._classcache = {}
+
+
+ def __get__(self, instance, owner):
+ cc = self._classcache
+ if owner in cc:
+ cached = cc[owner]
+ else:
+ cached = self.thunk(owner)
+ cc[owner] = cached
+ return cached
+
Modified: CalendarServer/trunk/twistedcaldav/directory/util.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/directory/util.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/twistedcaldav/directory/util.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -1,3 +1,4 @@
+# -*- test-case-name: twistedcaldav.directory.test.test_util -*-
##
# Copyright (c) 2006-2007 Apple Inc. All rights reserved.
#
@@ -22,7 +23,7 @@
"uuidFromName",
]
-from txdav.idav import AlreadyFinishedError
+from twext.enterprise.ienterprise import AlreadyFinishedError
from uuid import UUID, uuid5
Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -170,7 +170,7 @@
"DBAMPFD" : 0, # Internally used by database to tell slave
# processes to inherit a file descriptor and use it
# as an AMP connection over a UNIX socket; see
- # txdav.base.datastore.asyncsqlpool.
+ # twext.enterprise.adbapi2.ConnectionPoolConnection
"SharedConnectionPool" : False, # Use a shared database connection pool in
# the master process, rather than having
Modified: CalendarServer/trunk/twistedcaldav/test/test_wrapping.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_wrapping.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/twistedcaldav/test/test_wrapping.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -22,7 +22,7 @@
from twext.web2.server import Request
from twext.web2.responsecode import UNAUTHORIZED
from twext.web2.http_headers import Headers
-from txdav.idav import AlreadyFinishedError, IDataStore
+from twext.enterprise.ienterprise import AlreadyFinishedError
from twext.web2.dav import davxml
from twistedcaldav.config import config
@@ -37,6 +37,7 @@
from twistedcaldav.test.util import TestCase
+from txdav.idav import IDataStore
from txdav.caldav.datastore.test.test_file import event4_text
from txdav.carddav.datastore.test.test_file import vcard4_text
Deleted: CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/base/datastore/asyncsqlpool.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -1,653 +0,0 @@
-# -*- test-case-name: txdav.caldav.datastore -*-
-##
-# Copyright (c) 2010 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-"""
-Asynchronous multi-process connection pool.
-"""
-
-import sys
-
-from cStringIO import StringIO
-from cPickle import dumps, loads
-from itertools import count
-
-from zope.interface import implements
-
-from twisted.internet.defer import inlineCallbacks
-from twisted.internet.defer import returnValue
-from txdav.idav import IAsyncTransaction
-from twisted.internet.defer import Deferred
-from twisted.protocols.amp import Boolean
-from twisted.python.failure import Failure
-from twisted.protocols.amp import Argument, String, Command, AMP, Integer
-from twisted.internet import reactor as _reactor
-from twisted.application.service import Service
-from txdav.base.datastore.threadutils import ThreadHolder
-from txdav.idav import AlreadyFinishedError
-from twisted.python import log
-from twisted.internet.defer import maybeDeferred
-from twisted.python.components import proxyForInterface
-
-
-
-class BaseSqlTxn(object):
- """
- L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
- current process.
- """
- implements(IAsyncTransaction)
-
- def __init__(self, connectionFactory, reactor=_reactor):
- """
- @param connectionFactory: A 0-argument callable which returns a DB-API
- 2.0 connection.
- """
- self._completed = False
- self._cursor = None
- self._holder = ThreadHolder(reactor)
- self._holder.start()
-
- def initCursor():
- # support threadlevel=1; we can't necessarily cursor() in a
- # different thread than we do transactions in.
-
- # TODO: Re-try connect when it fails. Specify a timeout. That
- # should happen in this layer because we need to be able to stop
- # the reconnect attempt if it's hanging.
- self._connection = connectionFactory()
- self._cursor = self._connection.cursor()
-
- # Note: no locking necessary here; since this gets submitted first, all
- # subsequent submitted work-units will be in line behind it and the
- # cursor will already have been initialized.
- self._holder.submit(initCursor).addErrback(log.err)
-
-
- def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
- if args is None:
- args = []
- self._cursor.execute(sql, args)
- if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
- raise raiseOnZeroRowCount()
- if self._cursor.description:
- return self._cursor.fetchall()
- else:
- return None
-
-
- noisy = False
-
- def execSQL(self, *args, **kw):
- result = self._holder.submit(
- lambda : self._reallyExecSQL(*args, **kw)
- )
- if self.noisy:
- def reportResult(results):
- sys.stdout.write("\n".join([
- "",
- "SQL: %r %r" % (args, kw),
- "Results: %r" % (results,),
- "",
- ]))
- return results
- result.addBoth(reportResult)
- return result
-
-
- def commit(self):
- if not self._completed:
- self._completed = True
- def reallyCommit():
- self._connection.commit()
- result = self._holder.submit(reallyCommit)
- return result
- else:
- raise AlreadyFinishedError()
-
-
- def abort(self):
- if not self._completed:
- self._completed = True
- def reallyAbort():
- self._connection.rollback()
- result = self._holder.submit(reallyAbort)
- return result
- else:
- raise AlreadyFinishedError()
-
-
- def __del__(self):
- if not self._completed:
- print 'BaseSqlTxn.__del__: OK'
- self.abort()
-
-
- def reset(self):
- """
- Call this when placing this transaction back into the pool.
-
- @raise RuntimeError: if the transaction has not been committed or
- aborted.
- """
- if not self._completed:
- raise RuntimeError("Attempt to re-set active transaction.")
- self._completed = False
-
-
- def stop(self):
- """
- Release the thread and database connection associated with this
- transaction.
- """
- self._completed = True
- self._stopped = True
- holder = self._holder
- self._holder = None
- holder.submit(self._connection.close)
- return holder.stop()
-
-
-
-class SpooledTxn(object):
- """
- A L{SpooledTxn} is an implementation of L{IAsyncTransaction} which cannot
- yet actually execute anything, so it spools SQL reqeusts for later
- execution. When a L{BaseSqlTxn} becomes available later, it can be
- unspooled onto that.
- """
-
- implements(IAsyncTransaction)
-
- def __init__(self):
- self._spool = []
-
-
- def _enspool(self, cmd, a=(), kw={}):
- d = Deferred()
- self._spool.append((d, cmd, a, kw))
- return d
-
-
- def _iterDestruct(self):
- """
- Iterate the spool list destructively, while popping items from the
- beginning. This allows code which executes more SQL in the callback of
- a Deferred to not interfere with the originally submitted order of
- commands.
- """
- while self._spool:
- yield self._spool.pop(0)
-
-
- def _unspool(self, other):
- """
- Unspool this transaction onto another transaction.
-
- @param other: another provider of L{IAsyncTransaction} which will
- actually execute the SQL statements we have been buffering.
- """
- for (d, cmd, a, kw) in self._iterDestruct():
- self._relayCommand(other, d, cmd, a, kw)
-
-
- def _relayCommand(self, other, d, cmd, a, kw):
- """
- Relay a single command to another transaction.
- """
- maybeDeferred(getattr(other, cmd), *a, **kw).chainDeferred(d)
-
-
- def execSQL(self, *a, **kw):
- return self._enspool('execSQL', a, kw)
-
-
- def commit(self):
- return self._enspool('commit')
-
-
- def abort(self):
- return self._enspool('abort')
-
-
-
-class PooledSqlTxn(proxyForInterface(iface=IAsyncTransaction,
- originalAttribute='_baseTxn')):
- """
- This is a temporary throw-away wrapper for the longer-lived BaseSqlTxn, so
- that if a badly-behaved API client accidentally hangs on to one of these
- and, for example C{.abort()}s it multiple times once another client is
- using that connection, it will get some harmless tracebacks.
- """
-
- def __init__(self, pool, baseTxn):
- self._pool = pool
- self._baseTxn = baseTxn
- self._complete = False
-
-
- def execSQL(self, *a, **kw):
- self._checkComplete()
- return super(PooledSqlTxn, self).execSQL(*a, **kw)
-
-
- def commit(self):
- self._markComplete()
- return self._repoolAfter(super(PooledSqlTxn, self).commit())
-
-
- def abort(self):
- self._markComplete()
- return self._repoolAfter(super(PooledSqlTxn, self).abort())
-
-
- def _checkComplete(self):
- """
- If the transaction is complete, raise L{AlreadyFinishedError}
- """
- if self._complete:
- raise AlreadyFinishedError()
-
-
- def _markComplete(self):
- """
- Mark the transaction as complete, raising AlreadyFinishedError.
- """
- self._checkComplete()
- self._complete = True
-
-
- def _repoolAfter(self, d):
- def repool(result):
- self._pool.reclaim(self)
- return result
- return d.addCallback(repool)
-
-
-
-class ConnectionPool(Service, object):
- """
- This is a central service that has a threadpool and executes SQL statements
- asynchronously, in a pool.
-
- @ivar connectionFactory: a 0-or-1-argument callable that returns a DB-API
- connection. The optional argument can be used as a label for
- diagnostic purposes.
-
- @ivar maxConnections: The connection pool will not attempt to make more
- than this many concurrent connections to the database.
-
- @type maxConnections: C{int}
- """
-
- reactor = _reactor
-
- def __init__(self, connectionFactory, maxConnections=10):
- super(ConnectionPool, self).__init__()
- self.free = []
- self.busy = []
- self.waiting = []
- self.connectionFactory = connectionFactory
- self.maxConnections = maxConnections
-
-
- def startService(self):
- """
- No startup necessary.
- """
-
-
- @inlineCallbacks
- def stopService(self):
- """
- Forcibly abort any outstanding transactions.
- """
- for busy in self.busy[:]:
- try:
- yield busy.abort()
- except:
- log.err()
- # all transactions should now be in the free list, since 'abort()' will
- # have put them there.
- for free in self.free:
- yield free.stop()
-
-
- def connection(self, label="<unlabeled>"):
- """
- Find a transaction; either retrieve a free one from the list or
- allocate a new one if no free ones are available.
-
- @return: an L{IAsyncTransaction}
- """
-
- overload = False
- if self.free:
- basetxn = self.free.pop(0)
- elif len(self.busy) < self.maxConnections:
- basetxn = BaseSqlTxn(
- connectionFactory=self.connectionFactory,
- reactor=self.reactor
- )
- else:
- basetxn = SpooledTxn()
- overload = True
- txn = PooledSqlTxn(self, basetxn)
- if overload:
- self.waiting.append(txn)
- else:
- self.busy.append(txn)
- return txn
-
-
- def reclaim(self, txn):
- """
- Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
- BaseSqlTxn into the free list.
- """
- baseTxn = txn._baseTxn
- baseTxn.reset()
- self.busy.remove(txn)
- if self.waiting:
- waiting = self.waiting.pop(0)
- waiting._baseTxn._unspool(baseTxn)
- # Note: although commit() may already have been called, we don't
- # have to handle it specially here. It only unspools after the
- # deferred returned by commit() has actually been called, and since
- # that occurs in a callFromThread, that won't happen until the next
- # iteration of the mainloop, when the _baseTxn is safely correct.
- waiting._baseTxn = baseTxn
- self.busy.append(waiting)
- else:
- self.free.append(baseTxn)
-
-
-
-def txnarg():
- return [('transactionID', Integer())]
-
-
-CHUNK_MAX = 0xffff
-
-class BigArgument(Argument):
- """
- An argument whose payload can be larger than L{CHUNK_MAX}, by splitting
- across multiple AMP keys.
- """
- def fromBox(self, name, strings, objects, proto):
- value = StringIO()
- for counter in count():
- chunk = strings.get("%s.%d" % (name, counter))
- if chunk is None:
- break
- value.write(chunk)
- objects[name] = self.fromString(value.getvalue())
-
-
- def toBox(self, name, strings, objects, proto):
- value = StringIO(self.toString(objects[name]))
- for counter in count():
- nextChunk = value.read(CHUNK_MAX)
- if not nextChunk:
- break
- strings["%s.%d" % (name, counter)] = nextChunk
-
-
-
-class Pickle(BigArgument):
- """
- A pickle sent over AMP. This is to serialize the 'args' argument to
- C{execSQL}, which is the dynamically-typed 'args' list argument to a DB-API
- C{execute} function, as well as its dynamically-typed result ('rows').
-
- This should be cleaned up into a nicer structure, but this is not a network
- protocol, so we can be a little relaxed about security.
-
- This is a L{BigArgument} rather than a regular L{Argument} because
- individual arguments and query results need to contain entire vCard or
- iCalendar documents, which can easily be greater than 64k.
- """
-
- def toString(self, inObject):
- return dumps(inObject)
-
- def fromString(self, inString):
- return loads(inString)
-
-
-
-
-class StartTxn(Command):
- """
- Start a transaction, identified with an ID generated by the client.
- """
- arguments = txnarg()
-
-
-
-class ExecSQL(Command):
- """
- Execute an SQL statement.
- """
- arguments = [('sql', String()),
- ('queryID', String()),
- ('args', Pickle())] + txnarg()
-
-
-
-class Row(Command):
- """
- A row has been returned. Sent from server to client in response to
- L{ExecSQL}.
- """
-
- arguments = [('queryID', String()),
- ('row', Pickle())]
-
-
-
-class QueryComplete(Command):
- """
- A query issued with ExecSQL is complete.
- """
-
- arguments = [('queryID', String()),
- ('norows', Boolean())]
-
-
-
-class Commit(Command):
- arguments = txnarg()
-
-
-
-class Abort(Command):
- arguments = txnarg()
-
-
-
-class _NoRows(Exception):
- """
- Placeholder exception to report zero rows.
- """
-
-
-class ConnectionPoolConnection(AMP):
- """
- A L{ConnectionPoolConnection} is a single connection to a
- L{ConnectionPool}.
- """
-
- def __init__(self, pool):
- """
- Initialize a mapping of transaction IDs to transaction objects.
- """
- super(ConnectionPoolConnection, self).__init__()
- self.pool = pool
- self._txns = {}
-
-
- @StartTxn.responder
- def start(self, transactionID):
- self._txns[transactionID] = self.pool.connection()
- return {}
-
-
- @ExecSQL.responder
- @inlineCallbacks
- def receivedSQL(self, transactionID, queryID, sql, args):
- try:
- rows = yield self._txns[transactionID].execSQL(sql, args, _NoRows)
- except _NoRows:
- norows = True
- else:
- norows = False
- if rows is not None:
- for row in rows:
- # Either this should be yielded or it should be
- # requiresAnswer=False
- self.callRemote(Row, queryID=queryID, row=row)
- self.callRemote(QueryComplete, queryID=queryID, norows=norows)
- returnValue({})
-
-
- def _complete(self, transactionID, thunk):
- txn = self._txns.pop(transactionID)
- return thunk(txn).addCallback(lambda ignored: {})
-
-
- @Commit.responder
- def commit(self, transactionID):
- """
- Successfully complete the given transaction.
- """
- return self._complete(transactionID, lambda x: x.commit())
-
-
- @Abort.responder
- def abort(self, transactionID):
- """
- Roll back the given transaction.
- """
- return self._complete(transactionID, lambda x: x.abort())
-
-
-
-class ConnectionPoolClient(AMP):
- """
- A client which can execute SQL.
- """
- def __init__(self):
- super(ConnectionPoolClient, self).__init__()
- self._nextID = count().next
- self._txns = {}
- self._queries = {}
-
-
- def newTransaction(self):
- txnid = str(self._nextID())
- self.callRemote(StartTxn, transactionID=txnid)
- txn = Transaction(client=self, transactionID=txnid)
- self._txns[txnid] = txn
- return txn
-
-
- @Row.responder
- def row(self, queryID, row):
- self._queries[queryID].row(row)
- return {}
-
-
- @QueryComplete.responder
- def complete(self, queryID, norows):
- self._queries.pop(queryID).done(norows)
- return {}
-
-
-
-class _Query(object):
- def __init__(self, raiseOnZeroRowCount):
- self.results = []
- self.deferred = Deferred()
- self.raiseOnZeroRowCount = raiseOnZeroRowCount
-
-
- def row(self, row):
- """
- A row was received.
- """
- self.results.append(row)
-
-
- def done(self, norows):
- """
- The query is complete.
-
- @param norows: A boolean. True if there were not any rows.
- """
- if norows and (self.raiseOnZeroRowCount is not None):
- exc = self.raiseOnZeroRowCount()
- self.deferred.errback(Failure(exc))
- else:
- self.deferred.callback(self.results)
-
-
-
-
-class Transaction(object):
- """
- Async protocol-based transaction implementation.
- """
-
- implements(IAsyncTransaction)
-
- def __init__(self, client, transactionID):
- """
- Initialize a transaction with a L{ConnectionPoolClient} and a unique
- transaction identifier.
- """
- self._client = client
- self._transactionID = transactionID
- self._completed = False
-
-
- def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
- if args is None:
- args = []
- queryID = str(self._client._nextID())
- query = self._client._queries[queryID] = _Query(raiseOnZeroRowCount)
- self._client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args,
- transactionID=self._transactionID)
- return query.deferred
-
-
- def _complete(self, command):
- if self._completed:
- raise AlreadyFinishedError()
- self._completed = True
- return self._client.callRemote(
- command, transactionID=self._transactionID
- ).addCallback(lambda x: None)
-
-
- def commit(self):
- return self._complete(Commit)
-
-
- def abort(self):
- return self._complete(Abort)
-
-
Modified: CalendarServer/trunk/txdav/base/datastore/file.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/file.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/base/datastore/file.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -21,8 +21,8 @@
"""
from twext.python.log import LoggingMixIn
+from twext.enterprise.ienterprise import AlreadyFinishedError
from txdav.idav import IDataStoreResource
-from txdav.idav import AlreadyFinishedError
from txdav.base.propertystore.base import PropertyName
from twext.web2.dav.element.rfc2518 import GETContentType
Deleted: CalendarServer/trunk/txdav/base/datastore/test/test_asyncsqlpool.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/test/test_asyncsqlpool.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/base/datastore/test/test_asyncsqlpool.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -1,174 +0,0 @@
-# -*- test-case-name: txdav.caldav.datastore -*-
-##
-# Copyright (c) 2010 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-"""
-Tests for L{txdav.base.datastore.asyncsqlpool}.
-"""
-
-from itertools import count
-from twisted.trial.unittest import TestCase
-
-from txdav.base.datastore.asyncsqlpool import ConnectionPool
-from twisted.internet.defer import inlineCallbacks
-
-
-class Child(object):
- def __init__(self, parent):
- self.parent = parent
- self.parent.children.append(self)
-
- def close(self):
- self.parent.children.remove(self)
-
-
-
-class Parent(object):
-
- def __init__(self):
- self.children = []
-
-
-
-class FakeConnection(Parent, Child):
- """
- Fake Stand-in for DB-API 2.0 connection.
- """
-
- def __init__(self, factory):
- """
- Initialize list of cursors
- """
- Parent.__init__(self)
- Child.__init__(self, factory)
- self.id = factory.idcounter.next()
-
-
- @property
- def cursors(self):
- "Alias to make tests more readable."
- return self.children
-
-
- def cursor(self):
- return FakeCursor(self)
-
-
- def commit(self):
- return
-
-
- def rollback(self):
- return
-
-
-
-class FakeCursor(Child):
- """
- Fake stand-in for a DB-API 2.0 cursor.
- """
- def __init__(self, connection):
- Child.__init__(self, connection)
- self.rowcount = 0
- # not entirely correct, but all we care about is its truth value.
- self.description = False
-
-
- @property
- def connection(self):
- "Alias to make tests more readable."
- return self.parent
-
-
- def execute(self, sql, args=()):
- self.sql = sql
- self.description = True
- self.rowcount = 1
- return
-
-
- def fetchall(self):
- """
- Just echo the SQL that was executed in the last query.
- """
- return [[self.connection.id, self.sql]]
-
-
-
-class ConnectionFactory(Parent):
- def __init__(self):
- Parent.__init__(self)
- self.idcounter = count(1)
-
- @property
- def connections(self):
- "Alias to make tests more readable."
- return self.children
-
-
- def connect(self):
- return FakeConnection(self)
-
-
-
-class ConnectionPoolTests(TestCase):
-
- @inlineCallbacks
- def test_tooManyConnections(self):
- """
- When the number of outstanding busy transactions exceeds the number of
- slots specified by L{ConnectionPool.maxConnections},
- L{ConnectionPool.connection} will return a L{PooledSqlTxn} that is not
- backed by any L{BaseSqlTxn}; this object will queue its SQL statements
- until an existing connection becomes available.
- """
- cf = ConnectionFactory()
- cp = ConnectionPool(cf.connect, maxConnections=2)
- cp.startService()
- self.addCleanup(cp.stopService)
- a = cp.connection()
- [[counter, echo]] = yield a.execSQL("alpha")
- b = cp.connection()
- [[bcounter, becho]] = yield b.execSQL("beta")
-
- # both 'a' and 'b' are holding open a connection now; let's try to open
- # a third one. (The ordering will be deterministic even if this fails,
- # because those threads are already busy.)
- c = cp.connection()
- enqueue = c.execSQL("gamma")
- x = []
- def addtox(it):
- x.append(it)
- return it
- enqueue.addCallback(addtox)
-
- # Did 'c' open a connection? Let's hope not...
- self.assertEquals(len(cf.connections), 2)
- # This assertion is _not_ deterministic, unfortunately; it's unlikely
- # that the implementation could be adjusted such that this assertion
- # would fail and the others would succeed. However, if it does fail,
- # that's really bad, so I am leaving it regardless.
- self.failIf(bool(x), "SQL executed too soon!")
- yield b.commit()
-
- # Now that 'b' has committed, 'c' should be able to complete.
- [[ccounter, cecho]] = yield enqueue
-
- # The connection for 'a' ought to be busy, so let's make sure we're
- # using the one for 'c'.
- self.assertEquals(ccounter, bcounter)
-
-
Deleted: CalendarServer/trunk/txdav/base/datastore/threadutils.py
===================================================================
--- CalendarServer/trunk/txdav/base/datastore/threadutils.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/base/datastore/threadutils.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -1,111 +0,0 @@
-##
-# Copyright (c) 2010 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-import sys
-from Queue import Queue
-
-
-from twisted.python.failure import Failure
-from twisted.internet.defer import Deferred
-
-
-_DONE = object()
-
-_STATE_STOPPED = 'STOPPED'
-_STATE_RUNNING = 'RUNNING'
-_STATE_STOPPING = 'STOPPING'
-
-class ThreadHolder(object):
- """
- A queue which will hold a reactor threadpool thread open until all of the
- work in that queue is done.
- """
-
- def __init__(self, reactor):
- self._reactor = reactor
- self._state = _STATE_STOPPED
- self._stopper = None
- self._q = None
-
-
- def _run(self):
- """
- Worker function which runs in a non-reactor thread.
- """
- while True:
- work = self._q.get()
- if work is _DONE:
- def finishStopping():
- self._state = _STATE_STOPPED
- self._q = None
- s = self._stopper
- self._stopper = None
- s.callback(None)
- self._reactor.callFromThread(finishStopping)
- return
- self._oneWorkUnit(*work)
-
-
- def _oneWorkUnit(self, deferred, instruction):
- try:
- result = instruction()
- except:
- etype, evalue, etb = sys.exc_info()
- def relayFailure():
- f = Failure(evalue, etype, etb)
- deferred.errback(f)
- self._reactor.callFromThread(relayFailure)
- else:
- self._reactor.callFromThread(deferred.callback, result)
-
-
- def submit(self, work):
- """
- Submit some work to be run.
-
- @param work: a 0-argument callable, which will be run in a thread.
-
- @return: L{Deferred} that fires with the result of L{work}
- """
- d = Deferred()
- self._q.put((d, work))
- return d
-
-
- def start(self):
- """
- Start this thing, if it's stopped.
- """
- if self._state != _STATE_STOPPED:
- raise RuntimeError("Not stopped.")
- self._state = _STATE_RUNNING
- self._q = Queue(0)
- self._reactor.callInThread(self._run)
-
-
- def stop(self):
- """
- Stop this thing and release its thread, if it's running.
- """
- if self._state != _STATE_RUNNING:
- raise RuntimeError("Not running.")
- s = self._stopper = Deferred()
- self._state = _STATE_STOPPING
- self._q.put(_DONE)
- return s
-
-
-
Modified: CalendarServer/trunk/txdav/caldav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/sql.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/caldav/datastore/sql.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -56,7 +56,7 @@
_ATTACHMENTS_MODE_NONE, _ATTACHMENTS_MODE_READ, _ATTACHMENTS_MODE_WRITE,\
CALENDAR_HOME_TABLE, CALENDAR_HOME_METADATA_TABLE,\
CALENDAR_AND_CALENDAR_BIND, CALENDAR_OBJECT_REVISIONS_AND_BIND_TABLE,\
- CALENDAR_OBJECT_AND_BIND_TABLE
+ CALENDAR_OBJECT_AND_BIND_TABLE, schema
from txdav.common.icommondatastore import IndexedSearchException
from vobject.icalendar import utc
@@ -175,6 +175,7 @@
"""
implements(ICalendar)
+ _bindSchema = schema.CALENDAR_BIND
_bindTable = CALENDAR_BIND_TABLE
_homeChildTable = CALENDAR_TABLE
_homeChildBindTable = CALENDAR_AND_CALENDAR_BIND
Modified: CalendarServer/trunk/txdav/caldav/datastore/test/common.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/test/common.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/caldav/datastore/test/common.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -23,9 +23,16 @@
maybeDeferred
from twisted.internet.protocol import Protocol
-from txdav.idav import IPropertyStore, IDataStore, AlreadyFinishedError
+from twext.enterprise.ienterprise import AlreadyFinishedError
+
+from twext.python.filepath import CachingFilePath as FilePath
+from twext.web2.dav import davxml
+from twext.web2.http_headers import MimeType
+from twext.web2.dav.element.base import WebDAVUnknownElement
+from twext.python.vcomponent import VComponent
+
+from txdav.idav import IPropertyStore, IDataStore
from txdav.base.propertystore.base import PropertyName
-
from txdav.common.icommondatastore import HomeChildNameAlreadyExistsError, \
ICommonTransaction
from txdav.common.icommondatastore import InvalidObjectResourceError
@@ -39,11 +46,6 @@
ICalendarObject, ICalendarHome,
ICalendar, IAttachment, ICalendarTransaction)
-from twext.python.filepath import CachingFilePath as FilePath
-from twext.web2.dav import davxml
-from twext.web2.http_headers import MimeType
-from twext.web2.dav.element.base import WebDAVUnknownElement
-from twext.python.vcomponent import VComponent
from twistedcaldav.customxml import InviteNotification, InviteSummary
from twistedcaldav.ical import Component
Modified: CalendarServer/trunk/txdav/carddav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/carddav/datastore/sql.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/carddav/datastore/sql.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -50,8 +50,8 @@
ADDRESSBOOK_BIND_TABLE, ADDRESSBOOK_OBJECT_REVISIONS_TABLE,\
ADDRESSBOOK_OBJECT_TABLE, ADDRESSBOOK_HOME_TABLE,\
ADDRESSBOOK_HOME_METADATA_TABLE, ADDRESSBOOK_AND_ADDRESSBOOK_BIND,\
- ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE,\
- ADDRESSBOOK_OBJECT_AND_BIND_TABLE
+ ADDRESSBOOK_OBJECT_AND_BIND_TABLE, \
+ ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE, schema
from txdav.base.propertystore.base import PropertyName
@@ -94,6 +94,7 @@
"""
implements(IAddressBook)
+ _bindSchema = schema.ADDRESSBOOK_BIND
_bindTable = ADDRESSBOOK_BIND_TABLE
_homeChildTable = ADDRESSBOOK_TABLE
_homeChildBindTable = ADDRESSBOOK_AND_ADDRESSBOOK_BIND
Modified: CalendarServer/trunk/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/common/datastore/sql.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -57,6 +57,10 @@
from txdav.common.inotifications import INotificationCollection, \
INotificationObject
+from twext.enterprise.dal.syntax import Parameter
+from twext.python.clsprop import classproperty
+from twext.enterprise.dal.syntax import Select
+
from txdav.base.propertystore.sql import PropertyStore
from txdav.base.propertystore.base import PropertyName
@@ -211,6 +215,7 @@
CommonStoreTransaction._homeClass[ECALENDARTYPE] = CalendarHome
CommonStoreTransaction._homeClass[EADDRESSBOOKTYPE] = AddressBookHome
self._sqlTxn = sqlTxn
+ self.paramstyle = sqlTxn.paramstyle
def store(self):
@@ -889,7 +894,7 @@
]
))
revisions = dict(revisions)
-
+
# Create the actual objects merging in properties
for resource_id, resource_name, created, modified in dataRows:
child = cls(home, resource_name, resource_id, owned)
@@ -898,10 +903,48 @@
child._syncTokenRevision = revisions[resource_id]
yield child._loadPropertyStore(propertyStores.get(resource_id, None))
results.append(child)
-
returnValue(results)
+
@classmethod
+ def _objectResourceLookup(cls, ownedPart):
+ """
+ Common portions of C{_ownedResourceIDByName}
+ C{_resourceIDSharedToHomeByName}, except for the 'owned' fragment of the
+ Where clause, supplied as an argument.
+ """
+ bind = cls._bindSchema
+ return Select(
+ [bind.RESOURCE_ID],
+ From=bind,
+ Where=(bind.RESOURCE_NAME == Parameter('objectName')).And(
+ bind.HOME_RESOURCE_ID == Parameter('homeID')).And(
+ ownedPart))
+
+
+ @classproperty
+ def _resourceIDOwnedByHomeByName(cls):
+ """
+ DAL query to look up an object resource ID owned by a home, given a
+ resource name (C{objectName}), and a home resource ID
+ (C{homeID}).
+ """
+ return cls._objectResourceLookup(
+ cls._bindSchema.BIND_MODE == _BIND_MODE_OWN)
+
+
+ @classproperty
+ def _resourceIDSharedToHomeByName(cls):
+ """
+ DAL query to look up an object resource ID shared to a home, given a
+ resource name (C{objectName}), and a home resource ID
+ (C{homeID}).
+ """
+ return cls._objectResourceLookup(
+ cls._bindSchema.BIND_MODE != _BIND_MODE_OWN)
+
+
+ @classmethod
@inlineCallbacks
def objectWithName(cls, home, name, owned):
"""
@@ -911,39 +954,15 @@
@param home: a L{CommonHome}.
@param name: a string.
@param owned: a boolean - whether or not to get a shared child
- @return: an L{CommonHomChild} or C{None} if no such child
+ @return: an L{CommonHomeChild} or C{None} if no such child
exists.
"""
-
if owned:
- data = yield home._txn.execSQL("""
- select %(column_RESOURCE_ID)s from %(name)s
- where
- %(column_RESOURCE_NAME)s = %%s and
- %(column_HOME_RESOURCE_ID)s = %%s and
- %(column_BIND_MODE)s = %%s
- """ % cls._bindTable,
- [
- name,
- home._resourceID,
- _BIND_MODE_OWN
- ]
- )
+ query = cls._resourceIDOwnedByHomeByName
else:
- data = yield home._txn.execSQL("""
- select %(column_RESOURCE_ID)s from %(name)s
- where
- %(column_RESOURCE_NAME)s = %%s and
- %(column_HOME_RESOURCE_ID)s = %%s and
- %(column_BIND_MODE)s != %%s
- """ % cls._bindTable,
- [
- name,
- home._resourceID,
- _BIND_MODE_OWN
- ]
- )
-
+ query = cls._resourceIDSharedToHomeByName
+ data = yield query.on(home._txn,
+ objectName=name, homeID=home._resourceID)
if not data:
returnValue(None)
resourceID = data[0][0]
@@ -951,6 +970,7 @@
yield child.initFromStore()
returnValue(child)
+
@classmethod
@inlineCallbacks
def objectWithID(cls, home, resourceID):
Modified: CalendarServer/trunk/txdav/common/datastore/sql_tables.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/sql_tables.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/common/datastore/sql_tables.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -19,174 +19,177 @@
SQL Table definitions.
"""
-CALENDAR_HOME_TABLE = {
- "name" : "CALENDAR_HOME",
- "column_RESOURCE_ID" : "RESOURCE_ID",
- "column_OWNER_UID" : "OWNER_UID",
-}
+from twisted.python.modules import getModule
+from twext.enterprise.dal.syntax import SchemaSyntax
+from twext.enterprise.dal.parseschema import schemaFromPath
-CALENDAR_HOME_METADATA_TABLE = {
- "name" : "CALENDAR_HOME_METADATA",
- "column_RESOURCE_ID" : "RESOURCE_ID",
- "column_QUOTA_USED_BYTES" : "QUOTA_USED_BYTES",
-}
-ADDRESSBOOK_HOME_TABLE = {
- "name" : "ADDRESSBOOK_HOME",
- "column_RESOURCE_ID" : "RESOURCE_ID",
- "column_OWNER_UID" : "OWNER_UID",
-}
-ADDRESSBOOK_HOME_METADATA_TABLE = {
- "name" : "ADDRESSBOOK_HOME_METADATA",
- "column_RESOURCE_ID" : "RESOURCE_ID",
- "column_QUOTA_USED_BYTES" : "QUOTA_USED_BYTES",
-}
+def _populateSchema():
+ """
+ Generate the global L{SchemaSyntax}.
+ """
+ pathObj = getModule(__name__).filePath.sibling("sql_schema_v1.sql")
+ return SchemaSyntax(schemaFromPath(pathObj))
-NOTIFICATION_HOME_TABLE = {
- "name" : "NOTIFICATION_HOME",
- "column_RESOURCE_ID" : "RESOURCE_ID",
- "column_OWNER_UID" : "OWNER_UID",
-}
-CALENDAR_TABLE = {
- "name" : "CALENDAR",
- "column_RESOURCE_ID" : "RESOURCE_ID",
- "column_CREATED" : "CREATED",
- "column_MODIFIED" : "MODIFIED",
-}
-ADDRESSBOOK_TABLE = {
- "name" : "ADDRESSBOOK",
- "column_RESOURCE_ID" : "RESOURCE_ID",
- "column_CREATED" : "CREATED",
- "column_MODIFIED" : "MODIFIED",
-}
+schema = _populateSchema()
-CALENDAR_BIND_TABLE = {
- "name" : "CALENDAR_BIND",
- "column_HOME_RESOURCE_ID" : "CALENDAR_HOME_RESOURCE_ID",
- "column_RESOURCE_ID" : "CALENDAR_RESOURCE_ID",
- "column_RESOURCE_NAME" : "CALENDAR_RESOURCE_NAME",
- "column_BIND_MODE" : "BIND_MODE",
- "column_BIND_STATUS" : "BIND_STATUS",
- "column_SEEN_BY_OWNER" : "SEEN_BY_OWNER",
- "column_SEEN_BY_SHAREE" : "SEEN_BY_SHAREE",
- "column_MESSAGE" : "MESSAGE",
-}
-ADDRESSBOOK_BIND_TABLE = {
- "name" : "ADDRESSBOOK_BIND",
- "column_HOME_RESOURCE_ID" : "ADDRESSBOOK_HOME_RESOURCE_ID",
- "column_RESOURCE_ID" : "ADDRESSBOOK_RESOURCE_ID",
- "column_RESOURCE_NAME" : "ADDRESSBOOK_RESOURCE_NAME",
- "column_BIND_MODE" : "BIND_MODE",
- "column_BIND_STATUS" : "BIND_STATUS",
- "column_SEEN_BY_OWNER" : "SEEN_BY_OWNER",
- "column_SEEN_BY_SHAREE" : "SEEN_BY_SHAREE",
- "column_MESSAGE" : "MESSAGE",
-}
+# Column aliases, defined so that similar tables (such as CALENDAR_OBJECT and
+# ADDRESSBOOK_OBJECT) can be used according to a polymorphic interface.
-CALENDAR_OBJECT_REVISIONS_TABLE = {
- "name" : "CALENDAR_OBJECT_REVISIONS",
- "sequence" : "REVISION_SEQ",
- "column_HOME_RESOURCE_ID" : "CALENDAR_HOME_RESOURCE_ID",
- "column_RESOURCE_ID" : "CALENDAR_RESOURCE_ID",
- "column_COLLECTION_NAME" : "CALENDAR_NAME",
- "column_RESOURCE_NAME" : "RESOURCE_NAME",
- "column_REVISION" : "REVISION",
- "column_DELETED" : "DELETED",
-}
+schema.CALENDAR_BIND.RESOURCE_NAME = \
+ schema.CALENDAR_BIND.CALENDAR_RESOURCE_NAME
+schema.CALENDAR_BIND.RESOURCE_ID = \
+ schema.CALENDAR_BIND.CALENDAR_RESOURCE_ID
+schema.CALENDAR_BIND.HOME_RESOURCE_ID = \
+ schema.CALENDAR_BIND.CALENDAR_HOME_RESOURCE_ID
+schema.ADDRESSBOOK_BIND.RESOURCE_NAME = \
+ schema.ADDRESSBOOK_BIND.ADDRESSBOOK_RESOURCE_NAME
+schema.ADDRESSBOOK_BIND.RESOURCE_ID = \
+ schema.ADDRESSBOOK_BIND.ADDRESSBOOK_RESOURCE_ID
+schema.ADDRESSBOOK_BIND.HOME_RESOURCE_ID = \
+ schema.ADDRESSBOOK_BIND.ADDRESSBOOK_HOME_RESOURCE_ID
+schema.CALENDAR_OBJECT_REVISIONS.RESOURCE_ID = \
+ schema.CALENDAR_OBJECT_REVISIONS.CALENDAR_RESOURCE_ID
+schema.CALENDAR_OBJECT_REVISIONS.HOME_RESOURCE_ID = \
+ schema.CALENDAR_OBJECT_REVISIONS.CALENDAR_HOME_RESOURCE_ID
+schema.CALENDAR_OBJECT_REVISIONS.COLLECTION_NAME = \
+ schema.CALENDAR_OBJECT_REVISIONS.CALENDAR_NAME
+schema.ADDRESSBOOK_OBJECT_REVISIONS.RESOURCE_ID = \
+ schema.ADDRESSBOOK_OBJECT_REVISIONS.ADDRESSBOOK_RESOURCE_ID
+schema.ADDRESSBOOK_OBJECT_REVISIONS.HOME_RESOURCE_ID = \
+ schema.ADDRESSBOOK_OBJECT_REVISIONS.ADDRESSBOOK_HOME_RESOURCE_ID
+schema.ADDRESSBOOK_OBJECT_REVISIONS.COLLECTION_NAME = \
+ schema.ADDRESSBOOK_OBJECT_REVISIONS.ADDRESSBOOK_NAME
+schema.NOTIFICATION_OBJECT_REVISIONS.HOME_RESOURCE_ID = \
+ schema.NOTIFICATION_OBJECT_REVISIONS.NOTIFICATION_HOME_RESOURCE_ID
+schema.CALENDAR_OBJECT.TEXT = \
+ schema.CALENDAR_OBJECT.ICALENDAR_TEXT
+schema.CALENDAR_OBJECT.UID = \
+ schema.CALENDAR_OBJECT.ICALENDAR_UID
+schema.CALENDAR_OBJECT.PARENT_RESOURCE_ID = \
+ schema.CALENDAR_OBJECT.CALENDAR_RESOURCE_ID
+schema.ADDRESSBOOK_OBJECT.TEXT = \
+ schema.ADDRESSBOOK_OBJECT.VCARD_TEXT
+schema.ADDRESSBOOK_OBJECT.UID = \
+ schema.ADDRESSBOOK_OBJECT.VCARD_UID
+schema.ADDRESSBOOK_OBJECT.PARENT_RESOURCE_ID = \
+ schema.ADDRESSBOOK_OBJECT.ADDRESSBOOK_RESOURCE_ID
-ADDRESSBOOK_OBJECT_REVISIONS_TABLE = {
- "name" : "ADDRESSBOOK_OBJECT_REVISIONS",
- "sequence" : "REVISION_SEQ",
- "column_HOME_RESOURCE_ID" : "ADDRESSBOOK_HOME_RESOURCE_ID",
- "column_RESOURCE_ID" : "ADDRESSBOOK_RESOURCE_ID",
- "column_COLLECTION_NAME" : "ADDRESSBOOK_NAME",
- "column_RESOURCE_NAME" : "RESOURCE_NAME",
- "column_REVISION" : "REVISION",
- "column_DELETED" : "DELETED",
-}
-NOTIFICATION_OBJECT_REVISIONS_TABLE = {
- "name" : "NOTIFICATION_OBJECT_REVISIONS",
- "sequence" : "REVISION_SEQ",
- "column_HOME_RESOURCE_ID" : "NOTIFICATION_HOME_RESOURCE_ID",
- "column_RESOURCE_NAME" : "RESOURCE_NAME",
- "column_REVISION" : "REVISION",
- "column_DELETED" : "DELETED",
-}
-CALENDAR_OBJECT_TABLE = {
- "name" : "CALENDAR_OBJECT",
- "column_RESOURCE_ID" : "RESOURCE_ID",
- "column_PARENT_RESOURCE_ID" : "CALENDAR_RESOURCE_ID",
- "column_RESOURCE_NAME" : "RESOURCE_NAME",
- "column_TEXT" : "ICALENDAR_TEXT",
- "column_UID" : "ICALENDAR_UID",
- "column_ATTACHMENTS_MODE" : "ATTACHMENTS_MODE",
- "column_DROPBOX_ID" : "DROPBOX_ID",
- "column_ACCESS" : "ACCESS",
- "column_SCHEDULE_OBJECT" : "SCHEDULE_OBJECT",
- "column_SCHEDULE_TAG" : "SCHEDULE_TAG",
- "column_SCHEDULE_ETAGS" : "SCHEDULE_ETAGS",
- "column_PRIVATE_COMMENTS" : "PRIVATE_COMMENTS",
- "column_MD5" : "MD5",
- "column_CREATED" : "CREATED",
- "column_MODIFIED" : "MODIFIED",
-}
+def _combine(**kw):
+ """
+ Combine two table dictionaries used in a join to produce a single dictionary
+ that can be used in formatting.
+ """
+ result = {}
+ for tableRole, tableDictionary in kw.items():
+ result.update([("%s:%s" % (tableRole, k), v)
+ for k,v in tableDictionary.items()])
+ return result
-ADDRESSBOOK_OBJECT_TABLE = {
- "name" : "ADDRESSBOOK_OBJECT",
- "column_RESOURCE_ID" : "RESOURCE_ID",
- "column_PARENT_RESOURCE_ID" : "ADDRESSBOOK_RESOURCE_ID",
- "column_RESOURCE_NAME" : "RESOURCE_NAME",
- "column_TEXT" : "VCARD_TEXT",
- "column_UID" : "VCARD_UID",
- "column_MD5" : "MD5",
- "column_CREATED" : "CREATED",
- "column_MODIFIED" : "MODIFIED",
-}
+def _S(tableSyntax):
+ """
+ Construct a dictionary of strings from a L{TableSyntax} for those queries
+ that are still constructed via string interpolation.
+ """
+ result = {}
+ result['name'] = tableSyntax.model.name
+ #pkey = tableSyntax.model.primaryKey
+ #if pkey is not None:
+ # default = pkey.default
+ # if isinstance(default, Sequence):
+ # result['sequence'] = default.name
+ result['sequence'] = schema.model.sequenceNamed('REVISION_SEQ').name
+ for columnSyntax in tableSyntax:
+ result['column_' + columnSyntax.model.name] = columnSyntax.model.name
+ for alias, realColumnSyntax in tableSyntax.aliases().items():
+ result['column_' + alias] = realColumnSyntax.model.name
+ return result
+
+
+
+def _schemaConstants(nameColumn, valueColumn):
+ """
+ Get a constant value from the rows defined in the schema.
+ """
+ def get(name):
+ for row in nameColumn.model.table.schemaRows:
+ if row[nameColumn.model] == name:
+ return row[valueColumn.model]
+ return get
+
+
+
# Various constants
-_BIND_STATUS_INVITED = 0
-_BIND_STATUS_ACCEPTED = 1
-_BIND_STATUS_DECLINED = 2
-_BIND_STATUS_INVALID = 3
+_bindStatus = _schemaConstants(
+ schema.CALENDAR_BIND_STATUS.DESCRIPTION,
+ schema.CALENDAR_BIND_STATUS.ID
+)
-_ATTACHMENTS_MODE_NONE = 0
-_ATTACHMENTS_MODE_READ = 1
-_ATTACHMENTS_MODE_WRITE = 2
+_BIND_STATUS_INVITED = _bindStatus('invited')
+_BIND_STATUS_ACCEPTED = _bindStatus('accepted')
+_BIND_STATUS_DECLINED = _bindStatus('declined')
+_BIND_STATUS_INVALID = _bindStatus('invalid')
-_BIND_MODE_OWN = 0
-_BIND_MODE_READ = 1
-_BIND_MODE_WRITE = 2
-_BIND_MODE_DIRECT = 3
+_attachmentsMode = _schemaConstants(
+ schema.CALENDAR_OBJECT_ATTACHMENTS_MODE.DESCRIPTION,
+ schema.CALENDAR_OBJECT_ATTACHMENTS_MODE.ID
+)
-# Some combined tables used in joins
-CALENDAR_AND_CALENDAR_BIND = {}
-CALENDAR_AND_CALENDAR_BIND.update([("CHILD:%s" % (k,), v) for k,v in CALENDAR_TABLE.items()])
-CALENDAR_AND_CALENDAR_BIND.update([("BIND:%s" % (k,), v) for k,v in CALENDAR_BIND_TABLE.items()])
+_ATTACHMENTS_MODE_NONE = _attachmentsMode('none')
+_ATTACHMENTS_MODE_READ = _attachmentsMode('read')
+_ATTACHMENTS_MODE_WRITE = _attachmentsMode('write')
-CALENDAR_OBJECT_AND_BIND_TABLE = {}
-CALENDAR_OBJECT_AND_BIND_TABLE.update([("OBJECT:%s" % (k,), v) for k,v in CALENDAR_OBJECT_TABLE.items()])
-CALENDAR_OBJECT_AND_BIND_TABLE.update([("BIND:%s" % (k,), v) for k,v in CALENDAR_BIND_TABLE.items()])
-CALENDAR_OBJECT_REVISIONS_AND_BIND_TABLE = {}
-CALENDAR_OBJECT_REVISIONS_AND_BIND_TABLE.update([("REV:%s" % (k,), v) for k,v in CALENDAR_OBJECT_REVISIONS_TABLE.items()])
-CALENDAR_OBJECT_REVISIONS_AND_BIND_TABLE.update([("BIND:%s" % (k,), v) for k,v in CALENDAR_BIND_TABLE.items()])
+_bindMode = _schemaConstants(
+ schema.CALENDAR_BIND_MODE.DESCRIPTION,
+ schema.CALENDAR_BIND_MODE.ID
+)
-ADDRESSBOOK_AND_ADDRESSBOOK_BIND = {}
-ADDRESSBOOK_AND_ADDRESSBOOK_BIND.update([("CHILD:%s" % (k,), v) for k,v in ADDRESSBOOK_TABLE.items()])
-ADDRESSBOOK_AND_ADDRESSBOOK_BIND.update([("BIND:%s" % (k,), v) for k,v in ADDRESSBOOK_BIND_TABLE.items()])
-ADDRESSBOOK_OBJECT_AND_BIND_TABLE = {}
-ADDRESSBOOK_OBJECT_AND_BIND_TABLE.update([("OBJECT:%s" % (k,), v) for k,v in ADDRESSBOOK_OBJECT_TABLE.items()])
-ADDRESSBOOK_OBJECT_AND_BIND_TABLE.update([("BIND:%s" % (k,), v) for k,v in ADDRESSBOOK_BIND_TABLE.items()])
+_BIND_MODE_OWN = _bindMode('own')
+_BIND_MODE_READ = _bindMode('read')
+_BIND_MODE_WRITE = _bindMode('write')
+_BIND_MODE_DIRECT = _bindMode('direct')
-ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE = {}
-ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE.update([("REV:%s" % (k,), v) for k,v in ADDRESSBOOK_OBJECT_REVISIONS_TABLE.items()])
-ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE.update([("BIND:%s" % (k,), v) for k,v in ADDRESSBOOK_BIND_TABLE.items()])
+
+# Compatibility tables for string formatting:
+CALENDAR_HOME_TABLE = _S(schema.CALENDAR_HOME)
+CALENDAR_HOME_METADATA_TABLE = _S(schema.CALENDAR_HOME_METADATA)
+ADDRESSBOOK_HOME_TABLE = _S(schema.ADDRESSBOOK_HOME)
+ADDRESSBOOK_HOME_METADATA_TABLE = _S(schema.ADDRESSBOOK_HOME_METADATA)
+NOTIFICATION_HOME_TABLE = _S(schema.NOTIFICATION_HOME)
+CALENDAR_TABLE = _S(schema.CALENDAR)
+ADDRESSBOOK_TABLE = _S(schema.ADDRESSBOOK)
+CALENDAR_BIND_TABLE = _S(schema.CALENDAR_BIND)
+ADDRESSBOOK_BIND_TABLE = _S(schema.ADDRESSBOOK_BIND)
+CALENDAR_OBJECT_REVISIONS_TABLE = _S(schema.CALENDAR_OBJECT_REVISIONS)
+ADDRESSBOOK_OBJECT_REVISIONS_TABLE = _S(schema.ADDRESSBOOK_OBJECT_REVISIONS)
+NOTIFICATION_OBJECT_REVISIONS_TABLE = _S(schema.NOTIFICATION_OBJECT_REVISIONS)
+CALENDAR_OBJECT_TABLE = _S(schema.CALENDAR_OBJECT)
+ADDRESSBOOK_OBJECT_TABLE = _S(schema.ADDRESSBOOK_OBJECT)
+
+# Some combined tables used in join-string-formatting.
+CALENDAR_AND_CALENDAR_BIND = _combine(CHILD=CALENDAR_TABLE,
+ BIND=CALENDAR_BIND_TABLE)
+CALENDAR_OBJECT_AND_BIND_TABLE = _combine(OBJECT=CALENDAR_OBJECT_TABLE,
+ BIND=CALENDAR_BIND_TABLE)
+CALENDAR_OBJECT_REVISIONS_AND_BIND_TABLE = _combine(
+ REV=CALENDAR_OBJECT_REVISIONS_TABLE,
+ BIND=CALENDAR_BIND_TABLE)
+ADDRESSBOOK_AND_ADDRESSBOOK_BIND = _combine(CHILD=ADDRESSBOOK_TABLE,
+ BIND=ADDRESSBOOK_BIND_TABLE)
+ADDRESSBOOK_OBJECT_AND_BIND_TABLE = _combine(OBJECT=ADDRESSBOOK_OBJECT_TABLE,
+ BIND=ADDRESSBOOK_BIND_TABLE)
+ADDRESSBOOK_OBJECT_REVISIONS_AND_BIND_TABLE = _combine(
+ REV=ADDRESSBOOK_OBJECT_REVISIONS_TABLE,
+ BIND=ADDRESSBOOK_BIND_TABLE)
+
Added: CalendarServer/trunk/txdav/common/datastore/test/test_sql_tables.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/test/test_sql_tables.py (rev 0)
+++ CalendarServer/trunk/txdav/common/datastore/test/test_sql_tables.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -0,0 +1,38 @@
+# -*- test-case-name: txdav.caldav.datastore.test.test_sql -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for the SQL Table definitions in txdav.common.datastore.sql_tables: sample
+a couple of tables to make sure the schema is adequately parsed.
+
+These aren't unit tests, they're integration tests to verify the behavior tested
+by L{txdav.base.datastore.test.test_parseschema}.
+"""
+
+from txdav.common.datastore.sql_tables import schema
+from twisted.trial.unittest import TestCase
+
+class SampleSomeColumns(TestCase):
+ """
+ Sample some columns from the tables defined by L{schema} and verify that
+ they look correct.
+ """
+
+ def test_addressbookObjectResourceID(self):
+ self.assertEquals(schema.ADDRESSBOOK_OBJECT.RESOURCE_ID.model.name,
+ "RESOURCE_ID")
+
Modified: CalendarServer/trunk/txdav/common/datastore/test/util.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/test/util.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/common/datastore/test/util.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -37,7 +37,7 @@
from txdav.base.datastore.subpostgres import PostgresService
from txdav.base.datastore.dbapiclient import DiagnosticConnectionWrapper
from txdav.common.icommondatastore import NoSuchHomeChildError
-from txdav.base.datastore.asyncsqlpool import ConnectionPool
+from twext.enterprise.adbapi2 import ConnectionPool
from twisted.internet.defer import returnValue
from twistedcaldav.notify import Notifier
Modified: CalendarServer/trunk/txdav/idav.py
===================================================================
--- CalendarServer/trunk/txdav/idav.py 2011-01-20 00:46:25 UTC (rev 6790)
+++ CalendarServer/trunk/txdav/idav.py 2011-01-20 07:03:24 UTC (rev 6791)
@@ -21,7 +21,6 @@
__all__ = [
"PropertyStoreError",
"PropertyChangeNotAllowedError",
- "AlreadyFinishedError",
"IPropertyName",
"IPropertyStore",
"IDataStore",
@@ -52,13 +51,7 @@
-class AlreadyFinishedError(Exception):
- """
- The transaction was already completed via an C{abort} or C{commit} and
- cannot be aborted or committed again.
- """
-
#
# Interfaces
#
@@ -172,53 +165,6 @@
-class IAsyncTransaction(Interface):
- """
- Asynchronous execution of SQL.
-
- Note that there is no {begin()} method; if an L{IAsyncTransaction} exists,
- it is assumed to have been started.
- """
-
- def execSQL(sql, args=(), raiseOnZeroRowCount=None):
- """
- Execute some SQL.
-
- @param sql: an SQL string.
-
- @type sql: C{str}
-
- @param args: C{list} of arguments to interpolate into C{sql}.
-
- @param raiseOnZeroRowCount: a 0-argument callable which returns an
- exception to raise if the executed SQL does not affect any rows.
-
- @return: L{Deferred} which fires C{list} of C{tuple}
-
- @raise: C{raiseOnZeroRowCount} if it was specified and no rows were
- affected.
- """
-
-
- def commit():
- """
- Commit changes caused by this transaction.
-
- @return: L{Deferred} which fires with C{None} upon successful
- completion of this transaction.
- """
-
-
- def abort():
- """
- Roll back changes caused by this transaction.
-
- @return: L{Deferred} which fires with C{None} upon successful
- rollback of this transaction.
- """
-
-
-
class ITransaction(Interface):
"""
Transaction that can be aborted and either succeeds or fails in
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110119/d9ee2950/attachment-0001.html>
More information about the calendarserver-changes
mailing list