[CalendarServer-changes] [15141] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Mon Sep 21 07:29:03 PDT 2015


Revision: 15141
          http://trac.calendarserver.org//changeset/15141
Author:   cdaboo at apple.com
Date:     2015-09-21 07:29:03 -0700 (Mon, 21 Sep 2015)
Log Message:
-----------
Changes for Oracle: add Call(), SKIP LOCKED, and Case SQL syntax support. Use stored procedures for next_job processing. Record.deletesome multi-row return.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/adbapi2.py
    twext/trunk/twext/enterprise/dal/record.py
    twext/trunk/twext/enterprise/dal/syntax.py
    twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
    twext/trunk/twext/enterprise/jobs/jobitem.py
    twext/trunk/twext/enterprise/jobs/queue.py

Modified: twext/trunk/twext/enterprise/adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/adbapi2.py	2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/adbapi2.py	2015-09-21 14:29:03 UTC (rev 15141)
@@ -229,6 +229,13 @@
         @raise raiseOnZeroRowCount: if the argument was specified and no rows
             were returned by the executed statement.
         """
+        # Oracle only: we need to support the CALL statement which is available in
+        # in the cx_Oracle module via special callproc() and callfunc() methods
+        # that are not part of the standard Python DBAPI.
+        stmts = sql.splitlines()
+        if stmts[-1].startswith("call "):
+            return self._reallyCallSQL(stmts[-1], args)
+
         wasFirst = self._first
 
         # If this is the first time this cursor has been used in this
@@ -312,12 +319,103 @@
             # Oracle with a return into clause returns an empty set or rows, but
             # we then have to insert the special bind variables for the return into.
             # Thus we need to know whether there was any rowcount from the actual query.
-            # What we do is always insert a set of empty rows as the result if the
-            # rowcount is non-zero. Then we can detect whether the bind variables
-            # need to be added into the result set.
-            return [[]] * self._cursor.rowcount if self._cursor.rowcount else None
+            # What we do is always insert a set of empty rows equal to the rowcount. in
+            # the case of no result, an empty list is returned. This way we can detect
+            # that the additional bind variables are needed (if len(result) != 0).
+            return [[]] * self._cursor.rowcount
 
 
+    def _reallyCallSQL(self, sql, args=None):
+        """
+        Use cx_Oracle's callproc() or callfunc() Cursor methods to execute a
+        stored procedure.
+
+        @param sql: The SQL string to execute.
+        @type sql: C{str}
+
+        @param args: The parameters of the procedure or function, if any.
+        @type args: C{list} or C{None}
+
+        @raise Exception: this function may raise any exception raised by the
+            underlying C{dbapi.connect}, C{cursor.execute},
+            L{IDerivedParameter.preQuery}, C{connection.cursor}, or
+            C{cursor.callxxx}.
+        """
+
+        # The sql will be of the form "call <name>()" with args
+        if not sql.startswith("call ") or not sql.endswith("()"):
+            raise ValueError("Invalid SQL CALL statement: {}".format(sql))
+        name = sql[5:-2]
+        returnType = args[0]
+        args = args[1:]
+
+        wasFirst = self._first
+
+        # If this is the first time this cursor has been used in this
+        # transaction, remember that, but mark it as now used.
+        self._first = False
+
+        try:
+            # Make the return value look like an array of rows/columns, as
+            # one would expect for something like a SELECT
+            if returnType is not None:
+                returnValue = self._cursor.callfunc(name, returnType, args)
+                returnValue = [returnValue, ]
+            else:
+                returnValue = self._cursor.callproc(name, args)
+            returnValue = [returnValue, ]
+        except:
+            # If execute() raised an exception, and this was the first thing to
+            # happen in the transaction, then the connection has probably gone
+            # bad in the meanwhile, and we should try again.
+            if wasFirst:
+                # Report the error before doing anything else, since doing
+                # other things may cause the traceback stack to be eliminated
+                # if they raise exceptions (even internally).
+                log.failure(
+                    "Exception from execute() on first statement in "
+                    "transaction.  Possibly caused by a database server "
+                    "restart.  Automatically reconnecting now.",
+                    failure=Failure(),
+                )
+                try:
+                    self._connection.close()
+                except:
+                    # close() may raise an exception to alert us of an error as
+                    # well.  Right now the only type of error we know about is
+                    # "the connection is already closed", which obviously
+                    # doesn't need to be handled specially. Unfortunately the
+                    # reporting of this type of error is not consistent or
+                    # predictable across different databases, or even different
+                    # bindings to the same database, so we have to do a
+                    # catch-all here.  While I can't imagine another type of
+                    # error at the moment, bare C{except:}s are notorious for
+                    # making debugging surprising error conditions very
+                    # difficult, so let's make sure that the error is logged
+                    # just in case.
+                    log.failure(
+                        "Exception from close() while automatically "
+                        "reconnecting. (Probably not serious.)",
+                        failure=Failure(),
+                    )
+
+                # Now, if either of *these* things fail, there's an error here
+                # that we cannot workaround or address automatically, so no
+                # try:except: for them.
+                self._connection = self._pool.connectionFactory()
+                self._cursor = self._connection.cursor()
+
+                # Note that although this method is being invoked recursively,
+                # the "_first" flag is re-set at the very top, so we will _not_
+                # be re-entering it more than once.
+                result = self._reallyCallSQL(sql, args)
+                return result
+            else:
+                raise
+        else:
+            return returnValue
+
+
     def execSQL(self, *args, **kw):
         if self._completed:
             raise RuntimeError("Attempt to use {} transaction.".format(self._completed))

Modified: twext/trunk/twext/enterprise/dal/record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/record.py	2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/record.py	2015-09-21 14:29:03 UTC (rev 15141)
@@ -33,6 +33,7 @@
 from twext.enterprise.dal.syntax import (
     Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete, SavepointAction,
     Count, ALL_COLUMNS)
+from twext.enterprise.ienterprise import ORACLE_DIALECT
 from twext.enterprise.util import parseSQLTimestamp
 # from twext.enterprise.dal.syntax import ExpressionSyntax
 
@@ -472,7 +473,7 @@
 
 
     @classmethod
-    def query(cls, transaction, expr, order=None, group=None, limit=None, forUpdate=False, noWait=False, ascending=True, distinct=False):
+    def query(cls, transaction, expr, order=None, group=None, limit=None, forUpdate=False, noWait=False, skipLocked=False, ascending=True, distinct=False):
         """
         Query the table that corresponds to C{cls}, and return instances of
         C{cls} corresponding to the rows that are returned from that table.
