[CalendarServer-changes] [15726] twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs

source_changes at macosforge.org source_changes at macosforge.org
Thu Jun 30 09:49:16 PDT 2016


Revision: 15726
          http://trac.calendarserver.org//changeset/15726
Author:   cdaboo at apple.com
Date:     2016-06-30 09:49:16 -0700 (Thu, 30 Jun 2016)
Log Message:
-----------
Improve next_job() performance - notably for Oracle.

Modified Paths:
--------------
    twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/jobitem.py
    twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/queue.py
    twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/test/test_jobs.py

Modified: twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/jobitem.py
===================================================================
--- twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/jobitem.py	2016-06-30 14:43:42 UTC (rev 15725)
+++ twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/jobitem.py	2016-06-30 16:49:16 UTC (rev 15726)
@@ -60,13 +60,14 @@
 
     JobTable.addColumn("JOB_ID", SQLType("integer", None), default=Sequence(inSchema, "JOB_SEQ"), notNull=True, primaryKey=True)
     JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255), notNull=True)
-    JobTable.addColumn("PRIORITY", SQLType("integer", 0), default=0)
-    JobTable.addColumn("WEIGHT", SQLType("integer", 0), default=0)
+    JobTable.addColumn("PRIORITY", SQLType("integer", 0), default=0, notNull=True)
+    JobTable.addColumn("WEIGHT", SQLType("integer", 0), default=0, notNull=True)
     JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None), notNull=True)
+    JobTable.addColumn("IS_ASSIGNED", SQLType("integer", 0), default=0, notNull=True)
     JobTable.addColumn("ASSIGNED", SQLType("timestamp", None), default=None)
     JobTable.addColumn("OVERDUE", SQLType("timestamp", None), default=None)
-    JobTable.addColumn("FAILED", SQLType("integer", 0), default=0)
-    JobTable.addColumn("PAUSE", SQLType("integer", 0), default=0)
+    JobTable.addColumn("FAILED", SQLType("integer", 0), default=0, notNull=True)
+    JobTable.addColumn("PAUSE", SQLType("integer", 0), default=0, notNull=True)
 
     return inSchema
 
@@ -168,14 +169,14 @@
         @param overdue: number of seconds after assignment that the job will be considered overdue
         @type overdue: L{int}
         """
-        return self.update(assigned=when, overdue=when + timedelta(seconds=overdue))
+        return self.update(isAssigned=1, assigned=when, overdue=when + timedelta(seconds=overdue))
 
 
     def unassign(self):
         """
         Mark this job as unassigned by setting the assigned and overdue columns to L{None}.
         """
-        return self.update(assigned=None, overdue=None)
+        return self.update(isAssigned=0, assigned=None, overdue=None)
 
 
     def bumpOverdue(self, bump):
@@ -208,6 +209,7 @@
             delay = self.lockRescheduleInterval if locked else self.failureRescheduleInterval
             delay *= (self.failed + 1)
         return self.update(
+            isAssigned=0,
             assigned=None,
             overdue=None,
             failed=self.failed + (0 if locked else 1),
@@ -358,12 +360,16 @@
 
     @classmethod
     @inlineCallbacks
-    def nextjob(cls, txn, now, minPriority):
+    def nextjob(cls, txn, now, minPriority, rowLimit):
         """
         Find the next available job based on priority, also return any that are overdue. This
         method uses an SQL query to find the matching jobs, and sorts based on the NOT_BEFORE
         value and priority.
 
+        The C{rowLimit} value is used to help with concurrency problems when the underlying DB does
+        not support a proper "LIMIT" term with the query (Oracle). It should be set to no more than
+        1 plus the number of app-servers in use).
+
         @param txn: the transaction to use
         @type txn: L{IAsyncTransaction}
         @param now: current timestamp - needed for unit tests that might use their
@@ -371,6 +377,8 @@
         @type now: L{datetime.datetime}
         @param minPriority: lowest priority level to query for
         @type minPriority: L{int}
+        @param rowLimit: query at most this number of rows at a time
+        @type rowLimit: L{int}
 
         @return: the job record
         @rtype: L{JobItem}
@@ -387,7 +395,7 @@
             # being fetched is locked and existing locked rows are skipped.
 
             job = None
-            jobID = yield Call("next_job", now, minPriority, returnType=int).on(txn)
+            jobID = yield Call("next_job", now, minPriority, rowLimit, returnType=int).on(txn)
             if jobID:
                 job = yield cls.load(txn, jobID)
         else:
