[CalendarServer-changes] [13472] twext/trunk

source_changes at macosforge.org source_changes at macosforge.org
Wed May 14 12:50:12 PDT 2014


Revision: 13472
          http://trac.calendarserver.org//changeset/13472
Author:   cdaboo at apple.com
Date:     2014-05-14 12:50:12 -0700 (Wed, 14 May 2014)
Log Message:
-----------
New jobqueue implementation.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/adbapi2.py
    twext/trunk/twext/enterprise/dal/model.py
    twext/trunk/twext/enterprise/dal/record.py
    twext/trunk/twext/enterprise/dal/syntax.py
    twext/trunk/twext/enterprise/jobqueue.py
    twext/trunk/twext/enterprise/queue.py
    twext/trunk/twext/enterprise/test/test_jobqueue.py

Property Changed:
----------------
    twext/trunk/
    twext/trunk/twext/


Property changes on: twext/trunk
___________________________________________________________________
Modified: svn:mergeinfo
   - /twext/branches/users/cdaboo/jobs:12742-12780
   + /twext/branches/users/cdaboo/jobqueue-3:13444-13471
/twext/branches/users/cdaboo/jobs:12742-12780


Property changes on: twext/trunk/twext
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalDAVTester/trunk/twext:11193-11198
/CalendarServer/branches/config-separation/twext:4379-4443
/CalendarServer/branches/egg-info-351/twext:4589-4625
/CalendarServer/branches/generic-sqlstore/twext:6167-6191
/CalendarServer/branches/new-store/twext:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/twext:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/twext:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev/twext:10180-10190,10192
/CalendarServer/branches/release/CalendarServer-5.1-dev/twext:11846
/CalendarServer/branches/users/cdaboo/batchupload-6699/twext:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/twext:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes/twext:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/twext:3628-3644
/CalendarServer/branches/users/cdaboo/fix-no-ischedule/twext:11607-11871
/CalendarServer/branches/users/cdaboo/implicituidrace/twext:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim/twext:9747-9979
/CalendarServer/branches/users/cdaboo/json/twext:11622-11912
/CalendarServer/branches/users/cdaboo/managed-attachments/twext:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591/twext:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/twext:4465-4957
/CalendarServer/branches/users/cdaboo/performance-tweaks/twext:11824-11836
/CalendarServer/branches/users/cdaboo/pods/twext:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar/twext:7085-7206
/CalendarServer/branches/users/cdaboo/pycard/twext:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twext:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/twext:5071-5105
/CalendarServer/branches/users/cdaboo/reverse-proxy-pods/twext:11875-11900
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/twext:5188-5440
/CalendarServer/branches/users/cdaboo/sharing-in-the-store/twext:11935-12016
/CalendarServer/branches/users/cdaboo/store-scheduling/twext:10876-11129
/CalendarServer/branches/users/cdaboo/timezones/twext:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging/twext:8730-8743
/CalendarServer/branches/users/gaya/sharedgroups-3/twext:11088-11204
/CalendarServer/branches/users/glyph/always-abort-txn-on-error/twext:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid/twext:8772-8805
/CalendarServer/branches/users/glyph/conn-limit/twext:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge/twext:4971-5080
/CalendarServer/branches/users/glyph/dalify/twext:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect/twext:6824-6876
/CalendarServer/branches/users/glyph/deploybuild/twext:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux/twext:10624-10635
/CalendarServer/branches/users/glyph/disable-quota/twext:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres/twext:6592-6614
/CalendarServer/branches/users/glyph/enforce-max-requests/twext:11640-11643
/CalendarServer/branches/users/glyph/hang-fix/twext:11465-11491
/CalendarServer/branches/users/glyph/imip-and-admin-html/twext:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client/twext:9054-9105
/CalendarServer/branches/users/glyph/launchd-wrapper-bis/twext:11413-11436
/CalendarServer/branches/users/glyph/linux-tests/twext:6893-6900
/CalendarServer/branches/users/glyph/log-cleanups/twext:11691-11731
/CalendarServer/branches/users/glyph/migrate-merge/twext:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes/twext:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6/twext:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/twext:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete/twext:8321-8330
/CalendarServer/branches/users/glyph/new-export/twext:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api/twext:10048-10073
/CalendarServer/branches/users/glyph/oracle/twext:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls/twext:7340-7351
/CalendarServer/branches/users/glyph/other-html/twext:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim/twext:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade/twext:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext:8571-8583
/CalendarServer/branches/users/glyph/q/twext:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing/twext:10204-10289
/CalendarServer/branches/users/glyph/quota/twext:7604-7637
/CalendarServer/branches/users/glyph/sendfdport/twext:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes/twext:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2/twext:8155-8174
/CalendarServer/branches/users/glyph/sharedpool/twext:6490-6550
/CalendarServer/branches/users/glyph/sharing-api/twext:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones/twext:8524-8535
/CalendarServer/branches/users/glyph/sql-store/twext:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop/twext:11060-11065
/CalendarServer/branches/users/glyph/subtransactions/twext:7248-7258
/CalendarServer/branches/users/glyph/table-alias/twext:8651-8664
/CalendarServer/branches/users/glyph/uidexport/twext:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked/twext:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted/twext:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize/twext:9268-9296
/CalendarServer/branches/users/glyph/warning-cleanups/twext:11347-11357
/CalendarServer/branches/users/glyph/whenNotProposed/twext:11881-11897
/CalendarServer/branches/users/glyph/xattrs-from-files/twext:7757-7769
/CalendarServer/branches/users/sagen/applepush/twext:8126-8184
/CalendarServer/branches/users/sagen/inboxitems/twext:7380-7381
/CalendarServer/branches/users/sagen/locations-resources/twext:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/twext:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events/twext:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038/twext:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/twext:4068-4075
/CalendarServer/branches/users/sagen/resources-2/twext:5084-5093
/CalendarServer/branches/users/sagen/testing/twext:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations/twext:5515-5593
/twext/branches/users/cdaboo/jobs/twext:12742-12780
   + /CalDAVTester/trunk/twext:11193-11198
/CalendarServer/branches/config-separation/twext:4379-4443
/CalendarServer/branches/egg-info-351/twext:4589-4625
/CalendarServer/branches/generic-sqlstore/twext:6167-6191
/CalendarServer/branches/new-store-no-caldavfile-2/twext:5936-5981
/CalendarServer/branches/new-store-no-caldavfile/twext:5911-5935
/CalendarServer/branches/new-store/twext:5594-5934
/CalendarServer/branches/release/CalendarServer-4.3-dev/twext:10180-10190,10192
/CalendarServer/branches/release/CalendarServer-5.1-dev/twext:11846
/CalendarServer/branches/users/cdaboo/batchupload-6699/twext:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/twext:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes/twext:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/twext:3628-3644
/CalendarServer/branches/users/cdaboo/fix-no-ischedule/twext:11607-11871
/CalendarServer/branches/users/cdaboo/implicituidrace/twext:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim/twext:9747-9979
/CalendarServer/branches/users/cdaboo/json/twext:11622-11912
/CalendarServer/branches/users/cdaboo/managed-attachments/twext:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591/twext:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/twext:4465-4957
/CalendarServer/branches/users/cdaboo/performance-tweaks/twext:11824-11836
/CalendarServer/branches/users/cdaboo/pods/twext:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar/twext:7085-7206
/CalendarServer/branches/users/cdaboo/pycard/twext:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twext:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/twext:5071-5105
/CalendarServer/branches/users/cdaboo/reverse-proxy-pods/twext:11875-11900
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/twext:5188-5440
/CalendarServer/branches/users/cdaboo/sharing-in-the-store/twext:11935-12016
/CalendarServer/branches/users/cdaboo/store-scheduling/twext:10876-11129
/CalendarServer/branches/users/cdaboo/timezones/twext:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging/twext:8730-8743
/CalendarServer/branches/users/gaya/sharedgroups-3/twext:11088-11204
/CalendarServer/branches/users/glyph/always-abort-txn-on-error/twext:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid/twext:8772-8805
/CalendarServer/branches/users/glyph/conn-limit/twext:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge/twext:4971-5080
/CalendarServer/branches/users/glyph/dalify/twext:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect/twext:6824-6876
/CalendarServer/branches/users/glyph/deploybuild/twext:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux/twext:10624-10635
/CalendarServer/branches/users/glyph/disable-quota/twext:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres/twext:6592-6614
/CalendarServer/branches/users/glyph/enforce-max-requests/twext:11640-11643
/CalendarServer/branches/users/glyph/hang-fix/twext:11465-11491
/CalendarServer/branches/users/glyph/imip-and-admin-html/twext:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client/twext:9054-9105
/CalendarServer/branches/users/glyph/launchd-wrapper-bis/twext:11413-11436
/CalendarServer/branches/users/glyph/linux-tests/twext:6893-6900
/CalendarServer/branches/users/glyph/log-cleanups/twext:11691-11731
/CalendarServer/branches/users/glyph/migrate-merge/twext:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes/twext:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6/twext:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/twext:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete/twext:8321-8330
/CalendarServer/branches/users/glyph/new-export/twext:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api/twext:10048-10073
/CalendarServer/branches/users/glyph/oracle-nulls/twext:7340-7351
/CalendarServer/branches/users/glyph/oracle/twext:7106-7155
/CalendarServer/branches/users/glyph/other-html/twext:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim/twext:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade/twext:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext:8571-8583
/CalendarServer/branches/users/glyph/q/twext:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing/twext:10204-10289
/CalendarServer/branches/users/glyph/quota/twext:7604-7637
/CalendarServer/branches/users/glyph/sendfdport/twext:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes/twext:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2/twext:8155-8174
/CalendarServer/branches/users/glyph/sharedpool/twext:6490-6550
/CalendarServer/branches/users/glyph/sharing-api/twext:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones/twext:8524-8535
/CalendarServer/branches/users/glyph/sql-store/twext:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop/twext:11060-11065
/CalendarServer/branches/users/glyph/subtransactions/twext:7248-7258
/CalendarServer/branches/users/glyph/table-alias/twext:8651-8664
/CalendarServer/branches/users/glyph/uidexport/twext:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked/twext:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted/twext:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize/twext:9268-9296
/CalendarServer/branches/users/glyph/warning-cleanups/twext:11347-11357
/CalendarServer/branches/users/glyph/whenNotProposed/twext:11881-11897
/CalendarServer/branches/users/glyph/xattrs-from-files/twext:7757-7769
/CalendarServer/branches/users/sagen/applepush/twext:8126-8184
/CalendarServer/branches/users/sagen/inboxitems/twext:7380-7381
/CalendarServer/branches/users/sagen/locations-resources-2/twext:5052-5061
/CalendarServer/branches/users/sagen/locations-resources/twext:5032-5051
/CalendarServer/branches/users/sagen/purge_old_events/twext:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038/twext:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/twext:4068-4075
/CalendarServer/branches/users/sagen/resources-2/twext:5084-5093
/CalendarServer/branches/users/sagen/testing/twext:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations/twext:5515-5593
/twext/branches/users/cdaboo/jobqueue-3/twext:13444-13471
/twext/branches/users/cdaboo/jobs/twext:12742-12780

