[CalendarServer-changes] [14586] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Fri Mar 13 10:33:19 PDT 2015


Revision: 14586
          http://trac.calendarserver.org//changeset/14586
Author:   cdaboo at apple.com
Date:     2015-03-13 10:33:19 -0700 (Fri, 13 Mar 2015)
Log Message:
-----------
Fixes for Oracle DB.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/dal/syntax.py
    twext/trunk/twext/enterprise/dal/test/test_parseschema.py
    twext/trunk/twext/enterprise/jobqueue.py

Modified: twext/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/syntax.py	2015-03-13 02:18:06 UTC (rev 14585)
+++ twext/trunk/twext/enterprise/dal/syntax.py	2015-03-13 17:33:19 UTC (rev 14586)
@@ -1445,9 +1445,13 @@
             # FOR UPDATE not supported with sqlite - but that is probably not relevant
             # given that sqlite does file level locking of the DB
             if queryGenerator.dialect != SQLITE_DIALECT:
-                stmt.text += " for update"
-                if self.NoWait:
-                    stmt.text += " nowait"
+                # Oracle turns this statement into a sub-select if Limit is non-zero, but we can't have
+                # the "for update" in the sub-select. So suppress it here and add it in the outer limit
+                # select later.
+                if self.Limit is None or queryGenerator.dialect != ORACLE_DIALECT:
+                    stmt.text += " for update"
+                    if self.NoWait:
+                        stmt.text += " nowait"
 
         if self.Limit is not None:
             limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)
@@ -1460,6 +1464,12 @@
                 stmt.text += " limit "
             stmt.append(limitConst)
 
+            # Add in any Oracle "for update"
+            if self.ForUpdate and queryGenerator.dialect == ORACLE_DIALECT:
+                stmt.text += " for update"
+                if self.NoWait:
+                    stmt.text += " nowait"
+
         return stmt
 
 
@@ -2042,12 +2052,20 @@
         self._name = name
 
 
+    def _safeName(self, txn):
+        if txn.dialect == ORACLE_DIALECT:
+            # Oracle limits the length of identifiers
+            return self._name[:30]
+        else:
+            return self._name
+
+
     def acquire(self, txn):
-        return Savepoint(self._name).on(txn)
+        return Savepoint(self._safeName(txn)).on(txn)
 
 
     def rollback(self, txn):
-        return RollbackToSavepoint(self._name).on(txn)
+        return RollbackToSavepoint(self._safeName(txn)).on(txn)
 
 
     def release(self, txn):
@@ -2057,7 +2075,7 @@
             # do anything.
             return NoOp()
         else:
-            return ReleaseSavepoint(self._name).on(txn)
+            return ReleaseSavepoint(self._safeName(txn)).on(txn)
 
 
 

Modified: twext/trunk/twext/enterprise/dal/test/test_parseschema.py
===================================================================
--- twext/trunk/twext/enterprise/dal/test/test_parseschema.py	2015-03-13 02:18:06 UTC (rev 14585)
+++ twext/trunk/twext/enterprise/dal/test/test_parseschema.py	2015-03-13 17:33:19 UTC (rev 14586)
@@ -110,12 +110,17 @@
             create table thetable (
                 thecolumn integer default nextval('thingy')
             );
+            create table thetable2 (
+                thecolumn2 integer primary key default nextval('thingy'),
+                ignoreme integer
+            );
             """)
         self.assertEquals(len(s.sequences), 1)
         self.assertEquals(s.sequences[0].name, "thingy")
         self.assertEquals(s.tables[0].columns[0].default, s.sequences[0])
+        self.assertEquals(s.tables[1].columns[0].default, s.sequences[0])
         self.assertEquals(s.sequences[0].referringColumns,
-                          [s.tables[0].columns[0]])
+                          [s.tables[0].columns[0], s.tables[1].columns[0]])
 
 
     def test_sequenceDefault(self):

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2015-03-13 02:18:06 UTC (rev 14585)
+++ twext/trunk/twext/enterprise/jobqueue.py	2015-03-13 17:33:19 UTC (rev 14586)
@@ -145,13 +145,13 @@
     SchemaSyntax, Lock, NamedValue
 )
 
-from twext.enterprise.dal.model import ProcedureCall
+from twext.enterprise.dal.model import ProcedureCall, Sequence
 from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
 from twisted.python.failure import Failure
 
 from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
 from twisted.internet.endpoints import TCP4ServerEndpoint
-from twext.enterprise.ienterprise import IQueuer
+from twext.enterprise.ienterprise import IQueuer, ORACLE_DIALECT
 from zope.interface.interface import Interface
 
 import collections
@@ -229,7 +229,7 @@
     # transaction is made aware of somehow.
     JobTable = Table(inSchema, "JOB")
 
-    JobTable.addColumn("JOB_ID", SQLType("integer", None), default=ProcedureCall("nextval", ["JOB_SEQ"]), notNull=True, primaryKey=True)
+    JobTable.addColumn("JOB_ID", SQLType("integer", None), default=Sequence(inSchema, "JOB_SEQ"), notNull=True, primaryKey=True)
     JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255), notNull=True)
     JobTable.addColumn("PRIORITY", SQLType("integer", 0), default=0)
     JobTable.addColumn("WEIGHT", SQLType("integer", 0), default=0)
@@ -579,17 +579,38 @@
         @rtype: L{JobItem}
         """
 
-        jobs = yield cls.query(
-            txn,
-            (cls.notBefore <= now).And(cls.priority >= minPriority).And(
-                (cls.assigned == None).Or(cls.overdue < now)
-            ),
-            order=(cls.assigned, cls.priority),
-            ascending=False,
-            forUpdate=True,
-            noWait=False,
-            limit=limit,
-        )
+        if txn.dialect == ORACLE_DIALECT:
+            # Oracle does not support a "for update" clause with "order by". So do the
+            # "for update" as a second query right after the first. Will need to check
+            # how this might impact concurrency in a multi-host setup.
+            jobs = yield cls.query(
+                txn,
+                (cls.notBefore <= now).And(cls.priority >= minPriority).And(
+                    (cls.assigned == None).Or(cls.overdue < now)
+                ),
+                order=(cls.assigned, cls.priority),
+                ascending=False,
+                limit=limit,
+            )
+            if jobs:
+                yield cls.query(
+                    txn,
+                    (cls.jobID.In([job.jobID for job in jobs])),
+                    forUpdate=True,
+                    noWait=False,
+                )
+        else:
+            jobs = yield cls.query(
+                txn,
+                (cls.notBefore <= now).And(cls.priority >= minPriority).And(
+                    (cls.assigned == None).Or(cls.overdue < now)
+                ),
+                order=(cls.assigned, cls.priority),
+                ascending=False,
+                forUpdate=True,
+                noWait=False,
+                limit=limit,
+            )
 
         returnValue(jobs)
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150313/60b40fec/attachment-0001.html>


More information about the calendarserver-changes mailing list