@@ -495,6 +496,8 @@
         @type forUpdate: L{bool}
         @param noWait: include NOWAIT with the FOR UPDATE
         @type noWait: L{bool}
+        @param skipLocked: include SKIP LOCKED with the FOR UPDATE
+        @type skipLocked: L{bool}
         """
         return cls._rowsFromQuery(
             transaction,
@@ -505,6 +508,7 @@
                 limit=limit,
                 forUpdate=forUpdate,
                 noWait=noWait,
+                skipLocked=skipLocked,
                 ascending=ascending,
                 distinct=distinct,
             ),
@@ -513,7 +517,7 @@
 
 
     @classmethod
-    def queryExpr(cls, expr, attributes=None, order=None, group=None, limit=None, forUpdate=False, noWait=False, ascending=True, distinct=False):
+    def queryExpr(cls, expr, attributes=None, order=None, group=None, limit=None, forUpdate=False, noWait=False, skipLocked=False, ascending=True, distinct=False):
         """
         Query expression that corresponds to C{cls}. Used in cases where a sub-select
         on this record's table is needed.
@@ -536,6 +540,8 @@
         @type forUpdate: L{bool}
         @param noWait: include NOWAIT with the FOR UPDATE
         @type noWait: L{bool}
+        @param skipLocked: include SKIP LOCKED with the FOR UPDATE
+        @type skipLocked: L{bool}
         """
         kw = {}
         if order is not None:
@@ -548,6 +554,8 @@
             kw.update(ForUpdate=True)
             if noWait:
                 kw.update(NoWait=True)
+            if skipLocked:
+                kw.update(SkipLocked=True)
         if distinct:
             kw.update(Distinct=True)
         if attributes is None:
@@ -631,15 +639,32 @@
 
 
     @classmethod
+    @inlineCallbacks
     def deletesome(cls, transaction, where, returnCols=None):
         """
         Delete all rows matching the where expression from the table that corresponds to C{cls}.
         """
-        return Delete(
-            From=cls.table,
-            Where=where,
-            Return=returnCols,
-        ).on(transaction)
+        if transaction.dialect == ORACLE_DIALECT and returnCols is not None:
+            # Oracle cannot return multiple rows in the RETURNING clause so
+            # we have to split this into a SELECT followed by a DELETE
+            if not isinstance(returnCols, (tuple, list)):
+                returnCols = [returnCols, ]
+            result = yield Select(
+                returnCols,
+                From=cls.table,
+                Where=where,
+            ).on(transaction)
+            yield Delete(
+                From=cls.table,
+                Where=where,
+            ).on(transaction)
+        else:
+            result = yield Delete(
+                From=cls.table,
+                Where=where,
+                Return=returnCols,
+            ).on(transaction)
+        returnValue(result)
 
 
     @classmethod

Modified: twext/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/syntax.py	2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/syntax.py	2015-09-21 14:29:03 UTC (rev 15141)
@@ -1079,6 +1079,10 @@
         super(NullComparison, self).__init__(a, op, None)
 
 
+    def allColumns(self):
+        return self.a.allColumns()
+
+
     def subSQL(self, queryGenerator, allTables):
         sqls = SQLFragment()
         sqls.append(self.a.subSQL(queryGenerator, allTables))
@@ -1224,6 +1228,58 @@
 
 
 
+class Case(ExpressionSyntax):
+    """
+    Implementation of a simple CASE statement:
+
+    CASE ... WHEN ... THEN ... ELSE ... END
+
+    Note that SQL actually allows multiple WHEN ... THEN ... clauses, but
+    this implementation only supports one.
+    """
+
+    def __init__(self, when, true_result, false_result):
+        """
+        @param when: WHEN clause - typically a L{Comparison}
+        @type when: L{Expression}
+        @param true_result: result when WHEN clause is true
+        @type true_result: L{Constant} or L{None}
+        @param false_result: result when WHEN clause is false
+        @type false_result: L{Constant} or L{None}
+        """
+        self.when = when
+        self.true_result = true_result
+        self.false_result = false_result
+
+
+    def subSQL(self, queryGenerator, allTables):
+        result = SQLFragment("case when ")
+        result.append(self.when.subSQL(queryGenerator, allTables))
+        result.append(SQLFragment(" then "))
+        if self.true_result is None:
+            result.append(SQLFragment("null"))
+        else:
+            result.append(self.true_result.subSQL(queryGenerator, allTables))
+        result.append(SQLFragment(" else "))
+        if self.false_result is None:
+            result.append(SQLFragment("null"))
+        else:
+            result.append(self.false_result.subSQL(queryGenerator, allTables))
+        result.append(SQLFragment(" end"))
+
+        return result
+
+
+    def allColumns(self):
+        """
+        Return all columns referenced by any sub-clauses.
+        """
+        return self.when.allColumns() + \
+            (self.true_result.allColumns() if self.true_result is not None else []) + \
+            (self.false_result.allColumns() if self.false_result is not None else [])
+
+
+
 class Tuple(ExpressionSyntax):
 
     def __init__(self, columns):
@@ -1340,7 +1396,7 @@
         self,
         columns=None, Where=None, From=None,
         OrderBy=None, GroupBy=None,
-        Limit=None, ForUpdate=False, NoWait=False, Ascending=None,
+        Limit=None, ForUpdate=False, NoWait=False, SkipLocked=False, Ascending=None,
         Having=None, Distinct=False, As=None, SetExpression=None
     ):
         self.From = From
@@ -1365,6 +1421,7 @@
 
         self.ForUpdate = ForUpdate
         self.NoWait = NoWait
+        self.SkipLocked = SkipLocked
         self.Ascending = Ascending
         self.As = As
 
@@ -1453,6 +1510,8 @@
                     stmt.text += " for update"
                     if self.NoWait:
                         stmt.text += " nowait"
+                    if self.SkipLocked:
+                        stmt.text += " skip locked"
 
         if self.Limit is not None:
             limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)
@@ -1527,6 +1586,55 @@
 
 
 
+class Call(_Statement):
+    """
+    CALL statement. Only supported by Oracle.
+    """
+
+    def __init__(self, name, *args, **kwargs):
+        """
+        @param name: name of procedure or function to call
+        @type name: L{str}
+        @param *args: arguments for the procedure or functions
+        @type *args: L{list}
+        @param returnType: kwarg: the Python type of the return value for
+            a function
+        @type returnType: L{Type}
+        """
+        self.Name = name
+        self.Args = args
+        self.ReturnType = kwargs.get("returnType")
+
+
+    def _toSQL(self, queryGenerator):
+        """
+        Generate an SQL statement of the form "call <name>()" with a list of
+        args, where the first arg is always the return type (which is C{None}
+        for a procedure, rather than a function, call).
+
+        @return: a C{call} statement with arguments
+        @rtype: L{SQLFragment}
+        """
+
+        if queryGenerator.dialect != ORACLE_DIALECT:
+            raise NotImplementedError("CALL statement only available with Oracle DB")
+        args = (self.ReturnType,) + self.Args
+        stmt = SQLFragment("call ", args)
+        stmt.text += self.Name
+        stmt.text += "()"
+
+        return stmt
+
+
+    def _fixOracleNulls(self, rows):
+        """
+        Suppress the super class behavior because we are getting result values
+        directly, not from columns.
+        """
+        return rows[0][0]
+
+
+
 def _commaJoined(stmts):
     first = True
     cstatement = SQLFragment()
@@ -1658,9 +1766,9 @@
         ):
             def processIt(emptyListResult):
                 # See comment in L{adbapi2._ConnectedTxn._reallyExecSQL}. If the
-                # result is L{None} then also return L{None}. If the result is a
+                # result is an empty list, just return that. If the result is a
                 # L{list} of empty L{list} then there are return into rows to return.
-                if emptyListResult:
+                if len(emptyListResult) > 0:
                     emptyListResult = [[v.value for _ignore_k, v in outvars]]
                 return emptyListResult
             return result.addCallback(processIt)

Modified: twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2015-09-21 14:29:03 UTC (rev 15141)
@@ -33,7 +33,8 @@
     Savepoint, RollbackToSavepoint, ReleaseSavepoint, SavepointAction,
     Union, Intersect, Except, SetExpression, DALError,
     ResultAliasSyntax, Count, QueryGenerator, ALL_COLUMNS,
-    DatabaseLock, DatabaseUnlock, Not, Coalesce, NullIf)
+    DatabaseLock, DatabaseUnlock, Not, Coalesce, NullIf,
+    Call, Case)
 from twext.enterprise.dal.syntax import FixedPlaceholder, NumericPlaceholder
 from twext.enterprise.dal.syntax import Function
 from twext.enterprise.dal.syntax import SchemaSyntax
@@ -366,6 +367,18 @@
             Select(From=self.schema.FOO, ForUpdate=True).toSQL(),
             SQLFragment("select * from FOO for update")
         )
+        self.assertEquals(
+            Select(From=self.schema.FOO, ForUpdate=True, NoWait=True).toSQL(),
+            SQLFragment("select * from FOO for update nowait")
+        )
+        self.assertEquals(
+            Select(From=self.schema.FOO, ForUpdate=True, SkipLocked=True).toSQL(),
+            SQLFragment("select * from FOO for update skip locked")
+        )
+        self.assertEquals(
+            Select(From=self.schema.FOO, ForUpdate=True, NoWait=True, SkipLocked=True).toSQL(),
+            SQLFragment("select * from FOO for update nowait skip locked")
+        )
 
 
     def test_groupBy(self):
@@ -2192,7 +2205,83 @@
         self.assertEquals(values, {})
 
 
+    def test_case(self):
+        """
+        A L{Case} object will generate an appropriate SQL statement.
+        """
+        self.assertEquals(
+            Select(
+                [Case((self.schema.FOO.BAR < 1), Constant(2), Constant(3))],
+                From=self.schema.FOO,
+                Limit=123
+            ).toSQL(),
+            SQLFragment("select case when BAR < ? then ? else ? end from FOO limit ?", [1, 2, 3, 123])
+        )
+        self.assertEqual(Case((self.schema.FOO.BAR < 1), Constant(2), Constant(3)).allColumns(), [self.schema.FOO.BAR, ])
+        self.assertEquals(
+            Select(
+                [Case((self.schema.FOO.BAR < 1), Constant(2), None)],
+                From=self.schema.FOO,
+                Limit=123
+            ).toSQL(),
+            SQLFragment("select case when BAR < ? then ? else null end from FOO limit ?", [1, 2, 123])
+        )
+        self.assertEqual(Case((self.schema.FOO.BAR < 1), Constant(2), None).allColumns(), [self.schema.FOO.BAR, ])
+        self.assertEquals(
+            Select(
+                [Case((self.schema.FOO.BAR < 1), None, Constant(3))],
+                From=self.schema.FOO,
+                Limit=123
+            ).toSQL(),
+            SQLFragment("select case when BAR < ? then null else ? end from FOO limit ?", [1, 3, 123])
+        )
+        self.assertEqual(Case((self.schema.FOO.BAR < 1), None, Constant(3)).allColumns(), [self.schema.FOO.BAR, ])
 
+
+    def test_call(self):
+        """
+        A L{Call} object will generate an appropriate SQL statement.
+        """
+        self.assertEquals(
+            Call(
+                "procedure"
+            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            SQLFragment("call procedure()", (None,))
+        )
+
+        self.assertEquals(
+            Call(
+                "procedure",
+                1, "2"
+            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            SQLFragment("call procedure()", (None, 1, "2"))
+        )
+
+        self.assertEquals(
+            Call(
+                "function",
+                returnType=int
+            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            SQLFragment("call function()", (int,))
+        )
+
+        self.assertEquals(
+            Call(
+                "function",
+                1, "2",
+                returnType=int
+            ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+            SQLFragment("call function()", (int, 1, "2"))
+        )
+
+        self.assertRaises(
+            NotImplementedError,
+            Call("procedure").toSQL,
+            QueryGenerator(POSTGRES_DIALECT)
+        )
+
+
+
 class OracleConnectionMethods(object):
     def test_rewriteOracleNULLs_Insert(self):
         """