Modified: twext/trunk/twext/enterprise/adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/adbapi2.py	2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/adbapi2.py	2014-05-14 19:50:12 UTC (rev 13472)
@@ -154,13 +154,14 @@
 
     noisy = False
 
-    def __init__(self, pool, threadHolder, connection, cursor):
+    def __init__(self, pool, threadHolder, connection, cursor, label=None):
         self._pool = pool
         self._completed = "idle"
         self._cursor = cursor
         self._connection = connection
         self._holder = threadHolder
         self._first = True
+        self._label = label
 
 
     @_forward
@@ -402,10 +403,11 @@
     """
     implements(IAsyncTransaction)
 
-    def __init__(self, pool, reason):
+    def __init__(self, pool, reason, label=None):
         self.paramstyle = pool.paramstyle
         self.dialect = pool.dialect
         self.reason = reason
+        self._label = label
 
 
     def _everything(self, *a, **kw):
@@ -430,7 +432,7 @@
 
     implements(IAsyncTransaction)
 
-    def __init__(self, pool):
+    def __init__(self, pool, label=None):
         """
         Initialize a L{_WaitingTxn} based on a L{ConnectionPool}.  (The C{pool}
         is used only to reflect C{dialect} and C{paramstyle} attributes; not
@@ -439,6 +441,7 @@
         self._spool = []
         self.paramstyle = pool.paramstyle
         self.dialect = pool.dialect
+        self._label = label
 
 
     def _enspool(self, cmd, a=(), kw={}):
@@ -593,6 +596,7 @@
         super(_SingleTxn, self).__init__()
         self._pool = pool
         self._baseTxn = baseTxn
+        self._label = self._baseTxn._label
         self._completed = False
         self._currentBlock = None
         self._blockedQueue = None
@@ -718,7 +722,8 @@
         self._unspoolOnto(_NoTxn(
             self._pool,
             "connection pool shut down while txn "
-            "waiting for database connection."
+            "waiting for database connection.",
+            label=self._label,
         ))
 
 
@@ -746,7 +751,7 @@
         self._checkComplete()
         block = CommandBlock(self)
         if self._currentBlock is None:
-            self._blockedQueue = _WaitingTxn(self._pool)
+            self._blockedQueue = _WaitingTxn(self._pool, label=self._label)
             # FIXME: test the case where it's ready immediately.
             self._checkNextBlock()
         return block
@@ -795,7 +800,7 @@
         self._singleTxn = singleTxn
         self.paramstyle = singleTxn.paramstyle
         self.dialect = singleTxn.dialect
-        self._spool = _WaitingTxn(singleTxn._pool)
+        self._spool = _WaitingTxn(singleTxn._pool, label=singleTxn._label)
         self._started = False
         self._ended = False
         self._waitingForEnd = []
@@ -1067,14 +1072,15 @@
         if self._stopping:
             # FIXME: should be wrapping a _SingleTxn around this to get
             # .commandBlock()
-            return _NoTxn(self, "txn created while DB pool shutting down")
+            return _NoTxn(self, "txn created while DB pool shutting down", label=label)
 
         if self._free:
             basetxn = self._free.pop(0)
+            basetxn._label = label
             self._busy.append(basetxn)
             txn = _SingleTxn(self, basetxn)
         else:
-            txn = _SingleTxn(self, _WaitingTxn(self))
+            txn = _SingleTxn(self, _WaitingTxn(self, label=label))
             self._waiting.append(txn)
             # FIXME/TESTME: should be len(self._busy) + len(self._finishing)
             # (free doesn't need to be considered, as it's tested above)

Modified: twext/trunk/twext/enterprise/dal/model.py
===================================================================
--- twext/trunk/twext/enterprise/dal/model.py	2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/model.py	2014-05-14 19:50:12 UTC (rev 13472)
@@ -221,12 +221,12 @@
 
     compareAttributes = 'table name'.split()
 
-    def __init__(self, table, name, type):
+    def __init__(self, table, name, type, default=NO_DEFAULT):
         _checkstr(name)
         self.table = table
         self.name = name
         self.type = type
-        self.default = NO_DEFAULT
+        self.default = default
         self.references = None
         self.deleteAction = None
 
@@ -253,14 +253,16 @@
             # Some DBs don't allow sequence as a default
             if (
                 isinstance(self.default, Sequence) and other.default == NO_DEFAULT or
-                self.default == NO_DEFAULT and isinstance(other.default, Sequence)
+                self.default == NO_DEFAULT and isinstance(other.default, Sequence) or
+                self.default is None and other.default == NO_DEFAULT or
+                self.default == NO_DEFAULT and other.default is None
             ):
                 pass
             else:
                 results.append("Table: %s, column name %s default mismatch" % (self.table.name, self.name,))
         if stringIfNone(self.references, "name") != stringIfNone(other.references, "name"):
             results.append("Table: %s, column name %s references mismatch" % (self.table.name, self.name,))
-        if self.deleteAction != other.deleteAction:
+        if stringIfNone(self.deleteAction, "") != stringIfNone(other.deleteAction, ""):
             results.append("Table: %s, column name %s delete action mismatch" % (self.table.name, self.name,))
         return results
 
@@ -403,7 +405,7 @@
         raise KeyError("no such column: %r" % (name,))
 
 
-    def addColumn(self, name, type):
+    def addColumn(self, name, type, default=NO_DEFAULT, notNull=False, primaryKey=False):
         """
         A new column was parsed for this table.
 
@@ -413,8 +415,12 @@
 
         @param type: The L{SQLType} describing the column's type.
         """
-        column = Column(self, name, type)
+        column = Column(self, name, type, default=default)
         self.columns.append(column)
+        if notNull:
+            self.tableConstraint(Constraint.NOT_NULL, [name])
+        if primaryKey:
+            self.primaryKey = [column]
         return column
 
 

Modified: twext/trunk/twext/enterprise/dal/record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/record.py	2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/record.py	2014-05-14 19:50:12 UTC (rev 13472)
@@ -352,7 +352,7 @@
 
 
     @classmethod
-    def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False):
+    def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False, limit=None):
         """
         Query the table that corresponds to C{cls}, and return instances of
         C{cls} corresponding to the rows that are returned from that table.
@@ -385,6 +385,8 @@
             kw.update(ForUpdate=True)
             if noWait:
                 kw.update(NoWait=True)
+        if limit is not None:
+            kw.update(Limit=limit)
         return cls._rowsFromQuery(
             transaction,
             Select(

Modified: twext/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/syntax.py	2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/syntax.py	2014-05-14 19:50:12 UTC (rev 13472)
@@ -1433,9 +1433,12 @@
                 stmt.append(SQLFragment(kw))
 
         if self.ForUpdate:
-            stmt.text += " for update"
-            if self.NoWait:
-                stmt.text += " nowait"
+            # FOR UPDATE not supported with sqlite - but that is probably not relevant
+            # given that sqlite does file level locking of the DB
+            if queryGenerator.dialect != SQLITE_DIALECT:
+                stmt.text += " for update"
+                if self.NoWait:
+                    stmt.text += " nowait"
 
         if self.Limit is not None:
             limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/jobqueue.py	2014-05-14 19:50:12 UTC (rev 13472)
@@ -81,7 +81,8 @@
 """
 
 from functools import wraps
-from datetime import datetime
+from datetime import datetime, timedelta
+from collections import namedtuple
 
 from zope.interface import implements
 
@@ -91,12 +92,12 @@
     inlineCallbacks, returnValue, Deferred, passthru, succeed
 )
 from twisted.internet.endpoints import TCP4ClientEndpoint
-from twisted.protocols.amp import AMP, Command, Integer, String
+from twisted.protocols.amp import AMP, Command, Integer, String, Argument
 from twisted.python.reflect import qual
-from twisted.python import log
+from twext.python.log import Logger
 
 from twext.enterprise.dal.syntax import (
-    SchemaSyntax, Lock, NamedValue, Select, Count
+    SchemaSyntax, Lock, NamedValue
 )
 
 from twext.enterprise.dal.model import ProcedureCall
@@ -109,7 +110,10 @@
 from zope.interface.interface import Interface
 from twext.enterprise.locking import NamedLock
 
+import time
 
+log = Logger()
+
 class _IJobPerformer(Interface):
     """
     An object that can perform work.
@@ -118,10 +122,10 @@
     (in the worst case) pass from worker->controller->controller->worker.
     """
 