@@ -416,7 +424,7 @@
                 ascending=False,
                 forUpdate=True,
                 noWait=False,
-                limit=1,
+                limit=rowLimit,
             )
             job = jobs[0] if jobs else None
 
@@ -425,14 +433,20 @@
 
     @classmethod
     @inlineCallbacks
-    def overduejob(cls, txn, now):
+    def overduejob(cls, txn, now, rowLimit):
         """
         Find the next overdue job.
 
+        The C{rowLimit} value is used to help with concurrency problems when the underlying DB does
+        not support a proper "LIMIT" term with the query (Oracle). It should be set to no more than
+        1 plus the number of app-servers in use).
+
         @param txn: the transaction to use
         @type txn: L{IAsyncTransaction}
         @param now: current timestamp
         @type now: L{datetime.datetime}
+        @param rowLimit: query at most this number of rows at a time
+        @type rowLimit: L{int}
 
         @return: the job record
         @rtype: L{JobItem}
@@ -441,7 +455,7 @@
         if txn.dialect == ORACLE_DIALECT:
             # See L{nextjob} for why Oracle is different
             job = None
-            jobID = yield Call("overdue_job", now, returnType=int).on(txn)
+            jobID = yield Call("overdue_job", now, rowLimit, returnType=int).on(txn)
             if jobID:
                 job = yield cls.load(txn, jobID)
         else:
@@ -450,7 +464,7 @@
                 (cls.assigned != None).And(cls.overdue < now),
                 forUpdate=True,
                 noWait=False,
-                limit=1,
+                limit=rowLimit,
             )
             job = jobs[0] if jobs else None
 

Modified: twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/queue.py
===================================================================
--- twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/queue.py	2016-06-30 14:43:42 UTC (rev 15725)
+++ twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/queue.py	2016-06-30 16:49:16 UTC (rev 15726)
@@ -463,6 +463,12 @@
     highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
     mediumPriorityLevel = 50    # Percentage load level above which high and medium priority jobs are processed
 
+    # Used to help with concurrency problems when the underlying DB does not
+    # support a proper "LIMIT" term with the query (Oracle). It should be set to
+    # no more than 1 plus the number of app-servers in use). For a single app
+    # server always use 1.
+    rowLimit = 1
+
     def __init__(self, reactor, transactionFactory, useWorkerPool=True, disableWorkProcessing=False):
         """
         Initialize a L{ControllerQueue}.
@@ -577,7 +583,7 @@
             txn = nextJob = None
             try:
                 txn = self.transactionFactory(label="jobqueue.workCheck")
-                nextJob = yield JobItem.nextjob(txn, nowTime, minPriority)
+                nextJob = yield JobItem.nextjob(txn, nowTime, minPriority, self.rowLimit)
                 if nextJob is None:
                     break
 
@@ -706,7 +712,7 @@
             txn = overdueJob = None
             try:
                 txn = self.transactionFactory(label="jobqueue.overdueCheck")
-                overdueJob = yield JobItem.overduejob(txn, nowTime)
+                overdueJob = yield JobItem.overduejob(txn, nowTime, self.rowLimit)
                 if overdueJob is None:
                     break
 

