<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[15141] twext/trunk/twext/enterprise</title>
</head>
<body>

<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt;  }
#msg dl a { font-weight: bold}
#msg dl a:link    { color:#fc3; }
#msg dl a:active  { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff  {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/15141">15141</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2015-09-21 07:29:03 -0700 (Mon, 21 Sep 2015)</dd>
</dl>

<h3>Log Message</h3>
<pre>Changes for Oracle: add Call(), SKIP LOCKED, and Case SQL syntax support. Use stored procedures for next_job processing. Record.deletesome multi-row return.</pre>

<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterpriseadbapi2py">twext/trunk/twext/enterprise/adbapi2.py</a></li>
<li><a href="#twexttrunktwextenterprisedalrecordpy">twext/trunk/twext/enterprise/dal/record.py</a></li>
<li><a href="#twexttrunktwextenterprisedalsyntaxpy">twext/trunk/twext/enterprise/dal/syntax.py</a></li>
<li><a href="#twexttrunktwextenterprisedaltesttest_sqlsyntaxpy">twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py</a></li>
<li><a href="#twexttrunktwextenterprisejobsjobitempy">twext/trunk/twext/enterprise/jobs/jobitem.py</a></li>
<li><a href="#twexttrunktwextenterprisejobsqueuepy">twext/trunk/twext/enterprise/jobs/queue.py</a></li>
</ul>