-    def performJob(jobID):  # @NoSelf
+    def performJob(job):  # @NoSelf
         """
-        @param jobID: The primary key identifier of the given job.
-        @type jobID: L{int}
+        @param job: Details about the job to perform.
+        @type job: L{JobDescriptor}
 
         @return: a L{Deferred} firing with an empty dictionary when the work is
             complete.
@@ -180,17 +184,13 @@
     # transaction is made aware of somehow.
     JobTable = Table(inSchema, "JOB")
 
-    JobTable.addColumn("JOB_ID", SQLType("integer", None)).setDefaultValue(
-        ProcedureCall("nextval", ["JOB_SEQ"])
-    )
-    JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255))
-    JobTable.addColumn("PRIORITY", SQLType("integer", 0))
-    JobTable.addColumn("WEIGHT", SQLType("integer", 0))
-    JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None))
-    JobTable.addColumn("NOT_AFTER", SQLType("timestamp", None))
-    for column in ("JOB_ID", "WORK_TYPE"):
-        JobTable.tableConstraint(Constraint.NOT_NULL, [column])
-    JobTable.primaryKey = [JobTable.columnNamed("JOB_ID"), ]
+    JobTable.addColumn("JOB_ID", SQLType("integer", None), default=ProcedureCall("nextval", ["JOB_SEQ"]), notNull=True, primaryKey=True)
+    JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255), notNull=True)
+    JobTable.addColumn("PRIORITY", SQLType("integer", 0), default=0)
+    JobTable.addColumn("WEIGHT", SQLType("integer", 0), default=0)
+    JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None), notNull=True)
+    JobTable.addColumn("ASSIGNED", SQLType("timestamp", None), default=None)
+    JobTable.addColumn("FAILED", SQLType("integer", 0), default=0)
 
     return inSchema
 
@@ -199,7 +199,7 @@
 
 
 @inlineCallbacks
-def inTransaction(transactionCreator, operation, label="jobqueue.inTransaction"):
+def inTransaction(transactionCreator, operation, label="jobqueue.inTransaction", **kwargs):
     """
     Perform the given operation in a transaction, committing or aborting as
     required.
@@ -218,7 +218,7 @@
     """
     txn = transactionCreator(label=label)
     try:
-        result = yield operation(txn)
+        result = yield operation(txn, **kwargs)
     except:
         f = Failure()
         yield txn.abort()
@@ -270,6 +270,16 @@
 
 
 
+class JobFailedError(Exception):
+    """
+    A job failed to run - we need to be smart about clean up.
+    """
+
+    def __init__(self, ex):
+        self._ex = ex
+
+
+
 class JobItem(Record, fromTable(JobInfoSchema.JOB)):
     """
     An item in the job table. This is typically not directly used by code
@@ -277,6 +287,10 @@
     associated with work items.
     """
 
+    def descriptor(self):
+        return JobDescriptor(self.jobID, self.weight)
+
+
     @inlineCallbacks
     def workItem(self):
         workItemClass = WorkItem.forTableName(self.workType)
@@ -286,7 +300,183 @@
         returnValue(workItems[0] if len(workItems) == 1 else None)
 
 
+    def assign(self, now):
+        """
+        Mark this job as assigned to a worker by setting the assigned column to the current,
+        or provided, timestamp.
+
+        @param now: current timestamp
+        @type now: L{datetime.datetime}
+        @param when: explicitly set the assigned time - typically only used in tests
+        @type when: L{datetime.datetime} or L{None}
+        """
+        return self.update(assigned=now)
+
+
+    def failedToRun(self):
+        """
+        The attempt to run the job failed. Leave it in the queue, but mark it
+        as unassigned, bump the failure count and set to run at some point in
+        the future.
+        """
+        return self.update(
+            assigned=None,
+            failed=self.failed + 1,
+            notBefore=datetime.utcnow() + timedelta(seconds=60)
+        )
+
+
+    @classmethod
     @inlineCallbacks
+    def ultimatelyPerform(cls, txnFactory, jobID):
+        """
+        Eventually, after routing the job to the appropriate place, somebody
+        actually has to I{do} it.
+
+        @param txnFactory: a 0- or 1-argument callable that creates an
+            L{IAsyncTransaction}
+        @type txnFactory: L{callable}
+
+        @param jobID: the ID of the job to be performed
+        @type jobID: L{int}
+
+        @return: a L{Deferred} which fires with C{None} when the job has been
+            performed, or fails if the job can't be performed.
+        """
+
+        t = time.time()
+        def _tm():
+            return "{:.3f}".format(1000 * (time.time() - t))
+        def _overtm(nb):
+            return "{:.0f}".format(1000 * (t - astimestamp(nb)))
+
+        log.debug("JobItem: starting to run {jobid}".format(jobid=jobID))
+        txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
+        try:
+            job = yield cls.load(txn, jobID)
+            if hasattr(txn, "_label"):
+                txn._label = "{} <{}>".format(txn._label, job.workType)
+            log.debug("JobItem: loaded {jobid} {work} t={tm}".format(
+                jobid=jobID,
+                work=job.workType,
+                tm=_tm())
+            )
+            yield job.run()
+
+        except NoSuchRecord:
+            # The record has already been removed
+            yield txn.commit()
+            log.debug("JobItem: already removed {jobid} t={tm}".format(jobid=jobID, tm=_tm()))
+
+        except JobFailedError:
+            # Job failed: abort with cleanup, but pretend this method succeeded
+            def _cleanUp():
+                @inlineCallbacks
+                def _cleanUp2(txn2):
+                    job = yield cls.load(txn2, jobID)
+                    log.debug("JobItem: marking as failed {jobid}, failure count: {count} t={tm}".format(jobid=jobID, count=job.failed + 1, tm=_tm()))
+                    yield job.failedToRun()
+                return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
+            txn.postAbort(_cleanUp)
+            yield txn.abort()
+            log.debug("JobItem: failed {jobid} {work} t={tm}".format(
+                jobid=jobID,
+                work=job.workType,
+                tm=_tm()
+            ))
+
+        except:
+            f = Failure()
+            log.error("JobItem: Unknown exception for {jobid} failed t={tm} {exc}".format(
+                jobid=jobID,
+                tm=_tm(),
+                exc=f,
+            ))
+            yield txn.abort()
+            returnValue(f)
+
+        else:
+            yield txn.commit()
+            log.debug("JobItem: completed {jobid} {work} t={tm} over={over}".format(
+                jobid=jobID,
+                work=job.workType,
+                tm=_tm(),
+                over=_overtm(job.notBefore)
+            ))
+
+        returnValue(None)
+
+
+    @classmethod
+    @inlineCallbacks
+    def nextjob(cls, txn, now, minPriority, overdue):
+        """
+        Find the next available job based on priority, also return any that are overdue. This
+        method relies on there being a nextjob() SQL stored procedure to enable skipping over
+        items which are row locked to help avoid contention when multiple nodes are operating
+        on the job queue simultaneously.
+
+        @param txn: the transaction to use
+        @type txn: L{IAsyncTransaction}
+        @param now: current timestamp
+        @type now: L{datetime.datetime}
+        @param minPriority: lowest priority level to query for
+        @type minPriority: L{int}
+        @param overdue: how long before an assigned item is considered overdue
+        @type overdue: L{datetime.datetime}
+
+        @return: the job record
+        @rtype: L{JobItem}
+        """
+
+        jobs = yield cls.nextjobs(txn, now, minPriority, overdue, limit=1)
+
+        # Must only be one or zero
+        if jobs and len(jobs) > 1:
+            raise AssertionError("next_job() returned more than one row")
+
+        returnValue(jobs[0] if jobs else None)
+
+
+    @classmethod
+    @inlineCallbacks
+    def nextjobs(cls, txn, now, minPriority, overdue, limit=1):
+        """
+        Find the next available job based on priority, also return any that are overdue. This
+        method relies on there being a nextjob() SQL stored procedure to enable skipping over
+        items which are row locked to help avoid contention when multiple nodes are operating
+        on the job queue simultaneously.
+
+        @param txn: the transaction to use
+        @type txn: L{IAsyncTransaction}
+        @param now: current timestamp
+        @type now: L{datetime.datetime}
+        @param minPriority: lowest priority level to query for
+        @type minPriority: L{int}
+        @param overdue: how long before an assigned item is considered overdue
+        @type overdue: L{datetime.datetime}
+        @param limit: limit on number of jobs to return
+        @type limit: L{int}
+
+        @return: the job record
+        @rtype: L{JobItem}
+        """
+
+        jobs = yield cls.query(
+            txn,
+            (cls.notBefore <= now).And
+            (((cls.priority >= minPriority).And(cls.assigned == None)).Or(cls.assigned < overdue)),
+            order=(cls.assigned, cls.priority),
+            ascending=False,
+            forUpdate=True,
+            noWait=False,
+            limit=limit,
+        )
+
+        returnValue(jobs)
+
+
+    @inlineCallbacks
     def run(self):
         """
         Run this job item by finding the appropriate work item class and
@@ -304,7 +494,15 @@
                 # The record has already been removed
                 pass
             else:
-                yield workItem.doWork()
+                try:
+                    yield workItem.doWork()
+                except Exception as e:
+                    log.error("JobItem: {jobid}, WorkItem: {workid} failed: {exc}".format(
+                        jobid=self.jobID,
+                        workid=workItem.workID,
+                        exc=e,
+                    ))
+                    raise JobFailedError(e)
 
         try:
             # Once the work is done we delete ourselves
@@ -325,22 +523,48 @@
 
     @classmethod
     @inlineCallbacks
+    def waitEmpty(cls, txnCreator, reactor, timeout):
+        """
+        Wait for the job queue to drain. Only use this in tests
+        that need to wait for results from jobs.
+        """
+        t = time.time()
+        while True:
+            work = yield inTransaction(txnCreator, cls.all)
+            if not work:
+                break
+            if time.time() - t > timeout:
+                returnValue(False)
+            d = Deferred()
+            reactor.callLater(0.1, lambda : d.callback(None))
+            yield d
+
+        returnValue(True)
+
+
+    @classmethod
+    @inlineCallbacks
     def histogram(cls, txn):
         """
         Generate a histogram of work items currently in the queue.
         """
-        jb = JobInfoSchema.JOB
-        rows = yield Select(
-            [jb.WORK_TYPE, Count(jb.WORK_TYPE)],
-            From=jb,
-            GroupBy=jb.WORK_TYPE
-        ).on(txn)
-        results = dict(rows)
-
-        # Add in empty data for other work
+        results = {}
+        now = datetime.utcnow()
         for workType in cls.workTypes():
-            results.setdefault(workType.table.model.name, 0)
+            results.setdefault(workType.table.model.name, [0, 0, 0, 0])
 
