[CalendarServer-changes] [15734] twext/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Tue Jul 5 12:32:22 PDT 2016
Revision: 15734
http://trac.calendarserver.org//changeset/15734
Author: cdaboo at apple.com
Date: 2016-07-05 12:32:22 -0700 (Tue, 05 Jul 2016)
Log Message:
-----------
Wrap database parameters into a single class that also supports optional features. Allow SKIP LOCKED optional feature with postgres.
Modified Paths:
--------------
twext/trunk/twext/enterprise/adbapi2.py
twext/trunk/twext/enterprise/dal/record.py
twext/trunk/twext/enterprise/dal/syntax.py
twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
twext/trunk/twext/enterprise/fixtures.py
twext/trunk/twext/enterprise/ienterprise.py
twext/trunk/twext/enterprise/jobs/jobitem.py
twext/trunk/twext/enterprise/test/test_adbapi2.py
Modified: twext/trunk/twext/enterprise/adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/adbapi2.py 2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/adbapi2.py 2016-07-05 19:32:22 UTC (rev 15734)
@@ -60,7 +60,8 @@
from twisted.internet.defer import fail
from twext.enterprise.ienterprise import (
- AlreadyFinishedError, IAsyncTransaction, POSTGRES_DIALECT, ICommandBlock
+ AlreadyFinishedError, IAsyncTransaction, ICommandBlock,
+ DatabaseType, POSTGRES_DIALECT,
)
from twext.python.log import Logger
@@ -74,8 +75,8 @@
DEFAULT_PARAM_STYLE = "pyformat"
DEFAULT_DIALECT = POSTGRES_DIALECT
+DEFAULT_DBTYPE = DatabaseType(DEFAULT_DIALECT, DEFAULT_PARAM_STYLE)
-
def _forward(thunk):
"""
Forward an attribute to the connection pool.
@@ -172,17 +173,11 @@
@_forward
- def paramstyle(self):
+ def dbtype(self):
"""
- The paramstyle attribute is mirrored from the connection pool.
+ The dbtype attribute is mirrored from the connection pool.
"""
- @_forward
- def dialect(self):
- """
- The dialect attribute is mirrored from the connection pool.
- """
-
def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
"""
Execute the given SQL on a thread, using a DB-API 2.0 cursor.
@@ -528,8 +523,7 @@
implements(IAsyncTransaction)
def __init__(self, pool, reason, label=None):
- self.paramstyle = pool.paramstyle
- self.dialect = pool.dialect
+ self.dbtype = pool.dbtype
self.reason = reason
self._label = label
@@ -563,12 +557,11 @@
def __init__(self, pool, label=None):
"""
Initialize a L{_WaitingTxn} based on a L{ConnectionPool}. (The C{pool}
- is used only to reflect C{dialect} and C{paramstyle} attributes; not
+ is used only to reflect C{dbtype} attribute; not
remembered or modified in any way.)
"""
self._spool = []
- self.paramstyle = pool.paramstyle
- self.dialect = pool.dialect
+ self.dbtype = pool.dbtype
self._label = label
@@ -931,8 +924,7 @@
def __init__(self, singleTxn):
self._singleTxn = singleTxn
- self.paramstyle = singleTxn.paramstyle
- self.dialect = singleTxn.dialect
+ self.dbtype = singleTxn.dbtype
self._spool = _WaitingTxn(singleTxn._pool, label=singleTxn._label)
self._started = False
self._ended = False
@@ -1129,15 +1121,14 @@
def __init__(
self,
connectionFactory, maxConnections=10,
- paramstyle=DEFAULT_PARAM_STYLE, dialect=DEFAULT_DIALECT,
+ dbtype=None,
name=None,
):
super(ConnectionPool, self).__init__()
self.connectionFactory = connectionFactory
self.maxConnections = maxConnections
- self.paramstyle = paramstyle
- self.dialect = dialect
+ self.dbtype = dbtype if dbtype is not None else DEFAULT_DBTYPE.copyreplace()
if name is not None:
self.name = name
@@ -1674,15 +1665,14 @@
"""
def __init__(
- self, dialect=POSTGRES_DIALECT, paramstyle=DEFAULT_PARAM_STYLE
+ self, dbtype=DEFAULT_DBTYPE,
):
# See DEFAULT_PARAM_STYLE FIXME above.
super(ConnectionPoolClient, self).__init__()
self._nextID = count().next
self._txns = weakref.WeakValueDictionary()
self._queries = {}
- self.dialect = dialect
- self.paramstyle = paramstyle
+ self.dbtype = dbtype if dbtype is not None else DEFAULT_DBTYPE.copyreplace()
def unhandledError(self, failure):
@@ -1813,21 +1803,13 @@
@property
- def paramstyle(self):
+ def dbtype(self):
"""
- Forward C{paramstyle} attribute to the client.
+ Forward C{dbtype} attribute to the client.
"""
- return self._client.paramstyle
+ return self._client.dbtype
- @property
- def dialect(self):
- """
- Forward C{dialect} attribute to the client.
- """
- return self._client.dialect
-
-
def execSQL(self, sql, args=None, raiseOnZeroRowCount=None, blockID=""):
if not blockID:
if self._completed:
@@ -1912,21 +1894,13 @@
@property
- def paramstyle(self):
+ def dbtype(self):
"""
- Forward C{paramstyle} attribute to the transaction.
+ Forward C{dbtype} attribute to the transaction.
"""
- return self._transaction.paramstyle
+ return self._transaction.dbtype
- @property
- def dialect(self):
- """
- Forward C{dialect} attribute to the transaction.
- """
- return self._transaction.dialect
-
-
def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
"""
Execute some SQL on this command block.
Modified: twext/trunk/twext/enterprise/dal/record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/record.py 2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/dal/record.py 2016-07-05 19:32:22 UTC (rev 15734)
@@ -649,7 +649,7 @@
"""
Delete all rows matching the where expression from the table that corresponds to C{cls}.
"""
- if transaction.dialect == ORACLE_DIALECT and returnCols is not None:
+ if transaction.dbtype.dialect == ORACLE_DIALECT and returnCols is not None:
# Oracle cannot return multiple rows in the RETURNING clause so
# we have to split this into a SELECT followed by a DELETE
if not isinstance(returnCols, (tuple, list)):
Modified: twext/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/syntax.py 2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/dal/syntax.py 2016-07-05 19:32:22 UTC (rev 15734)
@@ -80,7 +80,7 @@
from twext.enterprise.dal.model import Schema, Table, Column, Sequence, SQLType
from twext.enterprise.ienterprise import (
- POSTGRES_DIALECT, ORACLE_DIALECT, SQLITE_DIALECT, IDerivedParameter
+ POSTGRES_DIALECT, ORACLE_DIALECT, SQLITE_DIALECT, DatabaseType, IDerivedParameter
)
from twext.enterprise.util import mapOracleOutputType
@@ -158,8 +158,8 @@
and automated id generator.
"""
- def __init__(self, dialect=None, placeholder=None):
- self.dialect = dialect if dialect else POSTGRES_DIALECT
+ def __init__(self, dbtype=None, placeholder=None):
+ self.dbtype = dbtype if dbtype else DatabaseType(POSTGRES_DIALECT, "qmark")
if placeholder is None:
placeholder = defaultPlaceholder()
self.placeholder = placeholder
@@ -172,7 +172,7 @@
def shouldQuote(self, name):
- return (self.dialect == ORACLE_DIALECT and name.lower() in _KEYWORDS)
+ return (self.dbtype.dialect == ORACLE_DIALECT and name.lower() in _KEYWORDS)
@@ -261,7 +261,7 @@
C{list}s)
"""
queryGenerator = QueryGenerator(
- txn.dialect, self._paramstyles[txn.paramstyle]()
+ txn.dbtype, self._paramstyles[txn.dbtype.paramstyle]()
)
outvars = self._extraVars(txn, queryGenerator)
kw.update(outvars)
@@ -270,7 +270,7 @@
fragment.text, fragment.parameters, raiseOnZeroRowCount
)
result = self._extraResult(result, outvars, queryGenerator)
- if queryGenerator.dialect == ORACLE_DIALECT and result:
+ if queryGenerator.dbtype.dialect == ORACLE_DIALECT and result:
result.addCallback(self._fixOracleNulls)
return result
@@ -590,7 +590,7 @@
def nameFor(self, queryGenerator):
if (
- queryGenerator.dialect == ORACLE_DIALECT and
+ queryGenerator.dbtype.dialect == ORACLE_DIALECT and
self.oracleName is not None
):
return self.oracleName
@@ -668,7 +668,7 @@
"""
Convert to an SQL fragment.
"""
- if queryGenerator.dialect == ORACLE_DIALECT:
+ if queryGenerator.dbtype.dialect == ORACLE_DIALECT:
fmt = "%s.nextval"
else:
fmt = "nextval('%s')"
@@ -723,7 +723,7 @@
# XXX maybe there should be a specific method which is only invoked
# from the FROM clause, that only tables and joins would implement?
return SQLFragment(
- _nameForDialect(self.model.name, queryGenerator.dialect)
+ _nameForDialect(self.model.name, queryGenerator.dbtype.dialect)
)
@@ -1110,7 +1110,7 @@
def subSQL(self, queryGenerator, allTables):
if (
- queryGenerator.dialect == ORACLE_DIALECT and
+ queryGenerator.dbtype.dialect == ORACLE_DIALECT and
isinstance(self.b, Constant) and
self.b.value == "" and self.op in ("=", "!=")
):
@@ -1382,9 +1382,9 @@
An EXCEPT construct used inside a SELECT.
"""
def setOpSQL(self, queryGenerator):
- if queryGenerator.dialect == POSTGRES_DIALECT:
+ if queryGenerator.dbtype.dialect == POSTGRES_DIALECT:
return SQLFragment(" EXCEPT ")
- elif queryGenerator.dialect == ORACLE_DIALECT:
+ elif queryGenerator.dbtype.dialect == ORACLE_DIALECT:
return SQLFragment(" MINUS ")
else:
raise NotImplementedError("Unsupported dialect")
@@ -1506,11 +1506,11 @@
if self.ForUpdate:
# FOR UPDATE not supported with sqlite - but that is probably not relevant
# given that sqlite does file level locking of the DB
- if queryGenerator.dialect != SQLITE_DIALECT:
+ if queryGenerator.dbtype.dialect != SQLITE_DIALECT:
# Oracle turns this statement into a sub-select if Limit is non-zero, but we can't have
# the "for update" in the sub-select. So suppress it here and add it in the outer limit
# select later.
- if self.Limit is None or queryGenerator.dialect != ORACLE_DIALECT:
+ if self.Limit is None or queryGenerator.dbtype.dialect != ORACLE_DIALECT:
stmt.text += " for update"
if self.NoWait:
stmt.text += " nowait"
@@ -1519,7 +1519,7 @@
if self.Limit is not None:
limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)
- if queryGenerator.dialect == ORACLE_DIALECT:
+ if queryGenerator.dbtype.dialect == ORACLE_DIALECT:
wrapper = SQLFragment("select * from (")
wrapper.append(stmt)
wrapper.append(SQLFragment(") where ROWNUM <= "))
@@ -1529,7 +1529,7 @@
stmt.append(limitConst)
# Add in any Oracle "for update"
- if self.ForUpdate and queryGenerator.dialect == ORACLE_DIALECT:
+ if self.ForUpdate and queryGenerator.dbtype.dialect == ORACLE_DIALECT:
stmt.text += " for update"
if self.NoWait:
stmt.text += " nowait"
@@ -1620,7 +1620,7 @@
@rtype: L{SQLFragment}
"""
- if queryGenerator.dialect != ORACLE_DIALECT:
+ if queryGenerator.dbtype.dialect != ORACLE_DIALECT:
raise NotImplementedError("CALL statement only available with Oracle DB")
args = (self.ReturnType,) + self.Args
stmt = SQLFragment("call ", args)
@@ -1724,14 +1724,14 @@
if isinstance(retclause, (tuple, list)):
retclause = _CommaList(retclause)
- if queryGenerator.dialect == SQLITE_DIALECT:
+ if queryGenerator.dbtype.dialect == SQLITE_DIALECT:
# sqlite does this another way.
return stmt
if retclause is not None:
stmt.text += " returning "
stmt.append(retclause.subSQL(queryGenerator, allTables))
- if queryGenerator.dialect == ORACLE_DIALECT:
+ if queryGenerator.dbtype.dialect == ORACLE_DIALECT:
stmt.text += " into "
params = []
retvals = self._returnAsList()
@@ -1757,7 +1757,7 @@
return []
result = []
rvars = self._returnAsList()
- if queryGenerator.dialect == ORACLE_DIALECT:
+ if queryGenerator.dbtype.dialect == ORACLE_DIALECT:
for n, v in enumerate(rvars):
result.append(("oracle_out_" + str(n), _OracleOutParam(v)))
return result
@@ -1765,7 +1765,7 @@
def _extraResult(self, result, outvars, queryGenerator):
if (
- queryGenerator.dialect == ORACLE_DIALECT and
+ queryGenerator.dbtype.dialect == ORACLE_DIALECT and
self.Return is not None
):
def processIt(emptyListResult):
@@ -1841,7 +1841,7 @@
tableModel = columnsAndValues[0][0].model.table
specifiedColumnModels = [x.model for x in self.columnMap.keys()]
- if queryGenerator.dialect == ORACLE_DIALECT:
+ if queryGenerator.dbtype.dialect == ORACLE_DIALECT:
# See test_nextSequenceDefaultImplicitExplicitOracle.
for column in tableModel.columns:
if isinstance(column.default, Sequence):
@@ -1881,7 +1881,7 @@
behavior.
"""
result = yield super(_DMLStatement, self).on(txn, *a, **kw)
- if self.Return is not None and txn.dialect == SQLITE_DIALECT:
+ if self.Return is not None and txn.dbtype.dialect == SQLITE_DIALECT:
table = self._returnAsList()[0].model.table
result = yield Select(
self._returnAsList(),
@@ -1936,7 +1936,7 @@
databases that don't provide return values as part of their C{UPDATE}
behavior.
"""
- doExtra = self.Return is not None and txn.dialect == SQLITE_DIALECT
+ doExtra = self.Return is not None and txn.dbtype.dialect == SQLITE_DIALECT
upcall = lambda: super(_DMLStatement, self).on(txn, *a, **kw)
if doExtra:
@@ -2024,7 +2024,7 @@
@inlineCallbacks
def on(self, txn, *a, **kw):
upcall = lambda: super(Delete, self).on(txn, *a, **kw)
- if txn.dialect == SQLITE_DIALECT and self.Return is not None:
+ if txn.dbtype.dialect == SQLITE_DIALECT and self.Return is not None:
result = yield Select(
self._returnAsList(),
From=self.From, Where=self.Where
@@ -2064,7 +2064,7 @@
def _toSQL(self, queryGenerator):
- if queryGenerator.dialect == SQLITE_DIALECT:
+ if queryGenerator.dbtype.dialect == SQLITE_DIALECT:
# FIXME - this is only stubbed out for testing right now, actual
# concurrency would require some kind of locking statement here.
# BEGIN IMMEDIATE maybe, if that's okay in the middle of a
@@ -2085,7 +2085,7 @@
"""
def _toSQL(self, queryGenerator):
- assert(queryGenerator.dialect == POSTGRES_DIALECT)
+ assert(queryGenerator.dbtype.dialect == POSTGRES_DIALECT)
return SQLFragment("select pg_advisory_lock(1)")
@@ -2093,7 +2093,7 @@
"""
Override on() to only execute on Postgres
"""
- if txn.dialect == POSTGRES_DIALECT:
+ if txn.dbtype.dialect == POSTGRES_DIALECT:
return super(DatabaseLock, self).on(txn, *a, **kw)
return succeed(None)
@@ -2106,7 +2106,7 @@
"""
def _toSQL(self, queryGenerator):
- assert(queryGenerator.dialect == POSTGRES_DIALECT)
+ assert(queryGenerator.dbtype.dialect == POSTGRES_DIALECT)
return SQLFragment("select pg_advisory_unlock(1)")
@@ -2114,7 +2114,7 @@
"""
Override on() to only execute on Postgres
"""
- if txn.dialect == POSTGRES_DIALECT:
+ if txn.dbtype.dialect == POSTGRES_DIALECT:
return super(DatabaseUnlock, self).on(txn, *a, **kw)
return succeed(None)
@@ -2170,7 +2170,7 @@
def _safeName(self, txn):
- if txn.dialect == ORACLE_DIALECT:
+ if txn.dbtype.dialect == ORACLE_DIALECT:
# Oracle limits the length of identifiers
return self._name[:30]
else:
@@ -2186,7 +2186,7 @@
def release(self, txn):
- if txn.dialect == ORACLE_DIALECT:
+ if txn.dbtype.dialect == ORACLE_DIALECT:
# There is no "release savepoint" statement in oracle, but then, we
# don't need it because there's no resource to manage. Just don't
# do anything.
Modified: twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py 2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py 2016-07-05 19:32:22 UTC (rev 15734)
@@ -21,6 +21,7 @@
from twisted.internet.defer import succeed
from twisted.trial.unittest import TestCase, SkipTest
+from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
from twext.enterprise.dal import syntax
try:
from twext.enterprise.dal.parseschema import addSQLToSchema
@@ -40,7 +41,7 @@
from twext.enterprise.dal.syntax import SchemaSyntax
from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
from twext.enterprise.ienterprise import (
- POSTGRES_DIALECT, ORACLE_DIALECT, SQLITE_DIALECT
+ POSTGRES_DIALECT, ORACLE_DIALECT, SQLITE_DIALECT, DatabaseType
)
from twext.enterprise.test.test_adbapi2 import ConnectionPoolHelper
from twext.enterprise.test.test_adbapi2 import NetworkedPoolHelper
@@ -56,8 +57,8 @@
generation.
"""
- def __init__(self, paramstyle):
- self.paramstyle = "qmark"
+ def __init__(self):
+ self.dbtype = DatabaseType(POSTGRES_DIALECT, "qmark")
@@ -75,11 +76,10 @@
"""
counter = 0
- def __init__(self, dialect=SQLITE_DIALECT, paramstyle="numeric"):
+ def __init__(self, dbtype=DatabaseType(SQLITE_DIALECT, "numeric")):
self.execed = []
self.pendingResults = []
- self.dialect = SQLITE_DIALECT
- self.paramstyle = "numeric"
+ self.dbtype = dbtype
def nextResult(self, result):
@@ -110,8 +110,7 @@
Fake transaction for testing oracle NULL behavior.
"""
- dialect = ORACLE_DIALECT
- paramstyle = "numeric"
+ dbtype = DatabaseType(ORACLE_DIALECT, "numeric")
def execSQL(self, text, params, exc):
return succeed([[None, None]])
@@ -192,7 +191,9 @@
Select(
From=self.schema.FOO,
Where=self.schema.FOO.BAR == 1
- ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("$$"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("$$")
+ )),
SQLFragment("select * from FOO where BAR = $$", [1])
)
@@ -270,14 +271,18 @@
Select(
From=self.schema.FOO,
Where=self.schema.FOO.BAR == ""
- ).toSQL(QueryGenerator(ORACLE_DIALECT, NumericPlaceholder())),
+ ).toSQL(QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "numeric"), NumericPlaceholder()
+ )),
SQLFragment("select * from FOO where BAR is null", [])
)
self.assertEquals(
Select(
From=self.schema.FOO,
Where=self.schema.FOO.BAR != ""
- ).toSQL(QueryGenerator(ORACLE_DIALECT, NumericPlaceholder())),
+ ).toSQL(QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "numeric"), NumericPlaceholder()
+ )),
SQLFragment("select * from FOO where BAR is not null", [])
)
@@ -698,7 +703,9 @@
Where=(self.schema.FOO.BAR == 2),
),
),
- ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"(select * from FOO where BAR = ?) "
"UNION (select * from FOO where BAR = ?)", [1, 2]
@@ -717,7 +724,9 @@
),
optype=SetExpression.OPTYPE_ALL
),
- ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"(select * from FOO where BAR = ?) "
"INTERSECT ALL (select * from FOO where BAR = ?)", [1, 2]
@@ -741,7 +750,9 @@
),
optype=SetExpression.OPTYPE_DISTINCT,
),
- ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"(select * from FOO) "
"EXCEPT DISTINCT (select * from FOO where BAR = ?) "
@@ -765,7 +776,9 @@
),
),
),
- ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"(select * from FOO) MINUS ((select * from FOO where BAR = ?) "
"MINUS (select * from FOO where BAR = ?))", [2, 3]
@@ -784,7 +797,9 @@
),
),
OrderBy=self.schema.FOO.BAR,
- ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"(select * from FOO where BAR = ?) "
"UNION (select * from FOO where BAR = ?) order by BAR", [1, 2]
@@ -1417,7 +1432,9 @@
Insert(
{self.schema.FOO.BAR: 40, self.schema.FOO.BAZ: 50},
Return=(self.schema.FOO.BAR, self.schema.FOO.BAZ)
- ).toSQL(QueryGenerator(ORACLE_DIALECT, NumericPlaceholder())),
+ ).toSQL(QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "numeric"), NumericPlaceholder()
+ )),
SQLFragment(
"insert into FOO (BAR, BAZ) values (:1, :2) "
"returning BAR, BAZ into :3, :4",
@@ -1439,7 +1456,9 @@
{self.schema.FOO.BAR: 39, self.schema.FOO.BAZ: 82},
Return=(self.schema.FOO.BAR, self.schema.FOO.BAZ)
)
- qg = lambda: QueryGenerator(SQLITE_DIALECT, NumericPlaceholder())
+ qg = lambda: QueryGenerator(
+ DatabaseType(SQLITE_DIALECT, "numeric"), NumericPlaceholder()
+ )
self.assertEquals(
insertStatement.toSQL(qg()),
SQLFragment("insert into FOO (BAR, BAZ) values (:1, :2)", [39, 82])
@@ -1609,7 +1628,9 @@
self.schema.LEVELS.ACCESS: 1,
self.schema.LEVELS.USERNAME: "hi"
}
- ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"""insert into LEVELS ("ACCESS", USERNAME) values (?, ?)""",
[1, "hi"]
@@ -1621,7 +1642,9 @@
self.schema.LEVELS.ACCESS: 1,
self.schema.LEVELS.USERNAME: "hi"
}
- ).toSQL(QueryGenerator(POSTGRES_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(POSTGRES_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"insert into LEVELS (ACCESS, USERNAME) values (?, ?)",
[1, "hi"]
@@ -1832,7 +1855,9 @@
[self.schema.FOO.BAR],
From=self.schema.FOO,
Limit=123
- ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"select * from (select BAR from FOO) "
"where ROWNUM <= ?", [123]
@@ -1891,7 +1916,9 @@
self.assertEquals(
Insert(
{self.schema.BOZ.QUX: self.schema.A_SEQ}
- ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment("insert into BOZ (QUX) values (A_SEQ.nextval)", [])
)
@@ -1911,7 +1938,9 @@
)
self.assertEquals(
Insert({self.schema.DFLTR.a: "hello"}).toSQL(
- QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))
+ QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )
),
SQLFragment("insert into DFLTR (a, b) values "
"(?, A_SEQ.nextval)", ["hello"]),
@@ -1924,7 +1953,9 @@
self.schema.DFLTR.b: self.schema.A_SEQ
}
).toSQL(
- QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))
+ QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )
),
SQLFragment(
"insert into DFLTR (a, b) values (?, A_SEQ.nextval)", ["hello"]
@@ -1943,8 +1974,7 @@
class FakeOracleTxn(object):
def execSQL(self, text, params, exc):
stmts.append((text, params))
- dialect = ORACLE_DIALECT
- paramstyle = "numeric"
+ dbtype = DatabaseType(ORACLE_DIALECT, "numeric")
Select(
[self.schema.FOO.BAR],
@@ -2171,7 +2201,9 @@
vvl = self.schema.veryveryveryveryveryveryveryverylong
self.assertEquals(
Insert({vvl.foo: 1}).toSQL(
- QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))
+ QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )
),
SQLFragment(
"insert into veryveryveryveryveryveryveryve (foo) values (?)",
@@ -2245,7 +2277,7 @@
self.assertEquals(
Call(
"procedure"
- ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ ).toSQL(QueryGenerator(DatabaseType(ORACLE_DIALECT, "qmark"))),
SQLFragment("call procedure()", (None,))
)
@@ -2253,7 +2285,7 @@
Call(
"procedure",
1, "2"
- ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ ).toSQL(QueryGenerator(DatabaseType(ORACLE_DIALECT, "qmark"))),
SQLFragment("call procedure()", (None, 1, "2"))
)
@@ -2261,7 +2293,7 @@
Call(
"function",
returnType=int
- ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ ).toSQL(QueryGenerator(DatabaseType(ORACLE_DIALECT, "qmark"))),
SQLFragment("call function()", (int,))
)
@@ -2270,14 +2302,14 @@
"function",
1, "2",
returnType=int
- ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ ).toSQL(QueryGenerator(DatabaseType(ORACLE_DIALECT, "qmark"))),
SQLFragment("call function()", (int, 1, "2"))
)
self.assertRaises(
NotImplementedError,
Call("procedure").toSQL,
- QueryGenerator(POSTGRES_DIALECT)
+ QueryGenerator(DatabaseType(POSTGRES_DIALECT, "qmark"))
)
@@ -2292,7 +2324,9 @@
self.assertEquals(
Insert(
{schema.FOO.BAR: 1, schema.FOO.UID: "test"},
- ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"insert into FOO (BAR, \"UID\") values (?, ?)", [1, "test"]
)
@@ -2301,7 +2335,9 @@
Update(
{schema.FOO.BAR: 1, schema.FOO.UID: "test"},
Where=(schema.FOO.BAR == 2),
- ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"update FOO set BAR = ?, \"UID\" = ? where BAR = ?", [1, "test", 2]
)
@@ -2311,7 +2347,9 @@
[schema.FOO.BAR, schema.FOO.UID],
From=schema.FOO,
Where=(schema.FOO.UID == "test"),
- ).toSQL(QueryGenerator(ORACLE_DIALECT, FixedPlaceholder("?"))),
+ ).toSQL(QueryGenerator(
+ DatabaseType(ORACLE_DIALECT, "pyformat"), FixedPlaceholder("?")
+ )),
SQLFragment(
"select BAR, \"UID\" from FOO where \"UID\" = ?", ["test"]
)
@@ -2388,7 +2426,7 @@
Tests which use an oracle connection.
"""
- dialect = ORACLE_DIALECT
+ dbtype = DatabaseType(ORACLE_DIALECT, DEFAULT_PARAM_STYLE)
def setUp(self):
"""
@@ -2406,10 +2444,10 @@
TestCase
):
- dialect = ORACLE_DIALECT
+ dbtype = DatabaseType(ORACLE_DIALECT, DEFAULT_PARAM_STYLE)
def setUp(self):
self.patch(syntax, "cx_Oracle", FakeCXOracleModule)
super(OracleNetConnectionTests, self).setUp()
ExampleSchemaHelper.setUp(self)
- self.pump.client.dialect = ORACLE_DIALECT
+ self.pump.client.dbtypedialect = ORACLE_DIALECT
Modified: twext/trunk/twext/enterprise/fixtures.py
===================================================================
--- twext/trunk/twext/enterprise/fixtures.py 2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/fixtures.py 2016-07-05 19:32:22 UTC (rev 15734)
@@ -32,6 +32,7 @@
from twisted.python.threadpool import ThreadPool
from twext.enterprise.adbapi2 import ConnectionPool
+from twext.enterprise.ienterprise import DatabaseType
from twext.enterprise.ienterprise import SQLITE_DIALECT
from twext.enterprise.ienterprise import POSTGRES_DIALECT
from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
@@ -39,7 +40,7 @@
-def buildConnectionPool(testCase, schemaText="", dialect=SQLITE_DIALECT):
+def buildConnectionPool(testCase, schemaText="", dbtype=DatabaseType(SQLITE_DIALECT, "numeric")):
"""
Build a L{ConnectionPool} for testing purposes, with the given C{testCase}.
@@ -71,8 +72,7 @@
con = connectionFactory()
con.executescript(schemaText)
con.commit()
- pool = ConnectionPool(connectionFactory, paramstyle="numeric",
- dialect=SQLITE_DIALECT)
+ pool = ConnectionPool(connectionFactory, dbtype=dbtype)
pool.startService()
testCase.addCleanup(pool.stopService)
return pool
@@ -227,8 +227,7 @@
L{ConnectionPool}.
"""
- dialect = POSTGRES_DIALECT
- paramstyle = DEFAULT_PARAM_STYLE
+ dbtype = DatabaseType(POSTGRES_DIALECT, DEFAULT_PARAM_STYLE)
def setUp(self, test=None, connect=None):
"""
@@ -245,8 +244,7 @@
self.pool = ConnectionPool(
connect,
maxConnections=2,
- dialect=self.dialect,
- paramstyle=self.paramstyle
+ dbtype=self.dbtype,
)
self.pool._createHolder = self.makeAHolder
self.clock = self.pool.reactor = ClockWithThreads()
@@ -301,8 +299,7 @@
capable of firing all its L{Deferred}s on demand, synchronously, by using
SQLite.
"""
- dialect = SQLITE_DIALECT
- paramstyle = sqlite3.paramstyle
+ dbtype = DatabaseType(SQLITE_DIALECT, sqlite3.paramstyle)
def __init__(self, schema):
self.schema = schema
Modified: twext/trunk/twext/enterprise/ienterprise.py
===================================================================
--- twext/trunk/twext/enterprise/ienterprise.py 2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/ienterprise.py 2016-07-05 19:32:22 UTC (rev 15734)
@@ -56,26 +56,61 @@
ORACLE_TABLE_NAME_MAX = 30
+"""
+Holds details about the database in use. The C{dialect} attribute is
+one of the C{*_DIALECT} constants in this module. The C{paramstyle}
+attribute is a copy of the DB-API 2.0 module attribute. The C{options}
+attribute is a set of optional features for the DB in use.
+"""
+class DatabaseType(object):
+ def __init__(self, dialect, paramstyle, options=()):
+ """
+ @param dialect: database dialect to use
+ @type dialect: L{str}
+ @param paramstyle: parameter style for SQL statements
+ @type paramstyle: L[str}
+ @param options: set of optional features
+ @type options: L{iterable}
+ """
+ self.dialect = dialect
+ self.paramstyle = paramstyle
+ self.options = frozenset(options)
+
+ def copyreplace(self, dialect=None, paramstyle=None, options=None):
+ """
+ Create a copy of this L{DatabaseType} and modify the supplied properties in
+ the new copy.
+
+ @param dialect: new value for C{dialect} or None for no change
+ @type dialect: L{str}
+ @param paramstyle: new value for C{paramstyle} or None for no change
+ @type paramstyle: L{str}
+ @param options: new value for C{options} or None for no change
+ @type options: L{iterable}
+ """
+
+ return DatabaseType(
+ self.dialect if dialect is None else dialect,
+ self.paramstyle if paramstyle is None else paramstyle,
+ self.options if options is None else options,
+ )
+
+
+
class ISQLExecutor(Interface):
"""
Base SQL-execution interface, for a group of commands or a transaction.
"""
- paramstyle = Attribute(
+ dbtype = Attribute(
"""
- A copy of the C{paramstyle} attribute from a DB-API 2.0 module.
+ A copy of the C{dbtype} attribute from the connection pool. It is
+ a L{DatabaseType}.
"""
)
- dialect = Attribute(
- """
- A copy of the C{dialect} attribute from the connection pool. One of
- the C{*_DIALECT} constants in this module, such as L{POSTGRES_DIALECT}.
- """
- )
-
def execSQL(sql, args=(), raiseOnZeroRowCount=None):
"""
Execute some SQL.
Modified: twext/trunk/twext/enterprise/jobs/jobitem.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/jobitem.py 2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py 2016-07-05 19:32:22 UTC (rev 15734)
@@ -376,7 +376,7 @@
@rtype: L{JobItem}
"""
- if txn.dialect == ORACLE_DIALECT:
+ if txn.dbtype.dialect == ORACLE_DIALECT:
# For Oracle we need a multi-app server solution that only locks the
# (one) row being returned by the query, and allows other app servers
@@ -409,6 +409,9 @@
elif minPriority == JOB_PRIORITY_HIGH:
queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
+ extra_kwargs = {}
+ if "skip-locked" in txn.dbtype.options:
+ extra_kwargs["skipLocked"] = True
jobs = yield cls.query(
txn,
queryExpr,
@@ -417,6 +420,7 @@
forUpdate=True,
noWait=False,
limit=1,
+ **extra_kwargs
)
job = jobs[0] if jobs else None
@@ -438,19 +442,23 @@
@rtype: L{JobItem}
"""
- if txn.dialect == ORACLE_DIALECT:
+ if txn.dbtype.dialect == ORACLE_DIALECT:
# See L{nextjob} for why Oracle is different
job = None
jobID = yield Call("overdue_job", now, returnType=int).on(txn)
if jobID:
job = yield cls.load(txn, jobID)
else:
+ extra_kwargs = {}
+ if "skip-locked" in txn.dbtype.options:
+ extra_kwargs["skipLocked"] = True
jobs = yield cls.query(
txn,
(cls.assigned != None).And(cls.overdue < now),
forUpdate=True,
noWait=False,
limit=1,
+ **extra_kwargs
)
job = jobs[0] if jobs else None
Modified: twext/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_adbapi2.py 2016-07-01 16:33:47 UTC (rev 15733)
+++ twext/trunk/twext/enterprise/test/test_adbapi2.py 2016-07-05 19:32:22 UTC (rev 15734)
@@ -676,7 +676,7 @@
"""
Change the paramstyle of the transaction under test.
"""
- self.pool.paramstyle = paramstyle
+ self.pool.dbtype = self.pool.dbtype.copyreplace(paramstyle=paramstyle)
def test_propagateParamstyle(self):
@@ -687,24 +687,24 @@
TEST_PARAMSTYLE = "justtesting"
self.setParamstyle(TEST_PARAMSTYLE)
normaltxn = self.createTransaction()
- self.assertEquals(normaltxn.paramstyle, TEST_PARAMSTYLE)
- self.assertEquals(normaltxn.commandBlock().paramstyle, TEST_PARAMSTYLE)
+ self.assertEquals(normaltxn.dbtype.paramstyle, TEST_PARAMSTYLE)
+ self.assertEquals(normaltxn.commandBlock().dbtype.paramstyle, TEST_PARAMSTYLE)
self.pauseHolders()
extra = []
extra.append(self.createTransaction())
waitingtxn = self.createTransaction()
- self.assertEquals(waitingtxn.paramstyle, TEST_PARAMSTYLE)
+ self.assertEquals(waitingtxn.dbtype.paramstyle, TEST_PARAMSTYLE)
self.flushHolders()
self.pool.stopService()
notxn = self.createTransaction()
- self.assertEquals(notxn.paramstyle, TEST_PARAMSTYLE)
+ self.assertEquals(notxn.dbtype.paramstyle, TEST_PARAMSTYLE)
def setDialect(self, dialect):
"""
Change the dialect of the transaction under test.
"""
- self.pool.dialect = dialect
+ self.pool.dbtype = self.pool.dbtype.copyreplace(dialect=dialect)
def test_propagateDialect(self):
@@ -715,17 +715,17 @@
TEST_DIALECT = "otherdialect"
self.setDialect(TEST_DIALECT)
normaltxn = self.createTransaction()
- self.assertEquals(normaltxn.dialect, TEST_DIALECT)
- self.assertEquals(normaltxn.commandBlock().dialect, TEST_DIALECT)
+ self.assertEquals(normaltxn.dbtype.dialect, TEST_DIALECT)
+ self.assertEquals(normaltxn.commandBlock().dbtype.dialect, TEST_DIALECT)
self.pauseHolders()
extra = []
extra.append(self.createTransaction())
waitingtxn = self.createTransaction()
- self.assertEquals(waitingtxn.dialect, TEST_DIALECT)
+ self.assertEquals(waitingtxn.dbtype.dialect, TEST_DIALECT)
self.flushHolders()
self.pool.stopService()
notxn = self.createTransaction()
- self.assertEquals(notxn.dialect, TEST_DIALECT)
+ self.assertEquals(notxn.dbtype.dialect, TEST_DIALECT)
def test_reConnectWhenFirstExecFails(self):
@@ -1330,8 +1330,7 @@
super(NetworkedPoolHelper, self).setUp()
self.pump = IOPump(
ConnectionPoolClient(
- dialect=self.dialect,
- paramstyle=self.paramstyle
+ dbtype=self.dbtype,
),
ConnectionPoolConnection(self.pool)
)
@@ -1384,7 +1383,7 @@
Change the paramstyle on both the pool and the client.
"""
super(NetworkedConnectionPoolTests, self).setParamstyle(paramstyle)
- self.pump.client.paramstyle = paramstyle
+ self.pump.client.dbtype = self.pump.client.dbtype.copyreplace(paramstyle=paramstyle)
def setDialect(self, dialect):
@@ -1392,7 +1391,7 @@
Change the dialect on both the pool and the client.
"""
super(NetworkedConnectionPoolTests, self).setDialect(dialect)
- self.pump.client.dialect = dialect
+ self.pump.client.dbtype = self.pump.client.dbtype.copyreplace(dialect=dialect)
def test_newTransaction(self):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20160705/41c5ebf9/attachment-0001.html>
More information about the calendarserver-changes
mailing list