[CalendarServer-changes] [13472] twext/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Wed May 14 12:50:12 PDT 2014
Revision: 13472
http://trac.calendarserver.org//changeset/13472
Author: cdaboo at apple.com
Date: 2014-05-14 12:50:12 -0700 (Wed, 14 May 2014)
Log Message:
-----------
New jobqueue implementation.
Modified Paths:
--------------
twext/trunk/twext/enterprise/adbapi2.py
twext/trunk/twext/enterprise/dal/model.py
twext/trunk/twext/enterprise/dal/record.py
twext/trunk/twext/enterprise/dal/syntax.py
twext/trunk/twext/enterprise/jobqueue.py
twext/trunk/twext/enterprise/queue.py
twext/trunk/twext/enterprise/test/test_jobqueue.py
Property Changed:
----------------
twext/trunk/
twext/trunk/twext/
Property changes on: twext/trunk
___________________________________________________________________
Modified: svn:mergeinfo
- /twext/branches/users/cdaboo/jobs:12742-12780
+ /twext/branches/users/cdaboo/jobqueue-3:13444-13471
/twext/branches/users/cdaboo/jobs:12742-12780
Property changes on: twext/trunk/twext
___________________________________________________________________
Modified: svn:mergeinfo
- /CalDAVTester/trunk/twext:11193-11198
/CalendarServer/branches/config-separation/twext:4379-4443
/CalendarServer/branches/egg-info-351/twext:4589-4625
/CalendarServer/branches/generic-sqlstore/twext:6167-6191
/CalendarServer/branches/new-store/twext:5594-5934
/CalendarServer/branches/new-store-no-caldavfile/twext:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2/twext:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev/twext:10180-10190,10192
/CalendarServer/branches/release/CalendarServer-5.1-dev/twext:11846
/CalendarServer/branches/users/cdaboo/batchupload-6699/twext:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/twext:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes/twext:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/twext:3628-3644
/CalendarServer/branches/users/cdaboo/fix-no-ischedule/twext:11607-11871
/CalendarServer/branches/users/cdaboo/implicituidrace/twext:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim/twext:9747-9979
/CalendarServer/branches/users/cdaboo/json/twext:11622-11912
/CalendarServer/branches/users/cdaboo/managed-attachments/twext:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591/twext:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/twext:4465-4957
/CalendarServer/branches/users/cdaboo/performance-tweaks/twext:11824-11836
/CalendarServer/branches/users/cdaboo/pods/twext:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar/twext:7085-7206
/CalendarServer/branches/users/cdaboo/pycard/twext:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twext:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/twext:5071-5105
/CalendarServer/branches/users/cdaboo/reverse-proxy-pods/twext:11875-11900
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/twext:5188-5440
/CalendarServer/branches/users/cdaboo/sharing-in-the-store/twext:11935-12016
/CalendarServer/branches/users/cdaboo/store-scheduling/twext:10876-11129
/CalendarServer/branches/users/cdaboo/timezones/twext:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging/twext:8730-8743
/CalendarServer/branches/users/gaya/sharedgroups-3/twext:11088-11204
/CalendarServer/branches/users/glyph/always-abort-txn-on-error/twext:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid/twext:8772-8805
/CalendarServer/branches/users/glyph/conn-limit/twext:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge/twext:4971-5080
/CalendarServer/branches/users/glyph/dalify/twext:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect/twext:6824-6876
/CalendarServer/branches/users/glyph/deploybuild/twext:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux/twext:10624-10635
/CalendarServer/branches/users/glyph/disable-quota/twext:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres/twext:6592-6614
/CalendarServer/branches/users/glyph/enforce-max-requests/twext:11640-11643
/CalendarServer/branches/users/glyph/hang-fix/twext:11465-11491
/CalendarServer/branches/users/glyph/imip-and-admin-html/twext:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client/twext:9054-9105
/CalendarServer/branches/users/glyph/launchd-wrapper-bis/twext:11413-11436
/CalendarServer/branches/users/glyph/linux-tests/twext:6893-6900
/CalendarServer/branches/users/glyph/log-cleanups/twext:11691-11731
/CalendarServer/branches/users/glyph/migrate-merge/twext:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes/twext:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6/twext:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/twext:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete/twext:8321-8330
/CalendarServer/branches/users/glyph/new-export/twext:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api/twext:10048-10073
/CalendarServer/branches/users/glyph/oracle/twext:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls/twext:7340-7351
/CalendarServer/branches/users/glyph/other-html/twext:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim/twext:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade/twext:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext:8571-8583
/CalendarServer/branches/users/glyph/q/twext:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing/twext:10204-10289
/CalendarServer/branches/users/glyph/quota/twext:7604-7637
/CalendarServer/branches/users/glyph/sendfdport/twext:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes/twext:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2/twext:8155-8174
/CalendarServer/branches/users/glyph/sharedpool/twext:6490-6550
/CalendarServer/branches/users/glyph/sharing-api/twext:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones/twext:8524-8535
/CalendarServer/branches/users/glyph/sql-store/twext:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop/twext:11060-11065
/CalendarServer/branches/users/glyph/subtransactions/twext:7248-7258
/CalendarServer/branches/users/glyph/table-alias/twext:8651-8664
/CalendarServer/branches/users/glyph/uidexport/twext:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked/twext:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted/twext:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize/twext:9268-9296
/CalendarServer/branches/users/glyph/warning-cleanups/twext:11347-11357
/CalendarServer/branches/users/glyph/whenNotProposed/twext:11881-11897
/CalendarServer/branches/users/glyph/xattrs-from-files/twext:7757-7769
/CalendarServer/branches/users/sagen/applepush/twext:8126-8184
/CalendarServer/branches/users/sagen/inboxitems/twext:7380-7381
/CalendarServer/branches/users/sagen/locations-resources/twext:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2/twext:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events/twext:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038/twext:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/twext:4068-4075
/CalendarServer/branches/users/sagen/resources-2/twext:5084-5093
/CalendarServer/branches/users/sagen/testing/twext:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations/twext:5515-5593
/twext/branches/users/cdaboo/jobs/twext:12742-12780
+ /CalDAVTester/trunk/twext:11193-11198
/CalendarServer/branches/config-separation/twext:4379-4443
/CalendarServer/branches/egg-info-351/twext:4589-4625
/CalendarServer/branches/generic-sqlstore/twext:6167-6191
/CalendarServer/branches/new-store-no-caldavfile-2/twext:5936-5981
/CalendarServer/branches/new-store-no-caldavfile/twext:5911-5935
/CalendarServer/branches/new-store/twext:5594-5934
/CalendarServer/branches/release/CalendarServer-4.3-dev/twext:10180-10190,10192
/CalendarServer/branches/release/CalendarServer-5.1-dev/twext:11846
/CalendarServer/branches/users/cdaboo/batchupload-6699/twext:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/twext:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes/twext:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/twext:3628-3644
/CalendarServer/branches/users/cdaboo/fix-no-ischedule/twext:11607-11871
/CalendarServer/branches/users/cdaboo/implicituidrace/twext:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim/twext:9747-9979
/CalendarServer/branches/users/cdaboo/json/twext:11622-11912
/CalendarServer/branches/users/cdaboo/managed-attachments/twext:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591/twext:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464/twext:4465-4957
/CalendarServer/branches/users/cdaboo/performance-tweaks/twext:11824-11836
/CalendarServer/branches/users/cdaboo/pods/twext:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar/twext:7085-7206
/CalendarServer/branches/users/cdaboo/pycard/twext:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twext:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/twext:5071-5105
/CalendarServer/branches/users/cdaboo/reverse-proxy-pods/twext:11875-11900
/CalendarServer/branches/users/cdaboo/shared-calendars-5187/twext:5188-5440
/CalendarServer/branches/users/cdaboo/sharing-in-the-store/twext:11935-12016
/CalendarServer/branches/users/cdaboo/store-scheduling/twext:10876-11129
/CalendarServer/branches/users/cdaboo/timezones/twext:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging/twext:8730-8743
/CalendarServer/branches/users/gaya/sharedgroups-3/twext:11088-11204
/CalendarServer/branches/users/glyph/always-abort-txn-on-error/twext:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid/twext:8772-8805
/CalendarServer/branches/users/glyph/conn-limit/twext:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge/twext:4971-5080
/CalendarServer/branches/users/glyph/dalify/twext:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect/twext:6824-6876
/CalendarServer/branches/users/glyph/deploybuild/twext:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux/twext:10624-10635
/CalendarServer/branches/users/glyph/disable-quota/twext:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres/twext:6592-6614
/CalendarServer/branches/users/glyph/enforce-max-requests/twext:11640-11643
/CalendarServer/branches/users/glyph/hang-fix/twext:11465-11491
/CalendarServer/branches/users/glyph/imip-and-admin-html/twext:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client/twext:9054-9105
/CalendarServer/branches/users/glyph/launchd-wrapper-bis/twext:11413-11436
/CalendarServer/branches/users/glyph/linux-tests/twext:6893-6900
/CalendarServer/branches/users/glyph/log-cleanups/twext:11691-11731
/CalendarServer/branches/users/glyph/migrate-merge/twext:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes/twext:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6/twext:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7/twext:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete/twext:8321-8330
/CalendarServer/branches/users/glyph/new-export/twext:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api/twext:10048-10073
/CalendarServer/branches/users/glyph/oracle-nulls/twext:7340-7351
/CalendarServer/branches/users/glyph/oracle/twext:7106-7155
/CalendarServer/branches/users/glyph/other-html/twext:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim/twext:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade/twext:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext:8571-8583
/CalendarServer/branches/users/glyph/q/twext:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing/twext:10204-10289
/CalendarServer/branches/users/glyph/quota/twext:7604-7637
/CalendarServer/branches/users/glyph/sendfdport/twext:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes/twext:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2/twext:8155-8174
/CalendarServer/branches/users/glyph/sharedpool/twext:6490-6550
/CalendarServer/branches/users/glyph/sharing-api/twext:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones/twext:8524-8535
/CalendarServer/branches/users/glyph/sql-store/twext:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop/twext:11060-11065
/CalendarServer/branches/users/glyph/subtransactions/twext:7248-7258
/CalendarServer/branches/users/glyph/table-alias/twext:8651-8664
/CalendarServer/branches/users/glyph/uidexport/twext:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked/twext:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted/twext:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize/twext:9268-9296
/CalendarServer/branches/users/glyph/warning-cleanups/twext:11347-11357
/CalendarServer/branches/users/glyph/whenNotProposed/twext:11881-11897
/CalendarServer/branches/users/glyph/xattrs-from-files/twext:7757-7769
/CalendarServer/branches/users/sagen/applepush/twext:8126-8184
/CalendarServer/branches/users/sagen/inboxitems/twext:7380-7381
/CalendarServer/branches/users/sagen/locations-resources-2/twext:5052-5061
/CalendarServer/branches/users/sagen/locations-resources/twext:5032-5051
/CalendarServer/branches/users/sagen/purge_old_events/twext:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038/twext:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066/twext:4068-4075
/CalendarServer/branches/users/sagen/resources-2/twext:5084-5093
/CalendarServer/branches/users/sagen/testing/twext:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations/twext:5515-5593
/twext/branches/users/cdaboo/jobqueue-3/twext:13444-13471
/twext/branches/users/cdaboo/jobs/twext:12742-12780
Modified: twext/trunk/twext/enterprise/adbapi2.py
===================================================================
--- twext/trunk/twext/enterprise/adbapi2.py 2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/adbapi2.py 2014-05-14 19:50:12 UTC (rev 13472)
@@ -154,13 +154,14 @@
noisy = False
- def __init__(self, pool, threadHolder, connection, cursor):
+ def __init__(self, pool, threadHolder, connection, cursor, label=None):
self._pool = pool
self._completed = "idle"
self._cursor = cursor
self._connection = connection
self._holder = threadHolder
self._first = True
+ self._label = label
@_forward
@@ -402,10 +403,11 @@
"""
implements(IAsyncTransaction)
- def __init__(self, pool, reason):
+ def __init__(self, pool, reason, label=None):
self.paramstyle = pool.paramstyle
self.dialect = pool.dialect
self.reason = reason
+ self._label = label
def _everything(self, *a, **kw):
@@ -430,7 +432,7 @@
implements(IAsyncTransaction)
- def __init__(self, pool):
+ def __init__(self, pool, label=None):
"""
Initialize a L{_WaitingTxn} based on a L{ConnectionPool}. (The C{pool}
is used only to reflect C{dialect} and C{paramstyle} attributes; not
@@ -439,6 +441,7 @@
self._spool = []
self.paramstyle = pool.paramstyle
self.dialect = pool.dialect
+ self._label = label
def _enspool(self, cmd, a=(), kw={}):
@@ -593,6 +596,7 @@
super(_SingleTxn, self).__init__()
self._pool = pool
self._baseTxn = baseTxn
+ self._label = self._baseTxn._label
self._completed = False
self._currentBlock = None
self._blockedQueue = None
@@ -718,7 +722,8 @@
self._unspoolOnto(_NoTxn(
self._pool,
"connection pool shut down while txn "
- "waiting for database connection."
+ "waiting for database connection.",
+ label=self._label,
))
@@ -746,7 +751,7 @@
self._checkComplete()
block = CommandBlock(self)
if self._currentBlock is None:
- self._blockedQueue = _WaitingTxn(self._pool)
+ self._blockedQueue = _WaitingTxn(self._pool, label=self._label)
# FIXME: test the case where it's ready immediately.
self._checkNextBlock()
return block
@@ -795,7 +800,7 @@
self._singleTxn = singleTxn
self.paramstyle = singleTxn.paramstyle
self.dialect = singleTxn.dialect
- self._spool = _WaitingTxn(singleTxn._pool)
+ self._spool = _WaitingTxn(singleTxn._pool, label=singleTxn._label)
self._started = False
self._ended = False
self._waitingForEnd = []
@@ -1067,14 +1072,15 @@
if self._stopping:
# FIXME: should be wrapping a _SingleTxn around this to get
# .commandBlock()
- return _NoTxn(self, "txn created while DB pool shutting down")
+ return _NoTxn(self, "txn created while DB pool shutting down", label=label)
if self._free:
basetxn = self._free.pop(0)
+ basetxn._label = label
self._busy.append(basetxn)
txn = _SingleTxn(self, basetxn)
else:
- txn = _SingleTxn(self, _WaitingTxn(self))
+ txn = _SingleTxn(self, _WaitingTxn(self, label=label))
self._waiting.append(txn)
# FIXME/TESTME: should be len(self._busy) + len(self._finishing)
# (free doesn't need to be considered, as it's tested above)
Modified: twext/trunk/twext/enterprise/dal/model.py
===================================================================
--- twext/trunk/twext/enterprise/dal/model.py 2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/model.py 2014-05-14 19:50:12 UTC (rev 13472)
@@ -221,12 +221,12 @@
compareAttributes = 'table name'.split()
- def __init__(self, table, name, type):
+ def __init__(self, table, name, type, default=NO_DEFAULT):
_checkstr(name)
self.table = table
self.name = name
self.type = type
- self.default = NO_DEFAULT
+ self.default = default
self.references = None
self.deleteAction = None
@@ -253,14 +253,16 @@
# Some DBs don't allow sequence as a default
if (
isinstance(self.default, Sequence) and other.default == NO_DEFAULT or
- self.default == NO_DEFAULT and isinstance(other.default, Sequence)
+ self.default == NO_DEFAULT and isinstance(other.default, Sequence) or
+ self.default is None and other.default == NO_DEFAULT or
+ self.default == NO_DEFAULT and other.default is None
):
pass
else:
results.append("Table: %s, column name %s default mismatch" % (self.table.name, self.name,))
if stringIfNone(self.references, "name") != stringIfNone(other.references, "name"):
results.append("Table: %s, column name %s references mismatch" % (self.table.name, self.name,))
- if self.deleteAction != other.deleteAction:
+ if stringIfNone(self.deleteAction, "") != stringIfNone(other.deleteAction, ""):
results.append("Table: %s, column name %s delete action mismatch" % (self.table.name, self.name,))
return results
@@ -403,7 +405,7 @@
raise KeyError("no such column: %r" % (name,))
- def addColumn(self, name, type):
+ def addColumn(self, name, type, default=NO_DEFAULT, notNull=False, primaryKey=False):
"""
A new column was parsed for this table.
@@ -413,8 +415,12 @@
@param type: The L{SQLType} describing the column's type.
"""
- column = Column(self, name, type)
+ column = Column(self, name, type, default=default)
self.columns.append(column)
+ if notNull:
+ self.tableConstraint(Constraint.NOT_NULL, [name])
+ if primaryKey:
+ self.primaryKey = [column]
return column
Modified: twext/trunk/twext/enterprise/dal/record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/record.py 2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/record.py 2014-05-14 19:50:12 UTC (rev 13472)
@@ -352,7 +352,7 @@
@classmethod
- def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False):
+ def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False, limit=None):
"""
Query the table that corresponds to C{cls}, and return instances of
C{cls} corresponding to the rows that are returned from that table.
@@ -385,6 +385,8 @@
kw.update(ForUpdate=True)
if noWait:
kw.update(NoWait=True)
+ if limit is not None:
+ kw.update(Limit=limit)
return cls._rowsFromQuery(
transaction,
Select(
Modified: twext/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- twext/trunk/twext/enterprise/dal/syntax.py 2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/syntax.py 2014-05-14 19:50:12 UTC (rev 13472)
@@ -1433,9 +1433,12 @@
stmt.append(SQLFragment(kw))
if self.ForUpdate:
- stmt.text += " for update"
- if self.NoWait:
- stmt.text += " nowait"
+ # FOR UPDATE not supported with sqlite - but that is probably not relevant
+ # given that sqlite does file level locking of the DB
+ if queryGenerator.dialect != SQLITE_DIALECT:
+ stmt.text += " for update"
+ if self.NoWait:
+ stmt.text += " nowait"
if self.Limit is not None:
limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)
Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py 2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/jobqueue.py 2014-05-14 19:50:12 UTC (rev 13472)
@@ -81,7 +81,8 @@
"""
from functools import wraps
-from datetime import datetime
+from datetime import datetime, timedelta
+from collections import namedtuple
from zope.interface import implements
@@ -91,12 +92,12 @@
inlineCallbacks, returnValue, Deferred, passthru, succeed
)
from twisted.internet.endpoints import TCP4ClientEndpoint
-from twisted.protocols.amp import AMP, Command, Integer, String
+from twisted.protocols.amp import AMP, Command, Integer, String, Argument
from twisted.python.reflect import qual
-from twisted.python import log
+from twext.python.log import Logger
from twext.enterprise.dal.syntax import (
- SchemaSyntax, Lock, NamedValue, Select, Count
+ SchemaSyntax, Lock, NamedValue
)
from twext.enterprise.dal.model import ProcedureCall
@@ -109,7 +110,10 @@
from zope.interface.interface import Interface
from twext.enterprise.locking import NamedLock
+import time
+log = Logger()
+
class _IJobPerformer(Interface):
"""
An object that can perform work.
@@ -118,10 +122,10 @@
(in the worst case) pass from worker->controller->controller->worker.
"""
- def performJob(jobID): # @NoSelf
+ def performJob(job): # @NoSelf
"""
- @param jobID: The primary key identifier of the given job.
- @type jobID: L{int}
+ @param job: Details about the job to perform.
+ @type job: L{JobDescriptor}
@return: a L{Deferred} firing with an empty dictionary when the work is
complete.
@@ -180,17 +184,13 @@
# transaction is made aware of somehow.
JobTable = Table(inSchema, "JOB")
- JobTable.addColumn("JOB_ID", SQLType("integer", None)).setDefaultValue(
- ProcedureCall("nextval", ["JOB_SEQ"])
- )
- JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255))
- JobTable.addColumn("PRIORITY", SQLType("integer", 0))
- JobTable.addColumn("WEIGHT", SQLType("integer", 0))
- JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None))
- JobTable.addColumn("NOT_AFTER", SQLType("timestamp", None))
- for column in ("JOB_ID", "WORK_TYPE"):
- JobTable.tableConstraint(Constraint.NOT_NULL, [column])
- JobTable.primaryKey = [JobTable.columnNamed("JOB_ID"), ]
+ JobTable.addColumn("JOB_ID", SQLType("integer", None), default=ProcedureCall("nextval", ["JOB_SEQ"]), notNull=True, primaryKey=True)
+ JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255), notNull=True)
+ JobTable.addColumn("PRIORITY", SQLType("integer", 0), default=0)
+ JobTable.addColumn("WEIGHT", SQLType("integer", 0), default=0)
+ JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None), notNull=True)
+ JobTable.addColumn("ASSIGNED", SQLType("timestamp", None), default=None)
+ JobTable.addColumn("FAILED", SQLType("integer", 0), default=0)
return inSchema
@@ -199,7 +199,7 @@
@inlineCallbacks
-def inTransaction(transactionCreator, operation, label="jobqueue.inTransaction"):
+def inTransaction(transactionCreator, operation, label="jobqueue.inTransaction", **kwargs):
"""
Perform the given operation in a transaction, committing or aborting as
required.
@@ -218,7 +218,7 @@
"""
txn = transactionCreator(label=label)
try:
- result = yield operation(txn)
+ result = yield operation(txn, **kwargs)
except:
f = Failure()
yield txn.abort()
@@ -270,6 +270,16 @@
+class JobFailedError(Exception):
+ """
+ A job failed to run - we need to be smart about clean up.
+ """
+
+ def __init__(self, ex):
+ self._ex = ex
+
+
+
class JobItem(Record, fromTable(JobInfoSchema.JOB)):
"""
An item in the job table. This is typically not directly used by code
@@ -277,6 +287,10 @@
associated with work items.
"""
+ def descriptor(self):
+ return JobDescriptor(self.jobID, self.weight)
+
+
@inlineCallbacks
def workItem(self):
workItemClass = WorkItem.forTableName(self.workType)
@@ -286,7 +300,183 @@
returnValue(workItems[0] if len(workItems) == 1 else None)
+ def assign(self, now):
+ """
+ Mark this job as assigned to a worker by setting the assigned column to the current,
+ or provided, timestamp.
+
+ @param now: current timestamp
+ @type now: L{datetime.datetime}
+ @param when: explicitly set the assigned time - typically only used in tests
+ @type when: L{datetime.datetime} or L{None}
+ """
+ return self.update(assigned=now)
+
+
+ def failedToRun(self):
+ """
+ The attempt to run the job failed. Leave it in the queue, but mark it
+ as unassigned, bump the failure count and set to run at some point in
+ the future.
+ """
+ return self.update(
+ assigned=None,
+ failed=self.failed + 1,
+ notBefore=datetime.utcnow() + timedelta(seconds=60)
+ )
+
+
+ @classmethod
@inlineCallbacks
+ def ultimatelyPerform(cls, txnFactory, jobID):
+ """
+ Eventually, after routing the job to the appropriate place, somebody
+ actually has to I{do} it.
+
+ @param txnFactory: a 0- or 1-argument callable that creates an
+ L{IAsyncTransaction}
+ @type txnFactory: L{callable}
+
+ @param jobID: the ID of the job to be performed
+ @type jobID: L{int}
+
+ @return: a L{Deferred} which fires with C{None} when the job has been
+ performed, or fails if the job can't be performed.
+ """
+
+ t = time.time()
+ def _tm():
+ return "{:.3f}".format(1000 * (time.time() - t))
+ def _overtm(nb):
+ return "{:.0f}".format(1000 * (t - astimestamp(nb)))
+
+ log.debug("JobItem: starting to run {jobid}".format(jobid=jobID))
+ txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
+ try:
+ job = yield cls.load(txn, jobID)
+ if hasattr(txn, "_label"):
+ txn._label = "{} <{}>".format(txn._label, job.workType)
+ log.debug("JobItem: loaded {jobid} {work} t={tm}".format(
+ jobid=jobID,
+ work=job.workType,
+ tm=_tm())
+ )
+ yield job.run()
+
+ except NoSuchRecord:
+ # The record has already been removed
+ yield txn.commit()
+ log.debug("JobItem: already removed {jobid} t={tm}".format(jobid=jobID, tm=_tm()))
+
+ except JobFailedError:
+ # Job failed: abort with cleanup, but pretend this method succeeded
+ def _cleanUp():
+ @inlineCallbacks
+ def _cleanUp2(txn2):
+ job = yield cls.load(txn2, jobID)
+ log.debug("JobItem: marking as failed {jobid}, failure count: {count} t={tm}".format(jobid=jobID, count=job.failed + 1, tm=_tm()))
+ yield job.failedToRun()
+ return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
+ txn.postAbort(_cleanUp)
+ yield txn.abort()
+ log.debug("JobItem: failed {jobid} {work} t={tm}".format(
+ jobid=jobID,
+ work=job.workType,
+ tm=_tm()
+ ))
+
+ except:
+ f = Failure()
+ log.error("JobItem: Unknown exception for {jobid} failed t={tm} {exc}".format(
+ jobid=jobID,
+ tm=_tm(),
+ exc=f,
+ ))
+ yield txn.abort()
+ returnValue(f)
+
+ else:
+ yield txn.commit()
+ log.debug("JobItem: completed {jobid} {work} t={tm} over={over}".format(
+ jobid=jobID,
+ work=job.workType,
+ tm=_tm(),
+ over=_overtm(job.notBefore)
+ ))
+
+ returnValue(None)
+
+
+ @classmethod
+ @inlineCallbacks
+ def nextjob(cls, txn, now, minPriority, overdue):
+ """
+ Find the next available job based on priority, also return any that are overdue. This
+ method relies on there being a nextjob() SQL stored procedure to enable skipping over
+ items which are row locked to help avoid contention when multiple nodes are operating
+ on the job queue simultaneously.
+
+ @param txn: the transaction to use
+ @type txn: L{IAsyncTransaction}
+ @param now: current timestamp
+ @type now: L{datetime.datetime}
+ @param minPriority: lowest priority level to query for
+ @type minPriority: L{int}
+ @param overdue: how long before an assigned item is considered overdue
+ @type overdue: L{datetime.datetime}
+
+ @return: the job record
+ @rtype: L{JobItem}
+ """
+
+ jobs = yield cls.nextjobs(txn, now, minPriority, overdue, limit=1)
+
+ # Must only be one or zero
+ if jobs and len(jobs) > 1:
+ raise AssertionError("next_job() returned more than one row")
+
+ returnValue(jobs[0] if jobs else None)
+
+
+ @classmethod
+ @inlineCallbacks
+ def nextjobs(cls, txn, now, minPriority, overdue, limit=1):
+ """
+ Find the next available job based on priority, also return any that are overdue. This
+ method relies on there being a nextjob() SQL stored procedure to enable skipping over
+ items which are row locked to help avoid contention when multiple nodes are operating
+ on the job queue simultaneously.
+
+ @param txn: the transaction to use
+ @type txn: L{IAsyncTransaction}
+ @param now: current timestamp
+ @type now: L{datetime.datetime}
+ @param minPriority: lowest priority level to query for
+ @type minPriority: L{int}
+ @param overdue: how long before an assigned item is considered overdue
+ @type overdue: L{datetime.datetime}
+ @param limit: limit on number of jobs to return
+ @type limit: L{int}
+
+ @return: the job record
+ @rtype: L{JobItem}
+ """
+
+ jobs = yield cls.query(
+ txn,
+ (cls.notBefore <= now).And
+ (((cls.priority >= minPriority).And(cls.assigned == None)).Or(cls.assigned < overdue)),
+ order=(cls.assigned, cls.priority),
+ ascending=False,
+ forUpdate=True,
+ noWait=False,
+ limit=limit,
+ )
+
+ returnValue(jobs)
+
+
+ @inlineCallbacks
def run(self):
"""
Run this job item by finding the appropriate work item class and
@@ -304,7 +494,15 @@
# The record has already been removed
pass
else:
- yield workItem.doWork()
+ try:
+ yield workItem.doWork()
+ except Exception as e:
+ log.error("JobItem: {jobid}, WorkItem: {workid} failed: {exc}".format(
+ jobid=self.jobID,
+ workid=workItem.workID,
+ exc=e,
+ ))
+ raise JobFailedError(e)
try:
# Once the work is done we delete ourselves
@@ -325,22 +523,48 @@
@classmethod
@inlineCallbacks
+ def waitEmpty(cls, txnCreator, reactor, timeout):
+ """
+ Wait for the job queue to drain. Only use this in tests
+ that need to wait for results from jobs.
+ """
+ t = time.time()
+ while True:
+ work = yield inTransaction(txnCreator, cls.all)
+ if not work:
+ break
+ if time.time() - t > timeout:
+ returnValue(False)
+ d = Deferred()
+ reactor.callLater(0.1, lambda : d.callback(None))
+ yield d
+
+ returnValue(True)
+
+
+ @classmethod
+ @inlineCallbacks
def histogram(cls, txn):
"""
Generate a histogram of work items currently in the queue.
"""
- jb = JobInfoSchema.JOB
- rows = yield Select(
- [jb.WORK_TYPE, Count(jb.WORK_TYPE)],
- From=jb,
- GroupBy=jb.WORK_TYPE
- ).on(txn)
- results = dict(rows)
-
- # Add in empty data for other work
+ results = {}
+ now = datetime.utcnow()
for workType in cls.workTypes():
- results.setdefault(workType.table.model.name, 0)
+ results.setdefault(workType.table.model.name, [0, 0, 0, 0])
+ jobs = yield cls.all(txn)
+
+ for job in jobs:
+ r = results[job.workType]
+ r[0] += 1
+ if job.assigned is not None:
+ r[1] += 1
+ if job.failed:
+ r[2] += 1
+ if job.assigned is None and job.notBefore < now:
+ r[3] += 1
+
returnValue(results)
@@ -349,14 +573,41 @@
return len(cls.workTypes())
+JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight"])
+class JobDescriptorArg(Argument):
+ """
+ Comma-separated.
+ """
+ def toString(self, inObject):
+ return ",".join(map(str, inObject))
+
+
+ def fromString(self, inString):
+ return JobDescriptor(*map(int, inString.split(",")))
+
+
# Priority for work - used to order work items in the job queue
WORK_PRIORITY_LOW = 1
WORK_PRIORITY_MEDIUM = 2
WORK_PRIORITY_HIGH = 3
+# Weight for work - used to schedule workers based on capacity
+WORK_WEIGHT_0 = 0
+WORK_WEIGHT_1 = 1
+WORK_WEIGHT_2 = 2
+WORK_WEIGHT_3 = 3
+WORK_WEIGHT_4 = 4
+WORK_WEIGHT_5 = 5
+WORK_WEIGHT_6 = 6
+WORK_WEIGHT_7 = 7
+WORK_WEIGHT_8 = 8
+WORK_WEIGHT_9 = 9
+WORK_WEIGHT_10 = 10
+WORK_WEIGHT_CAPACITY = 10 # Total amount of work any one worker can manage
+
class WorkItem(Record):
"""
A L{WorkItem} is an item of work which may be stored in a database, then
@@ -450,7 +701,8 @@
"""
group = None
- priority = WORK_PRIORITY_LOW # Default - subclasses should override
+ default_priority = WORK_PRIORITY_LOW # Default - subclasses should override
+ default_weight = WORK_WEIGHT_5 # Default - subclasses should override
@classmethod
@@ -469,19 +721,21 @@
}
def _transferArg(name):
- if name in kwargs:
- jobargs[name] = kwargs[name]
- del kwargs[name]
+ arg = kwargs.pop(name, None)
+ if arg is not None:
+ jobargs[name] = arg
+ elif hasattr(cls, "default_{}".format(name)):
+ jobargs[name] = getattr(cls, "default_{}".format(name))
_transferArg("jobID")
- if "priority" in kwargs:
- _transferArg("priority")
- else:
- jobargs["priority"] = cls.priority
+ _transferArg("priority")
_transferArg("weight")
_transferArg("notBefore")
- _transferArg("notAfter")
+ # Always need a notBefore
+ if "notBefore" not in jobargs:
+ jobargs["notBefore"] = datetime.utcnow()
+
job = yield JobItem.create(transaction, **jobargs)
kwargs["jobID"] = job.jobID
@@ -540,7 +794,7 @@
"""
arguments = [
- ("jobID", Integer()),
+ ("job", JobDescriptorArg()),
]
response = []
@@ -665,7 +919,7 @@
return self._reportedLoad + self._bonusLoad
- def performJob(self, jobID):
+ def performJob(self, job):
"""
A L{local worker connection <ConnectionFromWorker>} is asking this
specific peer node-controller process to perform a job, having
@@ -673,12 +927,12 @@
@see: L{_IJobPerformer.performJob}
"""
- d = self.callRemote(PerformJob, jobID=jobID)
- self._bonusLoad += 1
+ d = self.callRemote(PerformJob, job=job)
+ self._bonusLoad += job.weight
@d.addBoth
def performed(result):
- self._bonusLoad -= 1
+ self._bonusLoad -= job.weight
return result
@d.addCallback
@@ -689,17 +943,17 @@
@PerformJob.responder
- def dispatchToWorker(self, jobID):
+ def dispatchToWorker(self, job):
"""
A remote peer node has asked this node to do a job; dispatch it to
a local worker on this node.
- @param jobID: the identifier of the job.
- @type jobID: L{int}
+ @param job: the details of the job.
+ @type job: L{JobDescriptor}
@return: a L{Deferred} that fires when the work has been completed.
"""
- d = self.peerPool.performJobForPeer(jobID)
+ d = self.peerPool.performJobForPeer(job)
d.addCallback(lambda ignored: {})
return d
@@ -721,7 +975,7 @@
"""
implements(_IJobPerformer)
- def __init__(self, maximumLoadPerWorker=5):
+ def __init__(self, maximumLoadPerWorker=WORK_WEIGHT_CAPACITY):
self.workers = []
self.maximumLoadPerWorker = maximumLoadPerWorker
@@ -753,6 +1007,26 @@
return False
+ def loadLevel(self):
+ """
+ Return the overall load of this worker connection pool have as a percentage of
+ total capacity.
+
+ @return: current load percentage.
+ @rtype: L{int}
+ """
+ current = sum(worker.currentLoad for worker in self.workers)
+ total = len(self.workers) * self.maximumLoadPerWorker
+ return ((current * 100) / total) if total else 0
+
+
+ def eachWorkerLoad(self):
+ """
+ The load of all currently connected workers.
+ """
+ return [(worker.currentLoad, worker.totalCompleted) for worker in self.workers]
+
+
def allWorkerLoad(self):
"""
The total load of all currently connected workers.
@@ -771,20 +1045,20 @@
return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
- def performJob(self, jobID):
+ def performJob(self, job):
"""
Select a local worker that is idle enough to perform the given job,
then ask them to perform it.
- @param jobID: The primary key identifier of the given job.
- @type jobID: L{int}
+ @param job: The details of the given job.
+ @type job: L{JobDescriptor}
@return: a L{Deferred} firing with an empty dictionary when the work is
complete.
@rtype: L{Deferred} firing L{dict}
"""
preferredWorker = self._selectLowestLoadWorker()
- result = preferredWorker.performJob(jobID)
+ result = preferredWorker.performJob(job)
return result
@@ -799,6 +1073,7 @@
super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
self.peerPool = peerPool
self._load = 0
+ self._completed = 0
@property
@@ -809,6 +1084,14 @@
return self._load
+ @property
+ def totalCompleted(self):
+ """
+ What is the current load of this worker?
+ """
+ return self._completed
+
+
def startReceivingBoxes(self, sender):
"""
Start receiving AMP boxes from the peer. Initialize all necessary
@@ -829,19 +1112,20 @@
@PerformJob.responder
- def performJob(self, jobID):
+ def performJob(self, job):
"""
Dispatch a job to this worker.
@see: The responder for this should always be
L{ConnectionFromController.actuallyReallyExecuteJobHere}.
"""
- d = self.callRemote(PerformJob, jobID=jobID)
- self._load += 1
+ d = self.callRemote(PerformJob, job=job)
+ self._load += job.weight
@d.addBoth
def f(result):
- self._load -= 1
+ self._load -= job.weight
+ self._completed += 1
return result
return d
@@ -883,11 +1167,11 @@
return self
- def performJob(self, jobID):
+ def performJob(self, job):
"""
Ask the controller to perform a job on our behalf.
"""
- return self.callRemote(PerformJob, jobID=jobID)
+ return self.callRemote(PerformJob, job=job)
@inlineCallbacks
@@ -914,48 +1198,18 @@
@PerformJob.responder
- def actuallyReallyExecuteJobHere(self, jobID):
+ def actuallyReallyExecuteJobHere(self, job):
"""
This is where it's time to actually do the job. The controller
process has instructed this worker to do it; so, look up the data in
the row, and do it.
"""
- d = ultimatelyPerform(self.transactionFactory, jobID)
+ d = JobItem.ultimatelyPerform(self.transactionFactory, job.jobID)
d.addCallback(lambda ignored: {})
return d
-def ultimatelyPerform(txnFactory, jobID):
- """
- Eventually, after routing the job to the appropriate place, somebody
- actually has to I{do} it.
-
- @param txnFactory: a 0- or 1-argument callable that creates an
- L{IAsyncTransaction}
- @type txnFactory: L{callable}
-
- @param jobID: the ID of the job to be performed
- @type jobID: L{int}
-
- @return: a L{Deferred} which fires with C{None} when the job has been
- performed, or fails if the job can't be performed.
- """
- @inlineCallbacks
- def runJob(txn):
- try:
- job = yield JobItem.load(txn, jobID)
- if hasattr(txn, "_label"):
- txn._label = "{} <{}>".format(txn._label, job.workType)
- yield job.run()
- except NoSuchRecord:
- # The record has already been removed
- pass
-
- return inTransaction(txnFactory, runJob, label="ultimatelyPerform: {}".format(jobID))
-
-
-
class LocalPerformer(object):
"""
Implementor of C{performJob} that does its work in the local process,
@@ -970,11 +1224,11 @@
self.txnFactory = txnFactory
- def performJob(self, jobID):
+ def performJob(self, job):
"""
Perform the given job right now.
"""
- return ultimatelyPerform(self.txnFactory, jobID)
+ return JobItem.ultimatelyPerform(self.txnFactory, job.jobID)
@@ -1049,7 +1303,6 @@
self.txn = txn
self.workItemType = workItemType
self.kw = kw
- self._whenExecuted = Deferred()
self._whenCommitted = Deferred()
self.workItem = None
@@ -1073,60 +1326,11 @@
def whenDone():
self._whenCommitted.callback(self)
- def maybeLater():
- performer = self._chooser.choosePerformer()
-
- @passthru(
- performer.performJob(created.jobID).addCallback
- )
- def performed(result):
- self._whenExecuted.callback(self)
-
- @performed.addErrback
- def notPerformed(why):
- self._whenExecuted.errback(why)
-
- reactor = self._chooser.reactor
-
- if created.job.notBefore is not None:
- when = max(
- 0,
- astimestamp(created.job.notBefore) - reactor.seconds()
- )
- else:
- when = 0
- # TODO: Track the returned DelayedCall so it can be stopped
- # when the service stops.
- self._chooser.reactor.callLater(when, maybeLater)
-
@self.txn.postAbort
def whenFailed():
self._whenCommitted.errback(TransactionFailed)
- def whenExecuted(self):
- """
- Let the caller know when the proposed work has been fully executed.
-
- @note: The L{Deferred} returned by C{whenExecuted} should be used with
- extreme caution. If an application decides to do any
- database-persistent work as a result of this L{Deferred} firing,
- that work I{may be lost} as a result of a service being normally
- shut down between the time that the work is scheduled and the time
- that it is executed. So, the only things that should be added as
- callbacks to this L{Deferred} are those which are ephemeral, in
- memory, and reflect only presentation state associated with the
- user's perception of the completion of work, not logical chains of
- work which need to be completed in sequence; those should all be
- completed within the transaction of the L{WorkItem.doWork} that
- gets executed.
-
- @return: a L{Deferred} that fires with this L{WorkProposal} when the
- work has been completed remotely.
- """
- return _cloneDeferred(self._whenExecuted)
-
-
def whenProposed(self):
"""
Let the caller know when the work has been proposed; i.e. when the work
@@ -1221,14 +1425,14 @@
than waiting for it to be requested. By default, 10 minutes.
@type queueProcessTimeout: L{float} (in seconds)
- @ivar queueDelayedProcessInterval: The amount of time between database
+ @ivar queuePollInterval: The amount of time between database
pings, i.e. checks for over-due queue items that might have been
orphaned by a controller process that died mid-transaction. This is
how often the shared database should be pinged by I{all} nodes (i.e.,
all controller processes, or each instance of L{PeerConnectionPool});
each individual node will ping commensurately less often as more nodes
join the database.
- @type queueDelayedProcessInterval: L{float} (in seconds)
+ @type queuePollInterval: L{float} (in seconds)
@ivar reactor: The reactor used for scheduling timed events.
@type reactor: L{IReactorTime} provider.
@@ -1243,9 +1447,13 @@
getfqdn = staticmethod(getfqdn)
getpid = staticmethod(getpid)
- queueProcessTimeout = (10.0 * 60.0)
- queueDelayedProcessInterval = (60.0)
+ queuePollInterval = 0.1 # How often to poll for new work
+ queueOrphanTimeout = 5.0 * 60.0 # How long before assigned work is possibly orphaned
+ overloadLevel = 95 # Percentage load level above which job queue processing stops
+ highPriorityLevel = 80 # Percentage load level above which only high priority jobs are processed
+ mediumPriorityLevel = 50 # Percentage load level above which high and medium priority jobs are processed
+
def __init__(self, reactor, transactionFactory, ampPort):
"""
Initialize a L{PeerConnectionPool}.
@@ -1324,13 +1532,13 @@
return LocalPerformer(self.transactionFactory)
- def performJobForPeer(self, jobID):
+ def performJobForPeer(self, job):
"""
A peer has requested us to perform a job; choose a job performer
local to this node, and then execute it.
"""
performer = self.choosePerformer(onlyLocally=True)
- return performer.performJob(jobID)
+ return performer.performJob(job)
def totalNumberOfNodes(self):
@@ -1362,67 +1570,113 @@
return self._lastSeenNodeIndex
- def _periodicLostWorkCheck(self):
+ @inlineCallbacks
+ def _workCheck(self):
"""
- Periodically, every node controller has to check to make sure that work
- hasn't been dropped on the floor by someone. In order to do that it
- queries each work-item table.
+ Every node controller will periodically check for any new work to do, and dispatch
+ as much as possible given the current load.
"""
- @inlineCallbacks
- def workCheck(txn):
- if self.thisProcess:
- nodes = [(node.hostname, node.port) for node in
- (yield self.activeNodes(txn))]
- nodes.sort()
- self._lastSeenTotalNodes = len(nodes)
- self._lastSeenNodeIndex = nodes.index(
- (self.thisProcess.hostname, self.thisProcess.port)
- )
+ # FIXME: not sure if we should do this node check on every work poll
+# if self.thisProcess:
+# nodes = [(node.hostname, node.port) for node in
+# (yield self.activeNodes(txn))]
+# nodes.sort()
+# self._lastSeenTotalNodes = len(nodes)
+# self._lastSeenNodeIndex = nodes.index(
+# (self.thisProcess.hostname, self.thisProcess.port)
+# )
+ loopCounter = 0
+ while True:
+ if not self.running:
+ returnValue(None)
+
+ # Check the overall service load - if overloaded skip this poll cycle.
+ # FIXME: need to include capacity of other nodes. For now we only check
+ # our own capacity and stop processing if too busy. Other nodes that
+ # are not busy will pick up work.
+ level = self.workerPool.loadLevel()
+
+ # Check overload level first
+ if level > self.overloadLevel:
+ log.error("workCheck: jobqueue is overloaded")
+ break
+ elif level > self.highPriorityLevel:
+ log.debug("workCheck: jobqueue high priority only")
+ minPriority = WORK_PRIORITY_HIGH
+ elif level > self.mediumPriorityLevel:
+ log.debug("workCheck: jobqueue high/medium priority only")
+ minPriority = WORK_PRIORITY_MEDIUM
+ else:
+ minPriority = WORK_PRIORITY_LOW
+
+ # Determine what the timestamp cutoff
# TODO: here is where we should iterate over the unlocked items
# that are due, ordered by priority, notBefore etc
- tooLate = datetime.utcfromtimestamp(
- self.reactor.seconds() - self.queueProcessTimeout
- )
- overdueItems = (yield JobItem.query(
- txn, (JobItem.notBefore < tooLate))
- )
- for overdueItem in overdueItems:
- peer = self.choosePerformer()
+ nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
+ orphanTime = nowTime - timedelta(seconds=self.queueOrphanTimeout)
+
+ txn = self.transactionFactory(label="jobqueue.workCheck")
+ try:
+ nextJob = yield JobItem.nextjob(txn, nowTime, minPriority, orphanTime)
+ if nextJob is None:
+ break
+
+ # If it is now assigned but not earlier than the orphan time, ignore as this may have
+ # been returned after another txn just assigned it
+ if nextJob.assigned is not None and nextJob.assigned > orphanTime:
+ continue
+
+ # Always assign as a new job even when it is an orphan
+ yield nextJob.assign(nowTime)
+ loopCounter += 1
+
+ except Exception as e:
+ log.error("Failed to pick a new job, {exc}", exc=e)
+ yield txn.abort()
+ txn = None
+ nextJob = None
+ finally:
+ if txn:
+ yield txn.commit()
+
+ if nextJob is not None:
+ peer = self.choosePerformer(onlyLocally=True)
try:
- yield peer.performJob(overdueItem.jobID)
+ # Send the job over but DO NOT block on the response - that will ensure
+ # we can do stuff in parallel
+ peer.performJob(nextJob.descriptor())
except Exception as e:
- log.err("Failed to perform periodic lost job for jobid={}, {}".format(overdueItem.jobID, e))
+ log.error("Failed to perform job for jobid={jobid}, {exc}", jobid=nextJob.jobID, exc=e)
- return inTransaction(self.transactionFactory, workCheck, label="periodicLostWorkCheck")
+ if loopCounter:
+ log.debug("workCheck: processed {} jobs in one loop".format(loopCounter))
_currentWorkDeferred = None
- _lostWorkCheckCall = None
+ _workCheckCall = None
- def _lostWorkCheckLoop(self):
+ def _workCheckLoop(self):
"""
While the service is running, keep checking for any overdue / lost work
items and re-submit them to the cluster for processing. Space out
those checks in time based on the size of the cluster.
"""
- self._lostWorkCheckCall = None
+ self._workCheckCall = None
+ if not self.running:
+ return
+
@passthru(
- self._periodicLostWorkCheck().addErrback(log.err).addCallback
+ self._workCheck().addErrback(log.error).addCallback
)
def scheduleNext(result):
+ # TODO: if multiple nodes are present, see if we can
+ # stagger the polling to avoid contention.
self._currentWorkDeferred = None
if not self.running:
return
- index = self.nodeIndex()
- now = self.reactor.seconds()
-
- interval = self.queueDelayedProcessInterval
- count = self.totalNumberOfNodes()
- when = (now - (now % interval)) + (interval * (count + index))
- delay = when - now
- self._lostWorkCheckCall = self.reactor.callLater(
- delay, self._lostWorkCheckLoop
+ self._workCheckCall = self.reactor.callLater(
+ self.queuePollInterval, self._workCheckLoop
)
self._currentWorkDeferred = scheduleNext
@@ -1432,32 +1686,37 @@
"""
Register ourselves with the database and establish all outgoing
connections to other servers in the cluster.
+
+ @param waitForService: an optional L{Deferred} that will be called back when
+ the service startup is done.
+ @type waitForService: L{Deferred} or L{None}
"""
@inlineCallbacks
def startup(txn):
- endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
- # If this fails, the failure mode is going to be ugly, just like
- # all conflicted-port failures. But, at least it won't proceed.
- self._listeningPort = yield endpoint.listen(self.peerFactory())
- self.ampPort = self._listeningPort.getHost().port
- yield Lock.exclusive(NodeInfo.table).on(txn)
- nodes = yield self.activeNodes(txn)
- selves = [node for node in nodes
- if ((node.hostname == self.hostname) and
- (node.port == self.ampPort))]
- if selves:
- self.thisProcess = selves[0]
- nodes.remove(self.thisProcess)
- yield self.thisProcess.update(pid=self.pid,
- time=datetime.now())
- else:
- self.thisProcess = yield NodeInfo.create(
- txn, hostname=self.hostname, port=self.ampPort,
- pid=self.pid, time=datetime.now()
- )
+ if self.ampPort is not None:
+ endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+ # If this fails, the failure mode is going to be ugly, just like
+ # all conflicted-port failures. But, at least it won't proceed.
+ self._listeningPort = yield endpoint.listen(self.peerFactory())
+ self.ampPort = self._listeningPort.getHost().port
+ yield Lock.exclusive(NodeInfo.table).on(txn)
+ nodes = yield self.activeNodes(txn)
+ selves = [node for node in nodes
+ if ((node.hostname == self.hostname) and
+ (node.port == self.ampPort))]
+ if selves:
+ self.thisProcess = selves[0]
+ nodes.remove(self.thisProcess)
+ yield self.thisProcess.update(pid=self.pid,
+ time=datetime.now())
+ else:
+ self.thisProcess = yield NodeInfo.create(
+ txn, hostname=self.hostname, port=self.ampPort,
+ pid=self.pid, time=datetime.now()
+ )
- for node in nodes:
- self._startConnectingTo(node)
+ for node in nodes:
+ self._startConnectingTo(node)
self._startingUp = inTransaction(self.transactionFactory, startup, label="PeerConnectionPool.startService")
@@ -1465,7 +1724,7 @@
def done(result):
self._startingUp = None
super(PeerConnectionPool, self).startService()
- self._lostWorkCheckLoop()
+ self._workCheckLoop()
return result
@@ -1474,20 +1733,30 @@
"""
Stop this service, terminating any incoming or outgoing connections.
"""
- yield super(PeerConnectionPool, self).stopService()
+ # If in the process of starting up, always wait for startup to complete before
+ # stopping,.
if self._startingUp is not None:
- yield self._startingUp
+ d = Deferred()
+ self._startingUp.addBoth(lambda result: d.callback(None))
+ yield d
+ yield super(PeerConnectionPool, self).stopService()
+
if self._listeningPort is not None:
yield self._listeningPort.stopListening()
- if self._lostWorkCheckCall is not None:
- self._lostWorkCheckCall.cancel()
+ if self._workCheckCall is not None:
+ self._workCheckCall.cancel()
if self._currentWorkDeferred is not None:
- yield self._currentWorkDeferred
+ self._currentWorkDeferred.cancel()
+ for connector in self._connectingToPeer:
+ d = Deferred()
+ connector.addBoth(lambda result: d.callback(None))
+ yield d
+
for peer in self.peers:
peer.transport.abortConnection()
@@ -1510,6 +1779,7 @@
# self.mappedPeers.pop((host, port)).transport.loseConnection()
self.mappedPeers[(host, port)] = peer
+ _connectingToPeer = []
def _startConnectingTo(self, node):
"""
@@ -1519,8 +1789,10 @@
@type node: L{NodeInfo}
"""
connected = node.endpoint(self.reactor).connect(self.peerFactory())
+ self._connectingToPeer.append(connected)
def whenConnected(proto):
+ self._connectingToPeer.remove(connected)
self.mapPeer(node.hostname, node.port, proto)
proto.callRemote(
IdentifyNode,
@@ -1529,9 +1801,11 @@
).addErrback(noted, "identify")
def noted(err, x="connect"):
- log.msg(
- "Could not {0} to cluster peer {1} because {2}"
- .format(x, node, str(err.value))
+ if x == "connect":
+ self._connectingToPeer.remove(connected)
+ log.error(
+ "Could not {action} to cluster peer {node} because {reason}",
+ action=x, node=node, reason=str(err.value),
)
connected.addCallbacks(whenConnected, noted)
@@ -1594,7 +1868,7 @@
"""
implements(_IJobPerformer)
- def performJob(self, jobID):
+ def performJob(self, job):
"""
Don't perform job.
"""
Modified: twext/trunk/twext/enterprise/queue.py
===================================================================
--- twext/trunk/twext/enterprise/queue.py 2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/queue.py 2014-05-14 19:50:12 UTC (rev 13472)
@@ -1323,6 +1323,9 @@
"""
self._lostWorkCheckCall = None
+ if not self.running:
+ return
+
@passthru(
self._periodicLostWorkCheck().addErrback(log.err).addCallback
)
@@ -1390,11 +1393,15 @@
"""
Stop this service, terminating any incoming or outgoing connections.
"""
+ # If in the process of starting up, always wait for startup to complete before
+ # stopping,.
+ if self._startingUp is not None:
+ d = Deferred()
+ self._startingUp.addBoth(lambda result: d.callback(None))
+ yield d
+
yield super(PeerConnectionPool, self).stopService()
- if self._startingUp is not None:
- yield self._startingUp
-
if self._listeningPort is not None:
yield self._listeningPort.stopListening()
@@ -1402,8 +1409,13 @@
self._lostWorkCheckCall.cancel()
if self._currentWorkDeferred is not None:
- yield self._currentWorkDeferred
+ self._currentWorkDeferred.cancel()
+ for connector in self._connectingToPeer:
+ d = Deferred()
+ connector.addBoth(lambda result: d.callback(None))
+ yield d
+
for peer in self.peers:
peer.transport.abortConnection()
@@ -1426,6 +1438,7 @@
# self.mappedPeers.pop((host, port)).transport.loseConnection()
self.mappedPeers[(host, port)] = peer
+ _connectingToPeer = []
def _startConnectingTo(self, node):
"""
@@ -1435,8 +1448,10 @@
@type node: L{NodeInfo}
"""
connected = node.endpoint(self.reactor).connect(self.peerFactory())
+ self._connectingToPeer.append(connected)
def whenConnected(proto):
+ self._connectingToPeer.remove(connected)
self.mapPeer(node.hostname, node.port, proto)
proto.callRemote(
IdentifyNode,
@@ -1445,6 +1460,8 @@
).addErrback(noted, "identify")
def noted(err, x="connect"):
+ if x == "connect":
+ self._connectingToPeer.remove(connected)
log.msg(
"Could not {0} to cluster peer {1} because {2}"
.format(x, node, str(err.value))
Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py 2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py 2014-05-14 19:50:12 UTC (rev 13472)
@@ -22,16 +22,16 @@
from zope.interface.verify import verifyObject
+from twisted.internet import reactor
from twisted.trial.unittest import TestCase, SkipTest
from twisted.test.proto_helpers import StringTransport, MemoryReactor
-from twisted.internet.defer import (
- Deferred, inlineCallbacks, gatherResults, passthru, returnValue
-)
+from twisted.internet.defer import \
+ Deferred, inlineCallbacks, gatherResults, passthru, returnValue, succeed
from twisted.internet.task import Clock as _Clock
from twisted.protocols.amp import Command, AMP, Integer
from twisted.application.service import Service, MultiService
-from twext.enterprise.dal.syntax import SchemaSyntax, Select
+from twext.enterprise.dal.syntax import SchemaSyntax
from twext.enterprise.dal.record import fromTable
from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
from twext.enterprise.fixtures import buildConnectionPool
@@ -40,7 +40,9 @@
inTransaction, PeerConnectionPool, astimestamp,
LocalPerformer, _IJobPerformer, WorkItem, WorkerConnectionPool,
ConnectionFromPeerNode, LocalQueuer,
- _BaseQueuer, NonPerformingQueuer
+ _BaseQueuer, NonPerformingQueuer, JobItem,
+ WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM,
+ JobDescriptor
)
import twext.enterprise.jobqueue
@@ -67,7 +69,27 @@
return super(Clock, self).callLater(_seconds, _f, *args, **kw)
+ @inlineCallbacks
+ def advanceCompletely(self, amount):
+ """
+ Move time on this clock forward by the given amount and run whatever
+ pending calls should be run. Always complete the deferred calls before
+ returning.
+ @type amount: C{float}
+ @param amount: The number of seconds which to advance this clock's
+ time.
+ """
+ self.rightNow += amount
+ self._sortCalls()
+ while self.calls and self.calls[0].getTime() <= self.seconds():
+ call = self.calls.pop(0)
+ call.called = 1
+ yield call.func(*call.args, **call.kw)
+ self._sortCalls()
+
+
+
class MemoryReactorWithClock(MemoryReactor, Clock):
"""
Simulate a real reactor.
@@ -75,6 +97,7 @@
def __init__(self):
MemoryReactor.__init__(self)
Clock.__init__(self)
+ self._sortCalls()
@@ -180,8 +203,9 @@
WORK_TYPE varchar(255) not null,
PRIORITY integer default 0,
WEIGHT integer default 0,
- NOT_BEFORE timestamp default null,
- NOT_AFTER timestamp default null
+ NOT_BEFORE timestamp not null,
+ ASSIGNED timestamp default null,
+ FAILED integer default 0
);
"""
)
@@ -194,11 +218,6 @@
A integer, B integer,
DELETE_ON_LOAD integer default 0
);
- create table DUMMY_WORK_DONE (
- WORK_ID integer primary key,
- JOB_ID integer references JOB,
- A_PLUS_B integer
- );
"""
)
@@ -207,37 +226,30 @@
dropSQL = [
"drop table {name} cascade".format(name=table)
- for table in ("DUMMY_WORK_ITEM", "DUMMY_WORK_DONE")
+ for table in ("DUMMY_WORK_ITEM",)
] + ["delete from job"]
except SkipTest as e:
- DummyWorkDone = DummyWorkItem = object
+ DummyWorkItem = object
skip = e
else:
- DummyWorkDone = fromTable(schema.DUMMY_WORK_DONE)
DummyWorkItem = fromTable(schema.DUMMY_WORK_ITEM)
skip = False
-class DummyWorkDone(WorkItem, DummyWorkDone):
- """
- Work result.
- """
-
-
-
class DummyWorkItem(WorkItem, DummyWorkItem):
"""
Sample L{WorkItem} subclass that adds two integers together and stores them
in another table.
"""
+ results = {}
+
def doWork(self):
if self.a == -1:
raise ValueError("Ooops")
- return DummyWorkDone.makeJob(
- self.transaction, jobID=self.jobID + 100, workID=self.workID + 100, aPlusB=self.a + self.b
- )
+ self.results[self.jobID] = self.a + self.b
+ return succeed(None)
@classmethod
@@ -250,7 +262,7 @@
"""
workItems = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
if workItems[0].deleteOnLoad:
- otherTransaction = txn.concurrently()
+ otherTransaction = txn.store().newTransaction()
otherSelf = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
yield otherSelf[0].delete()
yield otherTransaction.commit()
@@ -306,12 +318,10 @@
@inlineCallbacks
- def test_enqueue(self):
- """
- L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
- """
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ def _enqueue(self, dbpool, a, b, notBefore=None, priority=None, weight=None):
fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ if notBefore is None:
+ notBefore = datetime.datetime(2012, 12, 13, 12, 12, 0)
sinceEpoch = astimestamp(fakeNow)
clock = Clock()
clock.advance(sinceEpoch)
@@ -329,36 +339,134 @@
@transactionally(dbpool.connection)
def check(txn):
return qpool.enqueueWork(
- txn, DummyWorkItem, a=3, b=9,
- notBefore=datetime.datetime(2012, 12, 13, 12, 12, 0)
+ txn, DummyWorkItem,
+ a=a, b=b, priority=priority, weight=weight,
+ notBefore=notBefore
)
proposal = yield check
yield proposal.whenProposed()
+
+ @inlineCallbacks
+ def test_enqueue(self):
+ """
+ L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ yield self._enqueue(dbpool, 1, 2)
+
# Make sure we have one JOB and one DUMMY_WORK_ITEM
@transactionally(dbpool.connection)
def checkJob(txn):
- return Select(
- From=schema.JOB
- ).on(txn)
+ return JobItem.all(txn)
jobs = yield checkJob
self.assertTrue(len(jobs) == 1)
- self.assertTrue(jobs[0][1] == "DUMMY_WORK_ITEM")
+ self.assertTrue(jobs[0].workType == "DUMMY_WORK_ITEM")
+ self.assertTrue(jobs[0].assigned is None)
@transactionally(dbpool.connection)
def checkWork(txn):
- return Select(
- From=schema.DUMMY_WORK_ITEM
- ).on(txn)
+ return DummyWorkItem.all(txn)
work = yield checkWork
self.assertTrue(len(work) == 1)
- self.assertTrue(work[0][1] == jobs[0][0])
+ self.assertTrue(work[0].jobID == jobs[0].jobID)
+ @inlineCallbacks
+ def test_assign(self):
+ """
+ L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ yield self._enqueue(dbpool, 1, 2)
+ # Make sure we have one JOB and one DUMMY_WORK_ITEM
+ def checkJob(txn):
+ return JobItem.all(txn)
+
+ jobs = yield inTransaction(dbpool.connection, checkJob)
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(jobs[0].assigned is None)
+
+ @inlineCallbacks
+ def assignJob(txn):
+ job = yield JobItem.load(txn, jobs[0].jobID)
+ yield job.assign(datetime.datetime.utcnow())
+ yield inTransaction(dbpool.connection, assignJob)
+
+ jobs = yield inTransaction(dbpool.connection, checkJob)
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(jobs[0].assigned is not None)
+
+
+ @inlineCallbacks
+ def test_nextjob(self):
+ """
+ L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+ """
+
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ now = datetime.datetime.utcnow()
+
+ # Empty job queue
+ @inlineCallbacks
+ def _next(txn, priority=WORK_PRIORITY_LOW):
+ job = yield JobItem.nextjob(txn, now, priority, now - datetime.timedelta(seconds=PeerConnectionPool.queueOrphanTimeout))
+ if job is not None:
+ work = yield job.workItem()
+ else:
+ work = None
+ returnValue((job, work))
+ job, work = yield inTransaction(dbpool.connection, _next)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Unassigned job with future notBefore not returned
+ yield self._enqueue(dbpool, 1, 1, now + datetime.timedelta(days=1))
+ job, work = yield inTransaction(dbpool.connection, _next)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Unassigned job with past notBefore returned
+ yield self._enqueue(dbpool, 2, 1, now + datetime.timedelta(days=-1))
+ job, work = yield inTransaction(dbpool.connection, _next)
+ self.assertTrue(job is not None)
+ self.assertTrue(work.a == 2)
+ assignID = job.jobID
+
+ # Assigned job with past notBefore not returned
+ @inlineCallbacks
+ def assignJob(txn, when=None):
+ assignee = yield JobItem.load(txn, assignID)
+ yield assignee.assign(now if when is None else when)
+ yield inTransaction(dbpool.connection, assignJob)
+ job, work = yield inTransaction(dbpool.connection, _next)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Unassigned low priority job with past notBefore not returned if high priority required
+ yield self._enqueue(dbpool, 4, 1, now + datetime.timedelta(days=-1))
+ job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Unassigned low priority job with past notBefore not returned if medium priority required
+ yield self._enqueue(dbpool, 5, 1, now + datetime.timedelta(days=-1))
+ job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_MEDIUM)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Assigned job with past notBefore, but overdue is returned
+ yield inTransaction(dbpool.connection, assignJob, when=now + datetime.timedelta(days=-1))
+ job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
+ self.assertTrue(job is not None)
+ self.assertTrue(work.a == 2)
+
+
+
class WorkerConnectionPoolTests(TestCase):
"""
A L{WorkerConnectionPool} is responsible for managing, in a node's
@@ -412,6 +520,7 @@
Create a L{PeerConnectionPool} that is just initialized enough.
"""
self.pcp = PeerConnectionPool(None, None, 4321)
+ DummyWorkItem.results = {}
def checkPerformer(self, cls):
@@ -424,6 +533,34 @@
verifyObject(_IJobPerformer, performer)
+ def _setupPools(self):
+ """
+ Setup pool and reactor clock for time stepped tests.
+ """
+ reactor = MemoryReactorWithClock()
+ cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+ then = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ reactor.advance(astimestamp(then))
+ cph.setUp(self)
+ qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+
+ realChoosePerformer = qpool.choosePerformer
+ performerChosen = []
+
+ def catchPerformerChoice(onlyLocally=False):
+ result = realChoosePerformer(onlyLocally=onlyLocally)
+ performerChosen.append(True)
+ return result
+
+ qpool.choosePerformer = catchPerformerChoice
+ reactor.callLater(0, qpool._workCheck)
+
+ qpool.startService()
+ cph.flushHolders()
+
+ return cph, qpool, reactor, performerChosen
+
+
def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
"""
If L{PeerConnectionPool.choosePerformer} is invoked when no workers
@@ -478,8 +615,8 @@
d = Deferred()
class DummyPerformer(object):
- def performJob(self, jobID):
- self.jobID = jobID
+ def performJob(self, job):
+ self.jobID = job.jobID
return d
# Doing real database I/O in this test would be tedious so fake the
@@ -490,7 +627,7 @@
return dummy
peer.choosePerformer = chooseDummy
- performed = local.performJob(7384)
+ performed = local.performJob(JobDescriptor(7384, 1))
performResult = []
performed.addCallback(performResult.append)
@@ -535,25 +672,19 @@
@inlineCallbacks
- def test_notBeforeWhenCheckingForLostWork(self):
+ def test_notBeforeWhenCheckingForWork(self):
"""
- L{PeerConnectionPool._periodicLostWorkCheck} should execute any
+ L{PeerConnectionPool._workCheck} should execute any
outstanding work items, but only those that are expired.
"""
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
- # An arbitrary point in time.
+ dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
- # *why* does datetime still not have .astimestamp()
- sinceEpoch = astimestamp(fakeNow)
- clock = Clock()
- clock.advance(sinceEpoch)
- qpool = PeerConnectionPool(clock, dbpool.connection, 0)
# Let's create a couple of work items directly, not via the enqueue
# method, so that they exist but nobody will try to immediately execute
# them.
- @transactionally(dbpool.connection)
+ @transactionally(dbpool.pool.connection)
@inlineCallbacks
def setup(txn):
# First, one that's right now.
@@ -564,9 +695,7 @@
txn, a=3, b=4, notBefore=(
# Schedule it in the past so that it should have already
# run.
- fakeNow - datetime.timedelta(
- seconds=qpool.queueProcessTimeout + 20
- )
+ fakeNow - datetime.timedelta(seconds=20)
)
)
@@ -575,14 +704,13 @@
txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
)
yield setup
- yield qpool._periodicLostWorkCheck()
- @transactionally(dbpool.connection)
- def check(txn):
- return DummyWorkDone.all(txn)
+ # Wait for job
+ while len(DummyWorkItem.results) != 2:
+ clock.advance(1)
- every = yield check
- self.assertEquals([x.aPlusB for x in every], [7])
+ # Work item complete
+ self.assertTrue(DummyWorkItem.results == {1: 3, 2: 7})
@inlineCallbacks
@@ -592,23 +720,10 @@
only executes it when enough time has elapsed to allow the C{notBefore}
attribute of the given work item to have passed.
"""
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
- fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
- sinceEpoch = astimestamp(fakeNow)
- clock = Clock()
- clock.advance(sinceEpoch)
- qpool = PeerConnectionPool(clock, dbpool.connection, 0)
- realChoosePerformer = qpool.choosePerformer
- performerChosen = []
- def catchPerformerChoice():
- result = realChoosePerformer()
- performerChosen.append(True)
- return result
+ dbpool, qpool, clock, performerChosen = self._setupPools()
- qpool.choosePerformer = catchPerformerChoice
-
- @transactionally(dbpool.connection)
+ @transactionally(dbpool.pool.connection)
def check(txn):
return qpool.enqueueWork(
txn, DummyWorkItem, a=3, b=9,
@@ -630,11 +745,12 @@
clock.advance(20 - 12)
self.assertEquals(performerChosen, [True])
- # FIXME: if this fails, it will hang, but that's better than no
- # notification that it is broken at all.
+ # Wait for job
+ while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+ clock.advance(1)
- result = yield proposal.whenExecuted()
- self.assertIdentical(result, proposal)
+ # Work item complete
+ self.assertTrue(DummyWorkItem.results == {1: 12})
@inlineCallbacks
@@ -643,23 +759,9 @@
L{PeerConnectionPool.enqueueWork} will execute its work immediately if
the C{notBefore} attribute of the work item in question is in the past.
"""
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
- fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
- sinceEpoch = astimestamp(fakeNow)
- clock = Clock()
- clock.advance(sinceEpoch)
- qpool = PeerConnectionPool(clock, dbpool.connection, 0)
- realChoosePerformer = qpool.choosePerformer
- performerChosen = []
+ dbpool, qpool, clock, performerChosen = self._setupPools()
- def catchPerformerChoice():
- result = realChoosePerformer()
- performerChosen.append(True)
- return result
-
- qpool.choosePerformer = catchPerformerChoice
-
- @transactionally(dbpool.connection)
+ @transactionally(dbpool.pool.connection)
def check(txn):
return qpool.enqueueWork(
txn, DummyWorkItem, a=3, b=9,
@@ -673,10 +775,14 @@
# Advance far beyond the given timestamp.
self.assertEquals(performerChosen, [True])
- result = yield proposal.whenExecuted()
- self.assertIdentical(result, proposal)
+ # Wait for job
+ while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+ clock.advance(1)
+ # Work item complete
+ self.assertTrue(DummyWorkItem.results == {1: 12})
+
def test_workerConnectionPoolPerformJob(self):
"""
L{WorkerConnectionPool.performJob} performs work by selecting a
@@ -696,12 +802,12 @@
worker2, _ignore_trans2 = peer()
# Ask the worker to do something.
- worker1.performJob(1)
+ worker1.performJob(JobDescriptor(1, 1))
self.assertEquals(worker1.currentLoad, 1)
self.assertEquals(worker2.currentLoad, 0)
# Now ask the pool to do something
- peerPool.workerPool.performJob(2)
+ peerPool.workerPool.performJob(JobDescriptor(2, 1))
self.assertEquals(worker1.currentLoad, 1)
self.assertEquals(worker2.currentLoad, 1)
@@ -716,49 +822,42 @@
reactor.advance(astimestamp(then))
cph.setUp(self)
pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321)
- now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
+ now = then + datetime.timedelta(seconds=20)
@transactionally(cph.pool.connection)
def createOldWork(txn):
- one = DummyWorkItem.makeJob(txn, jobID=100, workID=1, a=3, b=4, notBefore=then)
- two = DummyWorkItem.makeJob(txn, jobID=101, workID=2, a=7, b=9, notBefore=now)
+ one = DummyWorkItem.makeJob(txn, jobID=1, workID=1, a=3, b=4, notBefore=then)
+ two = DummyWorkItem.makeJob(txn, jobID=2, workID=2, a=7, b=9, notBefore=now)
return gatherResults([one, two])
pcp.startService()
cph.flushHolders()
- reactor.advance(pcp.queueProcessTimeout * 2)
+ reactor.advance(19)
self.assertEquals(
- cph.rows("select * from DUMMY_WORK_DONE"),
- [(101, 200, 7)]
+ DummyWorkItem.results,
+ {1: 7}
)
- cph.rows("delete from DUMMY_WORK_DONE")
- reactor.advance(pcp.queueProcessTimeout * 2)
+ reactor.advance(20)
self.assertEquals(
- cph.rows("select * from DUMMY_WORK_DONE"),
- [(102, 201, 16)]
+ DummyWorkItem.results,
+ {1: 7, 2: 16}
)
@inlineCallbacks
- def test_exceptionWhenCheckingForLostWork(self):
+ def test_exceptionWhenWorking(self):
"""
- L{PeerConnectionPool._periodicLostWorkCheck} should execute any
+ L{PeerConnectionPool._workCheck} should execute any
outstanding work items, and keep going if some raise an exception.
"""
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
- # An arbitrary point in time.
+ dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
- # *why* does datetime still not have .astimestamp()
- sinceEpoch = astimestamp(fakeNow)
- clock = Clock()
- clock.advance(sinceEpoch)
- qpool = PeerConnectionPool(clock, dbpool.connection, 0)
# Let's create a couple of work items directly, not via the enqueue
# method, so that they exist but nobody will try to immediately execute
# them.
- @transactionally(dbpool.connection)
+ @transactionally(dbpool.pool.connection)
@inlineCallbacks
def setup(txn):
# First, one that's right now.
@@ -776,14 +875,51 @@
txn, a=2, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60)
)
yield setup
- yield qpool._periodicLostWorkCheck()
+ clock.advance(20 - 12)
- @transactionally(dbpool.connection)
+ # Wait for job
+# while True:
+# jobs = yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))
+# if all([job.a == -1 for job in jobs]):
+# break
+# clock.advance(1)
+
+ # Work item complete
+ self.assertTrue(DummyWorkItem.results == {1: 1, 3: 2})
+
+
+ @inlineCallbacks
+ def test_exceptionUnassign(self):
+ """
+ When a work item fails it should appear as unassigned in the JOB
+ table and have the failure count bumped, and a notBefore one minute ahead.
+ """
+ dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+
+ # Let's create a couple of work items directly, not via the enqueue
+ # method, so that they exist but nobody will try to immediately execute
+ # them.
+
+ @transactionally(dbpool.pool.connection)
+ @inlineCallbacks
+ def setup(txn):
+ # Next, create failing work that's actually far enough into the past to run.
+ yield DummyWorkItem.makeJob(
+ txn, a=-1, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+ )
+ yield setup
+ clock.advance(20 - 12)
+
+ @transactionally(dbpool.pool.connection)
def check(txn):
- return DummyWorkDone.all(txn)
+ return JobItem.all(txn)
- every = yield check
- self.assertEquals([x.aPlusB for x in every], [1, 2])
+ jobs = yield check
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(jobs[0].assigned is None)
+ self.assertTrue(jobs[0].failed == 1)
+ self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow())
@@ -902,7 +1038,6 @@
)
self.addCleanup(deschema)
- from twisted.internet import reactor
self.node1 = PeerConnectionPool(
reactor, indirectedTransactionFactory, 0)
self.node2 = PeerConnectionPool(
@@ -928,7 +1063,9 @@
yield gatherResults([d1, d2])
self.store.queuer = self.node1
+ DummyWorkItem.results = {}
+
def test_currentNodeInfo(self):
"""
There will be two C{NODE_INFO} rows in the database, retrievable as two
@@ -942,12 +1079,11 @@
@inlineCallbacks
- def test_enqueueHappyPath(self):
+ def test_enqueueWorkDone(self):
"""
When a L{WorkItem} is scheduled for execution via
- L{PeerConnectionPool.enqueueWork} its C{doWork} method will be invoked
- by the time the L{Deferred} returned from the resulting
- L{WorkProposal}'s C{whenExecuted} method has fired.
+ L{PeerConnectionPool.enqueueWork} its C{doWork} method will be
+ run.
"""
# TODO: this exact test should run against LocalQueuer as well.
def operation(txn):
@@ -956,40 +1092,20 @@
# Should probably do something with components.
return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
notBefore=datetime.datetime.utcnow())
- result = yield inTransaction(self.store.newTransaction, operation)
+ yield inTransaction(self.store.newTransaction, operation)
+
# Wait for it to be executed. Hopefully this does not time out :-\.
- yield result.whenExecuted()
+ yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
- def op2(txn):
- return Select(
- [
- schema.DUMMY_WORK_DONE.WORK_ID,
- schema.DUMMY_WORK_DONE.JOB_ID,
- schema.DUMMY_WORK_DONE.A_PLUS_B,
- ],
- From=schema.DUMMY_WORK_DONE
- ).on(txn)
+ self.assertEquals(DummyWorkItem.results, {100: 7})
- rows = yield inTransaction(self.store.newTransaction, op2)
- self.assertEquals(rows, [[101, 200, 7]])
-
@inlineCallbacks
def test_noWorkDoneWhenConcurrentlyDeleted(self):
"""
When a L{WorkItem} is concurrently deleted by another transaction, it
should I{not} perform its work.
"""
- # Provide access to a method called "concurrently" everything using
- original = self.store.newTransaction
-
- def decorate(*a, **k):
- result = original(*a, **k)
- result.concurrently = self.store.newTransaction
- return result
-
- self.store.newTransaction = decorate
-
def operation(txn):
return txn.enqueue(
DummyWorkItem, a=30, b=40, workID=5678,
@@ -997,33 +1113,15 @@
notBefore=datetime.datetime.utcnow()
)
- proposal = yield inTransaction(self.store.newTransaction, operation)
- yield proposal.whenExecuted()
+ yield inTransaction(self.store.newTransaction, operation)
- # Sanity check on the concurrent deletion.
- def op2(txn):
- return Select(
- [schema.DUMMY_WORK_ITEM.WORK_ID],
- From=schema.DUMMY_WORK_ITEM
- ).on(txn)
+ # Wait for it to be executed. Hopefully this does not time out :-\.
+ yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
- rows = yield inTransaction(self.store.newTransaction, op2)
- self.assertEquals(rows, [])
+ self.assertEquals(DummyWorkItem.results, {})
- def op3(txn):
- return Select(
- [
- schema.DUMMY_WORK_DONE.WORK_ID,
- schema.DUMMY_WORK_DONE.A_PLUS_B,
- ],
- From=schema.DUMMY_WORK_DONE
- ).on(txn)
- rows = yield inTransaction(self.store.newTransaction, op3)
- self.assertEquals(rows, [])
-
-
class DummyProposal(object):
def __init__(self, *ignored):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140514/07d88b1d/attachment-0001.html>
More information about the calendarserver-changes
mailing list