@@ -2250,7 +2339,7 @@
             {self.schema.FOO.BAR: 40, self.schema.FOO.BAZ: 50}
         )
         result = self.resultOf(i.on(self.createTransaction()))
-        self.assertEquals(result, [None])
+        self.assertEquals(result, [[]])
 
 
 

Modified: twext/trunk/twext/enterprise/jobs/jobitem.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/jobitem.py	2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py	2015-09-21 14:29:03 UTC (rev 15141)
@@ -18,8 +18,7 @@
 from twext.enterprise.dal.model import Sequence
 from twext.enterprise.dal.model import Table, Schema, SQLType
 from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
-from twext.enterprise.dal.syntax import SchemaSyntax, Count, NullIf, Constant, \
-    Sum
+from twext.enterprise.dal.syntax import SchemaSyntax, Call, Count, Case, Constant, Sum
 from twext.enterprise.ienterprise import ORACLE_DIALECT
 from twext.enterprise.jobs.utils import inTransaction, astimestamp
 from twext.python.log import Logger
@@ -377,114 +376,95 @@
         @rtype: L{JobItem}
         """
 
-        jobs = yield cls.nextjobs(txn, now, minPriority, limit=1)
+        if txn.dialect == ORACLE_DIALECT:
 
-        # Must only be one or zero
-        if jobs and len(jobs) > 1:
-            raise AssertionError("nextjob() returned more than one row")
+            # For Oracle we need a multi-app server solution that only locks the
+            # (one) row being returned by the query, and allows other app servers
+            # to run the query in parallel and get the next unlocked row.
+            #
+            # To do that we use a stored procedure that uses a CURSOR with a
+            # SELECT ... FOR UPDATE SKIP LOCKED query that ensures only the row
+            # being fetched is locked and existing locked rows are skipped.
 
-        returnValue(jobs[0] if jobs else None)
+            # Three separate stored procedures for the three priority cases
+            if minPriority == JOB_PRIORITY_LOW:
+                function = "next_job_all"
+            elif minPriority == JOB_PRIORITY_MEDIUM:
+                function = "next_job_medium_high"
+            elif minPriority == JOB_PRIORITY_HIGH:
+                function = "next_job_high"
 
+            job = None
+            jobID = yield Call(function, now, returnType=int).on(txn)
+            if jobID:
+                job = yield cls.load(txn, jobID)
+        else:
+            # Only add the PRIORITY term if minimum is greater than zero
+            queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore <= now)
 
-    @classmethod
-    @inlineCallbacks
-    def nextjobs(cls, txn, now, minPriority, limit=1):
-        """
-        Find the next available job based on priority, also return any that are overdue.
+            # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
+            # an equality test as follows:
+            #
+            # PRIORITY >= 0 - no test needed all values match all the time
+            # PRIORITY >= 1 === PRIORITY != 0
+            # PRIORITY >= 2 === PRIORITY == 2
+            #
+            # Doing this allows use of the PRIORITY column in an index since we already
+            # have one inequality in the index (NOT_BEFORE)
 