+        jobs = yield cls.all(txn)
+
+        for job in jobs:
+            r = results[job.workType]
+            r[0] += 1
+            if job.assigned is not None:
+                r[1] += 1
+            if job.failed:
+                r[2] += 1
+            if job.assigned is None and job.notBefore < now:
+                r[3] += 1
+
         returnValue(results)
 
 
@@ -349,14 +573,41 @@
         return len(cls.workTypes())
 
 
+JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight"])
 
+class JobDescriptorArg(Argument):
+    """
+    Comma-separated.
+    """
+    def toString(self, inObject):
+        return ",".join(map(str, inObject))
+
+
+    def fromString(self, inString):
+        return JobDescriptor(*map(int, inString.split(",")))
+
+
 # Priority for work - used to order work items in the job queue
 WORK_PRIORITY_LOW = 1
 WORK_PRIORITY_MEDIUM = 2
 WORK_PRIORITY_HIGH = 3
 
+# Weight for work - used to schedule workers based on capacity
+WORK_WEIGHT_0 = 0
+WORK_WEIGHT_1 = 1
+WORK_WEIGHT_2 = 2
+WORK_WEIGHT_3 = 3
+WORK_WEIGHT_4 = 4
+WORK_WEIGHT_5 = 5
+WORK_WEIGHT_6 = 6
+WORK_WEIGHT_7 = 7
+WORK_WEIGHT_8 = 8
+WORK_WEIGHT_9 = 9
+WORK_WEIGHT_10 = 10
+WORK_WEIGHT_CAPACITY = 10   # Total amount of work any one worker can manage
 
 
+
 class WorkItem(Record):
     """
     A L{WorkItem} is an item of work which may be stored in a database, then
@@ -450,7 +701,8 @@
     """
 
     group = None
-    priority = WORK_PRIORITY_LOW    # Default - subclasses should override
+    default_priority = WORK_PRIORITY_LOW    # Default - subclasses should override
+    default_weight = WORK_WEIGHT_5          # Default - subclasses should override
 
 
     @classmethod
@@ -469,19 +721,21 @@
         }
 
         def _transferArg(name):
-            if name in kwargs:
-                jobargs[name] = kwargs[name]
-                del kwargs[name]
+            arg = kwargs.pop(name, None)
+            if arg is not None:
+                jobargs[name] = arg
+            elif hasattr(cls, "default_{}".format(name)):
+                jobargs[name] = getattr(cls, "default_{}".format(name))
 
         _transferArg("jobID")
-        if "priority" in kwargs:
-            _transferArg("priority")
-        else:
-            jobargs["priority"] = cls.priority
+        _transferArg("priority")
         _transferArg("weight")
         _transferArg("notBefore")
-        _transferArg("notAfter")
 
+        # Always need a notBefore
+        if "notBefore" not in jobargs:
+            jobargs["notBefore"] = datetime.utcnow()
+
         job = yield JobItem.create(transaction, **jobargs)
 
         kwargs["jobID"] = job.jobID
@@ -540,7 +794,7 @@
     """
 
     arguments = [
-        ("jobID", Integer()),
+        ("job", JobDescriptorArg()),
     ]
     response = []
 
@@ -665,7 +919,7 @@
         return self._reportedLoad + self._bonusLoad
 
 
-    def performJob(self, jobID):
+    def performJob(self, job):
         """
         A L{local worker connection <ConnectionFromWorker>} is asking this
         specific peer node-controller process to perform a job, having
@@ -673,12 +927,12 @@
 
         @see: L{_IJobPerformer.performJob}
         """
-        d = self.callRemote(PerformJob, jobID=jobID)
-        self._bonusLoad += 1
+        d = self.callRemote(PerformJob, job=job)
+        self._bonusLoad += job.weight
 
         @d.addBoth
         def performed(result):
-            self._bonusLoad -= 1
+            self._bonusLoad -= job.weight
             return result
 
         @d.addCallback
@@ -689,17 +943,17 @@
 
 
     @PerformJob.responder
-    def dispatchToWorker(self, jobID):
+    def dispatchToWorker(self, job):
         """
         A remote peer node has asked this node to do a job; dispatch it to
         a local worker on this node.
 
-        @param jobID: the identifier of the job.
-        @type jobID: L{int}
+        @param job: the details of the job.
+        @type job: L{JobDescriptor}
 
         @return: a L{Deferred} that fires when the work has been completed.
         """
-        d = self.peerPool.performJobForPeer(jobID)
+        d = self.peerPool.performJobForPeer(job)
         d.addCallback(lambda ignored: {})
         return d
 
@@ -721,7 +975,7 @@
     """
     implements(_IJobPerformer)
 
-    def __init__(self, maximumLoadPerWorker=5):
+    def __init__(self, maximumLoadPerWorker=WORK_WEIGHT_CAPACITY):
         self.workers = []
         self.maximumLoadPerWorker = maximumLoadPerWorker
 
@@ -753,6 +1007,26 @@
         return False
 
 
+    def loadLevel(self):
+        """
+        Return the overall load of this worker connection pool have as a percentage of
+        total capacity.
+
+        @return: current load percentage.
+        @rtype: L{int}
+        """
+        current = sum(worker.currentLoad for worker in self.workers)
+        total = len(self.workers) * self.maximumLoadPerWorker
+        return ((current * 100) / total) if total else 0
+
+
+    def eachWorkerLoad(self):
+        """
+        The load of all currently connected workers.
+        """
+        return [(worker.currentLoad, worker.totalCompleted) for worker in self.workers]
+
+
     def allWorkerLoad(self):
         """
         The total load of all currently connected workers.
@@ -771,20 +1045,20 @@
         return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
 
 
-    def performJob(self, jobID):
+    def performJob(self, job):
         """
         Select a local worker that is idle enough to perform the given job,
         then ask them to perform it.
 
-        @param jobID: The primary key identifier of the given job.
-        @type jobID: L{int}
+        @param job: The details of the given job.
+        @type job: L{JobDescriptor}
 
         @return: a L{Deferred} firing with an empty dictionary when the work is
             complete.
         @rtype: L{Deferred} firing L{dict}
         """
         preferredWorker = self._selectLowestLoadWorker()
-        result = preferredWorker.performJob(jobID)
+        result = preferredWorker.performJob(job)
         return result
 
 
@@ -799,6 +1073,7 @@
         super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
         self.peerPool = peerPool
         self._load = 0
+        self._completed = 0
 
 
     @property
@@ -809,6 +1084,14 @@
         return self._load
 
 
+    @property
+    def totalCompleted(self):
+        """
+        What is the current load of this worker?
+        """
+        return self._completed
+
+
     def startReceivingBoxes(self, sender):
         """
         Start receiving AMP boxes from the peer.  Initialize all necessary
@@ -829,19 +1112,20 @@
 
 
     @PerformJob.responder
-    def performJob(self, jobID):
+    def performJob(self, job):
         """
         Dispatch a job to this worker.
 
         @see: The responder for this should always be
             L{ConnectionFromController.actuallyReallyExecuteJobHere}.
         """
-        d = self.callRemote(PerformJob, jobID=jobID)
-        self._load += 1
+        d = self.callRemote(PerformJob, job=job)
+        self._load += job.weight
 
         @d.addBoth
         def f(result):
-            self._load -= 1
+            self._load -= job.weight
+            self._completed += 1
             return result
 
         return d
@@ -883,11 +1167,11 @@
         return self
 
 
-    def performJob(self, jobID):
+    def performJob(self, job):
         """
         Ask the controller to perform a job on our behalf.
         """
-        return self.callRemote(PerformJob, jobID=jobID)
+        return self.callRemote(PerformJob, job=job)
 
 
     @inlineCallbacks
@@ -914,48 +1198,18 @@
 
 
     @PerformJob.responder
-    def actuallyReallyExecuteJobHere(self, jobID):
+    def actuallyReallyExecuteJobHere(self, job):
         """
         This is where it's time to actually do the job.  The controller
         process has instructed this worker to do it; so, look up the data in
         the row, and do it.
         """
-        d = ultimatelyPerform(self.transactionFactory, jobID)
+        d = JobItem.ultimatelyPerform(self.transactionFactory, job.jobID)
         d.addCallback(lambda ignored: {})
         return d
 
 
 
-def ultimatelyPerform(txnFactory, jobID):
-    """
-    Eventually, after routing the job to the appropriate place, somebody
-    actually has to I{do} it.
-
-    @param txnFactory: a 0- or 1-argument callable that creates an
-        L{IAsyncTransaction}
-    @type txnFactory: L{callable}
-
-    @param jobID: the ID of the job to be performed
-    @type jobID: L{int}
-
-    @return: a L{Deferred} which fires with C{None} when the job has been
-        performed, or fails if the job can't be performed.
-    """
-    @inlineCallbacks
-    def runJob(txn):
-        try:
-            job = yield JobItem.load(txn, jobID)
-            if hasattr(txn, "_label"):
-                txn._label = "{} <{}>".format(txn._label, job.workType)
-            yield job.run()
-        except NoSuchRecord:
-            # The record has already been removed
-            pass
-
-    return inTransaction(txnFactory, runJob, label="ultimatelyPerform: {}".format(jobID))
-
-
-
 class LocalPerformer(object):
     """
     Implementor of C{performJob} that does its work in the local process,
@@ -970,11 +1224,11 @@
         self.txnFactory = txnFactory
 
 
-    def performJob(self, jobID):
+    def performJob(self, job):
         """
         Perform the given job right now.
         """
-        return ultimatelyPerform(self.txnFactory, jobID)
+        return JobItem.ultimatelyPerform(self.txnFactory, job.jobID)
 
 
 
@@ -1049,7 +1303,6 @@
         self.txn = txn
         self.workItemType = workItemType
         self.kw = kw
-        self._whenExecuted = Deferred()
         self._whenCommitted = Deferred()
         self.workItem = None
 
@@ -1073,60 +1326,11 @@
             def whenDone():
                 self._whenCommitted.callback(self)
 
-                def maybeLater():
-                    performer = self._chooser.choosePerformer()
-
-                    @passthru(
-                        performer.performJob(created.jobID).addCallback
-                    )
-                    def performed(result):
-                        self._whenExecuted.callback(self)
-
-                    @performed.addErrback
-                    def notPerformed(why):
-                        self._whenExecuted.errback(why)
-
-                reactor = self._chooser.reactor
-
-                if created.job.notBefore is not None:
-                    when = max(
-                        0,
-                        astimestamp(created.job.notBefore) - reactor.seconds()
-                    )
-                else:
-                    when = 0
-                # TODO: Track the returned DelayedCall so it can be stopped
-                # when the service stops.
-                self._chooser.reactor.callLater(when, maybeLater)
-
             @self.txn.postAbort
             def whenFailed():
                 self._whenCommitted.errback(TransactionFailed)
 
 
-    def whenExecuted(self):
-        """
-        Let the caller know when the proposed work has been fully executed.
-
-        @note: The L{Deferred} returned by C{whenExecuted} should be used with
-            extreme caution.  If an application decides to do any
-            database-persistent work as a result of this L{Deferred} firing,
-            that work I{may be lost} as a result of a service being normally
-            shut down between the time that the work is scheduled and the time
-            that it is executed.  So, the only things that should be added as
-            callbacks to this L{Deferred} are those which are ephemeral, in
-            memory, and reflect only presentation state associated with the
-            user's perception of the completion of work, not logical chains of
-            work which need to be completed in sequence; those should all be
-            completed within the transaction of the L{WorkItem.doWork} that
-            gets executed.
-
-        @return: a L{Deferred} that fires with this L{WorkProposal} when the
-            work has been completed remotely.
-        """
-        return _cloneDeferred(self._whenExecuted)
-
-
     def whenProposed(self):
         """
         Let the caller know when the work has been proposed; i.e. when the work
