[CalendarServer-changes] [15734] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Tue Jul 5 12:32:22 PDT 2016


Revision: 15734
          http://trac.calendarserver.org//changeset/15734
Author:   cdaboo at apple.com
Date:     2016-07-05 12:32:22 -0700 (Tue, 05 Jul 2016)
Log Message:
-----------
Wrap database parameters into a single class that also supports optional features. Allow SKIP LOCKED optional feature with postgres.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/adbapi2.py
    twext/trunk/twext/enterprise/dal/record.py
    twext/trunk/twext/enterprise/dal/syntax.py
    twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
    twext/trunk/twext/enterprise/fixtures.py
    twext/trunk/twext/enterprise/ienterprise.py
    twext/trunk/twext/enterprise/jobs/jobitem.py
    twext/trunk/twext/enterprise/test/test_adbapi2.py

Modified: twext/trunk/twext/enterprise/adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/adbapi2.py	2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/adbapi2.py	2016-07-05 19:32:22 UTC (rev 15734)
@@ -60,7 +60,8 @@
 from twisted.internet.defer import fail
 
 from twext.enterprise.ienterprise import (
-    AlreadyFinishedError, IAsyncTransaction, POSTGRES_DIALECT, ICommandBlock
+    AlreadyFinishedError, IAsyncTransaction, ICommandBlock,
+    DatabaseType, POSTGRES_DIALECT,
 )
 
 from twext.python.log import Logger
@@ -74,8 +75,8 @@
 
 DEFAULT_PARAM_STYLE = "pyformat"
 DEFAULT_DIALECT = POSTGRES_DIALECT
+DEFAULT_DBTYPE = DatabaseType(DEFAULT_DIALECT, DEFAULT_PARAM_STYLE)
 
-
 def _forward(thunk):
     """
     Forward an attribute to the connection pool.
@@ -172,17 +173,11 @@
 
 
     @_forward
-    def paramstyle(self):
+    def dbtype(self):
         """
-        The paramstyle attribute is mirrored from the connection pool.
+        The dbtype attribute is mirrored from the connection pool.
         """
 
-    @_forward
-    def dialect(self):
-        """
-        The dialect attribute is mirrored from the connection pool.
-        """
-
     def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
         """
         Execute the given SQL on a thread, using a DB-API 2.0 cursor.
@@ -528,8 +523,7 @@
     implements(IAsyncTransaction)
 
     def __init__(self, pool, reason, label=None):
-        self.paramstyle = pool.paramstyle
-        self.dialect = pool.dialect
+        self.dbtype = pool.dbtype
         self.reason = reason
         self._label = label
 
@@ -563,12 +557,11 @@
     def __init__(self, pool, label=None):
         """
         Initialize a L{_WaitingTxn} based on a L{ConnectionPool}.  (The C{pool}
-        is used only to reflect C{dialect} and C{paramstyle} attributes; not
+        is used only to reflect C{dbtype} attribute; not
         remembered or modified in any way.)
         """
         self._spool = []
-        self.paramstyle = pool.paramstyle
-        self.dialect = pool.dialect
+        self.dbtype = pool.dbtype
         self._label = label
 
 
@@ -931,8 +924,7 @@
 
     def __init__(self, singleTxn):
         self._singleTxn = singleTxn
-        self.paramstyle = singleTxn.paramstyle
-        self.dialect = singleTxn.dialect
+        self.dbtype = singleTxn.dbtype
         self._spool = _WaitingTxn(singleTxn._pool, label=singleTxn._label)
         self._started = False
         self._ended = False
@@ -1129,15 +1121,14 @@
     def __init__(
         self,
         connectionFactory, maxConnections=10,
-        paramstyle=DEFAULT_PARAM_STYLE, dialect=DEFAULT_DIALECT,
+        dbtype=None,
         name=None,
     ):
 
         super(ConnectionPool, self).__init__()
         self.connectionFactory = connectionFactory
         self.maxConnections = maxConnections
-        self.paramstyle = paramstyle
-        self.dialect = dialect
+        self.dbtype = dbtype if dbtype is not None else DEFAULT_DBTYPE.copyreplace()
         if name is not None:
             self.name = name
 
@@ -1674,15 +1665,14 @@
     """
 
     def __init__(
-        self, dialect=POSTGRES_DIALECT, paramstyle=DEFAULT_PARAM_STYLE
+        self, dbtype=DEFAULT_DBTYPE,
     ):
         # See DEFAULT_PARAM_STYLE FIXME above.
         super(ConnectionPoolClient, self).__init__()
         self._nextID = count().next
         self._txns = weakref.WeakValueDictionary()
         self._queries = {}
-        self.dialect = dialect
-        self.paramstyle = paramstyle
+        self.dbtype = dbtype if dbtype is not None else DEFAULT_DBTYPE.copyreplace()
 
 
     def unhandledError(self, failure):
@@ -1813,21 +1803,13 @@
 
 
     @property
-    def paramstyle(self):
+    def dbtype(self):
         """
