<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[15141] twext/trunk/twext/enterprise</title>
</head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; }
#msg dl a { font-weight: bold}
#msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/15141">15141</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2015-09-21 07:29:03 -0700 (Mon, 21 Sep 2015)</dd>
</dl>
<h3>Log Message</h3>
<pre>Changes for Oracle: add Call(), SKIP LOCKED, and Case SQL syntax support. Use stored procedures for next_job processing. Record.deletesome multi-row return.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterpriseadbapi2py">twext/trunk/twext/enterprise/adbapi2.py</a></li>
<li><a href="#twexttrunktwextenterprisedalrecordpy">twext/trunk/twext/enterprise/dal/record.py</a></li>
<li><a href="#twexttrunktwextenterprisedalsyntaxpy">twext/trunk/twext/enterprise/dal/syntax.py</a></li>
<li><a href="#twexttrunktwextenterprisedaltesttest_sqlsyntaxpy">twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py</a></li>
<li><a href="#twexttrunktwextenterprisejobsjobitempy">twext/trunk/twext/enterprise/jobs/jobitem.py</a></li>
<li><a href="#twexttrunktwextenterprisejobsqueuepy">twext/trunk/twext/enterprise/jobs/queue.py</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterpriseadbapi2py"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/adbapi2.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/adbapi2.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/adbapi2.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -229,6 +229,13 @@
</span><span class="cx"> @raise raiseOnZeroRowCount: if the argument was specified and no rows
</span><span class="cx"> were returned by the executed statement.
</span><span class="cx"> """
</span><ins>+ # Oracle only: we need to support the CALL statement which is available in
+ # in the cx_Oracle module via special callproc() and callfunc() methods
+ # that are not part of the standard Python DBAPI.
+ stmts = sql.splitlines()
+ if stmts[-1].startswith("call "):
+ return self._reallyCallSQL(stmts[-1], args)
+
</ins><span class="cx"> wasFirst = self._first
</span><span class="cx">
</span><span class="cx"> # If this is the first time this cursor has been used in this
</span><span class="lines">@@ -312,12 +319,103 @@
</span><span class="cx"> # Oracle with a return into clause returns an empty set or rows, but
</span><span class="cx"> # we then have to insert the special bind variables for the return into.
</span><span class="cx"> # Thus we need to know whether there was any rowcount from the actual query.
</span><del>- # What we do is always insert a set of empty rows as the result if the
- # rowcount is non-zero. Then we can detect whether the bind variables
- # need to be added into the result set.
- return [[]] * self._cursor.rowcount if self._cursor.rowcount else None
</del><ins>+ # What we do is always insert a set of empty rows equal to the rowcount. in
+ # the case of no result, an empty list is returned. This way we can detect
+ # that the additional bind variables are needed (if len(result) != 0).
+ return [[]] * self._cursor.rowcount
</ins><span class="cx">
</span><span class="cx">
</span><ins>+ def _reallyCallSQL(self, sql, args=None):
+ """
+ Use cx_Oracle's callproc() or callfunc() Cursor methods to execute a
+ stored procedure.
+
+ @param sql: The SQL string to execute.
+ @type sql: C{str}
+
+ @param args: The parameters of the procedure or function, if any.
+ @type args: C{list} or C{None}
+
+ @raise Exception: this function may raise any exception raised by the
+ underlying C{dbapi.connect}, C{cursor.execute},
+ L{IDerivedParameter.preQuery}, C{connection.cursor}, or
+ C{cursor.callxxx}.
+ """
+
+ # The sql will be of the form "call <name>()" with args
+ if not sql.startswith("call ") or not sql.endswith("()"):
+ raise ValueError("Invalid SQL CALL statement: {}".format(sql))
+ name = sql[5:-2]
+ returnType = args[0]
+ args = args[1:]
+
+ wasFirst = self._first
+
+ # If this is the first time this cursor has been used in this
+ # transaction, remember that, but mark it as now used.
+ self._first = False
+
+ try:
+ # Make the return value look like an array of rows/columns, as
+ # one would expect for something like a SELECT
+ if returnType is not None:
+ returnValue = self._cursor.callfunc(name, returnType, args)
+ returnValue = [returnValue, ]
+ else:
+ returnValue = self._cursor.callproc(name, args)
+ returnValue = [returnValue, ]
+ except:
+ # If execute() raised an exception, and this was the first thing to
+ # happen in the transaction, then the connection has probably gone
+ # bad in the meanwhile, and we should try again.
+ if wasFirst:
+ # Report the error before doing anything else, since doing
+ # other things may cause the traceback stack to be eliminated
+ # if they raise exceptions (even internally).
+ log.failure(
+ "Exception from execute() on first statement in "
+ "transaction. Possibly caused by a database server "
+ "restart. Automatically reconnecting now.",
+ failure=Failure(),
+ )
+ try:
+ self._connection.close()
+ except:
+ # close() may raise an exception to alert us of an error as
+ # well. Right now the only type of error we know about is
+ # "the connection is already closed", which obviously
+ # doesn't need to be handled specially. Unfortunately the
+ # reporting of this type of error is not consistent or
+ # predictable across different databases, or even different
+ # bindings to the same database, so we have to do a
+ # catch-all here. While I can't imagine another type of
+ # error at the moment, bare C{except:}s are notorious for
+ # making debugging surprising error conditions very
+ # difficult, so let's make sure that the error is logged
+ # just in case.
+ log.failure(
+ "Exception from close() while automatically "
+ "reconnecting. (Probably not serious.)",
+ failure=Failure(),
+ )
+
+ # Now, if either of *these* things fail, there's an error here
+ # that we cannot workaround or address automatically, so no
+ # try:except: for them.
+ self._connection = self._pool.connectionFactory()
+ self._cursor = self._connection.cursor()
+
+ # Note that although this method is being invoked recursively,
+ # the "_first" flag is re-set at the very top, so we will _not_
+ # be re-entering it more than once.
+ result = self._reallyCallSQL(sql, args)
+ return result
+ else:
+ raise
+ else:
+ return returnValue
+
+
</ins><span class="cx"> def execSQL(self, *args, **kw):
</span><span class="cx"> if self._completed:
</span><span class="cx"> raise RuntimeError("Attempt to use {} transaction.".format(self._completed))
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalrecordpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/record.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/record.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/record.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -33,6 +33,7 @@
</span><span class="cx"> from twext.enterprise.dal.syntax import (
</span><span class="cx"> Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete, SavepointAction,
</span><span class="cx"> Count, ALL_COLUMNS)
</span><ins>+from twext.enterprise.ienterprise import ORACLE_DIALECT
</ins><span class="cx"> from twext.enterprise.util import parseSQLTimestamp
</span><span class="cx"> # from twext.enterprise.dal.syntax import ExpressionSyntax
</span><span class="cx">
</span><span class="lines">@@ -472,7 +473,7 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><del>- def query(cls, transaction, expr, order=None, group=None, limit=None, forUpdate=False, noWait=False, ascending=True, distinct=False):
</del><ins>+ def query(cls, transaction, expr, order=None, group=None, limit=None, forUpdate=False, noWait=False, skipLocked=False, ascending=True, distinct=False):
</ins><span class="cx"> """
</span><span class="cx"> Query the table that corresponds to C{cls}, and return instances of
</span><span class="cx"> C{cls} corresponding to the rows that are returned from that table.
</span><span class="lines">@@ -495,6 +496,8 @@
</span><span class="cx"> @type forUpdate: L{bool}
</span><span class="cx"> @param noWait: include NOWAIT with the FOR UPDATE
</span><span class="cx"> @type noWait: L{bool}
</span><ins>+ @param skipLocked: include SKIP LOCKED with the FOR UPDATE
+ @type skipLocked: L{bool}
</ins><span class="cx"> """
</span><span class="cx"> return cls._rowsFromQuery(
</span><span class="cx"> transaction,
</span><span class="lines">@@ -505,6 +508,7 @@
</span><span class="cx"> limit=limit,
</span><span class="cx"> forUpdate=forUpdate,
</span><span class="cx"> noWait=noWait,
</span><ins>+ skipLocked=skipLocked,
</ins><span class="cx"> ascending=ascending,
</span><span class="cx"> distinct=distinct,
</span><span class="cx"> ),
</span><span class="lines">@@ -513,7 +517,7 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><del>- def queryExpr(cls, expr, attributes=None, order=None, group=None, limit=None, forUpdate=False, noWait=False, ascending=True, distinct=False):
</del><ins>+ def queryExpr(cls, expr, attributes=None, order=None, group=None, limit=None, forUpdate=False, noWait=False, skipLocked=False, ascending=True, distinct=False):
</ins><span class="cx"> """
</span><span class="cx"> Query expression that corresponds to C{cls}. Used in cases where a sub-select
</span><span class="cx"> on this record's table is needed.
</span><span class="lines">@@ -536,6 +540,8 @@
</span><span class="cx"> @type forUpdate: L{bool}
</span><span class="cx"> @param noWait: include NOWAIT with the FOR UPDATE
</span><span class="cx"> @type noWait: L{bool}
</span><ins>+ @param skipLocked: include SKIP LOCKED with the FOR UPDATE
+ @type skipLocked: L{bool}
</ins><span class="cx"> """
</span><span class="cx"> kw = {}
</span><span class="cx"> if order is not None:
</span><span class="lines">@@ -548,6 +554,8 @@
</span><span class="cx"> kw.update(ForUpdate=True)
</span><span class="cx"> if noWait:
</span><span class="cx"> kw.update(NoWait=True)
</span><ins>+ if skipLocked:
+ kw.update(SkipLocked=True)
</ins><span class="cx"> if distinct:
</span><span class="cx"> kw.update(Distinct=True)
</span><span class="cx"> if attributes is None:
</span><span class="lines">@@ -631,15 +639,32 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><ins>+ @inlineCallbacks
</ins><span class="cx"> def deletesome(cls, transaction, where, returnCols=None):
</span><span class="cx"> """
</span><span class="cx"> Delete all rows matching the where expression from the table that corresponds to C{cls}.
</span><span class="cx"> """
</span><del>- return Delete(
- From=cls.table,
- Where=where,
- Return=returnCols,
- ).on(transaction)
</del><ins>+ if transaction.dialect == ORACLE_DIALECT and returnCols is not None:
+ # Oracle cannot return multiple rows in the RETURNING clause so
+ # we have to split this into a SELECT followed by a DELETE
+ if not isinstance(returnCols, (tuple, list)):
+ returnCols = [returnCols, ]
+ result = yield Select(
+ returnCols,
+ From=cls.table,
+ Where=where,
+ ).on(transaction)
+ yield Delete(
+ From=cls.table,
+ Where=where,
+ ).on(transaction)
+ else:
+ result = yield Delete(
+ From=cls.table,
+ Where=where,
+ Return=returnCols,
+ ).on(transaction)
+ returnValue(result)
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @classmethod
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalsyntaxpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/syntax.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/syntax.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/syntax.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -1079,6 +1079,10 @@
</span><span class="cx"> super(NullComparison, self).__init__(a, op, None)
</span><span class="cx">
</span><span class="cx">
</span><ins>+ def allColumns(self):
+ return self.a.allColumns()
+
+
</ins><span class="cx"> def subSQL(self, queryGenerator, allTables):
</span><span class="cx"> sqls = SQLFragment()
</span><span class="cx"> sqls.append(self.a.subSQL(queryGenerator, allTables))
</span><span class="lines">@@ -1224,6 +1228,58 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><ins>+class Case(ExpressionSyntax):
+ """
+ Implementation of a simple CASE statement:
+
+ CASE ... WHEN ... THEN ... ELSE ... END
+
+ Note that SQL actually allows multiple WHEN ... THEN ... clauses, but
+ this implementation only supports one.
+ """
+
+ def __init__(self, when, true_result, false_result):
+ """
+ @param when: WHEN clause - typically a L{Comparison}
+ @type when: L{Expression}
+ @param true_result: result when WHEN clause is true
+ @type true_result: L{Constant} or L{None}
+ @param false_result: result when WHEN clause is false
+ @type false_result: L{Constant} or L{None}
+ """
+ self.when = when
+ self.true_result = true_result
+ self.false_result = false_result
+
+
+ def subSQL(self, queryGenerator, allTables):
+ result = SQLFragment("case when ")
+ result.append(self.when.subSQL(queryGenerator, allTables))
+ result.append(SQLFragment(" then "))
+ if self.true_result is None:
+ result.append(SQLFragment("null"))
+ else:
+ result.append(self.true_result.subSQL(queryGenerator, allTables))
+ result.append(SQLFragment(" else "))
+ if self.false_result is None:
+ result.append(SQLFragment("null"))
+ else:
+ result.append(self.false_result.subSQL(queryGenerator, allTables))
+ result.append(SQLFragment(" end"))
+
+ return result
+
+
+ def allColumns(self):
+ """
+ Return all columns referenced by any sub-clauses.
+ """
+ return self.when.allColumns() + \
+ (self.true_result.allColumns() if self.true_result is not None else []) + \
+ (self.false_result.allColumns() if self.false_result is not None else [])
+
+
+
</ins><span class="cx"> class Tuple(ExpressionSyntax):
</span><span class="cx">
</span><span class="cx"> def __init__(self, columns):
</span><span class="lines">@@ -1340,7 +1396,7 @@
</span><span class="cx"> self,
</span><span class="cx"> columns=None, Where=None, From=None,
</span><span class="cx"> OrderBy=None, GroupBy=None,
</span><del>- Limit=None, ForUpdate=False, NoWait=False, Ascending=None,
</del><ins>+ Limit=None, ForUpdate=False, NoWait=False, SkipLocked=False, Ascending=None,
</ins><span class="cx"> Having=None, Distinct=False, As=None, SetExpression=None
</span><span class="cx"> ):
</span><span class="cx"> self.From = From
</span><span class="lines">@@ -1365,6 +1421,7 @@
</span><span class="cx">
</span><span class="cx"> self.ForUpdate = ForUpdate
</span><span class="cx"> self.NoWait = NoWait
</span><ins>+ self.SkipLocked = SkipLocked
</ins><span class="cx"> self.Ascending = Ascending
</span><span class="cx"> self.As = As
</span><span class="cx">
</span><span class="lines">@@ -1453,6 +1510,8 @@
</span><span class="cx"> stmt.text += " for update"
</span><span class="cx"> if self.NoWait:
</span><span class="cx"> stmt.text += " nowait"
</span><ins>+ if self.SkipLocked:
+ stmt.text += " skip locked"
</ins><span class="cx">
</span><span class="cx"> if self.Limit is not None:
</span><span class="cx"> limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)
</span><span class="lines">@@ -1527,6 +1586,55 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><ins>+class Call(_Statement):
+ """
+ CALL statement. Only supported by Oracle.
+ """
+
+ def __init__(self, name, *args, **kwargs):
+ """
+ @param name: name of procedure or function to call
+ @type name: L{str}
+ @param *args: arguments for the procedure or functions
+ @type *args: L{list}
+ @param returnType: kwarg: the Python type of the return value for
+ a function
+ @type returnType: L{Type}
+ """
+ self.Name = name
+ self.Args = args
+ self.ReturnType = kwargs.get("returnType")
+
+
+ def _toSQL(self, queryGenerator):
+ """
+ Generate an SQL statement of the form "call <name>()" with a list of
+ args, where the first arg is always the return type (which is C{None}
+ for a procedure, rather than a function, call).
+
+ @return: a C{call} statement with arguments
+ @rtype: L{SQLFragment}
+ """
+
+ if queryGenerator.dialect != ORACLE_DIALECT:
+ raise NotImplementedError("CALL statement only available with Oracle DB")
+ args = (self.ReturnType,) + self.Args
+ stmt = SQLFragment("call ", args)
+ stmt.text += self.Name
+ stmt.text += "()"
+
+ return stmt
+
+
+ def _fixOracleNulls(self, rows):
+ """
+ Suppress the super class behavior because we are getting result values
+ directly, not from columns.
+ """
+ return rows[0][0]
+
+
+
</ins><span class="cx"> def _commaJoined(stmts):
</span><span class="cx"> first = True
</span><span class="cx"> cstatement = SQLFragment()
</span><span class="lines">@@ -1658,9 +1766,9 @@
</span><span class="cx"> ):
</span><span class="cx"> def processIt(emptyListResult):
</span><span class="cx"> # See comment in L{adbapi2._ConnectedTxn._reallyExecSQL}. If the
</span><del>- # result is L{None} then also return L{None}. If the result is a
</del><ins>+ # result is an empty list, just return that. If the result is a
</ins><span class="cx"> # L{list} of empty L{list} then there are return into rows to return.
</span><del>- if emptyListResult:
</del><ins>+ if len(emptyListResult) > 0:
</ins><span class="cx"> emptyListResult = [[v.value for _ignore_k, v in outvars]]
</span><span class="cx"> return emptyListResult
</span><span class="cx"> return result.addCallback(processIt)
</span></span></pre></div>
<a id="twexttrunktwextenterprisedaltesttest_sqlsyntaxpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -33,7 +33,8 @@
</span><span class="cx"> Savepoint, RollbackToSavepoint, ReleaseSavepoint, SavepointAction,
</span><span class="cx"> Union, Intersect, Except, SetExpression, DALError,
</span><span class="cx"> ResultAliasSyntax, Count, QueryGenerator, ALL_COLUMNS,
</span><del>- DatabaseLock, DatabaseUnlock, Not, Coalesce, NullIf)
</del><ins>+ DatabaseLock, DatabaseUnlock, Not, Coalesce, NullIf,
+ Call, Case)
</ins><span class="cx"> from twext.enterprise.dal.syntax import FixedPlaceholder, NumericPlaceholder
</span><span class="cx"> from twext.enterprise.dal.syntax import Function
</span><span class="cx"> from twext.enterprise.dal.syntax import SchemaSyntax
</span><span class="lines">@@ -366,6 +367,18 @@
</span><span class="cx"> Select(From=self.schema.FOO, ForUpdate=True).toSQL(),
</span><span class="cx"> SQLFragment("select * from FOO for update")
</span><span class="cx"> )
</span><ins>+ self.assertEquals(
+ Select(From=self.schema.FOO, ForUpdate=True, NoWait=True).toSQL(),
+ SQLFragment("select * from FOO for update nowait")
+ )
+ self.assertEquals(
+ Select(From=self.schema.FOO, ForUpdate=True, SkipLocked=True).toSQL(),
+ SQLFragment("select * from FOO for update skip locked")
+ )
+ self.assertEquals(
+ Select(From=self.schema.FOO, ForUpdate=True, NoWait=True, SkipLocked=True).toSQL(),
+ SQLFragment("select * from FOO for update nowait skip locked")
+ )
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def test_groupBy(self):
</span><span class="lines">@@ -2192,7 +2205,83 @@
</span><span class="cx"> self.assertEquals(values, {})
</span><span class="cx">
</span><span class="cx">
</span><ins>+ def test_case(self):
+ """
+ A L{Case} object will generate an appropriate SQL statement.
+ """
+ self.assertEquals(
+ Select(
+ [Case((self.schema.FOO.BAR < 1), Constant(2), Constant(3))],
+ From=self.schema.FOO,
+ Limit=123
+ ).toSQL(),
+ SQLFragment("select case when BAR < ? then ? else ? end from FOO limit ?", [1, 2, 3, 123])
+ )
+ self.assertEqual(Case((self.schema.FOO.BAR < 1), Constant(2), Constant(3)).allColumns(), [self.schema.FOO.BAR, ])
+ self.assertEquals(
+ Select(
+ [Case((self.schema.FOO.BAR < 1), Constant(2), None)],
+ From=self.schema.FOO,
+ Limit=123
+ ).toSQL(),
+ SQLFragment("select case when BAR < ? then ? else null end from FOO limit ?", [1, 2, 123])
+ )
+ self.assertEqual(Case((self.schema.FOO.BAR < 1), Constant(2), None).allColumns(), [self.schema.FOO.BAR, ])
+ self.assertEquals(
+ Select(
+ [Case((self.schema.FOO.BAR < 1), None, Constant(3))],
+ From=self.schema.FOO,
+ Limit=123
+ ).toSQL(),
+ SQLFragment("select case when BAR < ? then null else ? end from FOO limit ?", [1, 3, 123])
+ )
+ self.assertEqual(Case((self.schema.FOO.BAR < 1), None, Constant(3)).allColumns(), [self.schema.FOO.BAR, ])
</ins><span class="cx">
</span><ins>+
+ def test_call(self):
+ """
+ A L{Call} object will generate an appropriate SQL statement.
+ """
+ self.assertEquals(
+ Call(
+ "procedure"
+ ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ SQLFragment("call procedure()", (None,))
+ )
+
+ self.assertEquals(
+ Call(
+ "procedure",
+ 1, "2"
+ ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ SQLFragment("call procedure()", (None, 1, "2"))
+ )
+
+ self.assertEquals(
+ Call(
+ "function",
+ returnType=int
+ ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ SQLFragment("call function()", (int,))
+ )
+
+ self.assertEquals(
+ Call(
+ "function",
+ 1, "2",
+ returnType=int
+ ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ SQLFragment("call function()", (int, 1, "2"))
+ )
+
+ self.assertRaises(
+ NotImplementedError,
+ Call("procedure").toSQL,
+ QueryGenerator(POSTGRES_DIALECT)
+ )
+
+
+
</ins><span class="cx"> class OracleConnectionMethods(object):
</span><span class="cx"> def test_rewriteOracleNULLs_Insert(self):
</span><span class="cx"> """
</span><span class="lines">@@ -2250,7 +2339,7 @@
</span><span class="cx"> {self.schema.FOO.BAR: 40, self.schema.FOO.BAZ: 50}
</span><span class="cx"> )
</span><span class="cx"> result = self.resultOf(i.on(self.createTransaction()))
</span><del>- self.assertEquals(result, [None])
</del><ins>+ self.assertEquals(result, [[]])
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobsjobitempy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/jobitem.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/jobitem.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -18,8 +18,7 @@
</span><span class="cx"> from twext.enterprise.dal.model import Sequence
</span><span class="cx"> from twext.enterprise.dal.model import Table, Schema, SQLType
</span><span class="cx"> from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
</span><del>-from twext.enterprise.dal.syntax import SchemaSyntax, Count, NullIf, Constant, \
- Sum
</del><ins>+from twext.enterprise.dal.syntax import SchemaSyntax, Call, Count, Case, Constant, Sum
</ins><span class="cx"> from twext.enterprise.ienterprise import ORACLE_DIALECT
</span><span class="cx"> from twext.enterprise.jobs.utils import inTransaction, astimestamp
</span><span class="cx"> from twext.python.log import Logger
</span><span class="lines">@@ -377,114 +376,95 @@
</span><span class="cx"> @rtype: L{JobItem}
</span><span class="cx"> """
</span><span class="cx">
</span><del>- jobs = yield cls.nextjobs(txn, now, minPriority, limit=1)
</del><ins>+ if txn.dialect == ORACLE_DIALECT:
</ins><span class="cx">
</span><del>- # Must only be one or zero
- if jobs and len(jobs) > 1:
- raise AssertionError("nextjob() returned more than one row")
</del><ins>+ # For Oracle we need a multi-app server solution that only locks the
+ # (one) row being returned by the query, and allows other app servers
+ # to run the query in parallel and get the next unlocked row.
+ #
+ # To do that we use a stored procedure that uses a CURSOR with a
+ # SELECT ... FOR UPDATE SKIP LOCKED query that ensures only the row
+ # being fetched is locked and existing locked rows are skipped.
</ins><span class="cx">
</span><del>- returnValue(jobs[0] if jobs else None)
</del><ins>+ # Three separate stored procedures for the three priority cases
+ if minPriority == JOB_PRIORITY_LOW:
+ function = "next_job_all"
+ elif minPriority == JOB_PRIORITY_MEDIUM:
+ function = "next_job_medium_high"
+ elif minPriority == JOB_PRIORITY_HIGH:
+ function = "next_job_high"
</ins><span class="cx">
</span><ins>+ job = None
+ jobID = yield Call(function, now, returnType=int).on(txn)
+ if jobID:
+ job = yield cls.load(txn, jobID)
+ else:
+ # Only add the PRIORITY term if minimum is greater than zero
+ queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore <= now)
</ins><span class="cx">
</span><del>- @classmethod
- @inlineCallbacks
- def nextjobs(cls, txn, now, minPriority, limit=1):
- """
- Find the next available job based on priority, also return any that are overdue.
</del><ins>+ # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
+ # an equality test as follows:
+ #
+ # PRIORITY >= 0 - no test needed all values match all the time
+ # PRIORITY >= 1 === PRIORITY != 0
+ # PRIORITY >= 2 === PRIORITY == 2
+ #
+ # Doing this allows use of the PRIORITY column in an index since we already
+ # have one inequality in the index (NOT_BEFORE)
</ins><span class="cx">
</span><del>- @param txn: the transaction to use
- @type txn: L{IAsyncTransaction}
- @param now: current timestamp
- @type now: L{datetime.datetime}
- @param minPriority: lowest priority level to query for
- @type minPriority: L{int}
- @param limit: limit on number of jobs to return
- @type limit: L{int}
</del><ins>+ if minPriority == JOB_PRIORITY_MEDIUM:
+ queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
+ elif minPriority == JOB_PRIORITY_HIGH:
+ queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
</ins><span class="cx">
</span><del>- @return: the job record
- @rtype: L{JobItem}
- """
-
- # Only add the PRIORITY term if minimum is greater than zero
- queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore <= now)
-
- # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
- # an equality test as follows:
- #
- # PRIORITY >= 0 - no test needed all values match all the time
- # PRIORITY >= 1 === PRIORITY != 0
- # PRIORITY >= 2 === PRIORITY == 2
- #
- # Doing this allows use of the PRIORITY column in an index since we already
- # have one inequality in the index (NOT_BEFORE)
-
- if minPriority == JOB_PRIORITY_MEDIUM:
- queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
- elif minPriority == JOB_PRIORITY_HIGH:
- queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
-
- if txn.dialect == ORACLE_DIALECT:
- # Oracle does not support a "for update" clause with "order by". So do the
- # "for update" as a second query right after the first. Will need to check
- # how this might impact concurrency in a multi-host setup.
</del><span class="cx"> jobs = yield cls.query(
</span><span class="cx"> txn,
</span><span class="cx"> queryExpr,
</span><span class="cx"> order=cls.priority,
</span><span class="cx"> ascending=False,
</span><del>- limit=limit,
- )
- if jobs:
- yield cls.query(
- txn,
- (cls.jobID.In([job.jobID for job in jobs])),
- forUpdate=True,
- noWait=False,
- )
- else:
- jobs = yield cls.query(
- txn,
- queryExpr,
- order=cls.priority,
- ascending=False,
</del><span class="cx"> forUpdate=True,
</span><span class="cx"> noWait=False,
</span><del>- limit=limit,
</del><ins>+ limit=1,
</ins><span class="cx"> )
</span><ins>+ job = jobs[0] if jobs else None
</ins><span class="cx">
</span><del>- returnValue(jobs)
</del><ins>+ returnValue(job)
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><span class="cx"> @inlineCallbacks
</span><del>- def overduejobs(cls, txn, now, limit=None):
</del><ins>+ def overduejob(cls, txn, now):
</ins><span class="cx"> """
</span><del>- Find the next overdue job based on priority.
</del><ins>+ Find the next overdue job.
</ins><span class="cx">
</span><span class="cx"> @param txn: the transaction to use
</span><span class="cx"> @type txn: L{IAsyncTransaction}
</span><span class="cx"> @param now: current timestamp
</span><span class="cx"> @type now: L{datetime.datetime}
</span><del>- @param limit: limit on number of jobs to return
- @type limit: L{int}
</del><span class="cx">
</span><span class="cx"> @return: the job record
</span><span class="cx"> @rtype: L{JobItem}
</span><span class="cx"> """
</span><span class="cx">
</span><del>- queryExpr = (cls.assigned != None).And(cls.overdue < now)
</del><ins>+ if txn.dialect == ORACLE_DIALECT:
+ # See L{nextjob} for why Oracle is different
+ job = None
+ jobID = yield Call("overdue_job", now, returnType=int).on(txn)
+ if jobID:
+ job = yield cls.load(txn, jobID)
+ else:
+ jobs = yield cls.query(
+ txn,
+ (cls.assigned != None).And(cls.overdue < now),
+ forUpdate=True,
+ noWait=False,
+ limit=1,
+ )
+ job = jobs[0] if jobs else None
</ins><span class="cx">
</span><del>- jobs = yield cls.query(
- txn,
- queryExpr,
- forUpdate=True,
- noWait=False,
- limit=limit,
- )
</del><ins>+ returnValue(job)
</ins><span class="cx">
</span><del>- returnValue(jobs)
</del><span class="cx">
</span><del>-
</del><span class="cx"> @inlineCallbacks
</span><span class="cx"> def run(self):
</span><span class="cx"> """
</span><span class="lines">@@ -714,7 +694,7 @@
</span><span class="cx"> cls.workType,
</span><span class="cx"> Count(cls.workType),
</span><span class="cx"> Count(cls.assigned),
</span><del>- Count(NullIf(cls.assigned is not None and cls.notBefore < now, Constant(False))),
</del><ins>+ Count(Case((cls.assigned == None).And(cls.notBefore < now), Constant(1), None)),
</ins><span class="cx"> Sum(cls.failed),
</span><span class="cx"> ),
</span><span class="cx"> group=cls.workType
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobsqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/queue.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/queue.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/jobs/queue.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -705,10 +705,8 @@
</span><span class="cx"> txn = overdueJob = None
</span><span class="cx"> try:
</span><span class="cx"> txn = self.transactionFactory(label="jobqueue.overdueCheck")
</span><del>- overdueJobs = yield JobItem.overduejobs(txn, nowTime, limit=1)
- if overdueJobs:
- overdueJob = overdueJobs[0]
- else:
</del><ins>+ overdueJob = yield JobItem.overduejob(txn, nowTime)
+ if overdueJob is None:
</ins><span class="cx"> break
</span><span class="cx">
</span><span class="cx"> # It is overdue - check to see whether the work item is currently locked - if so no
</span></span></pre>
</div>
</div>
</body>
</html>