@@ -1221,14 +1425,14 @@
         than waiting for it to be requested.  By default, 10 minutes.
     @type queueProcessTimeout: L{float} (in seconds)
 
-    @ivar queueDelayedProcessInterval: The amount of time between database
+    @ivar queuePollInterval: The amount of time between database
         pings, i.e. checks for over-due queue items that might have been
         orphaned by a controller process that died mid-transaction.  This is
         how often the shared database should be pinged by I{all} nodes (i.e.,
         all controller processes, or each instance of L{PeerConnectionPool});
         each individual node will ping commensurately less often as more nodes
         join the database.
-    @type queueDelayedProcessInterval: L{float} (in seconds)
+    @type queuePollInterval: L{float} (in seconds)
 
     @ivar reactor: The reactor used for scheduling timed events.
     @type reactor: L{IReactorTime} provider.
@@ -1243,9 +1447,13 @@
     getfqdn = staticmethod(getfqdn)
     getpid = staticmethod(getpid)
 
-    queueProcessTimeout = (10.0 * 60.0)
-    queueDelayedProcessInterval = (60.0)
+    queuePollInterval = 0.1             # How often to poll for new work
+    queueOrphanTimeout = 5.0 * 60.0     # How long before assigned work is possibly orphaned
 
+    overloadLevel = 95          # Percentage load level above which job queue processing stops
+    highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
+    mediumPriorityLevel = 50    # Percentage load level above which high and medium priority jobs are processed
+
     def __init__(self, reactor, transactionFactory, ampPort):
         """
         Initialize a L{PeerConnectionPool}.
@@ -1324,13 +1532,13 @@
             return LocalPerformer(self.transactionFactory)
 
 
-    def performJobForPeer(self, jobID):
+    def performJobForPeer(self, job):
         """
         A peer has requested us to perform a job; choose a job performer
         local to this node, and then execute it.
         """
         performer = self.choosePerformer(onlyLocally=True)
-        return performer.performJob(jobID)
+        return performer.performJob(job)
 
 
     def totalNumberOfNodes(self):
@@ -1362,67 +1570,113 @@
         return self._lastSeenNodeIndex
 
 
-    def _periodicLostWorkCheck(self):
+    @inlineCallbacks
+    def _workCheck(self):
         """
-        Periodically, every node controller has to check to make sure that work
-        hasn't been dropped on the floor by someone.  In order to do that it
-        queries each work-item table.
+        Every node controller will periodically check for any new work to do, and dispatch
+        as much as possible given the current load.
         """
-        @inlineCallbacks
-        def workCheck(txn):
-            if self.thisProcess:
-                nodes = [(node.hostname, node.port) for node in
-                         (yield self.activeNodes(txn))]
-                nodes.sort()
-                self._lastSeenTotalNodes = len(nodes)
-                self._lastSeenNodeIndex = nodes.index(
-                    (self.thisProcess.hostname, self.thisProcess.port)
-                )
+        # FIXME: not sure if we should do this node check on every work poll
+#        if self.thisProcess:
+#            nodes = [(node.hostname, node.port) for node in
+#                     (yield self.activeNodes(txn))]
+#            nodes.sort()
+#            self._lastSeenTotalNodes = len(nodes)
+#            self._lastSeenNodeIndex = nodes.index(
+#                (self.thisProcess.hostname, self.thisProcess.port)
+#            )
 
+        loopCounter = 0
+        while True:
+            if not self.running:
+                returnValue(None)
+
+            # Check the overall service load - if overloaded skip this poll cycle.
+            # FIXME: need to include capacity of other nodes. For now we only check
+            # our own capacity and stop processing if too busy. Other nodes that
+            # are not busy will pick up work.
+            level = self.workerPool.loadLevel()
+
+            # Check overload level first
+            if level > self.overloadLevel:
+                log.error("workCheck: jobqueue is overloaded")
+                break
+            elif level > self.highPriorityLevel:
+                log.debug("workCheck: jobqueue high priority only")
+                minPriority = WORK_PRIORITY_HIGH
+            elif level > self.mediumPriorityLevel:
+                log.debug("workCheck: jobqueue high/medium priority only")
+                minPriority = WORK_PRIORITY_MEDIUM
+            else:
+                minPriority = WORK_PRIORITY_LOW
+
+            # Determine what the timestamp cutoff
             # TODO: here is where we should iterate over the unlocked items
             # that are due, ordered by priority, notBefore etc
-            tooLate = datetime.utcfromtimestamp(
-                self.reactor.seconds() - self.queueProcessTimeout
-            )
-            overdueItems = (yield JobItem.query(
-                txn, (JobItem.notBefore < tooLate))
-            )
-            for overdueItem in overdueItems:
-                peer = self.choosePerformer()
+            nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
+            orphanTime = nowTime - timedelta(seconds=self.queueOrphanTimeout)
+
+            txn = self.transactionFactory(label="jobqueue.workCheck")
+            try:
+                nextJob = yield JobItem.nextjob(txn, nowTime, minPriority, orphanTime)
+                if nextJob is None:
+                    break
+
+                # If it is now assigned but not earlier than the orphan time, ignore as this may have
+                # been returned after another txn just assigned it
+                if nextJob.assigned is not None and nextJob.assigned > orphanTime:
+                    continue
+
+                # Always assign as a new job even when it is an orphan
+                yield nextJob.assign(nowTime)
+                loopCounter += 1
+
+            except Exception as e:
+                log.error("Failed to pick a new job, {exc}", exc=e)
+                yield txn.abort()
+                txn = None
+                nextJob = None
+            finally:
+                if txn:
+                    yield txn.commit()
+
+            if nextJob is not None:
+                peer = self.choosePerformer(onlyLocally=True)
                 try:
-                    yield peer.performJob(overdueItem.jobID)
+                    # Send the job over but DO NOT block on the response - that will ensure
+                    # we can do stuff in parallel
+                    peer.performJob(nextJob.descriptor())
                 except Exception as e:
-                    log.err("Failed to perform periodic lost job for jobid={}, {}".format(overdueItem.jobID, e))
+                    log.error("Failed to perform job for jobid={jobid}, {exc}", jobid=nextJob.jobID, exc=e)
 
-        return inTransaction(self.transactionFactory, workCheck, label="periodicLostWorkCheck")
+        if loopCounter:
+            log.debug("workCheck: processed {} jobs in one loop".format(loopCounter))
 
     _currentWorkDeferred = None
-    _lostWorkCheckCall = None
+    _workCheckCall = None
 
-    def _lostWorkCheckLoop(self):
+    def _workCheckLoop(self):
         """
         While the service is running, keep checking for any overdue / lost work
         items and re-submit them to the cluster for processing.  Space out
         those checks in time based on the size of the cluster.
         """
-        self._lostWorkCheckCall = None
+        self._workCheckCall = None
 
+        if not self.running:
+            return
+
         @passthru(
-            self._periodicLostWorkCheck().addErrback(log.err).addCallback
+            self._workCheck().addErrback(log.error).addCallback
         )
         def scheduleNext(result):
+            # TODO: if multiple nodes are present, see if we can
+            # stagger the polling to avoid contention.
             self._currentWorkDeferred = None
             if not self.running:
                 return
-            index = self.nodeIndex()
-            now = self.reactor.seconds()
-
-            interval = self.queueDelayedProcessInterval
-            count = self.totalNumberOfNodes()
-            when = (now - (now % interval)) + (interval * (count + index))
-            delay = when - now
-            self._lostWorkCheckCall = self.reactor.callLater(
-                delay, self._lostWorkCheckLoop
+            self._workCheckCall = self.reactor.callLater(
+                self.queuePollInterval, self._workCheckLoop
             )
 
         self._currentWorkDeferred = scheduleNext
@@ -1432,32 +1686,37 @@
         """
         Register ourselves with the database and establish all outgoing
         connections to other servers in the cluster.