-        Forward C{paramstyle} attribute to the client.
+        Forward C{dbtype} attribute to the client.
         """
-        return self._client.paramstyle
+        return self._client.dbtype
 
 
-    @property
-    def dialect(self):
-        """
-        Forward C{dialect} attribute to the client.
-        """
-        return self._client.dialect
-
-
     def execSQL(self, sql, args=None, raiseOnZeroRowCount=None, blockID=""):
         if not blockID:
             if self._completed:
@@ -1912,21 +1894,13 @@
 
 
     @property
-    def paramstyle(self):
+    def dbtype(self):
         """
-        Forward C{paramstyle} attribute to the transaction.
+        Forward C{dbtype} attribute to the transaction.
         """
-        return self._transaction.paramstyle
+        return self._transaction.dbtype
 
 
-    @property
-    def dialect(self):
-        """
-        Forward C{dialect} attribute to the transaction.
-        """
-        return self._transaction.dialect
-
-
     def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
         """
         Execute some SQL on this command block.

Modified: twext/trunk/twext/enterprise/dal/record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/record.py	2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/dal/record.py	2016-07-05 19:32:22 UTC (rev 15734)
@@ -649,7 +649,7 @@
         """
         Delete all rows matching the where expression from the table that corresponds to C{cls}.
         """
-        if transaction.dialect == ORACLE_DIALECT and returnCols is not None:
+        if transaction.dbtype.dialect == ORACLE_DIALECT and returnCols is not None:
             # Oracle cannot return multiple rows in the RETURNING clause so
             # we have to split this into a SELECT followed by a DELETE
             if not isinstance(returnCols, (tuple, list)):

Modified: twext/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/syntax.py	2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/dal/syntax.py	2016-07-05 19:32:22 UTC (rev 15734)
@@ -80,7 +80,7 @@
 
 from twext.enterprise.dal.model import Schema, Table, Column, Sequence, SQLType
 from twext.enterprise.ienterprise import (
-    POSTGRES_DIALECT, ORACLE_DIALECT, SQLITE_DIALECT, IDerivedParameter
+    POSTGRES_DIALECT, ORACLE_DIALECT, SQLITE_DIALECT, DatabaseType, IDerivedParameter
 )
 from twext.enterprise.util import mapOracleOutputType
 
@@ -158,8 +158,8 @@
     and automated id generator.
     """
 
-    def __init__(self, dialect=None, placeholder=None):
-        self.dialect = dialect if dialect else POSTGRES_DIALECT
+    def __init__(self, dbtype=None, placeholder=None):
+        self.dbtype = dbtype if dbtype else DatabaseType(POSTGRES_DIALECT, "qmark")
         if placeholder is None:
             placeholder = defaultPlaceholder()
         self.placeholder = placeholder
@@ -172,7 +172,7 @@
 
 
     def shouldQuote(self, name):
-        return (self.dialect == ORACLE_DIALECT and name.lower() in _KEYWORDS)
+        return (self.dbtype.dialect == ORACLE_DIALECT and name.lower() in _KEYWORDS)
 
 
 
@@ -261,7 +261,7 @@
             C{list}s)
         """
         queryGenerator = QueryGenerator(
-            txn.dialect, self._paramstyles[txn.paramstyle]()
+            txn.dbtype, self._paramstyles[txn.dbtype.paramstyle]()
         )
         outvars = self._extraVars(txn, queryGenerator)
         kw.update(outvars)
@@ -270,7 +270,7 @@
             fragment.text, fragment.parameters, raiseOnZeroRowCount
         )
         result = self._extraResult(result, outvars, queryGenerator)
-        if queryGenerator.dialect == ORACLE_DIALECT and result:
+        if queryGenerator.dbtype.dialect == ORACLE_DIALECT and result:
             result.addCallback(self._fixOracleNulls)
         return result
 
@@ -590,7 +590,7 @@
 
     def nameFor(self, queryGenerator):
         if (
-            queryGenerator.dialect == ORACLE_DIALECT and
+            queryGenerator.dbtype.dialect == ORACLE_DIALECT and
             self.oracleName is not None
         ):
             return self.oracleName
@@ -668,7 +668,7 @@
         """
         Convert to an SQL fragment.
         """
-        if queryGenerator.dialect == ORACLE_DIALECT:
+        if queryGenerator.dbtype.dialect == ORACLE_DIALECT:
             fmt = "%s.nextval"
         else:
             fmt = "nextval('%s')"
@@ -723,7 +723,7 @@
         # XXX maybe there should be a specific method which is only invoked
         # from the FROM clause, that only tables and joins would implement?
         return SQLFragment(
-            _nameForDialect(self.model.name, queryGenerator.dialect)
+            _nameForDialect(self.model.name, queryGenerator.dbtype.dialect)
         )
 
 
@@ -1110,7 +1110,7 @@
 
     def subSQL(self, queryGenerator, allTables):
         if (
-            queryGenerator.dialect == ORACLE_DIALECT and
+            queryGenerator.dbtype.dialect == ORACLE_DIALECT and
             isinstance(self.b, Constant) and
             self.b.value == "" and self.op in ("=", "!=")
         ):
