[CalendarServer-changes] [15141] twext/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Mon Sep 21 07:29:03 PDT 2015
Revision: 15141
http://trac.calendarserver.org//changeset/15141
Author: cdaboo at apple.com
Date: 2015-09-21 07:29:03 -0700 (Mon, 21 Sep 2015)
Log Message:
-----------
Changes for Oracle: add Call(), SKIP LOCKED, and Case SQL syntax support. Use stored procedures for next_job processing. Record.deletesome multi-row return.
Modified Paths:
--------------
twext/trunk/twext/enterprise/adbapi2.py
twext/trunk/twext/enterprise/dal/record.py
twext/trunk/twext/enterprise/dal/syntax.py
twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
twext/trunk/twext/enterprise/jobs/jobitem.py
twext/trunk/twext/enterprise/jobs/queue.py
Modified: twext/trunk/twext/enterprise/adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/adbapi2.py 2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/adbapi2.py 2015-09-21 14:29:03 UTC (rev 15141)
@@ -229,6 +229,13 @@
@raise raiseOnZeroRowCount: if the argument was specified and no rows
were returned by the executed statement.
"""
+ # Oracle only: we need to support the CALL statement which is available in
+ # in the cx_Oracle module via special callproc() and callfunc() methods
+ # that are not part of the standard Python DBAPI.
+ stmts = sql.splitlines()
+ if stmts[-1].startswith("call "):
+ return self._reallyCallSQL(stmts[-1], args)
+
wasFirst = self._first
# If this is the first time this cursor has been used in this
@@ -312,12 +319,103 @@
# Oracle with a return into clause returns an empty set or rows, but
# we then have to insert the special bind variables for the return into.
# Thus we need to know whether there was any rowcount from the actual query.
- # What we do is always insert a set of empty rows as the result if the
- # rowcount is non-zero. Then we can detect whether the bind variables
- # need to be added into the result set.
- return [[]] * self._cursor.rowcount if self._cursor.rowcount else None
+ # What we do is always insert a set of empty rows equal to the rowcount. in
+ # the case of no result, an empty list is returned. This way we can detect
+ # that the additional bind variables are needed (if len(result) != 0).
+ return [[]] * self._cursor.rowcount
+ def _reallyCallSQL(self, sql, args=None):
+ """
+ Use cx_Oracle's callproc() or callfunc() Cursor methods to execute a
+ stored procedure.
+
+ @param sql: The SQL string to execute.
+ @type sql: C{str}
+
+ @param args: The parameters of the procedure or function, if any.
+ @type args: C{list} or C{None}
+
+ @raise Exception: this function may raise any exception raised by the
+ underlying C{dbapi.connect}, C{cursor.execute},
+ L{IDerivedParameter.preQuery}, C{connection.cursor}, or
+ C{cursor.callxxx}.
+ """
+
+ # The sql will be of the form "call <name>()" with args
+ if not sql.startswith("call ") or not sql.endswith("()"):
+ raise ValueError("Invalid SQL CALL statement: {}".format(sql))
+ name = sql[5:-2]
+ returnType = args[0]
+ args = args[1:]
+
+ wasFirst = self._first
+
+ # If this is the first time this cursor has been used in this
+ # transaction, remember that, but mark it as now used.
+ self._first = False
+
+ try:
+ # Make the return value look like an array of rows/columns, as
+ # one would expect for something like a SELECT
+ if returnType is not None:
+ returnValue = self._cursor.callfunc(name, returnType, args)
+ returnValue = [returnValue, ]
+ else:
+ returnValue = self._cursor.callproc(name, args)
+ returnValue = [returnValue, ]
+ except:
+ # If execute() raised an exception, and this was the first thing to
+ # happen in the transaction, then the connection has probably gone
+ # bad in the meanwhile, and we should try again.
+ if wasFirst:
+ # Report the error before doing anything else, since doing
+ # other things may cause the traceback stack to be eliminated
+ # if they raise exceptions (even internally).
+ log.failure(
+ "Exception from execute() on first statement in "
+ "transaction. Possibly caused by a database server "
+ "restart. Automatically reconnecting now.",
+ failure=Failure(),
+ )
+ try:
+ self._connection.close()
+ except:
+ # close() may raise an exception to alert us of an error as
+ # well. Right now the only type of error we know about is
+ # "the connection is already closed", which obviously
+ # doesn't need to be handled specially. Unfortunately the
+ # reporting of this type of error is not consistent or
+ # predictable across different databases, or even different
+ # bindings to the same database, so we have to do a
+ # catch-all here. While I can't imagine another type of
+ # error at the moment, bare C{except:}s are notorious for
+ # making debugging surprising error conditions very
+ # difficult, so let's make sure that the error is logged
+ # just in case.
+ log.failure(
+ "Exception from close() while automatically "
+ "reconnecting. (Probably not serious.)",
+ failure=Failure(),
+ )
+
+ # Now, if either of *these* things fail, there's an error here
+ # that we cannot workaround or address automatically, so no
+ # try:except: for them.
+ self._connection = self._pool.connectionFactory()
+ self._cursor = self._connection.cursor()
+
+ # Note that although this method is being invoked recursively,
+ # the "_first" flag is re-set at the very top, so we will _not_
+ # be re-entering it more than once.
+ result = self._reallyCallSQL(sql, args)
+ return result
+ else:
+ raise
+ else:
+ return returnValue
+
+
def execSQL(self, *args, **kw):
if self._completed:
raise RuntimeError("Attempt to use {} transaction.".format(self._completed))
Modified: twext/trunk/twext/enterprise/dal/record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/record.py 2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/record.py 2015-09-21 14:29:03 UTC (rev 15141)
@@ -33,6 +33,7 @@
from twext.enterprise.dal.syntax import (
Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete, SavepointAction,
Count, ALL_COLUMNS)
+from twext.enterprise.ienterprise import ORACLE_DIALECT
from twext.enterprise.util import parseSQLTimestamp
# from twext.enterprise.dal.syntax import ExpressionSyntax
@@ -472,7 +473,7 @@
@classmethod
- def query(cls, transaction, expr, order=None, group=None, limit=None, forUpdate=False, noWait=False, ascending=True, distinct=False):
+ def query(cls, transaction, expr, order=None, group=None, limit=None, forUpdate=False, noWait=False, skipLocked=False, ascending=True, distinct=False):
"""
Query the table that corresponds to C{cls}, and return instances of
C{cls} corresponding to the rows that are returned from that table.
@@ -495,6 +496,8 @@
@type forUpdate: L{bool}
@param noWait: include NOWAIT with the FOR UPDATE
@type noWait: L{bool}
+ @param skipLocked: include SKIP LOCKED with the FOR UPDATE
+ @type skipLocked: L{bool}
"""
return cls._rowsFromQuery(
transaction,
@@ -505,6 +508,7 @@
limit=limit,
forUpdate=forUpdate,
noWait=noWait,
+ skipLocked=skipLocked,
ascending=ascending,
distinct=distinct,
),
@@ -513,7 +517,7 @@
@classmethod
- def queryExpr(cls, expr, attributes=None, order=None, group=None, limit=None, forUpdate=False, noWait=False, ascending=True, distinct=False):
+ def queryExpr(cls, expr, attributes=None, order=None, group=None, limit=None, forUpdate=False, noWait=False, skipLocked=False, ascending=True, distinct=False):
"""
Query expression that corresponds to C{cls}. Used in cases where a sub-select
on this record's table is needed.
@@ -536,6 +540,8 @@
@type forUpdate: L{bool}
@param noWait: include NOWAIT with the FOR UPDATE
@type noWait: L{bool}
+ @param skipLocked: include SKIP LOCKED with the FOR UPDATE
+ @type skipLocked: L{bool}
"""
kw = {}
if order is not None:
@@ -548,6 +554,8 @@
kw.update(ForUpdate=True)
if noWait:
kw.update(NoWait=True)
+ if skipLocked:
+ kw.update(SkipLocked=True)
if distinct:
kw.update(Distinct=True)
if attributes is None:
@@ -631,15 +639,32 @@
@classmethod
+ @inlineCallbacks
def deletesome(cls, transaction, where, returnCols=None):
"""
Delete all rows matching the where expression from the table that corresponds to C{cls}.
"""
- return Delete(
- From=cls.table,
- Where=where,
- Return=returnCols,
- ).on(transaction)
+ if transaction.dialect == ORACLE_DIALECT and returnCols is not None:
+ # Oracle cannot return multiple rows in the RETURNING clause so
+ # we have to split this into a SELECT followed by a DELETE
+ if not isinstance(returnCols, (tuple, list)):
+ returnCols = [returnCols, ]
+ result = yield Select(
+ returnCols,
+ From=cls.table,
+ Where=where,
+ ).on(transaction)
+ yield Delete(
+ From=cls.table,
+ Where=where,
+ ).on(transaction)
+ else:
+ result = yield Delete(
+ From=cls.table,
+ Where=where,
+ Return=returnCols,
+ ).on(transaction)
+ returnValue(result)
@classmethod
Modified: twext/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/syntax.py 2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/syntax.py 2015-09-21 14:29:03 UTC (rev 15141)
@@ -1079,6 +1079,10 @@
super(NullComparison, self).__init__(a, op, None)
+ def allColumns(self):
+ return self.a.allColumns()
+
+
def subSQL(self, queryGenerator, allTables):
sqls = SQLFragment()
sqls.append(self.a.subSQL(queryGenerator, allTables))
@@ -1224,6 +1228,58 @@
+class Case(ExpressionSyntax):
+ """
+ Implementation of a simple CASE statement:
+
+ CASE ... WHEN ... THEN ... ELSE ... END
+
+ Note that SQL actually allows multiple WHEN ... THEN ... clauses, but
+ this implementation only supports one.
+ """
+
+ def __init__(self, when, true_result, false_result):
+ """
+ @param when: WHEN clause - typically a L{Comparison}
+ @type when: L{Expression}
+ @param true_result: result when WHEN clause is true
+ @type true_result: L{Constant} or L{None}
+ @param false_result: result when WHEN clause is false
+ @type false_result: L{Constant} or L{None}
+ """
+ self.when = when
+ self.true_result = true_result
+ self.false_result = false_result
+
+
+ def subSQL(self, queryGenerator, allTables):
+ result = SQLFragment("case when ")
+ result.append(self.when.subSQL(queryGenerator, allTables))
+ result.append(SQLFragment(" then "))
+ if self.true_result is None:
+ result.append(SQLFragment("null"))
+ else:
+ result.append(self.true_result.subSQL(queryGenerator, allTables))
+ result.append(SQLFragment(" else "))
+ if self.false_result is None:
+ result.append(SQLFragment("null"))
+ else:
+ result.append(self.false_result.subSQL(queryGenerator, allTables))
+ result.append(SQLFragment(" end"))
+
+ return result
+
+
+ def allColumns(self):
+ """
+ Return all columns referenced by any sub-clauses.
+ """
+ return self.when.allColumns() + \
+ (self.true_result.allColumns() if self.true_result is not None else []) + \
+ (self.false_result.allColumns() if self.false_result is not None else [])
+
+
+
class Tuple(ExpressionSyntax):
def __init__(self, columns):
@@ -1340,7 +1396,7 @@
self,
columns=None, Where=None, From=None,
OrderBy=None, GroupBy=None,
- Limit=None, ForUpdate=False, NoWait=False, Ascending=None,
+ Limit=None, ForUpdate=False, NoWait=False, SkipLocked=False, Ascending=None,
Having=None, Distinct=False, As=None, SetExpression=None
):
self.From = From
@@ -1365,6 +1421,7 @@
self.ForUpdate = ForUpdate
self.NoWait = NoWait
+ self.SkipLocked = SkipLocked
self.Ascending = Ascending
self.As = As
@@ -1453,6 +1510,8 @@
stmt.text += " for update"
if self.NoWait:
stmt.text += " nowait"
+ if self.SkipLocked:
+ stmt.text += " skip locked"
if self.Limit is not None:
limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)
@@ -1527,6 +1586,55 @@
+class Call(_Statement):
+ """
+ CALL statement. Only supported by Oracle.
+ """
+
+ def __init__(self, name, *args, **kwargs):
+ """
+ @param name: name of procedure or function to call
+ @type name: L{str}
+ @param *args: arguments for the procedure or functions
+ @type *args: L{list}
+ @param returnType: kwarg: the Python type of the return value for
+ a function
+ @type returnType: L{Type}
+ """
+ self.Name = name
+ self.Args = args
+ self.ReturnType = kwargs.get("returnType")
+
+
+ def _toSQL(self, queryGenerator):
+ """
+ Generate an SQL statement of the form "call <name>()" with a list of
+ args, where the first arg is always the return type (which is C{None}
+ for a procedure, rather than a function, call).
+
+ @return: a C{call} statement with arguments
+ @rtype: L{SQLFragment}
+ """
+
+ if queryGenerator.dialect != ORACLE_DIALECT:
+ raise NotImplementedError("CALL statement only available with Oracle DB")
+ args = (self.ReturnType,) + self.Args
+ stmt = SQLFragment("call ", args)
+ stmt.text += self.Name
+ stmt.text += "()"
+
+ return stmt
+
+
+ def _fixOracleNulls(self, rows):
+ """
+ Suppress the super class behavior because we are getting result values
+ directly, not from columns.
+ """
+ return rows[0][0]
+
+
+
def _commaJoined(stmts):
first = True
cstatement = SQLFragment()
@@ -1658,9 +1766,9 @@
):
def processIt(emptyListResult):
# See comment in L{adbapi2._ConnectedTxn._reallyExecSQL}. If the
- # result is L{None} then also return L{None}. If the result is a
+ # result is an empty list, just return that. If the result is a
# L{list} of empty L{list} then there are return into rows to return.
- if emptyListResult:
+ if len(emptyListResult) > 0:
emptyListResult = [[v.value for _ignore_k, v in outvars]]
return emptyListResult
return result.addCallback(processIt)
Modified: twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py 2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/dal/test/test_sqlsyntax.py 2015-09-21 14:29:03 UTC (rev 15141)
@@ -33,7 +33,8 @@
Savepoint, RollbackToSavepoint, ReleaseSavepoint, SavepointAction,
Union, Intersect, Except, SetExpression, DALError,
ResultAliasSyntax, Count, QueryGenerator, ALL_COLUMNS,
- DatabaseLock, DatabaseUnlock, Not, Coalesce, NullIf)
+ DatabaseLock, DatabaseUnlock, Not, Coalesce, NullIf,
+ Call, Case)
from twext.enterprise.dal.syntax import FixedPlaceholder, NumericPlaceholder
from twext.enterprise.dal.syntax import Function
from twext.enterprise.dal.syntax import SchemaSyntax
@@ -366,6 +367,18 @@
Select(From=self.schema.FOO, ForUpdate=True).toSQL(),
SQLFragment("select * from FOO for update")
)
+ self.assertEquals(
+ Select(From=self.schema.FOO, ForUpdate=True, NoWait=True).toSQL(),
+ SQLFragment("select * from FOO for update nowait")
+ )
+ self.assertEquals(
+ Select(From=self.schema.FOO, ForUpdate=True, SkipLocked=True).toSQL(),
+ SQLFragment("select * from FOO for update skip locked")
+ )
+ self.assertEquals(
+ Select(From=self.schema.FOO, ForUpdate=True, NoWait=True, SkipLocked=True).toSQL(),
+ SQLFragment("select * from FOO for update nowait skip locked")
+ )
def test_groupBy(self):
@@ -2192,7 +2205,83 @@
self.assertEquals(values, {})
+ def test_case(self):
+ """
+ A L{Case} object will generate an appropriate SQL statement.
+ """
+ self.assertEquals(
+ Select(
+ [Case((self.schema.FOO.BAR < 1), Constant(2), Constant(3))],
+ From=self.schema.FOO,
+ Limit=123
+ ).toSQL(),
+ SQLFragment("select case when BAR < ? then ? else ? end from FOO limit ?", [1, 2, 3, 123])
+ )
+ self.assertEqual(Case((self.schema.FOO.BAR < 1), Constant(2), Constant(3)).allColumns(), [self.schema.FOO.BAR, ])
+ self.assertEquals(
+ Select(
+ [Case((self.schema.FOO.BAR < 1), Constant(2), None)],
+ From=self.schema.FOO,
+ Limit=123
+ ).toSQL(),
+ SQLFragment("select case when BAR < ? then ? else null end from FOO limit ?", [1, 2, 123])
+ )
+ self.assertEqual(Case((self.schema.FOO.BAR < 1), Constant(2), None).allColumns(), [self.schema.FOO.BAR, ])
+ self.assertEquals(
+ Select(
+ [Case((self.schema.FOO.BAR < 1), None, Constant(3))],
+ From=self.schema.FOO,
+ Limit=123
+ ).toSQL(),
+ SQLFragment("select case when BAR < ? then null else ? end from FOO limit ?", [1, 3, 123])
+ )
+ self.assertEqual(Case((self.schema.FOO.BAR < 1), None, Constant(3)).allColumns(), [self.schema.FOO.BAR, ])
+
+ def test_call(self):
+ """
+ A L{Call} object will generate an appropriate SQL statement.
+ """
+ self.assertEquals(
+ Call(
+ "procedure"
+ ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ SQLFragment("call procedure()", (None,))
+ )
+
+ self.assertEquals(
+ Call(
+ "procedure",
+ 1, "2"
+ ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ SQLFragment("call procedure()", (None, 1, "2"))
+ )
+
+ self.assertEquals(
+ Call(
+ "function",
+ returnType=int
+ ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ SQLFragment("call function()", (int,))
+ )
+
+ self.assertEquals(
+ Call(
+ "function",
+ 1, "2",
+ returnType=int
+ ).toSQL(QueryGenerator(ORACLE_DIALECT)),
+ SQLFragment("call function()", (int, 1, "2"))
+ )
+
+ self.assertRaises(
+ NotImplementedError,
+ Call("procedure").toSQL,
+ QueryGenerator(POSTGRES_DIALECT)
+ )
+
+
+
class OracleConnectionMethods(object):
def test_rewriteOracleNULLs_Insert(self):
"""
@@ -2250,7 +2339,7 @@
{self.schema.FOO.BAR: 40, self.schema.FOO.BAZ: 50}
)
result = self.resultOf(i.on(self.createTransaction()))
- self.assertEquals(result, [None])
+ self.assertEquals(result, [[]])
Modified: twext/trunk/twext/enterprise/jobs/jobitem.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/jobitem.py 2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py 2015-09-21 14:29:03 UTC (rev 15141)
@@ -18,8 +18,7 @@
from twext.enterprise.dal.model import Sequence
from twext.enterprise.dal.model import Table, Schema, SQLType
from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
-from twext.enterprise.dal.syntax import SchemaSyntax, Count, NullIf, Constant, \
- Sum
+from twext.enterprise.dal.syntax import SchemaSyntax, Call, Count, Case, Constant, Sum
from twext.enterprise.ienterprise import ORACLE_DIALECT
from twext.enterprise.jobs.utils import inTransaction, astimestamp
from twext.python.log import Logger
@@ -377,114 +376,95 @@
@rtype: L{JobItem}
"""
- jobs = yield cls.nextjobs(txn, now, minPriority, limit=1)
+ if txn.dialect == ORACLE_DIALECT:
- # Must only be one or zero
- if jobs and len(jobs) > 1:
- raise AssertionError("nextjob() returned more than one row")
+ # For Oracle we need a multi-app server solution that only locks the
+ # (one) row being returned by the query, and allows other app servers
+ # to run the query in parallel and get the next unlocked row.
+ #
+ # To do that we use a stored procedure that uses a CURSOR with a
+ # SELECT ... FOR UPDATE SKIP LOCKED query that ensures only the row
+ # being fetched is locked and existing locked rows are skipped.
- returnValue(jobs[0] if jobs else None)
+ # Three separate stored procedures for the three priority cases
+ if minPriority == JOB_PRIORITY_LOW:
+ function = "next_job_all"
+ elif minPriority == JOB_PRIORITY_MEDIUM:
+ function = "next_job_medium_high"
+ elif minPriority == JOB_PRIORITY_HIGH:
+ function = "next_job_high"
+ job = None
+ jobID = yield Call(function, now, returnType=int).on(txn)
+ if jobID:
+ job = yield cls.load(txn, jobID)
+ else:
+ # Only add the PRIORITY term if minimum is greater than zero
+ queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore <= now)
- @classmethod
- @inlineCallbacks
- def nextjobs(cls, txn, now, minPriority, limit=1):
- """
- Find the next available job based on priority, also return any that are overdue.
+ # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
+ # an equality test as follows:
+ #
+ # PRIORITY >= 0 - no test needed all values match all the time
+ # PRIORITY >= 1 === PRIORITY != 0
+ # PRIORITY >= 2 === PRIORITY == 2
+ #
+ # Doing this allows use of the PRIORITY column in an index since we already
+ # have one inequality in the index (NOT_BEFORE)
- @param txn: the transaction to use
- @type txn: L{IAsyncTransaction}
- @param now: current timestamp
- @type now: L{datetime.datetime}
- @param minPriority: lowest priority level to query for
- @type minPriority: L{int}
- @param limit: limit on number of jobs to return
- @type limit: L{int}
+ if minPriority == JOB_PRIORITY_MEDIUM:
+ queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
+ elif minPriority == JOB_PRIORITY_HIGH:
+ queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
- @return: the job record
- @rtype: L{JobItem}
- """
-
- # Only add the PRIORITY term if minimum is greater than zero
- queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore <= now)
-
- # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
- # an equality test as follows:
- #
- # PRIORITY >= 0 - no test needed all values match all the time
- # PRIORITY >= 1 === PRIORITY != 0
- # PRIORITY >= 2 === PRIORITY == 2
- #
- # Doing this allows use of the PRIORITY column in an index since we already
- # have one inequality in the index (NOT_BEFORE)
-
- if minPriority == JOB_PRIORITY_MEDIUM:
- queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
- elif minPriority == JOB_PRIORITY_HIGH:
- queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
-
- if txn.dialect == ORACLE_DIALECT:
- # Oracle does not support a "for update" clause with "order by". So do the
- # "for update" as a second query right after the first. Will need to check
- # how this might impact concurrency in a multi-host setup.
jobs = yield cls.query(
txn,
queryExpr,
order=cls.priority,
ascending=False,
- limit=limit,
- )
- if jobs:
- yield cls.query(
- txn,
- (cls.jobID.In([job.jobID for job in jobs])),
- forUpdate=True,
- noWait=False,
- )
- else:
- jobs = yield cls.query(
- txn,
- queryExpr,
- order=cls.priority,
- ascending=False,
forUpdate=True,
noWait=False,
- limit=limit,
+ limit=1,
)
+ job = jobs[0] if jobs else None
- returnValue(jobs)
+ returnValue(job)
@classmethod
@inlineCallbacks
- def overduejobs(cls, txn, now, limit=None):
+ def overduejob(cls, txn, now):
"""
- Find the next overdue job based on priority.
+ Find the next overdue job.
@param txn: the transaction to use
@type txn: L{IAsyncTransaction}
@param now: current timestamp
@type now: L{datetime.datetime}
- @param limit: limit on number of jobs to return
- @type limit: L{int}
@return: the job record
@rtype: L{JobItem}
"""
- queryExpr = (cls.assigned != None).And(cls.overdue < now)
+ if txn.dialect == ORACLE_DIALECT:
+ # See L{nextjob} for why Oracle is different
+ job = None
+ jobID = yield Call("overdue_job", now, returnType=int).on(txn)
+ if jobID:
+ job = yield cls.load(txn, jobID)
+ else:
+ jobs = yield cls.query(
+ txn,
+ (cls.assigned != None).And(cls.overdue < now),
+ forUpdate=True,
+ noWait=False,
+ limit=1,
+ )
+ job = jobs[0] if jobs else None
- jobs = yield cls.query(
- txn,
- queryExpr,
- forUpdate=True,
- noWait=False,
- limit=limit,
- )
+ returnValue(job)
- returnValue(jobs)
-
@inlineCallbacks
def run(self):
"""
@@ -714,7 +694,7 @@
cls.workType,
Count(cls.workType),
Count(cls.assigned),
- Count(NullIf(cls.assigned is not None and cls.notBefore < now, Constant(False))),
+ Count(Case((cls.assigned == None).And(cls.notBefore < now), Constant(1), None)),
Sum(cls.failed),
),
group=cls.workType
Modified: twext/trunk/twext/enterprise/jobs/queue.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/queue.py 2015-09-21 14:19:48 UTC (rev 15140)
+++ twext/trunk/twext/enterprise/jobs/queue.py 2015-09-21 14:29:03 UTC (rev 15141)
@@ -705,10 +705,8 @@
txn = overdueJob = None
try:
txn = self.transactionFactory(label="jobqueue.overdueCheck")
- overdueJobs = yield JobItem.overduejobs(txn, nowTime, limit=1)
- if overdueJobs:
- overdueJob = overdueJobs[0]
- else:
+ overdueJob = yield JobItem.overduejob(txn, nowTime)
+ if overdueJob is None:
break
# It is overdue - check to see whether the work item is currently locked - if so no
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150921/fafc8c84/attachment-0001.html>
More information about the calendarserver-changes
mailing list