Modified: twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/test/test_jobs.py
===================================================================
--- twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/test/test_jobs.py	2016-06-30 14:43:42 UTC (rev 15725)
+++ twext/branches/users/cdaboo/better-next-job/twext/enterprise/jobs/test/test_jobs.py	2016-06-30 16:49:16 UTC (rev 15726)
@@ -193,13 +193,14 @@
     create table JOB (
       JOB_ID      integer primary key default 1,
       WORK_TYPE   varchar(255) not null,
-      PRIORITY    integer default 0,
-      WEIGHT      integer default 0,
+      PRIORITY    integer default 0 not null,
+      WEIGHT      integer default 0 not null,
       NOT_BEFORE  timestamp not null,
+      IS_ASSIGNED integer default 0 not null,
       ASSIGNED    timestamp default null,
       OVERDUE     timestamp default null,
-      FAILED      integer default 0,
-      PAUSE       integer default 0
+      FAILED      integer default 0 not null,
+      PAUSE       integer default 0 not null
     );
     """
 )
@@ -463,6 +464,7 @@
         self.assertTrue(len(jobs) == 1)
         self.assertTrue(jobs[0].workType == "DUMMY_WORK_ITEM")
         self.assertTrue(jobs[0].assigned is None)
+        self.assertEqual(jobs[0].isAssigned, 0)
 
         @transactionally(dbpool.connection)
         def checkWork(txn):
@@ -488,6 +490,7 @@
         jobs = yield inTransaction(dbpool.connection, checkJob)
         self.assertTrue(len(jobs) == 1)
         self.assertTrue(jobs[0].assigned is None)
+        self.assertEqual(jobs[0].isAssigned, 0)
 
         @inlineCallbacks
         def assignJob(txn):
@@ -498,6 +501,7 @@
         jobs = yield inTransaction(dbpool.connection, checkJob)
         self.assertTrue(len(jobs) == 1)
         self.assertTrue(jobs[0].assigned is not None)
+        self.assertEqual(jobs[0].isAssigned, 1)
 
 
     @inlineCallbacks
@@ -512,7 +516,7 @@
         # Empty job queue
         @inlineCallbacks
         def _next(txn, priority=WORK_PRIORITY_LOW):
-            job = yield JobItem.nextjob(txn, now, priority)
+            job = yield JobItem.nextjob(txn, now, priority, 1)
             if job is not None:
                 work = yield job.workItem()
             else:
@@ -1196,6 +1200,7 @@
         jobs = yield check
         self.assertTrue(len(jobs) == 1)
         self.assertTrue(jobs[0].assigned is None)
+        self.assertEqual(jobs[0].isAssigned, 0)
         self.assertTrue(jobs[0].failed == 1)
         self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow())
 
@@ -1230,6 +1235,7 @@
         jobs = yield check
         self.assertTrue(len(jobs) == 1)
         self.assertTrue(jobs[0].assigned is None)
+        self.assertEqual(jobs[0].isAssigned, 0)
         self.assertTrue(jobs[0].failed == 1)
         self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow() + datetime.timedelta(seconds=90))
 
@@ -1244,8 +1250,8 @@
 
         oldNextJob = JobItem.nextjob
         @inlineCallbacks
-        def _nextJob(cls, txn, now, minPriority):
-            job = yield oldNextJob(txn, now, minPriority)
+        def _nextJob(cls, txn, now, minPriority, rowLimit):
+            job = yield oldNextJob(txn, now, minPriority, rowLimit)
             work = yield job.workItem()
             if work.a == -2:
                 raise ValueError("oops")
@@ -1277,9 +1283,11 @@
         jobs = yield check
         self.assertEqual(len(jobs), 2)
         self.assertEqual(jobs[0].assigned, None)
+        self.assertEqual(jobs[0].isAssigned, 0)
         self.assertEqual(jobs[0].failed, 0)
         self.assertEqual(jobs[0].notBefore, fakeNow - datetime.timedelta(20 * 60))
         self.assertEqual(jobs[1].assigned, None)
+        self.assertEqual(jobs[1].isAssigned, 0)
         self.assertEqual(jobs[1].failed, 0)
         self.assertEqual(jobs[1].notBefore, fakeNow - datetime.timedelta(20 * 60, 5))
 
@@ -1327,6 +1335,7 @@
         jobs = yield check
         self.assertEqual(len(jobs), 1)
         self.assertEqual(jobs[0].assigned, None)
+        self.assertEqual(jobs[0].isAssigned, 0)
         self.assertEqual(jobs[0].failed, 1)
         self.assertGreater(jobs[0].notBefore, datetime.datetime.utcnow() + datetime.timedelta(seconds=30))
 
@@ -1380,9 +1389,11 @@
         jobs = yield check
         self.assertEqual(len(jobs), 2)
         self.assertEqual(jobs[0].assigned, None)
+        self.assertEqual(jobs[0].isAssigned, 0)
         self.assertEqual(jobs[0].failed, 0)
         self.assertEqual(jobs[0].notBefore, fakeNow - datetime.timedelta(20 * 60))
         self.assertEqual(jobs[1].assigned, None)
+        self.assertEqual(jobs[1].isAssigned, 0)
         self.assertEqual(jobs[1].failed, 0)
         self.assertEqual(jobs[1].notBefore, fakeNow - datetime.timedelta(20 * 60, 5))
 
@@ -1872,7 +1883,7 @@
         @inlineCallbacks
         def _testNone(txn):
             nowTime = datetime.datetime.utcfromtimestamp(reactor.seconds() + 10)
-            job = yield JobItem.nextjob(txn, nowTime, WORK_PRIORITY_HIGH)
+            job = yield JobItem.nextjob(txn, nowTime, WORK_PRIORITY_HIGH, 1)
             self.assertTrue(job is None)
 
         yield _testNone
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20160630/ea735ca3/attachment-0001.html>


More information about the calendarserver-changes mailing list