@@ -1382,9 +1382,9 @@
     An EXCEPT construct used inside a SELECT.
     """
     def setOpSQL(self, queryGenerator):
-        if queryGenerator.dialect == POSTGRES_DIALECT:
+        if queryGenerator.dbtype.dialect == POSTGRES_DIALECT:
             return SQLFragment(" EXCEPT ")
-        elif queryGenerator.dialect == ORACLE_DIALECT:
+        elif queryGenerator.dbtype.dialect == ORACLE_DIALECT:
             return SQLFragment(" MINUS ")
         else:
             raise NotImplementedError("Unsupported dialect")
@@ -1506,11 +1506,11 @@
         if self.ForUpdate:
             # FOR UPDATE not supported with sqlite - but that is probably not relevant
             # given that sqlite does file level locking of the DB
-            if queryGenerator.dialect != SQLITE_DIALECT:
+            if queryGenerator.dbtype.dialect != SQLITE_DIALECT:
                 # Oracle turns this statement into a sub-select if Limit is non-zero, but we can't have
                 # the "for update" in the sub-select. So suppress it here and add it in the outer limit
                 # select later.
-                if self.Limit is None or queryGenerator.dialect != ORACLE_DIALECT:
+                if self.Limit is None or queryGenerator.dbtype.dialect != ORACLE_DIALECT:
                     stmt.text += " for update"
                     if self.NoWait:
                         stmt.text += " nowait"
@@ -1519,7 +1519,7 @@
 
         if self.Limit is not None:
             limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)
-            if queryGenerator.dialect == ORACLE_DIALECT:
+            if queryGenerator.dbtype.dialect == ORACLE_DIALECT:
                 wrapper = SQLFragment("select * from (")
                 wrapper.append(stmt)
                 wrapper.append(SQLFragment(") where ROWNUM <= "))
@@ -1529,7 +1529,7 @@
             stmt.append(limitConst)
 
             # Add in any Oracle "for update"
-            if self.ForUpdate and queryGenerator.dialect == ORACLE_DIALECT:
+            if self.ForUpdate and queryGenerator.dbtype.dialect == ORACLE_DIALECT:
                 stmt.text += " for update"
                 if self.NoWait:
                     stmt.text += " nowait"
@@ -1620,7 +1620,7 @@
         @rtype: L{SQLFragment}
         """
 
-        if queryGenerator.dialect != ORACLE_DIALECT:
+        if queryGenerator.dbtype.dialect != ORACLE_DIALECT:
             raise NotImplementedError("CALL statement only available with Oracle DB")
         args = (self.ReturnType,) + self.Args
         stmt = SQLFragment("call ", args)
@@ -1724,14 +1724,14 @@
         if isinstance(retclause, (tuple, list)):
             retclause = _CommaList(retclause)
 
-        if queryGenerator.dialect == SQLITE_DIALECT:
+        if queryGenerator.dbtype.dialect == SQLITE_DIALECT:
             # sqlite does this another way.
             return stmt
 
         if retclause is not None:
             stmt.text += " returning "
             stmt.append(retclause.subSQL(queryGenerator, allTables))
-            if queryGenerator.dialect == ORACLE_DIALECT:
+            if queryGenerator.dbtype.dialect == ORACLE_DIALECT:
                 stmt.text += " into "
                 params = []
                 retvals = self._returnAsList()
@@ -1757,7 +1757,7 @@
             return []
         result = []
         rvars = self._returnAsList()
-        if queryGenerator.dialect == ORACLE_DIALECT:
+        if queryGenerator.dbtype.dialect == ORACLE_DIALECT:
             for n, v in enumerate(rvars):
                 result.append(("oracle_out_" + str(n), _OracleOutParam(v)))
         return result
@@ -1765,7 +1765,7 @@
 
     def _extraResult(self, result, outvars, queryGenerator):
         if (
-            queryGenerator.dialect == ORACLE_DIALECT and
+            queryGenerator.dbtype.dialect == ORACLE_DIALECT and
             self.Return is not None
         ):
             def processIt(emptyListResult):
@@ -1841,7 +1841,7 @@
         tableModel = columnsAndValues[0][0].model.table
         specifiedColumnModels = [x.model for x in self.columnMap.keys()]
 
-        if queryGenerator.dialect == ORACLE_DIALECT:
+        if queryGenerator.dbtype.dialect == ORACLE_DIALECT:
             # See test_nextSequenceDefaultImplicitExplicitOracle.
             for column in tableModel.columns:
                 if isinstance(column.default, Sequence):
@@ -1881,7 +1881,7 @@
         behavior.
         """
         result = yield super(_DMLStatement, self).on(txn, *a, **kw)
-        if self.Return is not None and txn.dialect == SQLITE_DIALECT:
+        if self.Return is not None and txn.dbtype.dialect == SQLITE_DIALECT:
             table = self._returnAsList()[0].model.table
             result = yield Select(
                 self._returnAsList(),
@@ -1936,7 +1936,7 @@
         databases that don't provide return values as part of their C{UPDATE}
         behavior.
         """