+
+        @param waitForService: an optional L{Deferred} that will be called back when
+            the service startup is done.
+        @type waitForService: L{Deferred} or L{None}
         """
         @inlineCallbacks
         def startup(txn):
-            endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
-            # If this fails, the failure mode is going to be ugly, just like
-            # all conflicted-port failures.  But, at least it won't proceed.
-            self._listeningPort = yield endpoint.listen(self.peerFactory())
-            self.ampPort = self._listeningPort.getHost().port
-            yield Lock.exclusive(NodeInfo.table).on(txn)
-            nodes = yield self.activeNodes(txn)
-            selves = [node for node in nodes
-                      if ((node.hostname == self.hostname) and
-                          (node.port == self.ampPort))]
-            if selves:
-                self.thisProcess = selves[0]
-                nodes.remove(self.thisProcess)
-                yield self.thisProcess.update(pid=self.pid,
-                                              time=datetime.now())
-            else:
-                self.thisProcess = yield NodeInfo.create(
-                    txn, hostname=self.hostname, port=self.ampPort,
-                    pid=self.pid, time=datetime.now()
-                )
+            if self.ampPort is not None:
+                endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+                # If this fails, the failure mode is going to be ugly, just like
+                # all conflicted-port failures.  But, at least it won't proceed.
+                self._listeningPort = yield endpoint.listen(self.peerFactory())
+                self.ampPort = self._listeningPort.getHost().port
+                yield Lock.exclusive(NodeInfo.table).on(txn)
+                nodes = yield self.activeNodes(txn)
+                selves = [node for node in nodes
+                          if ((node.hostname == self.hostname) and
+                              (node.port == self.ampPort))]
+                if selves:
+                    self.thisProcess = selves[0]
+                    nodes.remove(self.thisProcess)
+                    yield self.thisProcess.update(pid=self.pid,
+                                                  time=datetime.now())
+                else:
+                    self.thisProcess = yield NodeInfo.create(
+                        txn, hostname=self.hostname, port=self.ampPort,
+                        pid=self.pid, time=datetime.now()
+                    )
 
-            for node in nodes:
-                self._startConnectingTo(node)
+                for node in nodes:
+                    self._startConnectingTo(node)
 
         self._startingUp = inTransaction(self.transactionFactory, startup, label="PeerConnectionPool.startService")
 
@@ -1465,7 +1724,7 @@
         def done(result):
             self._startingUp = None
             super(PeerConnectionPool, self).startService()
-            self._lostWorkCheckLoop()
+            self._workCheckLoop()
             return result
 
 
@@ -1474,20 +1733,30 @@
         """
         Stop this service, terminating any incoming or outgoing connections.
         """
-        yield super(PeerConnectionPool, self).stopService()
 
+        # If in the process of starting up, always wait for startup to complete before
+        # stopping,.
         if self._startingUp is not None:
-            yield self._startingUp
+            d = Deferred()
+            self._startingUp.addBoth(lambda result: d.callback(None))
+            yield d
 
+        yield super(PeerConnectionPool, self).stopService()
+
         if self._listeningPort is not None:
             yield self._listeningPort.stopListening()
 
-        if self._lostWorkCheckCall is not None:
-            self._lostWorkCheckCall.cancel()
+        if self._workCheckCall is not None:
+            self._workCheckCall.cancel()
 
         if self._currentWorkDeferred is not None:
-            yield self._currentWorkDeferred
+            self._currentWorkDeferred.cancel()
 
+        for connector in self._connectingToPeer:
+            d = Deferred()
+            connector.addBoth(lambda result: d.callback(None))
+            yield d
+
         for peer in self.peers:
             peer.transport.abortConnection()
 
@@ -1510,6 +1779,7 @@
             # self.mappedPeers.pop((host, port)).transport.loseConnection()
         self.mappedPeers[(host, port)] = peer
 
+    _connectingToPeer = []
 
     def _startConnectingTo(self, node):
         """
@@ -1519,8 +1789,10 @@
         @type node: L{NodeInfo}
         """
         connected = node.endpoint(self.reactor).connect(self.peerFactory())
+        self._connectingToPeer.append(connected)
 
         def whenConnected(proto):
+            self._connectingToPeer.remove(connected)
             self.mapPeer(node.hostname, node.port, proto)
             proto.callRemote(
                 IdentifyNode,
@@ -1529,9 +1801,11 @@
             ).addErrback(noted, "identify")
 
         def noted(err, x="connect"):
-            log.msg(
-                "Could not {0} to cluster peer {1} because {2}"
-                .format(x, node, str(err.value))
+            if x == "connect":
+                self._connectingToPeer.remove(connected)
+            log.error(
+                "Could not {action} to cluster peer {node} because {reason}",
+                action=x, node=node, reason=str(err.value),
             )
 
         connected.addCallbacks(whenConnected, noted)
@@ -1594,7 +1868,7 @@
     """
     implements(_IJobPerformer)
 
-    def performJob(self, jobID):
+    def performJob(self, job):
         """
         Don't perform job.
         """

Modified: twext/trunk/twext/enterprise/queue.py
===================================================================
--- twext/trunk/twext/enterprise/queue.py	2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/queue.py	2014-05-14 19:50:12 UTC (rev 13472)
@@ -1323,6 +1323,9 @@
         """
         self._lostWorkCheckCall = None
 
+        if not self.running:
+            return
+
         @passthru(
             self._periodicLostWorkCheck().addErrback(log.err).addCallback
         )
@@ -1390,11 +1393,15 @@
         """
         Stop this service, terminating any incoming or outgoing connections.
         """
+        # If in the process of starting up, always wait for startup to complete before
+        # stopping,.
+        if self._startingUp is not None:
+            d = Deferred()
+            self._startingUp.addBoth(lambda result: d.callback(None))
+            yield d
+
         yield super(PeerConnectionPool, self).stopService()
 
-        if self._startingUp is not None:
-            yield self._startingUp
-
         if self._listeningPort is not None:
             yield self._listeningPort.stopListening()
 
@@ -1402,8 +1409,13 @@
             self._lostWorkCheckCall.cancel()
 
         if self._currentWorkDeferred is not None:
-            yield self._currentWorkDeferred
+            self._currentWorkDeferred.cancel()
 
+        for connector in self._connectingToPeer:
+            d = Deferred()
+            connector.addBoth(lambda result: d.callback(None))
+            yield d
+
         for peer in self.peers:
             peer.transport.abortConnection()
 
@@ -1426,6 +1438,7 @@
             # self.mappedPeers.pop((host, port)).transport.loseConnection()
         self.mappedPeers[(host, port)] = peer
 
+    _connectingToPeer = []
 
     def _startConnectingTo(self, node):
         """
@@ -1435,8 +1448,10 @@
         @type node: L{NodeInfo}
         """
         connected = node.endpoint(self.reactor).connect(self.peerFactory())
+        self._connectingToPeer.append(connected)
 
         def whenConnected(proto):
+            self._connectingToPeer.remove(connected)
             self.mapPeer(node.hostname, node.port, proto)
             proto.callRemote(
                 IdentifyNode,
@@ -1445,6 +1460,8 @@
             ).addErrback(noted, "identify")
 
         def noted(err, x="connect"):
+            if x == "connect":
+                self._connectingToPeer.remove(connected)
             log.msg(
                 "Could not {0} to cluster peer {1} because {2}"
                 .format(x, node, str(err.value))

Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py	2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py	2014-05-14 19:50:12 UTC (rev 13472)
@@ -22,16 +22,16 @@
 
 from zope.interface.verify import verifyObject
 
+from twisted.internet import reactor
 from twisted.trial.unittest import TestCase, SkipTest
 from twisted.test.proto_helpers import StringTransport, MemoryReactor
-from twisted.internet.defer import (
-    Deferred, inlineCallbacks, gatherResults, passthru, returnValue
-)
+from twisted.internet.defer import \
+    Deferred, inlineCallbacks, gatherResults, passthru, returnValue, succeed
 from twisted.internet.task import Clock as _Clock
 from twisted.protocols.amp import Command, AMP, Integer
 from twisted.application.service import Service, MultiService
 
-from twext.enterprise.dal.syntax import SchemaSyntax, Select
+from twext.enterprise.dal.syntax import SchemaSyntax
 from twext.enterprise.dal.record import fromTable
 from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
 from twext.enterprise.fixtures import buildConnectionPool
@@ -40,7 +40,9 @@
     inTransaction, PeerConnectionPool, astimestamp,
     LocalPerformer, _IJobPerformer, WorkItem, WorkerConnectionPool,
     ConnectionFromPeerNode, LocalQueuer,
-    _BaseQueuer, NonPerformingQueuer
+    _BaseQueuer, NonPerformingQueuer, JobItem,
+    WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM,
+    JobDescriptor
 )
 import twext.enterprise.jobqueue
 
@@ -67,7 +69,27 @@
         return super(Clock, self).callLater(_seconds, _f, *args, **kw)
 
 
+    @inlineCallbacks
+    def advanceCompletely(self, amount):
+        """
+        Move time on this clock forward by the given amount and run whatever
+        pending calls should be run. Always complete the deferred calls before
+        returning.
 
+        @type amount: C{float}
+        @param amount: The number of seconds which to advance this clock's
+        time.
+        """
+        self.rightNow += amount
+        self._sortCalls()
+        while self.calls and self.calls[0].getTime() <= self.seconds():
+            call = self.calls.pop(0)
+            call.called = 1
+            yield call.func(*call.args, **call.kw)
+            self._sortCalls()
+
+
+
 class MemoryReactorWithClock(MemoryReactor, Clock):
     """
     Simulate a real reactor.
@@ -75,6 +97,7 @@
     def __init__(self):
         MemoryReactor.__init__(self)
         Clock.__init__(self)
+        self._sortCalls()
 
 
 
@@ -180,8 +203,9 @@
       WORK_TYPE   varchar(255) not null,
       PRIORITY    integer default 0,
       WEIGHT      integer default 0,
-      NOT_BEFORE  timestamp default null,
-      NOT_AFTER   timestamp default null
+      NOT_BEFORE  timestamp not null,
+      ASSIGNED    timestamp default null,
+      FAILED      integer default 0
     );
     """
 )
@@ -194,11 +218,6 @@
       A integer, B integer,
       DELETE_ON_LOAD integer default 0
     );
-    create table DUMMY_WORK_DONE (
-      WORK_ID integer primary key,
-      JOB_ID integer references JOB,
-      A_PLUS_B integer
-    );
     """
 )
 