</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterpriseadbapi2py"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/adbapi2.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/adbapi2.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/adbapi2.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -229,6 +229,13 @@
</span><span class="cx">         @raise raiseOnZeroRowCount: if the argument was specified and no rows
</span><span class="cx">             were returned by the executed statement.
</span><span class="cx">         &quot;&quot;&quot;
</span><ins>+        # Oracle only: we need to support the CALL statement which is available in
+        # in the cx_Oracle module via special callproc() and callfunc() methods
+        # that are not part of the standard Python DBAPI.
+        stmts = sql.splitlines()
+        if stmts[-1].startswith(&quot;call &quot;):
+            return self._reallyCallSQL(stmts[-1], args)
+
</ins><span class="cx">         wasFirst = self._first
</span><span class="cx"> 
</span><span class="cx">         # If this is the first time this cursor has been used in this
</span><span class="lines">@@ -312,12 +319,103 @@
</span><span class="cx">             # Oracle with a return into clause returns an empty set or rows, but
</span><span class="cx">             # we then have to insert the special bind variables for the return into.
</span><span class="cx">             # Thus we need to know whether there was any rowcount from the actual query.
</span><del>-            # What we do is always insert a set of empty rows as the result if the
-            # rowcount is non-zero. Then we can detect whether the bind variables
-            # need to be added into the result set.
-            return [[]] * self._cursor.rowcount if self._cursor.rowcount else None
</del><ins>+            # What we do is always insert a set of empty rows equal to the rowcount. in
+            # the case of no result, an empty list is returned. This way we can detect
+            # that the additional bind variables are needed (if len(result) != 0).
+            return [[]] * self._cursor.rowcount
</ins><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    def _reallyCallSQL(self, sql, args=None):
+        &quot;&quot;&quot;
+        Use cx_Oracle's callproc() or callfunc() Cursor methods to execute a
+        stored procedure.
+
+        @param sql: The SQL string to execute.
+        @type sql: C{str}
+
+        @param args: The parameters of the procedure or function, if any.
+        @type args: C{list} or C{None}
+
+        @raise Exception: this function may raise any exception raised by the
+            underlying C{dbapi.connect}, C{cursor.execute},
+            L{IDerivedParameter.preQuery}, C{connection.cursor}, or
+            C{cursor.callxxx}.
+        &quot;&quot;&quot;
+
+        # The sql will be of the form &quot;call &lt;name&gt;()&quot; with args
+        if not sql.startswith(&quot;call &quot;) or not sql.endswith(&quot;()&quot;):
+            raise ValueError(&quot;Invalid SQL CALL statement: {}&quot;.format(sql))
+        name = sql[5:-2]
+        returnType = args[0]
+        args = args[1:]
+
+        wasFirst = self._first
+
+        # If this is the first time this cursor has been used in this
+        # transaction, remember that, but mark it as now used.
+        self._first = False
+
+        try:
+            # Make the return value look like an array of rows/columns, as
+            # one would expect for something like a SELECT
+            if returnType is not None:
+                returnValue = self._cursor.callfunc(name, returnType, args)
+                returnValue = [returnValue, ]
+            else:
+                returnValue = self._cursor.callproc(name, args)
+            returnValue = [returnValue, ]
+        except:
+            # If execute() raised an exception, and this was the first thing to
+            # happen in the transaction, then the connection has probably gone
+            # bad in the meanwhile, and we should try again.
+            if wasFirst:
+                # Report the error before doing anything else, since doing
+                # other things may cause the traceback stack to be eliminated
+                # if they raise exceptions (even internally).
+                log.failure(
+                    &quot;Exception from execute() on first statement in &quot;
+                    &quot;transaction.  Possibly caused by a database server &quot;
+                    &quot;restart.  Automatically reconnecting now.&quot;,
+                    failure=Failure(),
+                )
+                try:
+                    self._connection.close()
+                except:
+                    # close() may raise an exception to alert us of an error as
+                    # well.  Right now the only type of error we know about is
+                    # &quot;the connection is already closed&quot;, which obviously
+                    # doesn't need to be handled specially. Unfortunately the
+                    # reporting of this type of error is not consistent or
+                    # predictable across different databases, or even different
+                    # bindings to the same database, so we have to do a
+                    # catch-all here.  While I can't imagine another type of
+                    # error at the moment, bare C{except:}s are notorious for
+                    # making debugging surprising error conditions very
+                    # difficult, so let's make sure that the error is logged
+                    # just in case.
+                    log.failure(
+                        &quot;Exception from close() while automatically &quot;
+                        &quot;reconnecting. (Probably not serious.)&quot;,
+                        failure=Failure(),
+                    )
+
+                # Now, if either of *these* things fail, there's an error here
+                # that we cannot workaround or address automatically, so no
+                # try:except: for them.
+                self._connection = self._pool.connectionFactory()
+                self._cursor = self._connection.cursor()
+
+                # Note that although this method is being invoked recursively,
+                # the &quot;_first&quot; flag is re-set at the very top, so we will _not_
+                # be re-entering it more than once.
+                result = self._reallyCallSQL(sql, args)
+                return result
+            else:
+                raise
+        else:
+            return returnValue
+
+
</ins><span class="cx">     def execSQL(self, *args, **kw):
</span><span class="cx">         if self._completed:
</span><span class="cx">             raise RuntimeError(&quot;Attempt to use {} transaction.&quot;.format(self._completed))
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalrecordpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/record.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/record.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/record.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -33,6 +33,7 @@
</span><span class="cx"> from twext.enterprise.dal.syntax import (
</span><span class="cx">     Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete, SavepointAction,
</span><span class="cx">     Count, ALL_COLUMNS)
</span><ins>+from twext.enterprise.ienterprise import ORACLE_DIALECT
</ins><span class="cx"> from twext.enterprise.util import parseSQLTimestamp
</span><span class="cx"> # from twext.enterprise.dal.syntax import ExpressionSyntax
</span><span class="cx"> 
</span><span class="lines">@@ -472,7 +473,7 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><del>-    def query(cls, transaction, expr, order=None, group=None, limit=None, forUpdate=False, noWait=False, ascending=True, distinct=False):
</del><ins>+    def query(cls, transaction, expr, order=None, group=None, limit=None, forUpdate=False, noWait=False, skipLocked=False, ascending=True, distinct=False):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Query the table that corresponds to C{cls}, and return instances of
</span><span class="cx">         C{cls} corresponding to the rows that are returned from that table.
</span><span class="lines">@@ -495,6 +496,8 @@
</span><span class="cx">         @type forUpdate: L{bool}
</span><span class="cx">         @param noWait: include NOWAIT with the FOR UPDATE
</span><span class="cx">         @type noWait: L{bool}
</span><ins>+        @param skipLocked: include SKIP LOCKED with the FOR UPDATE
+        @type skipLocked: L{bool}
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         return cls._rowsFromQuery(
</span><span class="cx">             transaction,
</span><span class="lines">@@ -505,6 +508,7 @@
</span><span class="cx">                 limit=limit,
</span><span class="cx">                 forUpdate=forUpdate,
</span><span class="cx">                 noWait=noWait,
</span><ins>+                skipLocked=skipLocked,
</ins><span class="cx">                 ascending=ascending,
</span><span class="cx">                 distinct=distinct,
</span><span class="cx">             ),
</span><span class="lines">@@ -513,7 +517,7 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><del>-    def queryExpr(cls, expr, attributes=None, order=None, group=None, limit=None, forUpdate=False, noWait=False, ascending=True, distinct=False):
</del><ins>+    def queryExpr(cls, expr, attributes=None, order=None, group=None, limit=None, forUpdate=False, noWait=False, skipLocked=False, ascending=True, distinct=False):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Query expression that corresponds to C{cls}. Used in cases where a sub-select
</span><span class="cx">         on this record's table is needed.
</span><span class="lines">@@ -536,6 +540,8 @@
</span><span class="cx">         @type forUpdate: L{bool}
</span><span class="cx">         @param noWait: include NOWAIT with the FOR UPDATE
</span><span class="cx">         @type noWait: L{bool}
</span><ins>+        @param skipLocked: include SKIP LOCKED with the FOR UPDATE
+        @type skipLocked: L{bool}
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         kw = {}
</span><span class="cx">         if order is not None:
</span><span class="lines">@@ -548,6 +554,8 @@
</span><span class="cx">             kw.update(ForUpdate=True)
</span><span class="cx">             if noWait:
</span><span class="cx">                 kw.update(NoWait=True)
</span><ins>+            if skipLocked:
+                kw.update(SkipLocked=True)
</ins><span class="cx">         if distinct:
</span><span class="cx">             kw.update(Distinct=True)
</span><span class="cx">         if attributes is None:
</span><span class="lines">@@ -631,15 +639,32 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><ins>+    @inlineCallbacks
</ins><span class="cx">     def deletesome(cls, transaction, where, returnCols=None):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Delete all rows matching the where expression from the table that corresponds to C{cls}.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        return Delete(
-            From=cls.table,
-            Where=where,
-            Return=returnCols,
-        ).on(transaction)
</del><ins>+        if transaction.dialect == ORACLE_DIALECT and returnCols is not None:
+            # Oracle cannot return multiple rows in the RETURNING clause so
+            # we have to split this into a SELECT followed by a DELETE
+            if not isinstance(returnCols, (tuple, list)):
+                returnCols = [returnCols, ]
+            result = yield Select(
+                returnCols,
+                From=cls.table,
+                Where=where,
+            ).on(transaction)
+            yield Delete(
+                From=cls.table,
+                Where=where,
+            ).on(transaction)
+        else:
+            result = yield Delete(
+                From=cls.table,
+                Where=where,
+                Return=returnCols,
+            ).on(transaction)
+        returnValue(result)
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalsyntaxpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/syntax.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/syntax.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/syntax.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -1079,6 +1079,10 @@
</span><span class="cx">         super(NullComparison, self).__init__(a, op, None)
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    def allColumns(self):
+        return self.a.allColumns()
+
+
</ins><span class="cx">     def subSQL(self, queryGenerator, allTables):
</span><span class="cx">         sqls = SQLFragment()
</span><span class="cx">         sqls.append(self.a.subSQL(queryGenerator, allTables))
</span><span class="lines">@@ -1224,6 +1228,58 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+class Case(ExpressionSyntax):
+    &quot;&quot;&quot;
+    Implementation of a simple CASE statement:
+
+    CASE ... WHEN ... THEN ... ELSE ... END
+
+    Note that SQL actually allows multiple WHEN ... THEN ... clauses, but
+    this implementation only supports one.
+    &quot;&quot;&quot;
+
+    def __init__(self, when, true_result, false_result):
+        &quot;&quot;&quot;
+        @param when: WHEN clause - typically a L{Comparison}
+        @type when: L{Expression}
+        @param true_result: result when WHEN clause is true
+        @type true_result: L{Constant} or L{None}
+        @param false_result: result when WHEN clause is false
+        @type false_result: L{Constant} or L{None}
+        &quot;&quot;&quot;
+        self.when = when
+        self.true_result = true_result
+        self.false_result = false_result
+
+
+    def subSQL(self, queryGenerator, allTables):
+        result = SQLFragment(&quot;case when &quot;)
+        result.append(self.when.subSQL(queryGenerator, allTables))
+        result.append(SQLFragment(&quot; then &quot;))
+        if self.true_result is None:
+            result.append(SQLFragment(&quot;null&quot;))
+        else:
+            result.append(self.true_result.subSQL(queryGenerator, allTables))
+        result.append(SQLFragment(&quot; else &quot;))
+        if self.false_result is None:
+            result.append(SQLFragment(&quot;null&quot;))
+        else:
+            result.append(self.false_result.subSQL(queryGenerator, allTables))
+        result.append(SQLFragment(&quot; end&quot;))
+
+        return result
+
+
+    def allColumns(self):
+        &quot;&quot;&quot;
+        Return all columns referenced by any sub-clauses.
+        &quot;&quot;&quot;
+        return self.when.allColumns() + \
+            (self.true_result.allColumns() if self.true_result is not None else []) + \
+            (self.false_result.allColumns() if self.false_result is not None else [])
+
+
+
</ins><span class="cx"> class Tuple(ExpressionSyntax):
</span><span class="cx"> 
</span><span class="cx">     def __init__(self, columns):
</span><span class="lines">@@ -1340,7 +1396,7 @@
</span><span class="cx">         self,
</span><span class="cx">         columns=None, Where=None, From=None,
</span><span class="cx">         OrderBy=None, GroupBy=None,
</span><del>-        Limit=None, ForUpdate=False, NoWait=False, Ascending=None,
</del><ins>+        Limit=None, ForUpdate=False, NoWait=False, SkipLocked=False, Ascending=None,
</ins><span class="cx">         Having=None, Distinct=False, As=None, SetExpression=None
</span><span class="cx">     ):
</span><span class="cx">         self.From = From
</span><span class="lines">@@ -1365,6 +1421,7 @@
</span><span class="cx"> 
</span><span class="cx">         self.ForUpdate = ForUpdate
</span><span class="cx">         self.NoWait = NoWait
</span><ins>+        self.SkipLocked = SkipLocked
</ins><span class="cx">         self.Ascending = Ascending
</span><span class="cx">         self.As = As
</span><span class="cx"> 
</span><span class="lines">@@ -1453,6 +1510,8 @@
</span><span class="cx">                     stmt.text += &quot; for update&quot;
</span><span class="cx">                     if self.NoWait:
</span><span class="cx">                         stmt.text += &quot; nowait&quot;
</span><ins>+                    if self.SkipLocked:
+                        stmt.text += &quot; skip locked&quot;
</ins><span class="cx"> 
</span><span class="cx">         if self.Limit is not None:
</span><span class="cx">             limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)
</span><span class="lines">@@ -1527,6 +1586,55 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+class Call(_Statement):
+    &quot;&quot;&quot;
+    CALL statement. Only supported by Oracle.
+    &quot;&quot;&quot;
+
+    def __init__(self, name, *args, **kwargs):
+        &quot;&quot;&quot;
+        @param name: name of procedure or function to call
+        @type name: L{str}
+        @param *args: arguments for the procedure or functions
+        @type *args: L{list}
+        @param returnType: kwarg: the Python type of the return value for
+            a function
+        @type returnType: L{Type}
+        &quot;&quot;&quot;
+        self.Name = name
+        self.Args = args
+        self.ReturnType = kwargs.get(&quot;returnType&quot;)
+
+
+    def _toSQL(self, queryGenerator):
+        &quot;&quot;&quot;
+        Generate an SQL statement of the form &quot;call &lt;name&gt;()&quot; with a list of
+        args, where the first arg is always the return type (which is C{None}
+        for a procedure, rather than a function, call).
+
+        @return: a C{call} statement with arguments
+        @rtype: L{SQLFragment}
+        &quot;&quot;&quot;
+
+        if queryGenerator.dialect != ORACLE_DIALECT:
+            raise NotImplementedError(&quot;CALL statement only available with Oracle DB&quot;)
+        args = (self.ReturnType,) + self.Args
+        stmt = SQLFragment(&quot;call &quot;, args)
+        stmt.text += self.Name
+        stmt.text += &quot;()&quot;
+
+        return stmt
+
+
+    def _fixOracleNulls(self, rows):
+        &quot;&quot;&quot;
+        Suppress the super class behavior because we are getting result values
+        directly, not from columns.
+        &quot;&quot;&quot;
+        return rows[0][0]
+
+
+
</ins><span class="cx"> def _commaJoined(stmts):
</span><span class="cx">     first = True
</span><span class="cx">     cstatement = SQLFragment()
</span><span class="lines">@@ -1658,9 +1766,9 @@
</span><span class="cx">         ):
</span><span class="cx">             def processIt(emptyListResult):
</span><span class="cx">                 # See comment in L{adbapi2._ConnectedTxn._reallyExecSQL}. If the
</span><del>-                # result is L{None} then also return L{None}. If the result is a
</del><ins>+                # result is an empty list, just return that. If the result is a
</ins><span class="cx">                 # L{list} of empty L{list} then there are return into rows to return.
</span><del>-                if emptyListResult:
</del><ins>+                if len(emptyListResult) &gt; 0:
</ins><span class="cx">                     emptyListResult = [[v.value for _ignore_k, v in outvars]]
</span><span class="cx">                 return emptyListResult
</span><span class="cx">             return result.addCallback(processIt)
</span></span></pre></div>
<a id="twexttrunktwextenterprisedaltesttest_sqlsyntaxpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -33,7 +33,8 @@
</span><span class="cx">     Savepoint, RollbackToSavepoint, ReleaseSavepoint, SavepointAction,
</span><span class="cx">     Union, Intersect, Except, SetExpression, DALError,
</span><span class="cx">     ResultAliasSyntax, Count, QueryGenerator, ALL_COLUMNS,
</span><del>-    DatabaseLock, DatabaseUnlock, Not, Coalesce, NullIf)
</del><ins>+    DatabaseLock, DatabaseUnlock, Not, Coalesce, NullIf,
+    Call, Case)
</ins><span class="cx"> from twext.enterprise.dal.syntax import FixedPlaceholder, NumericPlaceholder
</span><span class="cx"> from twext.enterprise.dal.syntax import Function
</span><span class="cx"> from twext.enterprise.dal.syntax import SchemaSyntax
</span><span class="lines">@@ -366,6 +367,18 @@
</span><span class="cx">             Select(From=self.schema.FOO, ForUpdate=True).toSQL(),
</span><span class="cx">             SQLFragment(&quot;select * from FOO for update&quot;)
</span><span class="cx">         )
</span><ins>+        self.assertEquals(
+            Select(From=self.schema.FOO, ForUpdate=True, NoWait=True).toSQL(),
+            SQLFragment(&quot;select * from FOO for update nowait&quot;)
+        )
+        self.assertEquals(
+            Select(From=self.schema.FOO, ForUpdate=True, SkipLocked=True).toSQL(),
+            SQLFragment(&quot;select * from FOO for update skip locked&quot;)
+        )
+        self.assertEquals(
+            Select(From=self.schema.FOO, ForUpdate=True, NoWait=True, SkipLocked=True).toSQL(),
+            SQLFragment(&quot;select * from FOO for update nowait skip locked&quot;)
+        )
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def test_groupBy(self):
</span><span class="lines">@@ -2192,7 +2205,83 @@
</span><span class="cx">         self.assertEquals(values, {})
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    def test_case(self):
+        &quot;&quot;&quot;
+        A L{Case} object will generate an appropriate SQL statement.
+        &quot;&quot;&quot;
+        self.assertEquals(
+            Select(
+                [Case((self.schema.FOO.BAR &lt; 1), Constant(2), Constant(3))],
+                From=self.schema.FOO,
+                Limit=123
+            ).toSQL(),
+            SQLFragment(&quot;select case when BAR &lt; ? then ? else ? end from FOO limit ?&quot;, [1, 2, 3, 123])
+        )
+        self.assertEqual(Case((self.schema.FOO.BAR &lt; 1), Constant(2), Constant(3)).allColumns(), [self.schema.FOO.BAR, ])
+        self.assertEquals(
+            Select(
+                [Case((self.schema.FOO.BAR &lt; 1), Constant(2), None)],
+                From=self.schema.FOO,
+                Limit=123
+            ).toSQL(),
+            SQLFragment(&quot;select case when BAR &lt; ? then ? else null end from FOO limit ?&quot;, [1, 2, 123])
+        )
+        self.assertEqual(Case((self.schema.FOO.BAR &lt; 1), Constant(2), None).allColumns(), [self.schema.FOO.BAR, ])
+        self.assertEquals(
+            Select(
+                [Case((self.schema.FOO.BAR &lt; 1), None, Constant(3))],
+                From=self.schema.FOO,
+                Limit=123
+            ).toSQL(),
+            SQLFragment(&quot;select case when BAR &lt; ? then null else ? end from FOO limit ?&quot;, [1, 3, 123])
+        )
+        self.assertEqual(Case((self.schema.FOO.BAR &lt; 1), None, Constant(3)).allColumns(), [self.schema.FOO.BAR, ])
</ins><span class="cx"> 
</span><ins>+
+    def test_call(self):
+        &quot;&quot;&quot;
+        A L{Call} object will generate an appropriate SQL statement.
+        &quot;&quot;&quot;
+        self.assertEquals(
+            Call(
+                &quot;procedure&quot;
+            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            SQLFragment(&quot;call procedure()&quot;, (None,))
+        )
+
+        self.assertEquals(
+            Call(
+                &quot;procedure&quot;,
+                1, &quot;2&quot;
+            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            SQLFragment(&quot;call procedure()&quot;, (None, 1, &quot;2&quot;))
+        )
+
+        self.assertEquals(
+            Call(
+                &quot;function&quot;,
+                returnType=int
+            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            SQLFragment(&quot;call function()&quot;, (int,))
+        )
+
+        self.assertEquals(
+            Call(
+                &quot;function&quot;,
+                1, &quot;2&quot;,
+                returnType=int
+            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            SQLFragment(&quot;call function()&quot;, (int, 1, &quot;2&quot;))
+        )
+
+        self.assertRaises(
+            NotImplementedError,
+            Call(&quot;procedure&quot;).toSQL,
+            QueryGenerator(POSTGRES_DIALECT)
+        )
+
+
+
</ins><span class="cx"> class OracleConnectionMethods(object):
</span><span class="cx">     def test_rewriteOracleNULLs_Insert(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -2250,7 +2339,7 @@
</span><span class="cx">             {self.schema.FOO.BAR: 40, self.schema.FOO.BAZ: 50}
</span><span class="cx">         )
</span><span class="cx">         result = self.resultOf(i.on(self.createTransaction()))
</span><del>-        self.assertEquals(result, [None])
</del><ins>+        self.assertEquals(result, [[]])
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobsjobitempy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/jobitem.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/jobitem.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -18,8 +18,7 @@
</span><span class="cx"> from twext.enterprise.dal.model import Sequence
</span><span class="cx"> from twext.enterprise.dal.model import Table, Schema, SQLType
</span><span class="cx"> from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
</span><del>-from twext.enterprise.dal.syntax import SchemaSyntax, Count, NullIf, Constant, \
-    Sum
</del><ins>+from twext.enterprise.dal.syntax import SchemaSyntax, Call, Count, Case, Constant, Sum
</ins><span class="cx"> from twext.enterprise.ienterprise import ORACLE_DIALECT
</span><span class="cx"> from twext.enterprise.jobs.utils import inTransaction, astimestamp
</span><span class="cx"> from twext.python.log import Logger
</span><span class="lines">@@ -377,114 +376,95 @@
</span><span class="cx">         @rtype: L{JobItem}
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx"> 
</span><del>-        jobs = yield cls.nextjobs(txn, now, minPriority, limit=1)
</del><ins>+        if txn.dialect == ORACLE_DIALECT:
</ins><span class="cx"> 
</span><del>-        # Must only be one or zero
-        if jobs and len(jobs) &gt; 1:
-            raise AssertionError(&quot;nextjob() returned more than one row&quot;)
</del><ins>+            # For Oracle we need a multi-app server solution that only locks the
+            # (one) row being returned by the query, and allows other app servers
+            # to run the query in parallel and get the next unlocked row.
+            #
+            # To do that we use a stored procedure that uses a CURSOR with a
+            # SELECT ... FOR UPDATE SKIP LOCKED query that ensures only the row
+            # being fetched is locked and existing locked rows are skipped.
</ins><span class="cx"> 
</span><del>-        returnValue(jobs[0] if jobs else None)
</del><ins>+            # Three separate stored procedures for the three priority cases
+            if minPriority == JOB_PRIORITY_LOW:
+                function = &quot;next_job_all&quot;
+            elif minPriority == JOB_PRIORITY_MEDIUM:
+                function = &quot;next_job_medium_high&quot;
+            elif minPriority == JOB_PRIORITY_HIGH:
+                function = &quot;next_job_high&quot;
</ins><span class="cx"> 
</span><ins>+            job = None
+            jobID = yield Call(function, now, returnType=int).on(txn)
+            if jobID:
+                job = yield cls.load(txn, jobID)
+        else:
+            # Only add the PRIORITY term if minimum is greater than zero
+            queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore &lt;= now)
</ins><span class="cx"> 
</span><del>-    @classmethod
-    @inlineCallbacks
-    def nextjobs(cls, txn, now, minPriority, limit=1):
-        &quot;&quot;&quot;
-        Find the next available job based on priority, also return any that are overdue.
</del><ins>+            # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
+            # an equality test as follows:
+            #
+            # PRIORITY &gt;= 0 - no test needed all values match all the time
+            # PRIORITY &gt;= 1 === PRIORITY != 0
+            # PRIORITY &gt;= 2 === PRIORITY == 2
+            #
+            # Doing this allows use of the PRIORITY column in an index since we already
+            # have one inequality in the index (NOT_BEFORE)
</ins><span class="cx"> 
</span><del>-        @param txn: the transaction to use
-        @type txn: L{IAsyncTransaction}
-        @param now: current timestamp
-        @type now: L{datetime.datetime}
-        @param minPriority: lowest priority level to query for
-        @type minPriority: L{int}
-        @param limit: limit on number of jobs to return
-        @type limit: L{int}
</del><ins>+            if minPriority == JOB_PRIORITY_MEDIUM:
+                queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
+            elif minPriority == JOB_PRIORITY_HIGH:
+                queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
</ins><span class="cx"> 
</span><del>-        @return: the job record
-        @rtype: L{JobItem}
-        &quot;&quot;&quot;
-
-        # Only add the PRIORITY term if minimum is greater than zero
-        queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore &lt;= now)
-
-        # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
-        # an equality test as follows:
-        #
-        # PRIORITY &gt;= 0 - no test needed all values match all the time
-        # PRIORITY &gt;= 1 === PRIORITY != 0
-        # PRIORITY &gt;= 2 === PRIORITY == 2
-        #
-        # Doing this allows use of the PRIORITY column in an index since we already
-        # have one inequality in the index (NOT_BEFORE)
-
-        if minPriority == JOB_PRIORITY_MEDIUM:
-            queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
-        elif minPriority == JOB_PRIORITY_HIGH:
-            queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
-
-        if txn.dialect == ORACLE_DIALECT:
-            # Oracle does not support a &quot;for update&quot; clause with &quot;order by&quot;. So do the
-            # &quot;for update&quot; as a second query right after the first. Will need to check
-            # how this might impact concurrency in a multi-host setup.
</del><span class="cx">             jobs = yield cls.query(
</span><span class="cx">                 txn,
</span><span class="cx">                 queryExpr,
</span><span class="cx">                 order=cls.priority,
</span><span class="cx">                 ascending=False,
</span><del>-                limit=limit,
-            )
-            if jobs:
-                yield cls.query(
-                    txn,
-                    (cls.jobID.In([job.jobID for job in jobs])),
-                    forUpdate=True,
-                    noWait=False,
-                )
-        else:
-            jobs = yield cls.query(
-                txn,
-                queryExpr,
-                order=cls.priority,
-                ascending=False,
</del><span class="cx">                 forUpdate=True,
</span><span class="cx">                 noWait=False,
</span><del>-                limit=limit,
</del><ins>+                limit=1,
</ins><span class="cx">             )
</span><ins>+            job = jobs[0] if jobs else None
</ins><span class="cx"> 
</span><del>-        returnValue(jobs)
</del><ins>+        returnValue(job)
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><span class="cx">     @inlineCallbacks
</span><del>-    def overduejobs(cls, txn, now, limit=None):
</del><ins>+    def overduejob(cls, txn, now):
</ins><span class="cx">         &quot;&quot;&quot;
</span><del>-        Find the next overdue job based on priority.
</del><ins>+        Find the next overdue job.
</ins><span class="cx"> 
</span><span class="cx">         @param txn: the transaction to use
</span><span class="cx">         @type txn: L{IAsyncTransaction}
</span><span class="cx">         @param now: current timestamp
</span><span class="cx">         @type now: L{datetime.datetime}
</span><del>-        @param limit: limit on number of jobs to return
-        @type limit: L{int}
</del><span class="cx"> 
</span><span class="cx">         @return: the job record
</span><span class="cx">         @rtype: L{JobItem}
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx"> 
</span><del>-        queryExpr = (cls.assigned != None).And(cls.overdue &lt; now)
</del><ins>+        if txn.dialect == ORACLE_DIALECT:
+            # See L{nextjob} for why Oracle is different
+            job = None
+            jobID = yield Call(&quot;overdue_job&quot;, now, returnType=int).on(txn)
+            if jobID:
+                job = yield cls.load(txn, jobID)
+        else:
+            jobs = yield cls.query(
+                txn,
+                (cls.assigned != None).And(cls.overdue &lt; now),
+                forUpdate=True,
+                noWait=False,
+                limit=1,
+            )
+            job = jobs[0] if jobs else None
</ins><span class="cx"> 
</span><del>-        jobs = yield cls.query(
-            txn,
-            queryExpr,
-            forUpdate=True,
-            noWait=False,
-            limit=limit,
-        )
</del><ins>+        returnValue(job)
</ins><span class="cx"> 
</span><del>-        returnValue(jobs)
</del><span class="cx"> 
</span><del>-
</del><span class="cx">     @inlineCallbacks
</span><span class="cx">     def run(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -714,7 +694,7 @@
</span><span class="cx">                 cls.workType,
</span><span class="cx">                 Count(cls.workType),
</span><span class="cx">                 Count(cls.assigned),
</span><del>-                Count(NullIf(cls.assigned is not None and cls.notBefore &lt; now, Constant(False))),
</del><ins>+                Count(Case((cls.assigned == None).And(cls.notBefore &lt; now), Constant(1), None)),
</ins><span class="cx">                 Sum(cls.failed),
</span><span class="cx">             ),
</span><span class="cx">             group=cls.workType
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobsqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/queue.py (15140 => 15141)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/queue.py        2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/jobs/queue.py        2015-09-21 14:29:03 UTC (rev 15141)
</span><span class="lines">@@ -705,10 +705,8 @@
</span><span class="cx">             txn = overdueJob = None
</span><span class="cx">             try:
</span><span class="cx">                 txn = self.transactionFactory(label=&quot;jobqueue.overdueCheck&quot;)
</span><del>-                overdueJobs = yield JobItem.overduejobs(txn, nowTime, limit=1)
-                if overdueJobs:
-                    overdueJob = overdueJobs[0]
-                else:
</del><ins>+                overdueJob = yield JobItem.overduejob(txn, nowTime)
+                if overdueJob is None:
</ins><span class="cx">                     break
</span><span class="cx"> 
</span><span class="cx">                 # It is overdue - check to see whether the work item is currently locked - if so no
</span></span></pre>
</div>
</div>

</body>
</html>