-        @param txn: the transaction to use
-        @type txn: L{IAsyncTransaction}
-        @param now: current timestamp
-        @type now: L{datetime.datetime}
-        @param minPriority: lowest priority level to query for
-        @type minPriority: L{int}
-        @param limit: limit on number of jobs to return
-        @type limit: L{int}
+            if minPriority == JOB_PRIORITY_MEDIUM:
+                queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
+            elif minPriority == JOB_PRIORITY_HIGH:
+                queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
 
-        @return: the job record
-        @rtype: L{JobItem}
-        """
-
-        # Only add the PRIORITY term if minimum is greater than zero
-        queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore <= now)
-
-        # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
-        # an equality test as follows:
-        #
-        # PRIORITY >= 0 - no test needed all values match all the time
-        # PRIORITY >= 1 === PRIORITY != 0
-        # PRIORITY >= 2 === PRIORITY == 2
-        #
-        # Doing this allows use of the PRIORITY column in an index since we already
-        # have one inequality in the index (NOT_BEFORE)
-
-        if minPriority == JOB_PRIORITY_MEDIUM:
-            queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
-        elif minPriority == JOB_PRIORITY_HIGH:
-            queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
-
-        if txn.dialect == ORACLE_DIALECT:
-            # Oracle does not support a "for update" clause with "order by". So do the
-            # "for update" as a second query right after the first. Will need to check
-            # how this might impact concurrency in a multi-host setup.
             jobs = yield cls.query(
                 txn,
                 queryExpr,
                 order=cls.priority,
                 ascending=False,
-                limit=limit,
-            )
-            if jobs:
-                yield cls.query(
-                    txn,
-                    (cls.jobID.In([job.jobID for job in jobs])),
-                    forUpdate=True,
-                    noWait=False,
-                )
-        else:
-            jobs = yield cls.query(
-                txn,
-                queryExpr,
-                order=cls.priority,
-                ascending=False,
                 forUpdate=True,
                 noWait=False,
-                limit=limit,
+                limit=1,
             )
+            job = jobs[0] if jobs else None
 
-        returnValue(jobs)
+        returnValue(job)
 
 
     @classmethod
     @inlineCallbacks
-    def overduejobs(cls, txn, now, limit=None):
+    def overduejob(cls, txn, now):
         """