@@ -207,37 +226,30 @@
 
     dropSQL = [
         "drop table {name} cascade".format(name=table)
-        for table in ("DUMMY_WORK_ITEM", "DUMMY_WORK_DONE")
+        for table in ("DUMMY_WORK_ITEM",)
     ] + ["delete from job"]
 except SkipTest as e:
-    DummyWorkDone = DummyWorkItem = object
+    DummyWorkItem = object
     skip = e
 else:
-    DummyWorkDone = fromTable(schema.DUMMY_WORK_DONE)
     DummyWorkItem = fromTable(schema.DUMMY_WORK_ITEM)
     skip = False
 
 
 
-class DummyWorkDone(WorkItem, DummyWorkDone):
-    """
-    Work result.
-    """
-
-
-
 class DummyWorkItem(WorkItem, DummyWorkItem):
     """
     Sample L{WorkItem} subclass that adds two integers together and stores them
     in another table.
     """
 
+    results = {}
+
     def doWork(self):
         if self.a == -1:
             raise ValueError("Ooops")
-        return DummyWorkDone.makeJob(
-            self.transaction, jobID=self.jobID + 100, workID=self.workID + 100, aPlusB=self.a + self.b
-        )
+        self.results[self.jobID] = self.a + self.b
+        return succeed(None)
 
 
     @classmethod
@@ -250,7 +262,7 @@
         """
         workItems = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
         if workItems[0].deleteOnLoad:
-            otherTransaction = txn.concurrently()
+            otherTransaction = txn.store().newTransaction()
             otherSelf = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
             yield otherSelf[0].delete()
             yield otherTransaction.commit()
@@ -306,12 +318,10 @@
 
 
     @inlineCallbacks
-    def test_enqueue(self):
-        """
-        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
-        """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+    def _enqueue(self, dbpool, a, b, notBefore=None, priority=None, weight=None):
         fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        if notBefore is None:
+            notBefore = datetime.datetime(2012, 12, 13, 12, 12, 0)
         sinceEpoch = astimestamp(fakeNow)
         clock = Clock()
         clock.advance(sinceEpoch)
@@ -329,36 +339,134 @@
         @transactionally(dbpool.connection)
         def check(txn):
             return qpool.enqueueWork(
-                txn, DummyWorkItem, a=3, b=9,
-                notBefore=datetime.datetime(2012, 12, 13, 12, 12, 0)
+                txn, DummyWorkItem,
+                a=a, b=b, priority=priority, weight=weight,
+                notBefore=notBefore
             )
 
         proposal = yield check
         yield proposal.whenProposed()
 
+
+    @inlineCallbacks
+    def test_enqueue(self):
+        """
+        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        yield self._enqueue(dbpool, 1, 2)
+
         # Make sure we have one JOB and one DUMMY_WORK_ITEM
         @transactionally(dbpool.connection)
         def checkJob(txn):
-            return Select(
-                From=schema.JOB
-            ).on(txn)
+            return JobItem.all(txn)
 
         jobs = yield checkJob
         self.assertTrue(len(jobs) == 1)
-        self.assertTrue(jobs[0][1] == "DUMMY_WORK_ITEM")
+        self.assertTrue(jobs[0].workType == "DUMMY_WORK_ITEM")
+        self.assertTrue(jobs[0].assigned is None)
 
         @transactionally(dbpool.connection)
         def checkWork(txn):
-            return Select(
-                From=schema.DUMMY_WORK_ITEM
-            ).on(txn)
+            return DummyWorkItem.all(txn)
 
         work = yield checkWork
         self.assertTrue(len(work) == 1)
-        self.assertTrue(work[0][1] == jobs[0][0])
+        self.assertTrue(work[0].jobID == jobs[0].jobID)
 
 
+    @inlineCallbacks
+    def test_assign(self):
+        """
+        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        yield self._enqueue(dbpool, 1, 2)
 
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+
+        jobs = yield inTransaction(dbpool.connection, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is None)
+
+        @inlineCallbacks
+        def assignJob(txn):
+            job = yield JobItem.load(txn, jobs[0].jobID)
+            yield job.assign(datetime.datetime.utcnow())
+        yield inTransaction(dbpool.connection, assignJob)
+
+        jobs = yield inTransaction(dbpool.connection, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is not None)
+
+
+    @inlineCallbacks
+    def test_nextjob(self):
+        """
+        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+        """
+
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        now = datetime.datetime.utcnow()
+
+        # Empty job queue
+        @inlineCallbacks
+        def _next(txn, priority=WORK_PRIORITY_LOW):
+            job = yield JobItem.nextjob(txn, now, priority, now - datetime.timedelta(seconds=PeerConnectionPool.queueOrphanTimeout))
+            if job is not None:
+                work = yield job.workItem()
+            else:
+                work = None
+            returnValue((job, work))
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Unassigned job with future notBefore not returned
+        yield self._enqueue(dbpool, 1, 1, now + datetime.timedelta(days=1))
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Unassigned job with past notBefore returned
+        yield self._enqueue(dbpool, 2, 1, now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is not None)
+        self.assertTrue(work.a == 2)
+        assignID = job.jobID
+
+        # Assigned job with past notBefore not returned
+        @inlineCallbacks
+        def assignJob(txn, when=None):
+            assignee = yield JobItem.load(txn, assignID)
+            yield assignee.assign(now if when is None else when)
+        yield inTransaction(dbpool.connection, assignJob)
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Unassigned low priority job with past notBefore not returned if high priority required
+        yield self._enqueue(dbpool, 4, 1, now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Unassigned low priority job with past notBefore not returned if medium priority required
+        yield self._enqueue(dbpool, 5, 1, now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_MEDIUM)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Assigned job with past notBefore, but overdue is returned
+        yield inTransaction(dbpool.connection, assignJob, when=now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
+        self.assertTrue(job is not None)
+        self.assertTrue(work.a == 2)
+
+
+
 class WorkerConnectionPoolTests(TestCase):
     """
     A L{WorkerConnectionPool} is responsible for managing, in a node's
@@ -412,6 +520,7 @@
         Create a L{PeerConnectionPool} that is just initialized enough.
         """
         self.pcp = PeerConnectionPool(None, None, 4321)
+        DummyWorkItem.results = {}
 
 
     def checkPerformer(self, cls):
@@ -424,6 +533,34 @@
         verifyObject(_IJobPerformer, performer)
 
 
+    def _setupPools(self):
+        """
+        Setup pool and reactor clock for time stepped tests.
+        """
+        reactor = MemoryReactorWithClock()
+        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+        then = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        reactor.advance(astimestamp(then))
+        cph.setUp(self)
+        qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+
+        realChoosePerformer = qpool.choosePerformer
+        performerChosen = []
+
+        def catchPerformerChoice(onlyLocally=False):
+            result = realChoosePerformer(onlyLocally=onlyLocally)
+            performerChosen.append(True)
+            return result
+
+        qpool.choosePerformer = catchPerformerChoice
+        reactor.callLater(0, qpool._workCheck)
+
+        qpool.startService()
+        cph.flushHolders()
+
+        return cph, qpool, reactor, performerChosen
+
+
     def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
         """
         If L{PeerConnectionPool.choosePerformer} is invoked when no workers
@@ -478,8 +615,8 @@
         d = Deferred()
 
         class DummyPerformer(object):
-            def performJob(self, jobID):
-                self.jobID = jobID
+            def performJob(self, job):
+                self.jobID = job.jobID
                 return d
 
         # Doing real database I/O in this test would be tedious so fake the
@@ -490,7 +627,7 @@
             return dummy
 
         peer.choosePerformer = chooseDummy
-        performed = local.performJob(7384)
+        performed = local.performJob(JobDescriptor(7384, 1))
         performResult = []
         performed.addCallback(performResult.append)
 
@@ -535,25 +672,19 @@
 
 
     @inlineCallbacks
-    def test_notBeforeWhenCheckingForLostWork(self):
+    def test_notBeforeWhenCheckingForWork(self):
         """
-        L{PeerConnectionPool._periodicLostWorkCheck} should execute any
+        L{PeerConnectionPool._workCheck} should execute any
         outstanding work items, but only those that are expired.
         """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        # An arbitrary point in time.
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
         fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        # *why* does datetime still not have .astimestamp()
-        sinceEpoch = astimestamp(fakeNow)
-        clock = Clock()
-        clock.advance(sinceEpoch)
-        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
 
         # Let's create a couple of work items directly, not via the enqueue
         # method, so that they exist but nobody will try to immediately execute
         # them.
 
-        @transactionally(dbpool.connection)
+        @transactionally(dbpool.pool.connection)
         @inlineCallbacks
         def setup(txn):
             # First, one that's right now.
@@ -564,9 +695,7 @@
                 txn, a=3, b=4, notBefore=(
                     # Schedule it in the past so that it should have already
                     # run.
-                    fakeNow - datetime.timedelta(
-                        seconds=qpool.queueProcessTimeout + 20
-                    )
+                    fakeNow - datetime.timedelta(seconds=20)
                 )
             )
 
@@ -575,14 +704,13 @@
                 txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
             )
         yield setup
-        yield qpool._periodicLostWorkCheck()
 
-        @transactionally(dbpool.connection)
-        def check(txn):
-            return DummyWorkDone.all(txn)
+        # Wait for job
+        while len(DummyWorkItem.results) != 2:
+            clock.advance(1)
 
-        every = yield check
-        self.assertEquals([x.aPlusB for x in every], [7])
+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 3, 2: 7})
 
 
     @inlineCallbacks