-        doExtra = self.Return is not None and txn.dialect == SQLITE_DIALECT
+        doExtra = self.Return is not None and txn.dbtype.dialect == SQLITE_DIALECT
         upcall = lambda: super(_DMLStatement, self).on(txn, *a, **kw)
 
         if doExtra:
@@ -2024,7 +2024,7 @@
     @inlineCallbacks
     def on(self, txn, *a, **kw):
         upcall = lambda: super(Delete, self).on(txn, *a, **kw)
-        if txn.dialect == SQLITE_DIALECT and self.Return is not None:
+        if txn.dbtype.dialect == SQLITE_DIALECT and self.Return is not None:
             result = yield Select(
                 self._returnAsList(),
                 From=self.From, Where=self.Where
@@ -2064,7 +2064,7 @@
 
 
     def _toSQL(self, queryGenerator):
-        if queryGenerator.dialect == SQLITE_DIALECT:
+        if queryGenerator.dbtype.dialect == SQLITE_DIALECT:
             # FIXME - this is only stubbed out for testing right now, actual
             # concurrency would require some kind of locking statement here.
             # BEGIN IMMEDIATE maybe, if that's okay in the middle of a
@@ -2085,7 +2085,7 @@
     """
 
     def _toSQL(self, queryGenerator):
-        assert(queryGenerator.dialect == POSTGRES_DIALECT)
+        assert(queryGenerator.dbtype.dialect == POSTGRES_DIALECT)
         return SQLFragment("select pg_advisory_lock(1)")
 
 
@@ -2093,7 +2093,7 @@
         """
         Override on() to only execute on Postgres
         """
-        if txn.dialect == POSTGRES_DIALECT:
+        if txn.dbtype.dialect == POSTGRES_DIALECT:
             return super(DatabaseLock, self).on(txn, *a, **kw)
 
         return succeed(None)
@@ -2106,7 +2106,7 @@
     """
 
     def _toSQL(self, queryGenerator):
-        assert(queryGenerator.dialect == POSTGRES_DIALECT)
+        assert(queryGenerator.dbtype.dialect == POSTGRES_DIALECT)
         return SQLFragment("select pg_advisory_unlock(1)")
 
 
@@ -2114,7 +2114,7 @@
         """
         Override on() to only execute on Postgres
         """
-        if txn.dialect == POSTGRES_DIALECT:
+        if txn.dbtype.dialect == POSTGRES_DIALECT:
             return super(DatabaseUnlock, self).on(txn, *a, **kw)
 
         return succeed(None)
@@ -2170,7 +2170,7 @@
 
 
     def _safeName(self, txn):
-        if txn.dialect == ORACLE_DIALECT:
+        if txn.dbtype.dialect == ORACLE_DIALECT:
             # Oracle limits the length of identifiers
             return self._name[:30]
         else:
@@ -2186,7 +2186,7 @@
 
 
     def release(self, txn):
-        if txn.dialect == ORACLE_DIALECT:
+        if txn.dbtype.dialect == ORACLE_DIALECT:
             # There is no "release savepoint" statement in oracle, but then, we
             # don't need it because there's no resource to manage.  Just don't
             # do anything.

Modified: twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2016-07-05 19:32:22 UTC (rev 15734)
@@ -21,6 +21,7 @@
 from twisted.internet.defer import succeed
 from twisted.trial.unittest import TestCase, SkipTest
 
+from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
 from twext.enterprise.dal import syntax
 try:
     from twext.enterprise.dal.parseschema import addSQLToSchema
@@ -40,7 +41,7 @@
 from twext.enterprise.dal.syntax import SchemaSyntax
 from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
 from twext.enterprise.ienterprise import (
-    POSTGRES_DIALECT, ORACLE_DIALECT, SQLITE_DIALECT
+    POSTGRES_DIALECT, ORACLE_DIALECT, SQLITE_DIALECT, DatabaseType
 )
 from twext.enterprise.test.test_adbapi2 import ConnectionPoolHelper
 from twext.enterprise.test.test_adbapi2 import NetworkedPoolHelper
@@ -56,8 +57,8 @@
     generation.
     """
 
-    def __init__(self, paramstyle):
-        self.paramstyle = "qmark"
+    def __init__(self):
+        self.dbtype = DatabaseType(POSTGRES_DIALECT, "qmark")
 
 
 
@@ -75,11 +76,10 @@
     """
     counter = 0
 
-    def __init__(self, dialect=SQLITE_DIALECT, paramstyle="numeric"):
+    def __init__(self, dbtype=DatabaseType(SQLITE_DIALECT, "numeric")):
         self.execed = []
         self.pendingResults = []
-        self.dialect = SQLITE_DIALECT
-        self.paramstyle = "numeric"
+        self.dbtype = dbtype
 
 
     def nextResult(self, result):
@@ -110,8 +110,7 @@
     Fake transaction for testing oracle NULL behavior.
     """
 
-    dialect = ORACLE_DIALECT
-    paramstyle = "numeric"
+    dbtype = DatabaseType(ORACLE_DIALECT, "numeric")
 
     def execSQL(self, text, params, exc):
         return succeed([[None, None]])
@@ -192,7 +191,9 @@
             Select(
                 From=self.schema.FOO,
                 Where=self.schema.FOO.BAR == 1
-            ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("$$"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("$$")
+            )),
             SQLFragment("select * from FOO where BAR = $$", [1])
         )
 
@@ -270,14 +271,18 @@
             Select(
                 From=self.schema.FOO,
                 Where=self.schema.FOO.BAR == ""
-            ).toSQL(QueryGenerator(ORACLE_DIALECT, NumericPlaceholder())),
+            ).toSQL(QueryGenerator(
+                DatabaseType(ORACLE_DIALECT, "numeric"), NumericPlaceholder()
+            )),
             SQLFragment("select * from FOO where BAR is null", [])
         )
         self.assertEquals(
             Select(
                 From=self.schema.FOO,
                 Where=self.schema.FOO.BAR != ""
-            ).toSQL(QueryGenerator(ORACLE_DIALECT, NumericPlaceholder())),
+            ).toSQL(QueryGenerator(
+                DatabaseType(ORACLE_DIALECT, "numeric"), NumericPlaceholder()
+            )),
             SQLFragment("select * from FOO where BAR is not null", [])
         )
 
@@ -698,7 +703,9 @@
                         Where=(self.schema.FOO.BAR == 2),
                     ),
                 ),
-            ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 "(select * from FOO where BAR = ?) "
                 "UNION (select * from FOO where BAR = ?)", [1, 2]
@@ -717,7 +724,9 @@
                     ),
                     optype=SetExpression.OPTYPE_ALL
                 ),
-            ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 "(select * from FOO where BAR = ?) "
                 "INTERSECT ALL (select * from FOO where BAR = ?)", [1, 2]
@@ -741,7 +750,9 @@
                     ),
                     optype=SetExpression.OPTYPE_DISTINCT,
                 ),
-            ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 "(select * from FOO) "
                 "EXCEPT DISTINCT (select * from FOO where BAR = ?) "
@@ -765,7 +776,9 @@
                         ),
                     ),
                 ),
-            ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 "(select * from FOO) MINUS ((select * from FOO where BAR = ?) "
                 "MINUS (select * from FOO where BAR = ?))", [2, 3]
@@ -784,7 +797,9 @@
                     ),
                 ),
                 OrderBy=self.schema.FOO.BAR,
-            ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 "(select * from FOO where BAR = ?) "
                 "UNION (select * from FOO where BAR = ?) order by BAR", [1, 2]
@@ -1417,7 +1432,9 @@
             Insert(
                 {self.schema.FOO.BAR: 40, self.schema.FOO.BAZ: 50},
                 Return=(self.schema.FOO.BAR, self.schema.FOO.BAZ)
-            ).toSQL(QueryGenerator(ORACLE_DIALECT, NumericPlaceholder())),
+            ).toSQL(QueryGenerator(
+                DatabaseType(ORACLE_DIALECT, "numeric"), NumericPlaceholder()
+            )),
             SQLFragment(
                 "insert into FOO (BAR, BAZ) values (:1, :2) "
                 "returning BAR, BAZ into :3, :4",
@@ -1439,7 +1456,9 @@
             {self.schema.FOO.BAR: 39, self.schema.FOO.BAZ: 82},
             Return=(self.schema.FOO.BAR, self.schema.FOO.BAZ)
         )
-        qg = lambda: QueryGenerator(SQLITE_DIALECT, NumericPlaceholder())
+        qg = lambda: QueryGenerator(
+            DatabaseType(SQLITE_DIALECT, "numeric"), NumericPlaceholder()
+        )
         self.assertEquals(
             insertStatement.toSQL(qg()),
             SQLFragment("insert into FOO (BAR, BAZ) values (:1, :2)", [39, 82])
@@ -1609,7 +1628,9 @@
                     self.schema.LEVELS.ACCESS: 1,
                     self.schema.LEVELS.USERNAME: "hi"
                 }
-            ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 """insert into LEVELS ("ACCESS", USERNAME) values (?, ?)""",
                 [1, "hi"]
@@ -1621,7 +1642,9 @@
                     self.schema.LEVELS.ACCESS: 1,
                     self.schema.LEVELS.USERNAME: "hi"
                 }
-            ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 "insert into LEVELS (ACCESS, USERNAME) values (?, ?)",
                 [1, "hi"]
@@ -1832,7 +1855,9 @@
                 [self.schema.FOO.BAR],
                 From=self.schema.FOO,
                 Limit=123
-            ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 "select * from (select BAR from FOO) "
                 "where ROWNUM <= ?", [123]
@@ -1891,7 +1916,9 @@
         self.assertEquals(
             Insert(
                 {self.schema.BOZ.QUX: self.schema.A_SEQ}
-            ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment("insert into BOZ (QUX) values (A_SEQ.nextval)", [])
         )
 
@@ -1911,7 +1938,9 @@
         )
         self.assertEquals(
             Insert({self.schema.DFLTR.a: "hello"}).toSQL(
-                QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))
+                QueryGenerator(
+                    DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+                )
             ),
             SQLFragment("insert into DFLTR (a, b) values "
                         "(?, A_SEQ.nextval)", ["hello"]),
@@ -1924,7 +1953,9 @@
                     self.schema.DFLTR.b: self.schema.A_SEQ
                 }
             ).toSQL(
-                QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))
+                QueryGenerator(
+                    DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+                )
             ),
             SQLFragment(
                 "insert into DFLTR (a, b) values (?, A_SEQ.nextval)", ["hello"]
@@ -1943,8 +1974,7 @@
         class FakeOracleTxn(object):
             def execSQL(self, text, params, exc):
                 stmts.append((text, params))
-            dialect = ORACLE_DIALECT
-            paramstyle = "numeric"
+            dbtype = DatabaseType(ORACLE_DIALECT, "numeric")
 
         Select(
             [self.schema.FOO.BAR],
@@ -2171,7 +2201,9 @@
         vvl = self.schema.veryveryveryveryveryveryveryverylong
         self.assertEquals(
             Insert({vvl.foo: 1}).toSQL(
-                QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))
+                QueryGenerator(
+                    DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+                )
             ),
             SQLFragment(
                 "insert into veryveryveryveryveryveryveryve (foo) values (?)",
@@ -2245,7 +2277,7 @@
         self.assertEquals(
             Call(
                 "procedure"
-            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            ).toSQL(QueryGenerator(DatabaseType(ORACLE_DIALECT, "qmark"))),
             SQLFragment("call procedure()", (None,))
         )
 
@@ -2253,7 +2285,7 @@
             Call(
                 "procedure",
                 1, "2"
-            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            ).toSQL(QueryGenerator(DatabaseType(ORACLE_DIALECT, "qmark"))),
             SQLFragment("call procedure()", (None, 1, "2"))
         )
 
@@ -2261,7 +2293,7 @@
             Call(
                 "function",
                 returnType=int
-            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            ).toSQL(QueryGenerator(DatabaseType(ORACLE_DIALECT, "qmark"))),
             SQLFragment("call function()", (int,))
         )
 
@@ -2270,14 +2302,14 @@
                 "function",
                 1, "2",
                 returnType=int
-            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            ).toSQL(QueryGenerator(DatabaseType(ORACLE_DIALECT, "qmark"))),
             SQLFragment("call function()", (int, 1, "2"))
         )
 
         self.assertRaises(
             NotImplementedError,
             Call("procedure").toSQL,
-            QueryGenerator(POSTGRES_DIALECT)
+            QueryGenerator(DatabaseType(POSTGRES_DIALECT, "qmark"))
         )
 
 
@@ -2292,7 +2324,9 @@
         self.assertEquals(
             Insert(
                 {schema.FOO.BAR: 1, schema.FOO.UID: "test"},
-            ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 "insert into FOO (BAR, \"UID\") values (?, ?)", [1, "test"]
             )
@@ -2301,7 +2335,9 @@
             Update(
                 {schema.FOO.BAR: 1, schema.FOO.UID: "test"},
                 Where=(schema.FOO.BAR == 2),
-            ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 "update FOO set BAR = ?, \"UID\" = ? where BAR = ?", [1, "test", 2]
             )
@@ -2311,7 +2347,9 @@
                 [schema.FOO.BAR, schema.FOO.UID],
                 From=schema.FOO,
                 Where=(schema.FOO.UID == "test"),
-            ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+            ).toSQL(QueryGenerator(
+                DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+            )),
             SQLFragment(
                 "select BAR, \"UID\" from FOO where \"UID\" = ?", ["test"]
             )
@@ -2388,7 +2426,7 @@
     Tests which use an oracle connection.
     """
 
-    dialect = ORACLE_DIALECT
+    dbtype = DatabaseType(ORACLE_DIALECT, DEFAULT_PARAM_STYLE)
 
     def setUp(self):
         """
@@ -2406,10 +2444,10 @@
     TestCase
 ):
 
-    dialect = ORACLE_DIALECT
+    dbtype = DatabaseType(ORACLE_DIALECT, DEFAULT_PARAM_STYLE)
 
     def setUp(self):
         self.patch(syntax, "cx_Oracle", FakeCXOracleModule)
         super(OracleNetConnectionTests, self).setUp()
         ExampleSchemaHelper.setUp(self)
-        self.pump.client.dialect = ORACLE_DIALECT
+        self.pump.client.dbtypedialect = ORACLE_DIALECT

Modified: twext/trunk/twext/enterprise/fixtures.py
===================================================================
--- twext/trunk/twext/enterprise/fixtures.py	2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/fixtures.py	2016-07-05 19:32:22 UTC (rev 15734)
@@ -32,6 +32,7 @@
 from twisted.python.threadpool import ThreadPool
 
 from twext.enterprise.adbapi2 import ConnectionPool
+from twext.enterprise.ienterprise import DatabaseType
 from twext.enterprise.ienterprise import SQLITE_DIALECT
 from twext.enterprise.ienterprise import POSTGRES_DIALECT
 from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
@@ -39,7 +40,7 @@
 
 
 
-def buildConnectionPool(testCase, schemaText="", dialect=SQLITE_DIALECT):
+def buildConnectionPool(testCase, schemaText="", dbtype=DatabaseType(SQLITE_DIALECT, "numeric")):
     """
     Build a L{ConnectionPool} for testing purposes, with the given C{testCase}.
 
@@ -71,8 +72,7 @@
     con = connectionFactory()
     con.executescript(schemaText)
     con.commit()
-    pool = ConnectionPool(connectionFactory, paramstyle="numeric",
-                          dialect=SQLITE_DIALECT)
+    pool = ConnectionPool(connectionFactory, dbtype=dbtype)
     pool.startService()
     testCase.addCleanup(pool.stopService)
     return pool
@@ -227,8 +227,7 @@
     L{ConnectionPool}.
     """
 
-    dialect = POSTGRES_DIALECT
-    paramstyle = DEFAULT_PARAM_STYLE
+    dbtype = DatabaseType(POSTGRES_DIALECT, DEFAULT_PARAM_STYLE)
 
     def setUp(self, test=None, connect=None):
         """
@@ -245,8 +244,7 @@
         self.pool = ConnectionPool(
             connect,
             maxConnections=2,
-            dialect=self.dialect,
-            paramstyle=self.paramstyle
+            dbtype=self.dbtype,
         )
         self.pool._createHolder = self.makeAHolder
         self.clock = self.pool.reactor = ClockWithThreads()
@@ -301,8 +299,7 @@
     capable of firing all its L{Deferred}s on demand, synchronously, by using
     SQLite.
     """
-    dialect = SQLITE_DIALECT
-    paramstyle = sqlite3.paramstyle
+    dbtype = DatabaseType(SQLITE_DIALECT, sqlite3.paramstyle)
 
     def __init__(self, schema):
         self.schema = schema

Modified: twext/trunk/twext/enterprise/ienterprise.py
===================================================================
--- twext/trunk/twext/enterprise/ienterprise.py	2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/ienterprise.py	2016-07-05 19:32:22 UTC (rev 15734)
@@ -56,26 +56,61 @@
 ORACLE_TABLE_NAME_MAX = 30
 
 
+"""
+Holds details about the database in use. The C{dialect} attribute is
+one of the C{*_DIALECT} constants in this module. The C{paramstyle}
+attribute is a copy of the DB-API 2.0 module attribute. The C{options}
+attribute is a set of optional features for the DB in use.
+"""
+class DatabaseType(object):
+    def __init__(self, dialect, paramstyle, options=()):
+        """
+        @param dialect: database dialect to use
+        @type dialect: L{str}
+        @param paramstyle: parameter style for SQL statements
+        @type paramstyle: L[str}
+        @param options: set of optional features
+        @type options: L{iterable}
+        """
+        self.dialect = dialect
+        self.paramstyle = paramstyle
+        self.options = frozenset(options)
 
+
+    def copyreplace(self, dialect=None, paramstyle=None, options=None):
+        """
+        Create a copy of this L{DatabaseType} and modify the supplied properties in
+        the new copy.
+
+        @param dialect: new value for C{dialect} or None for no change
+        @type dialect: L{str}
+        @param paramstyle: new value for C{paramstyle} or None for no change
+        @type paramstyle: L{str}
+        @param options: new value for C{options} or None for no change
+        @type options: L{iterable}
+        """
+
+        return DatabaseType(
+            self.dialect if dialect is None else dialect,
+            self.paramstyle if paramstyle is None else paramstyle,
+            self.options if options is None else options,
+        )
+
+
+
 class ISQLExecutor(Interface):
     """
     Base SQL-execution interface, for a group of commands or a transaction.
     """
 
-    paramstyle = Attribute(
+    dbtype = Attribute(
         """
-        A copy of the C{paramstyle} attribute from a DB-API 2.0 module.
+        A copy of the C{dbtype} attribute from the connection pool. It is
+        a L{DatabaseType}.
         """
     )
 
-    dialect = Attribute(
-        """
-        A copy of the C{dialect} attribute from the connection pool.  One of
-        the C{*_DIALECT} constants in this module, such as L{POSTGRES_DIALECT}.
-        """
-    )
 
-
     def execSQL(sql, args=(), raiseOnZeroRowCount=None):
         """
         Execute some SQL.

Modified: twext/trunk/twext/enterprise/jobs/jobitem.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/jobitem.py	2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py	2016-07-05 19:32:22 UTC (rev 15734)
@@ -376,7 +376,7 @@
         @rtype: L{JobItem}
         """
 
-        if txn.dialect == ORACLE_DIALECT:
+        if txn.dbtype.dialect == ORACLE_DIALECT:
 
             # For Oracle we need a multi-app server solution that only locks the
             # (one) row being returned by the query, and allows other app servers
@@ -409,6 +409,9 @@
             elif minPriority == JOB_PRIORITY_HIGH:
                 queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
 
+            extra_kwargs = {}
+            if "skip-locked" in txn.dbtype.options:
+                extra_kwargs["skipLocked"] = True
             jobs = yield cls.query(
                 txn,
                 queryExpr,
@@ -417,6 +420,7 @@
                 forUpdate=True,
                 noWait=False,
                 limit=1,
+                **extra_kwargs
             )
             job = jobs[0] if jobs else None
 
@@ -438,19 +442,23 @@
         @rtype: L{JobItem}
         """
 
-        if txn.dialect == ORACLE_DIALECT:
+        if txn.dbtype.dialect == ORACLE_DIALECT:
             # See L{nextjob} for why Oracle is different
             job = None
             jobID = yield Call("overdue_job", now, returnType=int).on(txn)
             if jobID:
                 job = yield cls.load(txn, jobID)
         else:
+            extra_kwargs = {}
+            if "skip-locked" in txn.dbtype.options:
+                extra_kwargs["skipLocked"] = True
             jobs = yield cls.query(
                 txn,
                 (cls.assigned != None).And(cls.overdue < now),
                 forUpdate=True,
                 noWait=False,
                 limit=1,
+                **extra_kwargs
             )
             job = jobs[0] if jobs else None
 

Modified: twext/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_adbapi2.py	2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/test/test_adbapi2.py	2016-07-05 19:32:22 UTC (rev 15734)
@@ -676,7 +676,7 @@
         """
         Change the paramstyle of the transaction under test.
         """
-        self.pool.paramstyle = paramstyle
+        self.pool.dbtype = self.pool.dbtype.copyreplace(paramstyle=paramstyle)
 
 
     def test_propagateParamstyle(self):
@@ -687,24 +687,24 @@
         TEST_PARAMSTYLE = "justtesting"
         self.setParamstyle(TEST_PARAMSTYLE)
         normaltxn = self.createTransaction()
-        self.assertEquals(normaltxn.paramstyle, TEST_PARAMSTYLE)
-        self.assertEquals(normaltxn.commandBlock().paramstyle, TEST_PARAMSTYLE)
+        self.assertEquals(normaltxn.dbtype.paramstyle, TEST_PARAMSTYLE)
+        self.assertEquals(normaltxn.commandBlock().dbtype.paramstyle, TEST_PARAMSTYLE)
         self.pauseHolders()
         extra = []
         extra.append(self.createTransaction())
         waitingtxn = self.createTransaction()
-        self.assertEquals(waitingtxn.paramstyle, TEST_PARAMSTYLE)
+        self.assertEquals(waitingtxn.dbtype.paramstyle, TEST_PARAMSTYLE)
         self.flushHolders()
         self.pool.stopService()
         notxn = self.createTransaction()
-        self.assertEquals(notxn.paramstyle, TEST_PARAMSTYLE)
+        self.assertEquals(notxn.dbtype.paramstyle, TEST_PARAMSTYLE)
 
 
     def setDialect(self, dialect):
         """
         Change the dialect of the transaction under test.
         """
-        self.pool.dialect = dialect
+        self.pool.dbtype = self.pool.dbtype.copyreplace(dialect=dialect)
 
 
     def test_propagateDialect(self):
@@ -715,17 +715,17 @@
         TEST_DIALECT = "otherdialect"
         self.setDialect(TEST_DIALECT)
         normaltxn = self.createTransaction()
-        self.assertEquals(normaltxn.dialect, TEST_DIALECT)
-        self.assertEquals(normaltxn.commandBlock().dialect, TEST_DIALECT)
+        self.assertEquals(normaltxn.dbtype.dialect, TEST_DIALECT)
+        self.assertEquals(normaltxn.commandBlock().dbtype.dialect, TEST_DIALECT)
         self.pauseHolders()
         extra = []
         extra.append(self.createTransaction())
         waitingtxn = self.createTransaction()
-        self.assertEquals(waitingtxn.dialect, TEST_DIALECT)
+        self.assertEquals(waitingtxn.dbtype.dialect, TEST_DIALECT)
         self.flushHolders()
         self.pool.stopService()
         notxn = self.createTransaction()
-        self.assertEquals(notxn.dialect, TEST_DIALECT)
+        self.assertEquals(notxn.dbtype.dialect, TEST_DIALECT)
 
 
     def test_reConnectWhenFirstExecFails(self):
@@ -1330,8 +1330,7 @@
         super(NetworkedPoolHelper, self).setUp()
         self.pump = IOPump(
             ConnectionPoolClient(
-                dialect=self.dialect,
-                paramstyle=self.paramstyle
+                dbtype=self.dbtype,
             ),
             ConnectionPoolConnection(self.pool)
         )
@@ -1384,7 +1383,7 @@
         Change the paramstyle on both the pool and the client.
         """
         super(NetworkedConnectionPoolTests, self).setParamstyle(paramstyle)
-        self.pump.client.paramstyle = paramstyle
+        self.pump.client.dbtype = self.pump.client.dbtype.copyreplace(paramstyle=paramstyle)
 
 
     def setDialect(self, dialect):
@@ -1392,7 +1391,7 @@
         Change the dialect on both the pool and the client.
         """
         super(NetworkedConnectionPoolTests, self).setDialect(dialect)
-        self.pump.client.dialect = dialect
+        self.pump.client.dbtype = self.pump.client.dbtype.copyreplace(dialect=dialect)
 
 
     def test_newTransaction(self):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20160705/41c5ebf9/attachment-0001.html>


More information about the calendarserver-changes mailing list