-        Find the next overdue job based on priority.
+        Find the next overdue job.
 
         @param txn: the transaction to use
         @type txn: L{IAsyncTransaction}
         @param now: current timestamp
         @type now: L{datetime.datetime}
-        @param limit: limit on number of jobs to return
-        @type limit: L{int}
 
         @return: the job record
         @rtype: L{JobItem}
         """
 
-        queryExpr = (cls.assigned != None).And(cls.overdue < now)
+        if txn.dialect == ORACLE_DIALECT:
+            # See L{nextjob} for why Oracle is different
+            job = None
+            jobID = yield Call("overdue_job", now, returnType=int).on(txn)
+            if jobID:
+                job = yield cls.load(txn, jobID)
+        else:
+            jobs = yield cls.query(
+                txn,
+                (cls.assigned != None).And(cls.overdue < now),
+                forUpdate=True,
+                noWait=False,
+                limit=1,
+            )
+            job = jobs[0] if jobs else None
 
-        jobs = yield cls.query(
-            txn,
-            queryExpr,
-            forUpdate=True,
-            noWait=False,
-            limit=limit,
-        )
+        returnValue(job)
 
-        returnValue(jobs)
 
-
     @inlineCallbacks
     def run(self):
         """
@@ -714,7 +694,7 @@
                 cls.workType,
                 Count(cls.workType),
                 Count(cls.assigned),
-                Count(NullIf(cls.assigned is not None and cls.notBefore < now, Constant(False))),
+                Count(Case((cls.assigned == None).And(cls.notBefore < now), Constant(1), None)),
                 Sum(cls.failed),
             ),
             group=cls.workType

Modified: twext/trunk/twext/enterprise/jobs/queue.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/queue.py	2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/jobs/queue.py	2015-09-21 14:29:03 UTC (rev 15141)
@@ -705,10 +705,8 @@
             txn = overdueJob = None
             try:
                 txn = self.transactionFactory(label="jobqueue.overdueCheck")
-                overdueJobs = yield JobItem.overduejobs(txn, nowTime, limit=1)
-                if overdueJobs:
-                    overdueJob = overdueJobs[0]
-                else:
+                overdueJob = yield JobItem.overduejob(txn, nowTime)
+                if overdueJob is None:
                     break
 
                 # It is overdue - check to see whether the work item is currently locked - if so no
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150921/fafc8c84/attachment-0001.html>


More information about the calendarserver-changes mailing list