@@ -592,23 +720,10 @@
         only executes it when enough time has elapsed to allow the C{notBefore}
         attribute of the given work item to have passed.
         """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        sinceEpoch = astimestamp(fakeNow)
-        clock = Clock()
-        clock.advance(sinceEpoch)
-        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
-        realChoosePerformer = qpool.choosePerformer
-        performerChosen = []
 
-        def catchPerformerChoice():
-            result = realChoosePerformer()
-            performerChosen.append(True)
-            return result
+        dbpool, qpool, clock, performerChosen = self._setupPools()
 
-        qpool.choosePerformer = catchPerformerChoice
-
-        @transactionally(dbpool.connection)
+        @transactionally(dbpool.pool.connection)
         def check(txn):
             return qpool.enqueueWork(
                 txn, DummyWorkItem, a=3, b=9,
@@ -630,11 +745,12 @@
         clock.advance(20 - 12)
         self.assertEquals(performerChosen, [True])
 
-        # FIXME: if this fails, it will hang, but that's better than no
-        # notification that it is broken at all.
+        # Wait for job
+        while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+            clock.advance(1)
 
-        result = yield proposal.whenExecuted()
-        self.assertIdentical(result, proposal)
+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 12})
 
 
     @inlineCallbacks
@@ -643,23 +759,9 @@
         L{PeerConnectionPool.enqueueWork} will execute its work immediately if
         the C{notBefore} attribute of the work item in question is in the past.
         """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        sinceEpoch = astimestamp(fakeNow)
-        clock = Clock()
-        clock.advance(sinceEpoch)
-        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
-        realChoosePerformer = qpool.choosePerformer
-        performerChosen = []
+        dbpool, qpool, clock, performerChosen = self._setupPools()
 
-        def catchPerformerChoice():
-            result = realChoosePerformer()
-            performerChosen.append(True)
-            return result
-
-        qpool.choosePerformer = catchPerformerChoice
-
-        @transactionally(dbpool.connection)
+        @transactionally(dbpool.pool.connection)
         def check(txn):
             return qpool.enqueueWork(
                 txn, DummyWorkItem, a=3, b=9,
@@ -673,10 +775,14 @@
         # Advance far beyond the given timestamp.
         self.assertEquals(performerChosen, [True])
 
-        result = yield proposal.whenExecuted()
-        self.assertIdentical(result, proposal)
+        # Wait for job
+        while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+            clock.advance(1)
 
+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 12})
 
+
     def test_workerConnectionPoolPerformJob(self):
         """
         L{WorkerConnectionPool.performJob} performs work by selecting a
@@ -696,12 +802,12 @@
         worker2, _ignore_trans2 = peer()
 
         # Ask the worker to do something.
-        worker1.performJob(1)
+        worker1.performJob(JobDescriptor(1, 1))
         self.assertEquals(worker1.currentLoad, 1)
         self.assertEquals(worker2.currentLoad, 0)
 
         # Now ask the pool to do something
-        peerPool.workerPool.performJob(2)
+        peerPool.workerPool.performJob(JobDescriptor(2, 1))
         self.assertEquals(worker1.currentLoad, 1)
         self.assertEquals(worker2.currentLoad, 1)
 
@@ -716,49 +822,42 @@
         reactor.advance(astimestamp(then))
         cph.setUp(self)
         pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321)
-        now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
+        now = then + datetime.timedelta(seconds=20)
 
         @transactionally(cph.pool.connection)
         def createOldWork(txn):
-            one = DummyWorkItem.makeJob(txn, jobID=100, workID=1, a=3, b=4, notBefore=then)
-            two = DummyWorkItem.makeJob(txn, jobID=101, workID=2, a=7, b=9, notBefore=now)
+            one = DummyWorkItem.makeJob(txn, jobID=1, workID=1, a=3, b=4, notBefore=then)
+            two = DummyWorkItem.makeJob(txn, jobID=2, workID=2, a=7, b=9, notBefore=now)
             return gatherResults([one, two])
 
         pcp.startService()
         cph.flushHolders()
-        reactor.advance(pcp.queueProcessTimeout * 2)
+        reactor.advance(19)
         self.assertEquals(
-            cph.rows("select * from DUMMY_WORK_DONE"),
-            [(101, 200, 7)]
+            DummyWorkItem.results,
+            {1: 7}
         )
-        cph.rows("delete from DUMMY_WORK_DONE")
-        reactor.advance(pcp.queueProcessTimeout * 2)
+        reactor.advance(20)
         self.assertEquals(
-            cph.rows("select * from DUMMY_WORK_DONE"),
-            [(102, 201, 16)]
+            DummyWorkItem.results,
+            {1: 7, 2: 16}
         )
 
 
     @inlineCallbacks
-    def test_exceptionWhenCheckingForLostWork(self):
+    def test_exceptionWhenWorking(self):
         """
-        L{PeerConnectionPool._periodicLostWorkCheck} should execute any
+        L{PeerConnectionPool._workCheck} should execute any
         outstanding work items, and keep going if some raise an exception.
         """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        # An arbitrary point in time.
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
         fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        # *why* does datetime still not have .astimestamp()
-        sinceEpoch = astimestamp(fakeNow)
-        clock = Clock()
-        clock.advance(sinceEpoch)
-        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
 
         # Let's create a couple of work items directly, not via the enqueue
         # method, so that they exist but nobody will try to immediately execute
         # them.
 
-        @transactionally(dbpool.connection)
+        @transactionally(dbpool.pool.connection)
         @inlineCallbacks
         def setup(txn):
             # First, one that's right now.
@@ -776,14 +875,51 @@
                 txn, a=2, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60)
             )
         yield setup
-        yield qpool._periodicLostWorkCheck()
+        clock.advance(20 - 12)
 
-        @transactionally(dbpool.connection)
+        # Wait for job
+#        while True:
+#            jobs = yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))
+#            if all([job.a == -1 for job in jobs]):
+#                break
+#            clock.advance(1)
+
+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 1, 3: 2})
+
+
+    @inlineCallbacks
+    def test_exceptionUnassign(self):
+        """
+        When a work item fails it should appear as unassigned in the JOB
+        table and have the failure count bumped, and a notBefore one minute ahead.
+        """
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+
+        @transactionally(dbpool.pool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # Next, create failing work that's actually far enough into the past to run.
+            yield DummyWorkItem.makeJob(
+                txn, a=-1, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+        yield setup
+        clock.advance(20 - 12)
+
+        @transactionally(dbpool.pool.connection)
         def check(txn):
-            return DummyWorkDone.all(txn)
+            return JobItem.all(txn)
 
-        every = yield check
-        self.assertEquals([x.aPlusB for x in every], [1, 2])
+        jobs = yield check
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is None)
+        self.assertTrue(jobs[0].failed == 1)
+        self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow())
 
 
 
@@ -902,7 +1038,6 @@
             )
         self.addCleanup(deschema)
 
-        from twisted.internet import reactor
         self.node1 = PeerConnectionPool(
             reactor, indirectedTransactionFactory, 0)
         self.node2 = PeerConnectionPool(
@@ -928,7 +1063,9 @@
         yield gatherResults([d1, d2])
         self.store.queuer = self.node1
 
+        DummyWorkItem.results = {}
 
+
     def test_currentNodeInfo(self):
         """
         There will be two C{NODE_INFO} rows in the database, retrievable as two
@@ -942,12 +1079,11 @@
 
 
     @inlineCallbacks
-    def test_enqueueHappyPath(self):
+    def test_enqueueWorkDone(self):
         """
         When a L{WorkItem} is scheduled for execution via
-        L{PeerConnectionPool.enqueueWork} its C{doWork} method will be invoked
-        by the time the L{Deferred} returned from the resulting
-        L{WorkProposal}'s C{whenExecuted} method has fired.
+        L{PeerConnectionPool.enqueueWork} its C{doWork} method will be
+        run.
         """
         # TODO: this exact test should run against LocalQueuer as well.
         def operation(txn):
@@ -956,40 +1092,20 @@
             # Should probably do something with components.
             return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
                                notBefore=datetime.datetime.utcnow())
-        result = yield inTransaction(self.store.newTransaction, operation)
+        yield inTransaction(self.store.newTransaction, operation)
+
         # Wait for it to be executed.  Hopefully this does not time out :-\.
-        yield result.whenExecuted()
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
 
-        def op2(txn):
-            return Select(
-                [
-                    schema.DUMMY_WORK_DONE.WORK_ID,
-                    schema.DUMMY_WORK_DONE.JOB_ID,
-                    schema.DUMMY_WORK_DONE.A_PLUS_B,
-                ],
-                From=schema.DUMMY_WORK_DONE
-            ).on(txn)
+        self.assertEquals(DummyWorkItem.results, {100: 7})
 
-        rows = yield inTransaction(self.store.newTransaction, op2)
-        self.assertEquals(rows, [[101, 200, 7]])
 
-
     @inlineCallbacks
     def test_noWorkDoneWhenConcurrentlyDeleted(self):
         """
         When a L{WorkItem} is concurrently deleted by another transaction, it
         should I{not} perform its work.
         """
-        # Provide access to a method called "concurrently" everything using
-        original = self.store.newTransaction
-
-        def decorate(*a, **k):
-            result = original(*a, **k)
-            result.concurrently = self.store.newTransaction
-            return result
-
-        self.store.newTransaction = decorate
-
         def operation(txn):
             return txn.enqueue(
                 DummyWorkItem, a=30, b=40, workID=5678,
@@ -997,33 +1113,15 @@
                 notBefore=datetime.datetime.utcnow()
             )
 
-        proposal = yield inTransaction(self.store.newTransaction, operation)
-        yield proposal.whenExecuted()
+        yield inTransaction(self.store.newTransaction, operation)
 
-        # Sanity check on the concurrent deletion.
-        def op2(txn):
-            return Select(
-                [schema.DUMMY_WORK_ITEM.WORK_ID],
-                From=schema.DUMMY_WORK_ITEM
-            ).on(txn)
+        # Wait for it to be executed.  Hopefully this does not time out :-\.
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
 
-        rows = yield inTransaction(self.store.newTransaction, op2)
-        self.assertEquals(rows, [])
+        self.assertEquals(DummyWorkItem.results, {})
 
-        def op3(txn):
-            return Select(
-                [
-                    schema.DUMMY_WORK_DONE.WORK_ID,
-                    schema.DUMMY_WORK_DONE.A_PLUS_B,
-                ],
-                From=schema.DUMMY_WORK_DONE
-            ).on(txn)
 
-        rows = yield inTransaction(self.store.newTransaction, op3)
-        self.assertEquals(rows, [])
 
-
-
 class DummyProposal(object):
 
     def __init__(self, *ignored):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140514/07d88b1d/attachment-0001.html>


More information about the calendarserver-changes mailing list