<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[13472] twext/trunk</title>
</head>
<body>

<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt;  }
#msg dl a { font-weight: bold}
#msg dl a:link    { color:#fc3; }
#msg dl a:active  { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff  {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/13472">13472</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-05-14 12:50:12 -0700 (Wed, 14 May 2014)</dd>
</dl>

<h3>Log Message</h3>
<pre>New jobqueue implementation.</pre>

<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterpriseadbapi2py">twext/trunk/twext/enterprise/adbapi2.py</a></li>
<li><a href="#twexttrunktwextenterprisedalmodelpy">twext/trunk/twext/enterprise/dal/model.py</a></li>
<li><a href="#twexttrunktwextenterprisedalrecordpy">twext/trunk/twext/enterprise/dal/record.py</a></li>
<li><a href="#twexttrunktwextenterprisedalsyntaxpy">twext/trunk/twext/enterprise/dal/syntax.py</a></li>
<li><a href="#twexttrunktwextenterprisejobqueuepy">twext/trunk/twext/enterprise/jobqueue.py</a></li>
<li><a href="#twexttrunktwextenterprisequeuepy">twext/trunk/twext/enterprise/queue.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_jobqueuepy">twext/trunk/twext/enterprise/test/test_jobqueue.py</a></li>
</ul>

<h3>Property Changed</h3>
<ul>
<li><a href="#twexttrunk">twext/trunk/</a></li>
<li><a href="#twexttrunktwext">twext/trunk/twext/</a></li>
</ul>

</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunk"></a>
<div class="propset"><h4>Property changes: twext/trunk</h4>
<pre class="diff"><span>
</span></pre></div>
<a id="svnmergeinfo"></a>
<div class="modfile"><h4>Modified: svn:mergeinfo</h4></div>
<span class="cx">   + /twext/branches/users/cdaboo/jobqueue-3:13444-13471
</span><span class="cx">/twext/branches/users/cdaboo/jobs:12742-12780
</span><a id="twexttrunktwext"></a>
<div class="propset"><h4>Property changes: twext/trunk/twext</h4>
<pre class="diff"><span>
</span></pre></div>
<a id="svnmergeinfo"></a>
<div class="modfile"><h4>Modified: svn:mergeinfo</h4></div>
<span class="cx">/CalendarServer/branches/config-separation/twext:4379-4443
</span><span class="cx">/CalendarServer/branches/egg-info-351/twext:4589-4625
</span><span class="cx">/CalendarServer/branches/generic-sqlstore/twext:6167-6191
</span><span class="cx">/CalendarServer/branches/new-store/twext:5594-5934
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile/twext:5911-5935
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile-2/twext:5936-5981
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-4.3-dev/twext:10180-10190,10192
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-5.1-dev/twext:11846
</span><span class="cx">/CalendarServer/branches/users/cdaboo/batchupload-6699/twext:6700-7198
</span><span class="cx">/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/twext:5693-5702
</span><span class="cx">/CalendarServer/branches/users/cdaboo/component-set-fixes/twext:8130-8346
</span><span class="cx">/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/twext:3628-3644
</span><span class="cx">/CalendarServer/branches/users/cdaboo/fix-no-ischedule/twext:11607-11871
</span><span class="cx">/CalendarServer/branches/users/cdaboo/implicituidrace/twext:8137-8141
</span><span class="cx">/CalendarServer/branches/users/cdaboo/ischedule-dkim/twext:9747-9979
</span><span class="cx">/CalendarServer/branches/users/cdaboo/json/twext:11622-11912
</span><span class="cx">/CalendarServer/branches/users/cdaboo/managed-attachments/twext:9985-10145
</span><span class="cx">/CalendarServer/branches/users/cdaboo/more-sharing-5591/twext:5592-5601
</span><span class="cx">/CalendarServer/branches/users/cdaboo/partition-4464/twext:4465-4957
</span><span class="cx">/CalendarServer/branches/users/cdaboo/performance-tweaks/twext:11824-11836
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pods/twext:7297-7377
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycalendar/twext:7085-7206
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycard/twext:7227-7237
</span><span class="cx">/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twext:7740-8287
</span><span class="cx">/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/twext:5071-5105
</span><span class="cx">/CalendarServer/branches/users/cdaboo/reverse-proxy-pods/twext:11875-11900
</span><span class="cx">/CalendarServer/branches/users/cdaboo/shared-calendars-5187/twext:5188-5440
</span><span class="cx">/CalendarServer/branches/users/cdaboo/sharing-in-the-store/twext:11935-12016
</span><span class="cx">/CalendarServer/branches/users/cdaboo/store-scheduling/twext:10876-11129
</span><span class="cx">/CalendarServer/branches/users/cdaboo/timezones/twext:7443-7699
</span><span class="cx">/CalendarServer/branches/users/cdaboo/txn-debugging/twext:8730-8743
</span><span class="cx">/CalendarServer/branches/users/gaya/sharedgroups-3/twext:11088-11204
</span><span class="cx">/CalendarServer/branches/users/glyph/always-abort-txn-on-error/twext:9958-9969
</span><span class="cx">/CalendarServer/branches/users/glyph/case-insensitive-uid/twext:8772-8805
</span><span class="cx">/CalendarServer/branches/users/glyph/conn-limit/twext:6574-6577
</span><span class="cx">/CalendarServer/branches/users/glyph/contacts-server-merge/twext:4971-5080
</span><span class="cx">/CalendarServer/branches/users/glyph/dalify/twext:6932-7023
</span><span class="cx">/CalendarServer/branches/users/glyph/db-reconnect/twext:6824-6876
</span><span class="cx">/CalendarServer/branches/users/glyph/deploybuild/twext:7563-7572
</span><span class="cx">/CalendarServer/branches/users/glyph/digest-auth-redux/twext:10624-10635
</span><span class="cx">/CalendarServer/branches/users/glyph/disable-quota/twext:7718-7727
</span><span class="cx">/CalendarServer/branches/users/glyph/dont-start-postgres/twext:6592-6614
</span><span class="cx">/CalendarServer/branches/users/glyph/enforce-max-requests/twext:11640-11643
</span><span class="cx">/CalendarServer/branches/users/glyph/hang-fix/twext:11465-11491
</span><span class="cx">/CalendarServer/branches/users/glyph/imip-and-admin-html/twext:7866-7984
</span><span class="cx">/CalendarServer/branches/users/glyph/ipv6-client/twext:9054-9105
</span><span class="cx">/CalendarServer/branches/users/glyph/launchd-wrapper-bis/twext:11413-11436
</span><span class="cx">/CalendarServer/branches/users/glyph/linux-tests/twext:6893-6900
</span><span class="cx">/CalendarServer/branches/users/glyph/log-cleanups/twext:11691-11731
</span><span class="cx">/CalendarServer/branches/users/glyph/migrate-merge/twext:8690-8713
</span><span class="cx">/CalendarServer/branches/users/glyph/misc-portability-fixes/twext:7365-7374
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-6/twext:6322-6368
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-7/twext:6369-6445
</span><span class="cx">/CalendarServer/branches/users/glyph/multiget-delete/twext:8321-8330
</span><span class="cx">/CalendarServer/branches/users/glyph/new-export/twext:7444-7485
</span><span class="cx">/CalendarServer/branches/users/glyph/one-home-list-api/twext:10048-10073
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle/twext:7106-7155
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle-nulls/twext:7340-7351
</span><span class="cx">/CalendarServer/branches/users/glyph/other-html/twext:8062-8091
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-sim/twext:8240-8251
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade/twext:8376-8400
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext:8571-8583
</span><span class="cx">/CalendarServer/branches/users/glyph/q/twext:9560-9688
</span><span class="cx">/CalendarServer/branches/users/glyph/queue-locking-and-timing/twext:10204-10289
</span><span class="cx">/CalendarServer/branches/users/glyph/quota/twext:7604-7637
</span><span class="cx">/CalendarServer/branches/users/glyph/sendfdport/twext:5388-5424
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-fixes/twext:8436-8443
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-take2/twext:8155-8174
</span><span class="cx">/CalendarServer/branches/users/glyph/sharedpool/twext:6490-6550
</span><span class="cx">/CalendarServer/branches/users/glyph/sharing-api/twext:9192-9205
</span><span class="cx">/CalendarServer/branches/users/glyph/skip-lonely-vtimezones/twext:8524-8535
</span><span class="cx">/CalendarServer/branches/users/glyph/sql-store/twext:5929-6073
</span><span class="cx">/CalendarServer/branches/users/glyph/start-service-start-loop/twext:11060-11065
</span><span class="cx">/CalendarServer/branches/users/glyph/subtransactions/twext:7248-7258
</span><span class="cx">/CalendarServer/branches/users/glyph/table-alias/twext:8651-8664
</span><span class="cx">/CalendarServer/branches/users/glyph/uidexport/twext:7673-7676
</span><span class="cx">/CalendarServer/branches/users/glyph/unshare-when-access-revoked/twext:10562-10595
</span><span class="cx">/CalendarServer/branches/users/glyph/use-system-twisted/twext:5084-5149
</span><span class="cx">/CalendarServer/branches/users/glyph/uuid-normalize/twext:9268-9296
</span><span class="cx">/CalendarServer/branches/users/glyph/warning-cleanups/twext:11347-11357
</span><span class="cx">/CalendarServer/branches/users/glyph/whenNotProposed/twext:11881-11897
</span><span class="cx">/CalendarServer/branches/users/glyph/xattrs-from-files/twext:7757-7769
</span><span class="cx">/CalendarServer/branches/users/sagen/applepush/twext:8126-8184
</span><span class="cx">/CalendarServer/branches/users/sagen/inboxitems/twext:7380-7381
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources/twext:5032-5051
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources-2/twext:5052-5061
</span><span class="cx">/CalendarServer/branches/users/sagen/purge_old_events/twext:6735-6746
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4038/twext:4040-4067
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4066/twext:4068-4075
</span><span class="cx">/CalendarServer/branches/users/sagen/resources-2/twext:5084-5093
</span><span class="cx">/CalendarServer/branches/users/sagen/testing/twext:10827-10851,10853-10855
</span><span class="cx">/CalendarServer/branches/users/wsanchez/transations/twext:5515-5593
</span><span class="cx">/twext/branches/users/cdaboo/jobs/twext:12742-12780
</span><span class="cx">   + /CalDAVTester/trunk/twext:11193-11198
</span><span class="cx">/CalendarServer/branches/config-separation/twext:4379-4443
</span><span class="cx">/CalendarServer/branches/egg-info-351/twext:4589-4625
</span><span class="cx">/CalendarServer/branches/generic-sqlstore/twext:6167-6191
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile-2/twext:5936-5981
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile/twext:5911-5935
</span><span class="cx">/CalendarServer/branches/new-store/twext:5594-5934
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-4.3-dev/twext:10180-10190,10192
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-5.1-dev/twext:11846
</span><span class="cx">/CalendarServer/branches/users/cdaboo/batchupload-6699/twext:6700-7198
</span><span class="cx">/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/twext:5693-5702
</span><span class="cx">/CalendarServer/branches/users/cdaboo/component-set-fixes/twext:8130-8346
</span><span class="cx">/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/twext:3628-3644
</span><span class="cx">/CalendarServer/branches/users/cdaboo/fix-no-ischedule/twext:11607-11871
</span><span class="cx">/CalendarServer/branches/users/cdaboo/implicituidrace/twext:8137-8141
</span><span class="cx">/CalendarServer/branches/users/cdaboo/ischedule-dkim/twext:9747-9979
</span><span class="cx">/CalendarServer/branches/users/cdaboo/json/twext:11622-11912
</span><span class="cx">/CalendarServer/branches/users/cdaboo/managed-attachments/twext:9985-10145
</span><span class="cx">/CalendarServer/branches/users/cdaboo/more-sharing-5591/twext:5592-5601
</span><span class="cx">/CalendarServer/branches/users/cdaboo/partition-4464/twext:4465-4957
</span><span class="cx">/CalendarServer/branches/users/cdaboo/performance-tweaks/twext:11824-11836
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pods/twext:7297-7377
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycalendar/twext:7085-7206
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycard/twext:7227-7237
</span><span class="cx">/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twext:7740-8287
</span><span class="cx">/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/twext:5071-5105
</span><span class="cx">/CalendarServer/branches/users/cdaboo/reverse-proxy-pods/twext:11875-11900
</span><span class="cx">/CalendarServer/branches/users/cdaboo/shared-calendars-5187/twext:5188-5440
</span><span class="cx">/CalendarServer/branches/users/cdaboo/sharing-in-the-store/twext:11935-12016
</span><span class="cx">/CalendarServer/branches/users/cdaboo/store-scheduling/twext:10876-11129
</span><span class="cx">/CalendarServer/branches/users/cdaboo/timezones/twext:7443-7699
</span><span class="cx">/CalendarServer/branches/users/cdaboo/txn-debugging/twext:8730-8743
</span><span class="cx">/CalendarServer/branches/users/gaya/sharedgroups-3/twext:11088-11204
</span><span class="cx">/CalendarServer/branches/users/glyph/always-abort-txn-on-error/twext:9958-9969
</span><span class="cx">/CalendarServer/branches/users/glyph/case-insensitive-uid/twext:8772-8805
</span><span class="cx">/CalendarServer/branches/users/glyph/conn-limit/twext:6574-6577
</span><span class="cx">/CalendarServer/branches/users/glyph/contacts-server-merge/twext:4971-5080
</span><span class="cx">/CalendarServer/branches/users/glyph/dalify/twext:6932-7023
</span><span class="cx">/CalendarServer/branches/users/glyph/db-reconnect/twext:6824-6876
</span><span class="cx">/CalendarServer/branches/users/glyph/deploybuild/twext:7563-7572
</span><span class="cx">/CalendarServer/branches/users/glyph/digest-auth-redux/twext:10624-10635
</span><span class="cx">/CalendarServer/branches/users/glyph/disable-quota/twext:7718-7727
</span><span class="cx">/CalendarServer/branches/users/glyph/dont-start-postgres/twext:6592-6614
</span><span class="cx">/CalendarServer/branches/users/glyph/enforce-max-requests/twext:11640-11643
</span><span class="cx">/CalendarServer/branches/users/glyph/hang-fix/twext:11465-11491
</span><span class="cx">/CalendarServer/branches/users/glyph/imip-and-admin-html/twext:7866-7984
</span><span class="cx">/CalendarServer/branches/users/glyph/ipv6-client/twext:9054-9105
</span><span class="cx">/CalendarServer/branches/users/glyph/launchd-wrapper-bis/twext:11413-11436
</span><span class="cx">/CalendarServer/branches/users/glyph/linux-tests/twext:6893-6900
</span><span class="cx">/CalendarServer/branches/users/glyph/log-cleanups/twext:11691-11731
</span><span class="cx">/CalendarServer/branches/users/glyph/migrate-merge/twext:8690-8713
</span><span class="cx">/CalendarServer/branches/users/glyph/misc-portability-fixes/twext:7365-7374
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-6/twext:6322-6368
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-7/twext:6369-6445
</span><span class="cx">/CalendarServer/branches/users/glyph/multiget-delete/twext:8321-8330
</span><span class="cx">/CalendarServer/branches/users/glyph/new-export/twext:7444-7485
</span><span class="cx">/CalendarServer/branches/users/glyph/one-home-list-api/twext:10048-10073
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle-nulls/twext:7340-7351
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle/twext:7106-7155
</span><span class="cx">/CalendarServer/branches/users/glyph/other-html/twext:8062-8091
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-sim/twext:8240-8251
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade/twext:8376-8400
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext:8571-8583
</span><span class="cx">/CalendarServer/branches/users/glyph/q/twext:9560-9688
</span><span class="cx">/CalendarServer/branches/users/glyph/queue-locking-and-timing/twext:10204-10289
</span><span class="cx">/CalendarServer/branches/users/glyph/quota/twext:7604-7637
</span><span class="cx">/CalendarServer/branches/users/glyph/sendfdport/twext:5388-5424
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-fixes/twext:8436-8443
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-take2/twext:8155-8174
</span><span class="cx">/CalendarServer/branches/users/glyph/sharedpool/twext:6490-6550
</span><span class="cx">/CalendarServer/branches/users/glyph/sharing-api/twext:9192-9205
</span><span class="cx">/CalendarServer/branches/users/glyph/skip-lonely-vtimezones/twext:8524-8535
</span><span class="cx">/CalendarServer/branches/users/glyph/sql-store/twext:5929-6073
</span><span class="cx">/CalendarServer/branches/users/glyph/start-service-start-loop/twext:11060-11065
</span><span class="cx">/CalendarServer/branches/users/glyph/subtransactions/twext:7248-7258
</span><span class="cx">/CalendarServer/branches/users/glyph/table-alias/twext:8651-8664
</span><span class="cx">/CalendarServer/branches/users/glyph/uidexport/twext:7673-7676
</span><span class="cx">/CalendarServer/branches/users/glyph/unshare-when-access-revoked/twext:10562-10595
</span><span class="cx">/CalendarServer/branches/users/glyph/use-system-twisted/twext:5084-5149
</span><span class="cx">/CalendarServer/branches/users/glyph/uuid-normalize/twext:9268-9296
</span><span class="cx">/CalendarServer/branches/users/glyph/warning-cleanups/twext:11347-11357
</span><span class="cx">/CalendarServer/branches/users/glyph/whenNotProposed/twext:11881-11897
</span><span class="cx">/CalendarServer/branches/users/glyph/xattrs-from-files/twext:7757-7769
</span><span class="cx">/CalendarServer/branches/users/sagen/applepush/twext:8126-8184
</span><span class="cx">/CalendarServer/branches/users/sagen/inboxitems/twext:7380-7381
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources-2/twext:5052-5061
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources/twext:5032-5051
</span><span class="cx">/CalendarServer/branches/users/sagen/purge_old_events/twext:6735-6746
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4038/twext:4040-4067
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4066/twext:4068-4075
</span><span class="cx">/CalendarServer/branches/users/sagen/resources-2/twext:5084-5093
</span><span class="cx">/CalendarServer/branches/users/sagen/testing/twext:10827-10851,10853-10855
</span><span class="cx">/CalendarServer/branches/users/wsanchez/transations/twext:5515-5593
</span><span class="cx">/twext/branches/users/cdaboo/jobqueue-3/twext:13444-13471
</span><span class="cx">/twext/branches/users/cdaboo/jobs/twext:12742-12780
</span><a id="twexttrunktwextenterpriseadbapi2py"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/adbapi2.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/adbapi2.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/adbapi2.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -154,13 +154,14 @@
</span><span class="cx"> 
</span><span class="cx">     noisy = False
</span><span class="cx"> 
</span><del>-    def __init__(self, pool, threadHolder, connection, cursor):
</del><ins>+    def __init__(self, pool, threadHolder, connection, cursor, label=None):
</ins><span class="cx">         self._pool = pool
</span><span class="cx">         self._completed = &quot;idle&quot;
</span><span class="cx">         self._cursor = cursor
</span><span class="cx">         self._connection = connection
</span><span class="cx">         self._holder = threadHolder
</span><span class="cx">         self._first = True
</span><ins>+        self._label = label
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @_forward
</span><span class="lines">@@ -402,10 +403,11 @@
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     implements(IAsyncTransaction)
</span><span class="cx"> 
</span><del>-    def __init__(self, pool, reason):
</del><ins>+    def __init__(self, pool, reason, label=None):
</ins><span class="cx">         self.paramstyle = pool.paramstyle
</span><span class="cx">         self.dialect = pool.dialect
</span><span class="cx">         self.reason = reason
</span><ins>+        self._label = label
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def _everything(self, *a, **kw):
</span><span class="lines">@@ -430,7 +432,7 @@
</span><span class="cx"> 
</span><span class="cx">     implements(IAsyncTransaction)
</span><span class="cx"> 
</span><del>-    def __init__(self, pool):
</del><ins>+    def __init__(self, pool, label=None):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Initialize a L{_WaitingTxn} based on a L{ConnectionPool}.  (The C{pool}
</span><span class="cx">         is used only to reflect C{dialect} and C{paramstyle} attributes; not
</span><span class="lines">@@ -439,6 +441,7 @@
</span><span class="cx">         self._spool = []
</span><span class="cx">         self.paramstyle = pool.paramstyle
</span><span class="cx">         self.dialect = pool.dialect
</span><ins>+        self._label = label
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def _enspool(self, cmd, a=(), kw={}):
</span><span class="lines">@@ -593,6 +596,7 @@
</span><span class="cx">         super(_SingleTxn, self).__init__()
</span><span class="cx">         self._pool = pool
</span><span class="cx">         self._baseTxn = baseTxn
</span><ins>+        self._label = self._baseTxn._label
</ins><span class="cx">         self._completed = False
</span><span class="cx">         self._currentBlock = None
</span><span class="cx">         self._blockedQueue = None
</span><span class="lines">@@ -718,7 +722,8 @@
</span><span class="cx">         self._unspoolOnto(_NoTxn(
</span><span class="cx">             self._pool,
</span><span class="cx">             &quot;connection pool shut down while txn &quot;
</span><del>-            &quot;waiting for database connection.&quot;
</del><ins>+            &quot;waiting for database connection.&quot;,
+            label=self._label,
</ins><span class="cx">         ))
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -746,7 +751,7 @@
</span><span class="cx">         self._checkComplete()
</span><span class="cx">         block = CommandBlock(self)
</span><span class="cx">         if self._currentBlock is None:
</span><del>-            self._blockedQueue = _WaitingTxn(self._pool)
</del><ins>+            self._blockedQueue = _WaitingTxn(self._pool, label=self._label)
</ins><span class="cx">             # FIXME: test the case where it's ready immediately.
</span><span class="cx">             self._checkNextBlock()
</span><span class="cx">         return block
</span><span class="lines">@@ -795,7 +800,7 @@
</span><span class="cx">         self._singleTxn = singleTxn
</span><span class="cx">         self.paramstyle = singleTxn.paramstyle
</span><span class="cx">         self.dialect = singleTxn.dialect
</span><del>-        self._spool = _WaitingTxn(singleTxn._pool)
</del><ins>+        self._spool = _WaitingTxn(singleTxn._pool, label=singleTxn._label)
</ins><span class="cx">         self._started = False
</span><span class="cx">         self._ended = False
</span><span class="cx">         self._waitingForEnd = []
</span><span class="lines">@@ -1067,14 +1072,15 @@
</span><span class="cx">         if self._stopping:
</span><span class="cx">             # FIXME: should be wrapping a _SingleTxn around this to get
</span><span class="cx">             # .commandBlock()
</span><del>-            return _NoTxn(self, &quot;txn created while DB pool shutting down&quot;)
</del><ins>+            return _NoTxn(self, &quot;txn created while DB pool shutting down&quot;, label=label)
</ins><span class="cx"> 
</span><span class="cx">         if self._free:
</span><span class="cx">             basetxn = self._free.pop(0)
</span><ins>+            basetxn._label = label
</ins><span class="cx">             self._busy.append(basetxn)
</span><span class="cx">             txn = _SingleTxn(self, basetxn)
</span><span class="cx">         else:
</span><del>-            txn = _SingleTxn(self, _WaitingTxn(self))
</del><ins>+            txn = _SingleTxn(self, _WaitingTxn(self, label=label))
</ins><span class="cx">             self._waiting.append(txn)
</span><span class="cx">             # FIXME/TESTME: should be len(self._busy) + len(self._finishing)
</span><span class="cx">             # (free doesn't need to be considered, as it's tested above)
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalmodelpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/model.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/model.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/model.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -221,12 +221,12 @@
</span><span class="cx"> 
</span><span class="cx">     compareAttributes = 'table name'.split()
</span><span class="cx"> 
</span><del>-    def __init__(self, table, name, type):
</del><ins>+    def __init__(self, table, name, type, default=NO_DEFAULT):
</ins><span class="cx">         _checkstr(name)
</span><span class="cx">         self.table = table
</span><span class="cx">         self.name = name
</span><span class="cx">         self.type = type
</span><del>-        self.default = NO_DEFAULT
</del><ins>+        self.default = default
</ins><span class="cx">         self.references = None
</span><span class="cx">         self.deleteAction = None
</span><span class="cx"> 
</span><span class="lines">@@ -253,14 +253,16 @@
</span><span class="cx">             # Some DBs don't allow sequence as a default
</span><span class="cx">             if (
</span><span class="cx">                 isinstance(self.default, Sequence) and other.default == NO_DEFAULT or
</span><del>-                self.default == NO_DEFAULT and isinstance(other.default, Sequence)
</del><ins>+                self.default == NO_DEFAULT and isinstance(other.default, Sequence) or
+                self.default is None and other.default == NO_DEFAULT or
+                self.default == NO_DEFAULT and other.default is None
</ins><span class="cx">             ):
</span><span class="cx">                 pass
</span><span class="cx">             else:
</span><span class="cx">                 results.append(&quot;Table: %s, column name %s default mismatch&quot; % (self.table.name, self.name,))
</span><span class="cx">         if stringIfNone(self.references, &quot;name&quot;) != stringIfNone(other.references, &quot;name&quot;):
</span><span class="cx">             results.append(&quot;Table: %s, column name %s references mismatch&quot; % (self.table.name, self.name,))
</span><del>-        if self.deleteAction != other.deleteAction:
</del><ins>+        if stringIfNone(self.deleteAction, &quot;&quot;) != stringIfNone(other.deleteAction, &quot;&quot;):
</ins><span class="cx">             results.append(&quot;Table: %s, column name %s delete action mismatch&quot; % (self.table.name, self.name,))
</span><span class="cx">         return results
</span><span class="cx"> 
</span><span class="lines">@@ -403,7 +405,7 @@
</span><span class="cx">         raise KeyError(&quot;no such column: %r&quot; % (name,))
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    def addColumn(self, name, type):
</del><ins>+    def addColumn(self, name, type, default=NO_DEFAULT, notNull=False, primaryKey=False):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         A new column was parsed for this table.
</span><span class="cx"> 
</span><span class="lines">@@ -413,8 +415,12 @@
</span><span class="cx"> 
</span><span class="cx">         @param type: The L{SQLType} describing the column's type.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        column = Column(self, name, type)
</del><ins>+        column = Column(self, name, type, default=default)
</ins><span class="cx">         self.columns.append(column)
</span><ins>+        if notNull:
+            self.tableConstraint(Constraint.NOT_NULL, [name])
+        if primaryKey:
+            self.primaryKey = [column]
</ins><span class="cx">         return column
</span><span class="cx"> 
</span><span class="cx"> 
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalrecordpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/record.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/record.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/record.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -352,7 +352,7 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><del>-    def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False):
</del><ins>+    def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False, limit=None):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Query the table that corresponds to C{cls}, and return instances of
</span><span class="cx">         C{cls} corresponding to the rows that are returned from that table.
</span><span class="lines">@@ -385,6 +385,8 @@
</span><span class="cx">             kw.update(ForUpdate=True)
</span><span class="cx">             if noWait:
</span><span class="cx">                 kw.update(NoWait=True)
</span><ins>+        if limit is not None:
+            kw.update(Limit=limit)
</ins><span class="cx">         return cls._rowsFromQuery(
</span><span class="cx">             transaction,
</span><span class="cx">             Select(
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalsyntaxpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/syntax.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/syntax.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/syntax.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -1433,9 +1433,12 @@
</span><span class="cx">                 stmt.append(SQLFragment(kw))
</span><span class="cx"> 
</span><span class="cx">         if self.ForUpdate:
</span><del>-            stmt.text += &quot; for update&quot;
-            if self.NoWait:
-                stmt.text += &quot; nowait&quot;
</del><ins>+            # FOR UPDATE not supported with sqlite - but that is probably not relevant
+            # given that sqlite does file level locking of the DB
+            if queryGenerator.dialect != SQLITE_DIALECT:
+                stmt.text += &quot; for update&quot;
+                if self.NoWait:
+                    stmt.text += &quot; nowait&quot;
</ins><span class="cx"> 
</span><span class="cx">         if self.Limit is not None:
</span><span class="cx">             limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobqueue.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobqueue.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/jobqueue.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -81,7 +81,8 @@
</span><span class="cx"> &quot;&quot;&quot;
</span><span class="cx"> 
</span><span class="cx"> from functools import wraps
</span><del>-from datetime import datetime
</del><ins>+from datetime import datetime, timedelta
+from collections import namedtuple
</ins><span class="cx"> 
</span><span class="cx"> from zope.interface import implements
</span><span class="cx"> 
</span><span class="lines">@@ -91,12 +92,12 @@
</span><span class="cx">     inlineCallbacks, returnValue, Deferred, passthru, succeed
</span><span class="cx"> )
</span><span class="cx"> from twisted.internet.endpoints import TCP4ClientEndpoint
</span><del>-from twisted.protocols.amp import AMP, Command, Integer, String
</del><ins>+from twisted.protocols.amp import AMP, Command, Integer, String, Argument
</ins><span class="cx"> from twisted.python.reflect import qual
</span><del>-from twisted.python import log
</del><ins>+from twext.python.log import Logger
</ins><span class="cx"> 
</span><span class="cx"> from twext.enterprise.dal.syntax import (
</span><del>-    SchemaSyntax, Lock, NamedValue, Select, Count
</del><ins>+    SchemaSyntax, Lock, NamedValue
</ins><span class="cx"> )
</span><span class="cx"> 
</span><span class="cx"> from twext.enterprise.dal.model import ProcedureCall
</span><span class="lines">@@ -109,7 +110,10 @@
</span><span class="cx"> from zope.interface.interface import Interface
</span><span class="cx"> from twext.enterprise.locking import NamedLock
</span><span class="cx"> 
</span><ins>+import time
</ins><span class="cx"> 
</span><ins>+log = Logger()
+
</ins><span class="cx"> class _IJobPerformer(Interface):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     An object that can perform work.
</span><span class="lines">@@ -118,10 +122,10 @@
</span><span class="cx">     (in the worst case) pass from worker-&gt;controller-&gt;controller-&gt;worker.
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx"> 
</span><del>-    def performJob(jobID):  # @NoSelf
</del><ins>+    def performJob(job):  # @NoSelf
</ins><span class="cx">         &quot;&quot;&quot;
</span><del>-        @param jobID: The primary key identifier of the given job.
-        @type jobID: L{int}
</del><ins>+        @param job: Details about the job to perform.
+        @type job: L{JobDescriptor}
</ins><span class="cx"> 
</span><span class="cx">         @return: a L{Deferred} firing with an empty dictionary when the work is
</span><span class="cx">             complete.
</span><span class="lines">@@ -180,17 +184,13 @@
</span><span class="cx">     # transaction is made aware of somehow.
</span><span class="cx">     JobTable = Table(inSchema, &quot;JOB&quot;)
</span><span class="cx"> 
</span><del>-    JobTable.addColumn(&quot;JOB_ID&quot;, SQLType(&quot;integer&quot;, None)).setDefaultValue(
-        ProcedureCall(&quot;nextval&quot;, [&quot;JOB_SEQ&quot;])
-    )
-    JobTable.addColumn(&quot;WORK_TYPE&quot;, SQLType(&quot;varchar&quot;, 255))
-    JobTable.addColumn(&quot;PRIORITY&quot;, SQLType(&quot;integer&quot;, 0))
-    JobTable.addColumn(&quot;WEIGHT&quot;, SQLType(&quot;integer&quot;, 0))
-    JobTable.addColumn(&quot;NOT_BEFORE&quot;, SQLType(&quot;timestamp&quot;, None))
-    JobTable.addColumn(&quot;NOT_AFTER&quot;, SQLType(&quot;timestamp&quot;, None))
-    for column in (&quot;JOB_ID&quot;, &quot;WORK_TYPE&quot;):
-        JobTable.tableConstraint(Constraint.NOT_NULL, [column])
-    JobTable.primaryKey = [JobTable.columnNamed(&quot;JOB_ID&quot;), ]
</del><ins>+    JobTable.addColumn(&quot;JOB_ID&quot;, SQLType(&quot;integer&quot;, None), default=ProcedureCall(&quot;nextval&quot;, [&quot;JOB_SEQ&quot;]), notNull=True, primaryKey=True)
+    JobTable.addColumn(&quot;WORK_TYPE&quot;, SQLType(&quot;varchar&quot;, 255), notNull=True)
+    JobTable.addColumn(&quot;PRIORITY&quot;, SQLType(&quot;integer&quot;, 0), default=0)
+    JobTable.addColumn(&quot;WEIGHT&quot;, SQLType(&quot;integer&quot;, 0), default=0)
+    JobTable.addColumn(&quot;NOT_BEFORE&quot;, SQLType(&quot;timestamp&quot;, None), notNull=True)
+    JobTable.addColumn(&quot;ASSIGNED&quot;, SQLType(&quot;timestamp&quot;, None), default=None)
+    JobTable.addColumn(&quot;FAILED&quot;, SQLType(&quot;integer&quot;, 0), default=0)
</ins><span class="cx"> 
</span><span class="cx">     return inSchema
</span><span class="cx"> 
</span><span class="lines">@@ -199,7 +199,7 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> @inlineCallbacks
</span><del>-def inTransaction(transactionCreator, operation, label=&quot;jobqueue.inTransaction&quot;):
</del><ins>+def inTransaction(transactionCreator, operation, label=&quot;jobqueue.inTransaction&quot;, **kwargs):
</ins><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     Perform the given operation in a transaction, committing or aborting as
</span><span class="cx">     required.
</span><span class="lines">@@ -218,7 +218,7 @@
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     txn = transactionCreator(label=label)
</span><span class="cx">     try:
</span><del>-        result = yield operation(txn)
</del><ins>+        result = yield operation(txn, **kwargs)
</ins><span class="cx">     except:
</span><span class="cx">         f = Failure()
</span><span class="cx">         yield txn.abort()
</span><span class="lines">@@ -270,6 +270,16 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+class JobFailedError(Exception):
+    &quot;&quot;&quot;
+    A job failed to run - we need to be smart about clean up.
+    &quot;&quot;&quot;
+
+    def __init__(self, ex):
+        self._ex = ex
+
+
+
</ins><span class="cx"> class JobItem(Record, fromTable(JobInfoSchema.JOB)):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     An item in the job table. This is typically not directly used by code
</span><span class="lines">@@ -277,6 +287,10 @@
</span><span class="cx">     associated with work items.
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx"> 
</span><ins>+    def descriptor(self):
+        return JobDescriptor(self.jobID, self.weight)
+
+
</ins><span class="cx">     @inlineCallbacks
</span><span class="cx">     def workItem(self):
</span><span class="cx">         workItemClass = WorkItem.forTableName(self.workType)
</span><span class="lines">@@ -286,7 +300,183 @@
</span><span class="cx">         returnValue(workItems[0] if len(workItems) == 1 else None)
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    def assign(self, now):
+        &quot;&quot;&quot;
+        Mark this job as assigned to a worker by setting the assigned column to the current,
+        or provided, timestamp.
+
+        @param now: current timestamp
+        @type now: L{datetime.datetime}
+        @param when: explicitly set the assigned time - typically only used in tests
+        @type when: L{datetime.datetime} or L{None}
+        &quot;&quot;&quot;
+        return self.update(assigned=now)
+
+
+    def failedToRun(self):
+        &quot;&quot;&quot;
+        The attempt to run the job failed. Leave it in the queue, but mark it
+        as unassigned, bump the failure count and set to run at some point in
+        the future.
+        &quot;&quot;&quot;
+        return self.update(
+            assigned=None,
+            failed=self.failed + 1,
+            notBefore=datetime.utcnow() + timedelta(seconds=60)
+        )
+
+
+    @classmethod
</ins><span class="cx">     @inlineCallbacks
</span><ins>+    def ultimatelyPerform(cls, txnFactory, jobID):
+        &quot;&quot;&quot;
+        Eventually, after routing the job to the appropriate place, somebody
+        actually has to I{do} it.
+
+        @param txnFactory: a 0- or 1-argument callable that creates an
+            L{IAsyncTransaction}
+        @type txnFactory: L{callable}
+
+        @param jobID: the ID of the job to be performed
+        @type jobID: L{int}
+
+        @return: a L{Deferred} which fires with C{None} when the job has been
+            performed, or fails if the job can't be performed.
+        &quot;&quot;&quot;
+
+        t = time.time()
+        def _tm():
+            return &quot;{:.3f}&quot;.format(1000 * (time.time() - t))
+        def _overtm(nb):
+            return &quot;{:.0f}&quot;.format(1000 * (t - astimestamp(nb)))
+
+        log.debug(&quot;JobItem: starting to run {jobid}&quot;.format(jobid=jobID))
+        txn = txnFactory(label=&quot;ultimatelyPerform: {}&quot;.format(jobID))
+        try:
+            job = yield cls.load(txn, jobID)
+            if hasattr(txn, &quot;_label&quot;):
+                txn._label = &quot;{} &lt;{}&gt;&quot;.format(txn._label, job.workType)
+            log.debug(&quot;JobItem: loaded {jobid} {work} t={tm}&quot;.format(
+                jobid=jobID,
+                work=job.workType,
+                tm=_tm())
+            )
+            yield job.run()
+
+        except NoSuchRecord:
+            # The record has already been removed
+            yield txn.commit()
+            log.debug(&quot;JobItem: already removed {jobid} t={tm}&quot;.format(jobid=jobID, tm=_tm()))
+
+        except JobFailedError:
+            # Job failed: abort with cleanup, but pretend this method succeeded
+            def _cleanUp():
+                @inlineCallbacks
+                def _cleanUp2(txn2):
+                    job = yield cls.load(txn2, jobID)
+                    log.debug(&quot;JobItem: marking as failed {jobid}, failure count: {count} t={tm}&quot;.format(jobid=jobID, count=job.failed + 1, tm=_tm()))
+                    yield job.failedToRun()
+                return inTransaction(txnFactory, _cleanUp2, &quot;ultimatelyPerform._cleanUp&quot;)
+            txn.postAbort(_cleanUp)
+            yield txn.abort()
+            log.debug(&quot;JobItem: failed {jobid} {work} t={tm}&quot;.format(
+                jobid=jobID,
+                work=job.workType,
+                tm=_tm()
+            ))
+
+        except:
+            f = Failure()
+            log.error(&quot;JobItem: Unknown exception for {jobid} failed t={tm} {exc}&quot;.format(
+                jobid=jobID,
+                tm=_tm(),
+                exc=f,
+            ))
+            yield txn.abort()
+            returnValue(f)
+
+        else:
+            yield txn.commit()
+            log.debug(&quot;JobItem: completed {jobid} {work} t={tm} over={over}&quot;.format(
+                jobid=jobID,
+                work=job.workType,
+                tm=_tm(),
+                over=_overtm(job.notBefore)
+            ))
+
+        returnValue(None)
+
+
+    @classmethod
+    @inlineCallbacks
+    def nextjob(cls, txn, now, minPriority, overdue):
+        &quot;&quot;&quot;
+        Find the next available job based on priority, also return any that are overdue. This
+        method relies on there being a nextjob() SQL stored procedure to enable skipping over
+        items which are row locked to help avoid contention when multiple nodes are operating
+        on the job queue simultaneously.
+
+        @param txn: the transaction to use
+        @type txn: L{IAsyncTransaction}
+        @param now: current timestamp
+        @type now: L{datetime.datetime}
+        @param minPriority: lowest priority level to query for
+        @type minPriority: L{int}
+        @param overdue: how long before an assigned item is considered overdue
+        @type overdue: L{datetime.datetime}
+
+        @return: the job record
+        @rtype: L{JobItem}
+        &quot;&quot;&quot;
+
+        jobs = yield cls.nextjobs(txn, now, minPriority, overdue, limit=1)
+
+        # Must only be one or zero
+        if jobs and len(jobs) &gt; 1:
+            raise AssertionError(&quot;next_job() returned more than one row&quot;)
+
+        returnValue(jobs[0] if jobs else None)
+
+
+    @classmethod
+    @inlineCallbacks
+    def nextjobs(cls, txn, now, minPriority, overdue, limit=1):
+        &quot;&quot;&quot;
+        Find the next available job based on priority, also return any that are overdue. This
+        method relies on there being a nextjob() SQL stored procedure to enable skipping over
+        items which are row locked to help avoid contention when multiple nodes are operating
+        on the job queue simultaneously.
+
+        @param txn: the transaction to use
+        @type txn: L{IAsyncTransaction}
+        @param now: current timestamp
+        @type now: L{datetime.datetime}
+        @param minPriority: lowest priority level to query for
+        @type minPriority: L{int}
+        @param overdue: how long before an assigned item is considered overdue
+        @type overdue: L{datetime.datetime}
+        @param limit: limit on number of jobs to return
+        @type limit: L{int}
+
+        @return: the job record
+        @rtype: L{JobItem}
+        &quot;&quot;&quot;
+
+        jobs = yield cls.query(
+            txn,
+            (cls.notBefore &lt;= now).And
+            (((cls.priority &gt;= minPriority).And(cls.assigned == None)).Or(cls.assigned &lt; overdue)),
+            order=(cls.assigned, cls.priority),
+            ascending=False,
+            forUpdate=True,
+            noWait=False,
+            limit=limit,
+        )
+
+        returnValue(jobs)
+
+
+    @inlineCallbacks
</ins><span class="cx">     def run(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Run this job item by finding the appropriate work item class and
</span><span class="lines">@@ -304,7 +494,15 @@
</span><span class="cx">                 # The record has already been removed
</span><span class="cx">                 pass
</span><span class="cx">             else:
</span><del>-                yield workItem.doWork()
</del><ins>+                try:
+                    yield workItem.doWork()
+                except Exception as e:
+                    log.error(&quot;JobItem: {jobid}, WorkItem: {workid} failed: {exc}&quot;.format(
+                        jobid=self.jobID,
+                        workid=workItem.workID,
+                        exc=e,
+                    ))
+                    raise JobFailedError(e)
</ins><span class="cx"> 
</span><span class="cx">         try:
</span><span class="cx">             # Once the work is done we delete ourselves
</span><span class="lines">@@ -325,22 +523,48 @@
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><span class="cx">     @inlineCallbacks
</span><ins>+    def waitEmpty(cls, txnCreator, reactor, timeout):
+        &quot;&quot;&quot;
+        Wait for the job queue to drain. Only use this in tests
+        that need to wait for results from jobs.
+        &quot;&quot;&quot;
+        t = time.time()
+        while True:
+            work = yield inTransaction(txnCreator, cls.all)
+            if not work:
+                break
+            if time.time() - t &gt; timeout:
+                returnValue(False)
+            d = Deferred()
+            reactor.callLater(0.1, lambda : d.callback(None))
+            yield d
+
+        returnValue(True)
+
+
+    @classmethod
+    @inlineCallbacks
</ins><span class="cx">     def histogram(cls, txn):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Generate a histogram of work items currently in the queue.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        jb = JobInfoSchema.JOB
-        rows = yield Select(
-            [jb.WORK_TYPE, Count(jb.WORK_TYPE)],
-            From=jb,
-            GroupBy=jb.WORK_TYPE
-        ).on(txn)
-        results = dict(rows)
-
-        # Add in empty data for other work
</del><ins>+        results = {}
+        now = datetime.utcnow()
</ins><span class="cx">         for workType in cls.workTypes():
</span><del>-            results.setdefault(workType.table.model.name, 0)
</del><ins>+            results.setdefault(workType.table.model.name, [0, 0, 0, 0])
</ins><span class="cx"> 
</span><ins>+        jobs = yield cls.all(txn)
+
+        for job in jobs:
+            r = results[job.workType]
+            r[0] += 1
+            if job.assigned is not None:
+                r[1] += 1
+            if job.failed:
+                r[2] += 1
+            if job.assigned is None and job.notBefore &lt; now:
+                r[3] += 1
+
</ins><span class="cx">         returnValue(results)
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -349,14 +573,41 @@
</span><span class="cx">         return len(cls.workTypes())
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+JobDescriptor = namedtuple(&quot;JobDescriptor&quot;, [&quot;jobID&quot;, &quot;weight&quot;])
</ins><span class="cx"> 
</span><ins>+class JobDescriptorArg(Argument):
+    &quot;&quot;&quot;
+    Comma-separated.
+    &quot;&quot;&quot;
+    def toString(self, inObject):
+        return &quot;,&quot;.join(map(str, inObject))
+
+
+    def fromString(self, inString):
+        return JobDescriptor(*map(int, inString.split(&quot;,&quot;)))
+
+
</ins><span class="cx"> # Priority for work - used to order work items in the job queue
</span><span class="cx"> WORK_PRIORITY_LOW = 1
</span><span class="cx"> WORK_PRIORITY_MEDIUM = 2
</span><span class="cx"> WORK_PRIORITY_HIGH = 3
</span><span class="cx"> 
</span><ins>+# Weight for work - used to schedule workers based on capacity
+WORK_WEIGHT_0 = 0
+WORK_WEIGHT_1 = 1
+WORK_WEIGHT_2 = 2
+WORK_WEIGHT_3 = 3
+WORK_WEIGHT_4 = 4
+WORK_WEIGHT_5 = 5
+WORK_WEIGHT_6 = 6
+WORK_WEIGHT_7 = 7
+WORK_WEIGHT_8 = 8
+WORK_WEIGHT_9 = 9
+WORK_WEIGHT_10 = 10
+WORK_WEIGHT_CAPACITY = 10   # Total amount of work any one worker can manage
</ins><span class="cx"> 
</span><span class="cx"> 
</span><ins>+
</ins><span class="cx"> class WorkItem(Record):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     A L{WorkItem} is an item of work which may be stored in a database, then
</span><span class="lines">@@ -450,7 +701,8 @@
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx"> 
</span><span class="cx">     group = None
</span><del>-    priority = WORK_PRIORITY_LOW    # Default - subclasses should override
</del><ins>+    default_priority = WORK_PRIORITY_LOW    # Default - subclasses should override
+    default_weight = WORK_WEIGHT_5          # Default - subclasses should override
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><span class="lines">@@ -469,19 +721,21 @@
</span><span class="cx">         }
</span><span class="cx"> 
</span><span class="cx">         def _transferArg(name):
</span><del>-            if name in kwargs:
-                jobargs[name] = kwargs[name]
-                del kwargs[name]
</del><ins>+            arg = kwargs.pop(name, None)
+            if arg is not None:
+                jobargs[name] = arg
+            elif hasattr(cls, &quot;default_{}&quot;.format(name)):
+                jobargs[name] = getattr(cls, &quot;default_{}&quot;.format(name))
</ins><span class="cx"> 
</span><span class="cx">         _transferArg(&quot;jobID&quot;)
</span><del>-        if &quot;priority&quot; in kwargs:
-            _transferArg(&quot;priority&quot;)
-        else:
-            jobargs[&quot;priority&quot;] = cls.priority
</del><ins>+        _transferArg(&quot;priority&quot;)
</ins><span class="cx">         _transferArg(&quot;weight&quot;)
</span><span class="cx">         _transferArg(&quot;notBefore&quot;)
</span><del>-        _transferArg(&quot;notAfter&quot;)
</del><span class="cx"> 
</span><ins>+        # Always need a notBefore
+        if &quot;notBefore&quot; not in jobargs:
+            jobargs[&quot;notBefore&quot;] = datetime.utcnow()
+
</ins><span class="cx">         job = yield JobItem.create(transaction, **jobargs)
</span><span class="cx"> 
</span><span class="cx">         kwargs[&quot;jobID&quot;] = job.jobID
</span><span class="lines">@@ -540,7 +794,7 @@
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx"> 
</span><span class="cx">     arguments = [
</span><del>-        (&quot;jobID&quot;, Integer()),
</del><ins>+        (&quot;job&quot;, JobDescriptorArg()),
</ins><span class="cx">     ]
</span><span class="cx">     response = []
</span><span class="cx"> 
</span><span class="lines">@@ -665,7 +919,7 @@
</span><span class="cx">         return self._reportedLoad + self._bonusLoad
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    def performJob(self, jobID):
</del><ins>+    def performJob(self, job):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         A L{local worker connection &lt;ConnectionFromWorker&gt;} is asking this
</span><span class="cx">         specific peer node-controller process to perform a job, having
</span><span class="lines">@@ -673,12 +927,12 @@
</span><span class="cx"> 
</span><span class="cx">         @see: L{_IJobPerformer.performJob}
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        d = self.callRemote(PerformJob, jobID=jobID)
-        self._bonusLoad += 1
</del><ins>+        d = self.callRemote(PerformJob, job=job)
+        self._bonusLoad += job.weight
</ins><span class="cx"> 
</span><span class="cx">         @d.addBoth
</span><span class="cx">         def performed(result):
</span><del>-            self._bonusLoad -= 1
</del><ins>+            self._bonusLoad -= job.weight
</ins><span class="cx">             return result
</span><span class="cx"> 
</span><span class="cx">         @d.addCallback
</span><span class="lines">@@ -689,17 +943,17 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @PerformJob.responder
</span><del>-    def dispatchToWorker(self, jobID):
</del><ins>+    def dispatchToWorker(self, job):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         A remote peer node has asked this node to do a job; dispatch it to
</span><span class="cx">         a local worker on this node.
</span><span class="cx"> 
</span><del>-        @param jobID: the identifier of the job.
-        @type jobID: L{int}
</del><ins>+        @param job: the details of the job.
+        @type job: L{JobDescriptor}
</ins><span class="cx"> 
</span><span class="cx">         @return: a L{Deferred} that fires when the work has been completed.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        d = self.peerPool.performJobForPeer(jobID)
</del><ins>+        d = self.peerPool.performJobForPeer(job)
</ins><span class="cx">         d.addCallback(lambda ignored: {})
</span><span class="cx">         return d
</span><span class="cx"> 
</span><span class="lines">@@ -721,7 +975,7 @@
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     implements(_IJobPerformer)
</span><span class="cx"> 
</span><del>-    def __init__(self, maximumLoadPerWorker=5):
</del><ins>+    def __init__(self, maximumLoadPerWorker=WORK_WEIGHT_CAPACITY):
</ins><span class="cx">         self.workers = []
</span><span class="cx">         self.maximumLoadPerWorker = maximumLoadPerWorker
</span><span class="cx"> 
</span><span class="lines">@@ -753,6 +1007,26 @@
</span><span class="cx">         return False
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    def loadLevel(self):
+        &quot;&quot;&quot;
+        Return the overall load of this worker connection pool have as a percentage of
+        total capacity.
+
+        @return: current load percentage.
+        @rtype: L{int}
+        &quot;&quot;&quot;
+        current = sum(worker.currentLoad for worker in self.workers)
+        total = len(self.workers) * self.maximumLoadPerWorker
+        return ((current * 100) / total) if total else 0
+
+
+    def eachWorkerLoad(self):
+        &quot;&quot;&quot;
+        The load of all currently connected workers.
+        &quot;&quot;&quot;
+        return [(worker.currentLoad, worker.totalCompleted) for worker in self.workers]
+
+
</ins><span class="cx">     def allWorkerLoad(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         The total load of all currently connected workers.
</span><span class="lines">@@ -771,20 +1045,20 @@
</span><span class="cx">         return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    def performJob(self, jobID):
</del><ins>+    def performJob(self, job):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Select a local worker that is idle enough to perform the given job,
</span><span class="cx">         then ask them to perform it.
</span><span class="cx"> 
</span><del>-        @param jobID: The primary key identifier of the given job.
-        @type jobID: L{int}
</del><ins>+        @param job: The details of the given job.
+        @type job: L{JobDescriptor}
</ins><span class="cx"> 
</span><span class="cx">         @return: a L{Deferred} firing with an empty dictionary when the work is
</span><span class="cx">             complete.
</span><span class="cx">         @rtype: L{Deferred} firing L{dict}
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         preferredWorker = self._selectLowestLoadWorker()
</span><del>-        result = preferredWorker.performJob(jobID)
</del><ins>+        result = preferredWorker.performJob(job)
</ins><span class="cx">         return result
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -799,6 +1073,7 @@
</span><span class="cx">         super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
</span><span class="cx">         self.peerPool = peerPool
</span><span class="cx">         self._load = 0
</span><ins>+        self._completed = 0
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @property
</span><span class="lines">@@ -809,6 +1084,14 @@
</span><span class="cx">         return self._load
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    @property
+    def totalCompleted(self):
+        &quot;&quot;&quot;
+        What is the current load of this worker?
+        &quot;&quot;&quot;
+        return self._completed
+
+
</ins><span class="cx">     def startReceivingBoxes(self, sender):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Start receiving AMP boxes from the peer.  Initialize all necessary
</span><span class="lines">@@ -829,19 +1112,20 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @PerformJob.responder
</span><del>-    def performJob(self, jobID):
</del><ins>+    def performJob(self, job):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Dispatch a job to this worker.
</span><span class="cx"> 
</span><span class="cx">         @see: The responder for this should always be
</span><span class="cx">             L{ConnectionFromController.actuallyReallyExecuteJobHere}.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        d = self.callRemote(PerformJob, jobID=jobID)
-        self._load += 1
</del><ins>+        d = self.callRemote(PerformJob, job=job)
+        self._load += job.weight
</ins><span class="cx"> 
</span><span class="cx">         @d.addBoth
</span><span class="cx">         def f(result):
</span><del>-            self._load -= 1
</del><ins>+            self._load -= job.weight
+            self._completed += 1
</ins><span class="cx">             return result
</span><span class="cx"> 
</span><span class="cx">         return d
</span><span class="lines">@@ -883,11 +1167,11 @@
</span><span class="cx">         return self
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    def performJob(self, jobID):
</del><ins>+    def performJob(self, job):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Ask the controller to perform a job on our behalf.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        return self.callRemote(PerformJob, jobID=jobID)
</del><ins>+        return self.callRemote(PerformJob, job=job)
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><span class="lines">@@ -914,48 +1198,18 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @PerformJob.responder
</span><del>-    def actuallyReallyExecuteJobHere(self, jobID):
</del><ins>+    def actuallyReallyExecuteJobHere(self, job):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         This is where it's time to actually do the job.  The controller
</span><span class="cx">         process has instructed this worker to do it; so, look up the data in
</span><span class="cx">         the row, and do it.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        d = ultimatelyPerform(self.transactionFactory, jobID)
</del><ins>+        d = JobItem.ultimatelyPerform(self.transactionFactory, job.jobID)
</ins><span class="cx">         d.addCallback(lambda ignored: {})
</span><span class="cx">         return d
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-def ultimatelyPerform(txnFactory, jobID):
-    &quot;&quot;&quot;
-    Eventually, after routing the job to the appropriate place, somebody
-    actually has to I{do} it.
-
-    @param txnFactory: a 0- or 1-argument callable that creates an
-        L{IAsyncTransaction}
-    @type txnFactory: L{callable}
-
-    @param jobID: the ID of the job to be performed
-    @type jobID: L{int}
-
-    @return: a L{Deferred} which fires with C{None} when the job has been
-        performed, or fails if the job can't be performed.
-    &quot;&quot;&quot;
-    @inlineCallbacks
-    def runJob(txn):
-        try:
-            job = yield JobItem.load(txn, jobID)
-            if hasattr(txn, &quot;_label&quot;):
-                txn._label = &quot;{} &lt;{}&gt;&quot;.format(txn._label, job.workType)
-            yield job.run()
-        except NoSuchRecord:
-            # The record has already been removed
-            pass
-
-    return inTransaction(txnFactory, runJob, label=&quot;ultimatelyPerform: {}&quot;.format(jobID))
-
-
-
</del><span class="cx"> class LocalPerformer(object):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     Implementor of C{performJob} that does its work in the local process,
</span><span class="lines">@@ -970,11 +1224,11 @@
</span><span class="cx">         self.txnFactory = txnFactory
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    def performJob(self, jobID):
</del><ins>+    def performJob(self, job):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Perform the given job right now.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        return ultimatelyPerform(self.txnFactory, jobID)
</del><ins>+        return JobItem.ultimatelyPerform(self.txnFactory, job.jobID)
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -1049,7 +1303,6 @@
</span><span class="cx">         self.txn = txn
</span><span class="cx">         self.workItemType = workItemType
</span><span class="cx">         self.kw = kw
</span><del>-        self._whenExecuted = Deferred()
</del><span class="cx">         self._whenCommitted = Deferred()
</span><span class="cx">         self.workItem = None
</span><span class="cx"> 
</span><span class="lines">@@ -1073,60 +1326,11 @@
</span><span class="cx">             def whenDone():
</span><span class="cx">                 self._whenCommitted.callback(self)
</span><span class="cx"> 
</span><del>-                def maybeLater():
-                    performer = self._chooser.choosePerformer()
-
-                    @passthru(
-                        performer.performJob(created.jobID).addCallback
-                    )
-                    def performed(result):
-                        self._whenExecuted.callback(self)
-
-                    @performed.addErrback
-                    def notPerformed(why):
-                        self._whenExecuted.errback(why)
-
-                reactor = self._chooser.reactor
-
-                if created.job.notBefore is not None:
-                    when = max(
-                        0,
-                        astimestamp(created.job.notBefore) - reactor.seconds()
-                    )
-                else:
-                    when = 0
-                # TODO: Track the returned DelayedCall so it can be stopped
-                # when the service stops.
-                self._chooser.reactor.callLater(when, maybeLater)
-
</del><span class="cx">             @self.txn.postAbort
</span><span class="cx">             def whenFailed():
</span><span class="cx">                 self._whenCommitted.errback(TransactionFailed)
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    def whenExecuted(self):
-        &quot;&quot;&quot;
-        Let the caller know when the proposed work has been fully executed.
-
-        @note: The L{Deferred} returned by C{whenExecuted} should be used with
-            extreme caution.  If an application decides to do any
-            database-persistent work as a result of this L{Deferred} firing,
-            that work I{may be lost} as a result of a service being normally
-            shut down between the time that the work is scheduled and the time
-            that it is executed.  So, the only things that should be added as
-            callbacks to this L{Deferred} are those which are ephemeral, in
-            memory, and reflect only presentation state associated with the
-            user's perception of the completion of work, not logical chains of
-            work which need to be completed in sequence; those should all be
-            completed within the transaction of the L{WorkItem.doWork} that
-            gets executed.
-
-        @return: a L{Deferred} that fires with this L{WorkProposal} when the
-            work has been completed remotely.
-        &quot;&quot;&quot;
-        return _cloneDeferred(self._whenExecuted)
-
-
</del><span class="cx">     def whenProposed(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Let the caller know when the work has been proposed; i.e. when the work
</span><span class="lines">@@ -1221,14 +1425,14 @@
</span><span class="cx">         than waiting for it to be requested.  By default, 10 minutes.
</span><span class="cx">     @type queueProcessTimeout: L{float} (in seconds)
</span><span class="cx"> 
</span><del>-    @ivar queueDelayedProcessInterval: The amount of time between database
</del><ins>+    @ivar queuePollInterval: The amount of time between database
</ins><span class="cx">         pings, i.e. checks for over-due queue items that might have been
</span><span class="cx">         orphaned by a controller process that died mid-transaction.  This is
</span><span class="cx">         how often the shared database should be pinged by I{all} nodes (i.e.,
</span><span class="cx">         all controller processes, or each instance of L{PeerConnectionPool});
</span><span class="cx">         each individual node will ping commensurately less often as more nodes
</span><span class="cx">         join the database.
</span><del>-    @type queueDelayedProcessInterval: L{float} (in seconds)
</del><ins>+    @type queuePollInterval: L{float} (in seconds)
</ins><span class="cx"> 
</span><span class="cx">     @ivar reactor: The reactor used for scheduling timed events.
</span><span class="cx">     @type reactor: L{IReactorTime} provider.
</span><span class="lines">@@ -1243,9 +1447,13 @@
</span><span class="cx">     getfqdn = staticmethod(getfqdn)
</span><span class="cx">     getpid = staticmethod(getpid)
</span><span class="cx"> 
</span><del>-    queueProcessTimeout = (10.0 * 60.0)
-    queueDelayedProcessInterval = (60.0)
</del><ins>+    queuePollInterval = 0.1             # How often to poll for new work
+    queueOrphanTimeout = 5.0 * 60.0     # How long before assigned work is possibly orphaned
</ins><span class="cx"> 
</span><ins>+    overloadLevel = 95          # Percentage load level above which job queue processing stops
+    highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
+    mediumPriorityLevel = 50    # Percentage load level above which high and medium priority jobs are processed
+
</ins><span class="cx">     def __init__(self, reactor, transactionFactory, ampPort):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Initialize a L{PeerConnectionPool}.
</span><span class="lines">@@ -1324,13 +1532,13 @@
</span><span class="cx">             return LocalPerformer(self.transactionFactory)
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    def performJobForPeer(self, jobID):
</del><ins>+    def performJobForPeer(self, job):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         A peer has requested us to perform a job; choose a job performer
</span><span class="cx">         local to this node, and then execute it.
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         performer = self.choosePerformer(onlyLocally=True)
</span><del>-        return performer.performJob(jobID)
</del><ins>+        return performer.performJob(job)
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def totalNumberOfNodes(self):
</span><span class="lines">@@ -1362,67 +1570,113 @@
</span><span class="cx">         return self._lastSeenNodeIndex
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    def _periodicLostWorkCheck(self):
</del><ins>+    @inlineCallbacks
+    def _workCheck(self):
</ins><span class="cx">         &quot;&quot;&quot;
</span><del>-        Periodically, every node controller has to check to make sure that work
-        hasn't been dropped on the floor by someone.  In order to do that it
-        queries each work-item table.
</del><ins>+        Every node controller will periodically check for any new work to do, and dispatch
+        as much as possible given the current load.
</ins><span class="cx">         &quot;&quot;&quot;
</span><del>-        @inlineCallbacks
-        def workCheck(txn):
-            if self.thisProcess:
-                nodes = [(node.hostname, node.port) for node in
-                         (yield self.activeNodes(txn))]
-                nodes.sort()
-                self._lastSeenTotalNodes = len(nodes)
-                self._lastSeenNodeIndex = nodes.index(
-                    (self.thisProcess.hostname, self.thisProcess.port)
-                )
</del><ins>+        # FIXME: not sure if we should do this node check on every work poll
+#        if self.thisProcess:
+#            nodes = [(node.hostname, node.port) for node in
+#                     (yield self.activeNodes(txn))]
+#            nodes.sort()
+#            self._lastSeenTotalNodes = len(nodes)
+#            self._lastSeenNodeIndex = nodes.index(
+#                (self.thisProcess.hostname, self.thisProcess.port)
+#            )
</ins><span class="cx"> 
</span><ins>+        loopCounter = 0
+        while True:
+            if not self.running:
+                returnValue(None)
+
+            # Check the overall service load - if overloaded skip this poll cycle.
+            # FIXME: need to include capacity of other nodes. For now we only check
+            # our own capacity and stop processing if too busy. Other nodes that
+            # are not busy will pick up work.
+            level = self.workerPool.loadLevel()
+
+            # Check overload level first
+            if level &gt; self.overloadLevel:
+                log.error(&quot;workCheck: jobqueue is overloaded&quot;)
+                break
+            elif level &gt; self.highPriorityLevel:
+                log.debug(&quot;workCheck: jobqueue high priority only&quot;)
+                minPriority = WORK_PRIORITY_HIGH
+            elif level &gt; self.mediumPriorityLevel:
+                log.debug(&quot;workCheck: jobqueue high/medium priority only&quot;)
+                minPriority = WORK_PRIORITY_MEDIUM
+            else:
+                minPriority = WORK_PRIORITY_LOW
+
+            # Determine what the timestamp cutoff
</ins><span class="cx">             # TODO: here is where we should iterate over the unlocked items
</span><span class="cx">             # that are due, ordered by priority, notBefore etc
</span><del>-            tooLate = datetime.utcfromtimestamp(
-                self.reactor.seconds() - self.queueProcessTimeout
-            )
-            overdueItems = (yield JobItem.query(
-                txn, (JobItem.notBefore &lt; tooLate))
-            )
-            for overdueItem in overdueItems:
-                peer = self.choosePerformer()
</del><ins>+            nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
+            orphanTime = nowTime - timedelta(seconds=self.queueOrphanTimeout)
+
+            txn = self.transactionFactory(label=&quot;jobqueue.workCheck&quot;)
+            try:
+                nextJob = yield JobItem.nextjob(txn, nowTime, minPriority, orphanTime)
+                if nextJob is None:
+                    break
+
+                # If it is now assigned but not earlier than the orphan time, ignore as this may have
+                # been returned after another txn just assigned it
+                if nextJob.assigned is not None and nextJob.assigned &gt; orphanTime:
+                    continue
+
+                # Always assign as a new job even when it is an orphan
+                yield nextJob.assign(nowTime)
+                loopCounter += 1
+
+            except Exception as e:
+                log.error(&quot;Failed to pick a new job, {exc}&quot;, exc=e)
+                yield txn.abort()
+                txn = None
+                nextJob = None
+            finally:
+                if txn:
+                    yield txn.commit()
+
+            if nextJob is not None:
+                peer = self.choosePerformer(onlyLocally=True)
</ins><span class="cx">                 try:
</span><del>-                    yield peer.performJob(overdueItem.jobID)
</del><ins>+                    # Send the job over but DO NOT block on the response - that will ensure
+                    # we can do stuff in parallel
+                    peer.performJob(nextJob.descriptor())
</ins><span class="cx">                 except Exception as e:
</span><del>-                    log.err(&quot;Failed to perform periodic lost job for jobid={}, {}&quot;.format(overdueItem.jobID, e))
</del><ins>+                    log.error(&quot;Failed to perform job for jobid={jobid}, {exc}&quot;, jobid=nextJob.jobID, exc=e)
</ins><span class="cx"> 
</span><del>-        return inTransaction(self.transactionFactory, workCheck, label=&quot;periodicLostWorkCheck&quot;)
</del><ins>+        if loopCounter:
+            log.debug(&quot;workCheck: processed {} jobs in one loop&quot;.format(loopCounter))
</ins><span class="cx"> 
</span><span class="cx">     _currentWorkDeferred = None
</span><del>-    _lostWorkCheckCall = None
</del><ins>+    _workCheckCall = None
</ins><span class="cx"> 
</span><del>-    def _lostWorkCheckLoop(self):
</del><ins>+    def _workCheckLoop(self):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         While the service is running, keep checking for any overdue / lost work
</span><span class="cx">         items and re-submit them to the cluster for processing.  Space out
</span><span class="cx">         those checks in time based on the size of the cluster.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        self._lostWorkCheckCall = None
</del><ins>+        self._workCheckCall = None
</ins><span class="cx"> 
</span><ins>+        if not self.running:
+            return
+
</ins><span class="cx">         @passthru(
</span><del>-            self._periodicLostWorkCheck().addErrback(log.err).addCallback
</del><ins>+            self._workCheck().addErrback(log.error).addCallback
</ins><span class="cx">         )
</span><span class="cx">         def scheduleNext(result):
</span><ins>+            # TODO: if multiple nodes are present, see if we can
+            # stagger the polling to avoid contention.
</ins><span class="cx">             self._currentWorkDeferred = None
</span><span class="cx">             if not self.running:
</span><span class="cx">                 return
</span><del>-            index = self.nodeIndex()
-            now = self.reactor.seconds()
-
-            interval = self.queueDelayedProcessInterval
-            count = self.totalNumberOfNodes()
-            when = (now - (now % interval)) + (interval * (count + index))
-            delay = when - now
-            self._lostWorkCheckCall = self.reactor.callLater(
-                delay, self._lostWorkCheckLoop
</del><ins>+            self._workCheckCall = self.reactor.callLater(
+                self.queuePollInterval, self._workCheckLoop
</ins><span class="cx">             )
</span><span class="cx"> 
</span><span class="cx">         self._currentWorkDeferred = scheduleNext
</span><span class="lines">@@ -1432,32 +1686,37 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Register ourselves with the database and establish all outgoing
</span><span class="cx">         connections to other servers in the cluster.
</span><ins>+
+        @param waitForService: an optional L{Deferred} that will be called back when
+            the service startup is done.
+        @type waitForService: L{Deferred} or L{None}
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         @inlineCallbacks
</span><span class="cx">         def startup(txn):
</span><del>-            endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
-            # If this fails, the failure mode is going to be ugly, just like
-            # all conflicted-port failures.  But, at least it won't proceed.
-            self._listeningPort = yield endpoint.listen(self.peerFactory())
-            self.ampPort = self._listeningPort.getHost().port
-            yield Lock.exclusive(NodeInfo.table).on(txn)
-            nodes = yield self.activeNodes(txn)
-            selves = [node for node in nodes
-                      if ((node.hostname == self.hostname) and
-                          (node.port == self.ampPort))]
-            if selves:
-                self.thisProcess = selves[0]
-                nodes.remove(self.thisProcess)
-                yield self.thisProcess.update(pid=self.pid,
-                                              time=datetime.now())
-            else:
-                self.thisProcess = yield NodeInfo.create(
-                    txn, hostname=self.hostname, port=self.ampPort,
-                    pid=self.pid, time=datetime.now()
-                )
</del><ins>+            if self.ampPort is not None:
+                endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+                # If this fails, the failure mode is going to be ugly, just like
+                # all conflicted-port failures.  But, at least it won't proceed.
+                self._listeningPort = yield endpoint.listen(self.peerFactory())
+                self.ampPort = self._listeningPort.getHost().port
+                yield Lock.exclusive(NodeInfo.table).on(txn)
+                nodes = yield self.activeNodes(txn)
+                selves = [node for node in nodes
+                          if ((node.hostname == self.hostname) and
+                              (node.port == self.ampPort))]
+                if selves:
+                    self.thisProcess = selves[0]
+                    nodes.remove(self.thisProcess)
+                    yield self.thisProcess.update(pid=self.pid,
+                                                  time=datetime.now())
+                else:
+                    self.thisProcess = yield NodeInfo.create(
+                        txn, hostname=self.hostname, port=self.ampPort,
+                        pid=self.pid, time=datetime.now()
+                    )
</ins><span class="cx"> 
</span><del>-            for node in nodes:
-                self._startConnectingTo(node)
</del><ins>+                for node in nodes:
+                    self._startConnectingTo(node)
</ins><span class="cx"> 
</span><span class="cx">         self._startingUp = inTransaction(self.transactionFactory, startup, label=&quot;PeerConnectionPool.startService&quot;)
</span><span class="cx"> 
</span><span class="lines">@@ -1465,7 +1724,7 @@
</span><span class="cx">         def done(result):
</span><span class="cx">             self._startingUp = None
</span><span class="cx">             super(PeerConnectionPool, self).startService()
</span><del>-            self._lostWorkCheckLoop()
</del><ins>+            self._workCheckLoop()
</ins><span class="cx">             return result
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -1474,20 +1733,30 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Stop this service, terminating any incoming or outgoing connections.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        yield super(PeerConnectionPool, self).stopService()
</del><span class="cx"> 
</span><ins>+        # If in the process of starting up, always wait for startup to complete before
+        # stopping,.
</ins><span class="cx">         if self._startingUp is not None:
</span><del>-            yield self._startingUp
</del><ins>+            d = Deferred()
+            self._startingUp.addBoth(lambda result: d.callback(None))
+            yield d
</ins><span class="cx"> 
</span><ins>+        yield super(PeerConnectionPool, self).stopService()
+
</ins><span class="cx">         if self._listeningPort is not None:
</span><span class="cx">             yield self._listeningPort.stopListening()
</span><span class="cx"> 
</span><del>-        if self._lostWorkCheckCall is not None:
-            self._lostWorkCheckCall.cancel()
</del><ins>+        if self._workCheckCall is not None:
+            self._workCheckCall.cancel()
</ins><span class="cx"> 
</span><span class="cx">         if self._currentWorkDeferred is not None:
</span><del>-            yield self._currentWorkDeferred
</del><ins>+            self._currentWorkDeferred.cancel()
</ins><span class="cx"> 
</span><ins>+        for connector in self._connectingToPeer:
+            d = Deferred()
+            connector.addBoth(lambda result: d.callback(None))
+            yield d
+
</ins><span class="cx">         for peer in self.peers:
</span><span class="cx">             peer.transport.abortConnection()
</span><span class="cx"> 
</span><span class="lines">@@ -1510,6 +1779,7 @@
</span><span class="cx">             # self.mappedPeers.pop((host, port)).transport.loseConnection()
</span><span class="cx">         self.mappedPeers[(host, port)] = peer
</span><span class="cx"> 
</span><ins>+    _connectingToPeer = []
</ins><span class="cx"> 
</span><span class="cx">     def _startConnectingTo(self, node):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -1519,8 +1789,10 @@
</span><span class="cx">         @type node: L{NodeInfo}
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         connected = node.endpoint(self.reactor).connect(self.peerFactory())
</span><ins>+        self._connectingToPeer.append(connected)
</ins><span class="cx"> 
</span><span class="cx">         def whenConnected(proto):
</span><ins>+            self._connectingToPeer.remove(connected)
</ins><span class="cx">             self.mapPeer(node.hostname, node.port, proto)
</span><span class="cx">             proto.callRemote(
</span><span class="cx">                 IdentifyNode,
</span><span class="lines">@@ -1529,9 +1801,11 @@
</span><span class="cx">             ).addErrback(noted, &quot;identify&quot;)
</span><span class="cx"> 
</span><span class="cx">         def noted(err, x=&quot;connect&quot;):
</span><del>-            log.msg(
-                &quot;Could not {0} to cluster peer {1} because {2}&quot;
-                .format(x, node, str(err.value))
</del><ins>+            if x == &quot;connect&quot;:
+                self._connectingToPeer.remove(connected)
+            log.error(
+                &quot;Could not {action} to cluster peer {node} because {reason}&quot;,
+                action=x, node=node, reason=str(err.value),
</ins><span class="cx">             )
</span><span class="cx"> 
</span><span class="cx">         connected.addCallbacks(whenConnected, noted)
</span><span class="lines">@@ -1594,7 +1868,7 @@
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     implements(_IJobPerformer)
</span><span class="cx"> 
</span><del>-    def performJob(self, jobID):
</del><ins>+    def performJob(self, job):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Don't perform job.
</span><span class="cx">         &quot;&quot;&quot;
</span></span></pre></div>
<a id="twexttrunktwextenterprisequeuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/queue.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/queue.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/queue.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -1323,6 +1323,9 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         self._lostWorkCheckCall = None
</span><span class="cx"> 
</span><ins>+        if not self.running:
+            return
+
</ins><span class="cx">         @passthru(
</span><span class="cx">             self._periodicLostWorkCheck().addErrback(log.err).addCallback
</span><span class="cx">         )
</span><span class="lines">@@ -1390,11 +1393,15 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Stop this service, terminating any incoming or outgoing connections.
</span><span class="cx">         &quot;&quot;&quot;
</span><ins>+        # If in the process of starting up, always wait for startup to complete before
+        # stopping,.
+        if self._startingUp is not None:
+            d = Deferred()
+            self._startingUp.addBoth(lambda result: d.callback(None))
+            yield d
+
</ins><span class="cx">         yield super(PeerConnectionPool, self).stopService()
</span><span class="cx"> 
</span><del>-        if self._startingUp is not None:
-            yield self._startingUp
-
</del><span class="cx">         if self._listeningPort is not None:
</span><span class="cx">             yield self._listeningPort.stopListening()
</span><span class="cx"> 
</span><span class="lines">@@ -1402,8 +1409,13 @@
</span><span class="cx">             self._lostWorkCheckCall.cancel()
</span><span class="cx"> 
</span><span class="cx">         if self._currentWorkDeferred is not None:
</span><del>-            yield self._currentWorkDeferred
</del><ins>+            self._currentWorkDeferred.cancel()
</ins><span class="cx"> 
</span><ins>+        for connector in self._connectingToPeer:
+            d = Deferred()
+            connector.addBoth(lambda result: d.callback(None))
+            yield d
+
</ins><span class="cx">         for peer in self.peers:
</span><span class="cx">             peer.transport.abortConnection()
</span><span class="cx"> 
</span><span class="lines">@@ -1426,6 +1438,7 @@
</span><span class="cx">             # self.mappedPeers.pop((host, port)).transport.loseConnection()
</span><span class="cx">         self.mappedPeers[(host, port)] = peer
</span><span class="cx"> 
</span><ins>+    _connectingToPeer = []
</ins><span class="cx"> 
</span><span class="cx">     def _startConnectingTo(self, node):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -1435,8 +1448,10 @@
</span><span class="cx">         @type node: L{NodeInfo}
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         connected = node.endpoint(self.reactor).connect(self.peerFactory())
</span><ins>+        self._connectingToPeer.append(connected)
</ins><span class="cx"> 
</span><span class="cx">         def whenConnected(proto):
</span><ins>+            self._connectingToPeer.remove(connected)
</ins><span class="cx">             self.mapPeer(node.hostname, node.port, proto)
</span><span class="cx">             proto.callRemote(
</span><span class="cx">                 IdentifyNode,
</span><span class="lines">@@ -1445,6 +1460,8 @@
</span><span class="cx">             ).addErrback(noted, &quot;identify&quot;)
</span><span class="cx"> 
</span><span class="cx">         def noted(err, x=&quot;connect&quot;):
</span><ins>+            if x == &quot;connect&quot;:
+                self._connectingToPeer.remove(connected)
</ins><span class="cx">             log.msg(
</span><span class="cx">                 &quot;Could not {0} to cluster peer {1} because {2}&quot;
</span><span class="cx">                 .format(x, node, str(err.value))
</span></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_jobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -22,16 +22,16 @@
</span><span class="cx"> 
</span><span class="cx"> from zope.interface.verify import verifyObject
</span><span class="cx"> 
</span><ins>+from twisted.internet import reactor
</ins><span class="cx"> from twisted.trial.unittest import TestCase, SkipTest
</span><span class="cx"> from twisted.test.proto_helpers import StringTransport, MemoryReactor
</span><del>-from twisted.internet.defer import (
-    Deferred, inlineCallbacks, gatherResults, passthru, returnValue
-)
</del><ins>+from twisted.internet.defer import \
+    Deferred, inlineCallbacks, gatherResults, passthru, returnValue, succeed
</ins><span class="cx"> from twisted.internet.task import Clock as _Clock
</span><span class="cx"> from twisted.protocols.amp import Command, AMP, Integer
</span><span class="cx"> from twisted.application.service import Service, MultiService
</span><span class="cx"> 
</span><del>-from twext.enterprise.dal.syntax import SchemaSyntax, Select
</del><ins>+from twext.enterprise.dal.syntax import SchemaSyntax
</ins><span class="cx"> from twext.enterprise.dal.record import fromTable
</span><span class="cx"> from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
</span><span class="cx"> from twext.enterprise.fixtures import buildConnectionPool
</span><span class="lines">@@ -40,7 +40,9 @@
</span><span class="cx">     inTransaction, PeerConnectionPool, astimestamp,
</span><span class="cx">     LocalPerformer, _IJobPerformer, WorkItem, WorkerConnectionPool,
</span><span class="cx">     ConnectionFromPeerNode, LocalQueuer,
</span><del>-    _BaseQueuer, NonPerformingQueuer
</del><ins>+    _BaseQueuer, NonPerformingQueuer, JobItem,
+    WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM,
+    JobDescriptor
</ins><span class="cx"> )
</span><span class="cx"> import twext.enterprise.jobqueue
</span><span class="cx"> 
</span><span class="lines">@@ -67,7 +69,27 @@
</span><span class="cx">         return super(Clock, self).callLater(_seconds, _f, *args, **kw)
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    @inlineCallbacks
+    def advanceCompletely(self, amount):
+        &quot;&quot;&quot;
+        Move time on this clock forward by the given amount and run whatever
+        pending calls should be run. Always complete the deferred calls before
+        returning.
</ins><span class="cx"> 
</span><ins>+        @type amount: C{float}
+        @param amount: The number of seconds which to advance this clock's
+        time.
+        &quot;&quot;&quot;
+        self.rightNow += amount
+        self._sortCalls()
+        while self.calls and self.calls[0].getTime() &lt;= self.seconds():
+            call = self.calls.pop(0)
+            call.called = 1
+            yield call.func(*call.args, **call.kw)
+            self._sortCalls()
+
+
+
</ins><span class="cx"> class MemoryReactorWithClock(MemoryReactor, Clock):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     Simulate a real reactor.
</span><span class="lines">@@ -75,6 +97,7 @@
</span><span class="cx">     def __init__(self):
</span><span class="cx">         MemoryReactor.__init__(self)
</span><span class="cx">         Clock.__init__(self)
</span><ins>+        self._sortCalls()
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -180,8 +203,9 @@
</span><span class="cx">       WORK_TYPE   varchar(255) not null,
</span><span class="cx">       PRIORITY    integer default 0,
</span><span class="cx">       WEIGHT      integer default 0,
</span><del>-      NOT_BEFORE  timestamp default null,
-      NOT_AFTER   timestamp default null
</del><ins>+      NOT_BEFORE  timestamp not null,
+      ASSIGNED    timestamp default null,
+      FAILED      integer default 0
</ins><span class="cx">     );
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx"> )
</span><span class="lines">@@ -194,11 +218,6 @@
</span><span class="cx">       A integer, B integer,
</span><span class="cx">       DELETE_ON_LOAD integer default 0
</span><span class="cx">     );
</span><del>-    create table DUMMY_WORK_DONE (
-      WORK_ID integer primary key,
-      JOB_ID integer references JOB,
-      A_PLUS_B integer
-    );
</del><span class="cx">     &quot;&quot;&quot;
</span><span class="cx"> )
</span><span class="cx"> 
</span><span class="lines">@@ -207,37 +226,30 @@
</span><span class="cx"> 
</span><span class="cx">     dropSQL = [
</span><span class="cx">         &quot;drop table {name} cascade&quot;.format(name=table)
</span><del>-        for table in (&quot;DUMMY_WORK_ITEM&quot;, &quot;DUMMY_WORK_DONE&quot;)
</del><ins>+        for table in (&quot;DUMMY_WORK_ITEM&quot;,)
</ins><span class="cx">     ] + [&quot;delete from job&quot;]
</span><span class="cx"> except SkipTest as e:
</span><del>-    DummyWorkDone = DummyWorkItem = object
</del><ins>+    DummyWorkItem = object
</ins><span class="cx">     skip = e
</span><span class="cx"> else:
</span><del>-    DummyWorkDone = fromTable(schema.DUMMY_WORK_DONE)
</del><span class="cx">     DummyWorkItem = fromTable(schema.DUMMY_WORK_ITEM)
</span><span class="cx">     skip = False
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-class DummyWorkDone(WorkItem, DummyWorkDone):
-    &quot;&quot;&quot;
-    Work result.
-    &quot;&quot;&quot;
-
-
-
</del><span class="cx"> class DummyWorkItem(WorkItem, DummyWorkItem):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     Sample L{WorkItem} subclass that adds two integers together and stores them
</span><span class="cx">     in another table.
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx"> 
</span><ins>+    results = {}
+
</ins><span class="cx">     def doWork(self):
</span><span class="cx">         if self.a == -1:
</span><span class="cx">             raise ValueError(&quot;Ooops&quot;)
</span><del>-        return DummyWorkDone.makeJob(
-            self.transaction, jobID=self.jobID + 100, workID=self.workID + 100, aPlusB=self.a + self.b
-        )
</del><ins>+        self.results[self.jobID] = self.a + self.b
+        return succeed(None)
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><span class="lines">@@ -250,7 +262,7 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         workItems = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
</span><span class="cx">         if workItems[0].deleteOnLoad:
</span><del>-            otherTransaction = txn.concurrently()
</del><ins>+            otherTransaction = txn.store().newTransaction()
</ins><span class="cx">             otherSelf = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
</span><span class="cx">             yield otherSelf[0].delete()
</span><span class="cx">             yield otherTransaction.commit()
</span><span class="lines">@@ -306,12 +318,10 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><del>-    def test_enqueue(self):
-        &quot;&quot;&quot;
-        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
-        &quot;&quot;&quot;
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</del><ins>+    def _enqueue(self, dbpool, a, b, notBefore=None, priority=None, weight=None):
</ins><span class="cx">         fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
</span><ins>+        if notBefore is None:
+            notBefore = datetime.datetime(2012, 12, 13, 12, 12, 0)
</ins><span class="cx">         sinceEpoch = astimestamp(fakeNow)
</span><span class="cx">         clock = Clock()
</span><span class="cx">         clock.advance(sinceEpoch)
</span><span class="lines">@@ -329,36 +339,134 @@
</span><span class="cx">         @transactionally(dbpool.connection)
</span><span class="cx">         def check(txn):
</span><span class="cx">             return qpool.enqueueWork(
</span><del>-                txn, DummyWorkItem, a=3, b=9,
-                notBefore=datetime.datetime(2012, 12, 13, 12, 12, 0)
</del><ins>+                txn, DummyWorkItem,
+                a=a, b=b, priority=priority, weight=weight,
+                notBefore=notBefore
</ins><span class="cx">             )
</span><span class="cx"> 
</span><span class="cx">         proposal = yield check
</span><span class="cx">         yield proposal.whenProposed()
</span><span class="cx"> 
</span><ins>+
+    @inlineCallbacks
+    def test_enqueue(self):
+        &quot;&quot;&quot;
+        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+        &quot;&quot;&quot;
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        yield self._enqueue(dbpool, 1, 2)
+
</ins><span class="cx">         # Make sure we have one JOB and one DUMMY_WORK_ITEM
</span><span class="cx">         @transactionally(dbpool.connection)
</span><span class="cx">         def checkJob(txn):
</span><del>-            return Select(
-                From=schema.JOB
-            ).on(txn)
</del><ins>+            return JobItem.all(txn)
</ins><span class="cx"> 
</span><span class="cx">         jobs = yield checkJob
</span><span class="cx">         self.assertTrue(len(jobs) == 1)
</span><del>-        self.assertTrue(jobs[0][1] == &quot;DUMMY_WORK_ITEM&quot;)
</del><ins>+        self.assertTrue(jobs[0].workType == &quot;DUMMY_WORK_ITEM&quot;)
+        self.assertTrue(jobs[0].assigned is None)
</ins><span class="cx"> 
</span><span class="cx">         @transactionally(dbpool.connection)
</span><span class="cx">         def checkWork(txn):
</span><del>-            return Select(
-                From=schema.DUMMY_WORK_ITEM
-            ).on(txn)
</del><ins>+            return DummyWorkItem.all(txn)
</ins><span class="cx"> 
</span><span class="cx">         work = yield checkWork
</span><span class="cx">         self.assertTrue(len(work) == 1)
</span><del>-        self.assertTrue(work[0][1] == jobs[0][0])
</del><ins>+        self.assertTrue(work[0].jobID == jobs[0].jobID)
</ins><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    @inlineCallbacks
+    def test_assign(self):
+        &quot;&quot;&quot;
+        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+        &quot;&quot;&quot;
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        yield self._enqueue(dbpool, 1, 2)
</ins><span class="cx"> 
</span><ins>+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+
+        jobs = yield inTransaction(dbpool.connection, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is None)
+
+        @inlineCallbacks
+        def assignJob(txn):
+            job = yield JobItem.load(txn, jobs[0].jobID)
+            yield job.assign(datetime.datetime.utcnow())
+        yield inTransaction(dbpool.connection, assignJob)
+
+        jobs = yield inTransaction(dbpool.connection, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is not None)
+
+
+    @inlineCallbacks
+    def test_nextjob(self):
+        &quot;&quot;&quot;
+        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+        &quot;&quot;&quot;
+
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        now = datetime.datetime.utcnow()
+
+        # Empty job queue
+        @inlineCallbacks
+        def _next(txn, priority=WORK_PRIORITY_LOW):
+            job = yield JobItem.nextjob(txn, now, priority, now - datetime.timedelta(seconds=PeerConnectionPool.queueOrphanTimeout))
+            if job is not None:
+                work = yield job.workItem()
+            else:
+                work = None
+            returnValue((job, work))
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Unassigned job with future notBefore not returned
+        yield self._enqueue(dbpool, 1, 1, now + datetime.timedelta(days=1))
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Unassigned job with past notBefore returned
+        yield self._enqueue(dbpool, 2, 1, now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is not None)
+        self.assertTrue(work.a == 2)
+        assignID = job.jobID
+
+        # Assigned job with past notBefore not returned
+        @inlineCallbacks
+        def assignJob(txn, when=None):
+            assignee = yield JobItem.load(txn, assignID)
+            yield assignee.assign(now if when is None else when)
+        yield inTransaction(dbpool.connection, assignJob)
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Unassigned low priority job with past notBefore not returned if high priority required
+        yield self._enqueue(dbpool, 4, 1, now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Unassigned low priority job with past notBefore not returned if medium priority required
+        yield self._enqueue(dbpool, 5, 1, now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_MEDIUM)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Assigned job with past notBefore, but overdue is returned
+        yield inTransaction(dbpool.connection, assignJob, when=now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
+        self.assertTrue(job is not None)
+        self.assertTrue(work.a == 2)
+
+
+
</ins><span class="cx"> class WorkerConnectionPoolTests(TestCase):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     A L{WorkerConnectionPool} is responsible for managing, in a node's
</span><span class="lines">@@ -412,6 +520,7 @@
</span><span class="cx">         Create a L{PeerConnectionPool} that is just initialized enough.
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         self.pcp = PeerConnectionPool(None, None, 4321)
</span><ins>+        DummyWorkItem.results = {}
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def checkPerformer(self, cls):
</span><span class="lines">@@ -424,6 +533,34 @@
</span><span class="cx">         verifyObject(_IJobPerformer, performer)
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    def _setupPools(self):
+        &quot;&quot;&quot;
+        Setup pool and reactor clock for time stepped tests.
+        &quot;&quot;&quot;
+        reactor = MemoryReactorWithClock()
+        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+        then = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        reactor.advance(astimestamp(then))
+        cph.setUp(self)
+        qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+
+        realChoosePerformer = qpool.choosePerformer
+        performerChosen = []
+
+        def catchPerformerChoice(onlyLocally=False):
+            result = realChoosePerformer(onlyLocally=onlyLocally)
+            performerChosen.append(True)
+            return result
+
+        qpool.choosePerformer = catchPerformerChoice
+        reactor.callLater(0, qpool._workCheck)
+
+        qpool.startService()
+        cph.flushHolders()
+
+        return cph, qpool, reactor, performerChosen
+
+
</ins><span class="cx">     def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         If L{PeerConnectionPool.choosePerformer} is invoked when no workers
</span><span class="lines">@@ -478,8 +615,8 @@
</span><span class="cx">         d = Deferred()
</span><span class="cx"> 
</span><span class="cx">         class DummyPerformer(object):
</span><del>-            def performJob(self, jobID):
-                self.jobID = jobID
</del><ins>+            def performJob(self, job):
+                self.jobID = job.jobID
</ins><span class="cx">                 return d
</span><span class="cx"> 
</span><span class="cx">         # Doing real database I/O in this test would be tedious so fake the
</span><span class="lines">@@ -490,7 +627,7 @@
</span><span class="cx">             return dummy
</span><span class="cx"> 
</span><span class="cx">         peer.choosePerformer = chooseDummy
</span><del>-        performed = local.performJob(7384)
</del><ins>+        performed = local.performJob(JobDescriptor(7384, 1))
</ins><span class="cx">         performResult = []
</span><span class="cx">         performed.addCallback(performResult.append)
</span><span class="cx"> 
</span><span class="lines">@@ -535,25 +672,19 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><del>-    def test_notBeforeWhenCheckingForLostWork(self):
</del><ins>+    def test_notBeforeWhenCheckingForWork(self):
</ins><span class="cx">         &quot;&quot;&quot;
</span><del>-        L{PeerConnectionPool._periodicLostWorkCheck} should execute any
</del><ins>+        L{PeerConnectionPool._workCheck} should execute any
</ins><span class="cx">         outstanding work items, but only those that are expired.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        # An arbitrary point in time.
</del><ins>+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
</ins><span class="cx">         fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
</span><del>-        # *why* does datetime still not have .astimestamp()
-        sinceEpoch = astimestamp(fakeNow)
-        clock = Clock()
-        clock.advance(sinceEpoch)
-        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
</del><span class="cx"> 
</span><span class="cx">         # Let's create a couple of work items directly, not via the enqueue
</span><span class="cx">         # method, so that they exist but nobody will try to immediately execute
</span><span class="cx">         # them.
</span><span class="cx"> 
</span><del>-        @transactionally(dbpool.connection)
</del><ins>+        @transactionally(dbpool.pool.connection)
</ins><span class="cx">         @inlineCallbacks
</span><span class="cx">         def setup(txn):
</span><span class="cx">             # First, one that's right now.
</span><span class="lines">@@ -564,9 +695,7 @@
</span><span class="cx">                 txn, a=3, b=4, notBefore=(
</span><span class="cx">                     # Schedule it in the past so that it should have already
</span><span class="cx">                     # run.
</span><del>-                    fakeNow - datetime.timedelta(
-                        seconds=qpool.queueProcessTimeout + 20
-                    )
</del><ins>+                    fakeNow - datetime.timedelta(seconds=20)
</ins><span class="cx">                 )
</span><span class="cx">             )
</span><span class="cx"> 
</span><span class="lines">@@ -575,14 +704,13 @@
</span><span class="cx">                 txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
</span><span class="cx">             )
</span><span class="cx">         yield setup
</span><del>-        yield qpool._periodicLostWorkCheck()
</del><span class="cx"> 
</span><del>-        @transactionally(dbpool.connection)
-        def check(txn):
-            return DummyWorkDone.all(txn)
</del><ins>+        # Wait for job
+        while len(DummyWorkItem.results) != 2:
+            clock.advance(1)
</ins><span class="cx"> 
</span><del>-        every = yield check
-        self.assertEquals([x.aPlusB for x in every], [7])
</del><ins>+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 3, 2: 7})
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><span class="lines">@@ -592,23 +720,10 @@
</span><span class="cx">         only executes it when enough time has elapsed to allow the C{notBefore}
</span><span class="cx">         attribute of the given work item to have passed.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        sinceEpoch = astimestamp(fakeNow)
-        clock = Clock()
-        clock.advance(sinceEpoch)
-        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
-        realChoosePerformer = qpool.choosePerformer
-        performerChosen = []
</del><span class="cx"> 
</span><del>-        def catchPerformerChoice():
-            result = realChoosePerformer()
-            performerChosen.append(True)
-            return result
</del><ins>+        dbpool, qpool, clock, performerChosen = self._setupPools()
</ins><span class="cx"> 
</span><del>-        qpool.choosePerformer = catchPerformerChoice
-
-        @transactionally(dbpool.connection)
</del><ins>+        @transactionally(dbpool.pool.connection)
</ins><span class="cx">         def check(txn):
</span><span class="cx">             return qpool.enqueueWork(
</span><span class="cx">                 txn, DummyWorkItem, a=3, b=9,
</span><span class="lines">@@ -630,11 +745,12 @@
</span><span class="cx">         clock.advance(20 - 12)
</span><span class="cx">         self.assertEquals(performerChosen, [True])
</span><span class="cx"> 
</span><del>-        # FIXME: if this fails, it will hang, but that's better than no
-        # notification that it is broken at all.
</del><ins>+        # Wait for job
+        while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+            clock.advance(1)
</ins><span class="cx"> 
</span><del>-        result = yield proposal.whenExecuted()
-        self.assertIdentical(result, proposal)
</del><ins>+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 12})
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><span class="lines">@@ -643,23 +759,9 @@
</span><span class="cx">         L{PeerConnectionPool.enqueueWork} will execute its work immediately if
</span><span class="cx">         the C{notBefore} attribute of the work item in question is in the past.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        sinceEpoch = astimestamp(fakeNow)
-        clock = Clock()
-        clock.advance(sinceEpoch)
-        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
-        realChoosePerformer = qpool.choosePerformer
-        performerChosen = []
</del><ins>+        dbpool, qpool, clock, performerChosen = self._setupPools()
</ins><span class="cx"> 
</span><del>-        def catchPerformerChoice():
-            result = realChoosePerformer()
-            performerChosen.append(True)
-            return result
-
-        qpool.choosePerformer = catchPerformerChoice
-
-        @transactionally(dbpool.connection)
</del><ins>+        @transactionally(dbpool.pool.connection)
</ins><span class="cx">         def check(txn):
</span><span class="cx">             return qpool.enqueueWork(
</span><span class="cx">                 txn, DummyWorkItem, a=3, b=9,
</span><span class="lines">@@ -673,10 +775,14 @@
</span><span class="cx">         # Advance far beyond the given timestamp.
</span><span class="cx">         self.assertEquals(performerChosen, [True])
</span><span class="cx"> 
</span><del>-        result = yield proposal.whenExecuted()
-        self.assertIdentical(result, proposal)
</del><ins>+        # Wait for job
+        while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+            clock.advance(1)
</ins><span class="cx"> 
</span><ins>+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 12})
</ins><span class="cx"> 
</span><ins>+
</ins><span class="cx">     def test_workerConnectionPoolPerformJob(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         L{WorkerConnectionPool.performJob} performs work by selecting a
</span><span class="lines">@@ -696,12 +802,12 @@
</span><span class="cx">         worker2, _ignore_trans2 = peer()
</span><span class="cx"> 
</span><span class="cx">         # Ask the worker to do something.
</span><del>-        worker1.performJob(1)
</del><ins>+        worker1.performJob(JobDescriptor(1, 1))
</ins><span class="cx">         self.assertEquals(worker1.currentLoad, 1)
</span><span class="cx">         self.assertEquals(worker2.currentLoad, 0)
</span><span class="cx"> 
</span><span class="cx">         # Now ask the pool to do something
</span><del>-        peerPool.workerPool.performJob(2)
</del><ins>+        peerPool.workerPool.performJob(JobDescriptor(2, 1))
</ins><span class="cx">         self.assertEquals(worker1.currentLoad, 1)
</span><span class="cx">         self.assertEquals(worker2.currentLoad, 1)
</span><span class="cx"> 
</span><span class="lines">@@ -716,49 +822,42 @@
</span><span class="cx">         reactor.advance(astimestamp(then))
</span><span class="cx">         cph.setUp(self)
</span><span class="cx">         pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321)
</span><del>-        now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
</del><ins>+        now = then + datetime.timedelta(seconds=20)
</ins><span class="cx"> 
</span><span class="cx">         @transactionally(cph.pool.connection)
</span><span class="cx">         def createOldWork(txn):
</span><del>-            one = DummyWorkItem.makeJob(txn, jobID=100, workID=1, a=3, b=4, notBefore=then)
-            two = DummyWorkItem.makeJob(txn, jobID=101, workID=2, a=7, b=9, notBefore=now)
</del><ins>+            one = DummyWorkItem.makeJob(txn, jobID=1, workID=1, a=3, b=4, notBefore=then)
+            two = DummyWorkItem.makeJob(txn, jobID=2, workID=2, a=7, b=9, notBefore=now)
</ins><span class="cx">             return gatherResults([one, two])
</span><span class="cx"> 
</span><span class="cx">         pcp.startService()
</span><span class="cx">         cph.flushHolders()
</span><del>-        reactor.advance(pcp.queueProcessTimeout * 2)
</del><ins>+        reactor.advance(19)
</ins><span class="cx">         self.assertEquals(
</span><del>-            cph.rows(&quot;select * from DUMMY_WORK_DONE&quot;),
-            [(101, 200, 7)]
</del><ins>+            DummyWorkItem.results,
+            {1: 7}
</ins><span class="cx">         )
</span><del>-        cph.rows(&quot;delete from DUMMY_WORK_DONE&quot;)
-        reactor.advance(pcp.queueProcessTimeout * 2)
</del><ins>+        reactor.advance(20)
</ins><span class="cx">         self.assertEquals(
</span><del>-            cph.rows(&quot;select * from DUMMY_WORK_DONE&quot;),
-            [(102, 201, 16)]
</del><ins>+            DummyWorkItem.results,
+            {1: 7, 2: 16}
</ins><span class="cx">         )
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><del>-    def test_exceptionWhenCheckingForLostWork(self):
</del><ins>+    def test_exceptionWhenWorking(self):
</ins><span class="cx">         &quot;&quot;&quot;
</span><del>-        L{PeerConnectionPool._periodicLostWorkCheck} should execute any
</del><ins>+        L{PeerConnectionPool._workCheck} should execute any
</ins><span class="cx">         outstanding work items, and keep going if some raise an exception.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        # An arbitrary point in time.
</del><ins>+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
</ins><span class="cx">         fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
</span><del>-        # *why* does datetime still not have .astimestamp()
-        sinceEpoch = astimestamp(fakeNow)
-        clock = Clock()
-        clock.advance(sinceEpoch)
-        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
</del><span class="cx"> 
</span><span class="cx">         # Let's create a couple of work items directly, not via the enqueue
</span><span class="cx">         # method, so that they exist but nobody will try to immediately execute
</span><span class="cx">         # them.
</span><span class="cx"> 
</span><del>-        @transactionally(dbpool.connection)
</del><ins>+        @transactionally(dbpool.pool.connection)
</ins><span class="cx">         @inlineCallbacks
</span><span class="cx">         def setup(txn):
</span><span class="cx">             # First, one that's right now.
</span><span class="lines">@@ -776,14 +875,51 @@
</span><span class="cx">                 txn, a=2, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60)
</span><span class="cx">             )
</span><span class="cx">         yield setup
</span><del>-        yield qpool._periodicLostWorkCheck()
</del><ins>+        clock.advance(20 - 12)
</ins><span class="cx"> 
</span><del>-        @transactionally(dbpool.connection)
</del><ins>+        # Wait for job
+#        while True:
+#            jobs = yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))
+#            if all([job.a == -1 for job in jobs]):
+#                break
+#            clock.advance(1)
+
+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 1, 3: 2})
+
+
+    @inlineCallbacks
+    def test_exceptionUnassign(self):
+        &quot;&quot;&quot;
+        When a work item fails it should appear as unassigned in the JOB
+        table and have the failure count bumped, and a notBefore one minute ahead.
+        &quot;&quot;&quot;
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+
+        @transactionally(dbpool.pool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # Next, create failing work that's actually far enough into the past to run.
+            yield DummyWorkItem.makeJob(
+                txn, a=-1, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+        yield setup
+        clock.advance(20 - 12)
+
+        @transactionally(dbpool.pool.connection)
</ins><span class="cx">         def check(txn):
</span><del>-            return DummyWorkDone.all(txn)
</del><ins>+            return JobItem.all(txn)
</ins><span class="cx"> 
</span><del>-        every = yield check
-        self.assertEquals([x.aPlusB for x in every], [1, 2])
</del><ins>+        jobs = yield check
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is None)
+        self.assertTrue(jobs[0].failed == 1)
+        self.assertTrue(jobs[0].notBefore &gt; datetime.datetime.utcnow())
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -902,7 +1038,6 @@
</span><span class="cx">             )
</span><span class="cx">         self.addCleanup(deschema)
</span><span class="cx"> 
</span><del>-        from twisted.internet import reactor
</del><span class="cx">         self.node1 = PeerConnectionPool(
</span><span class="cx">             reactor, indirectedTransactionFactory, 0)
</span><span class="cx">         self.node2 = PeerConnectionPool(
</span><span class="lines">@@ -928,7 +1063,9 @@
</span><span class="cx">         yield gatherResults([d1, d2])
</span><span class="cx">         self.store.queuer = self.node1
</span><span class="cx"> 
</span><ins>+        DummyWorkItem.results = {}
</ins><span class="cx"> 
</span><ins>+
</ins><span class="cx">     def test_currentNodeInfo(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         There will be two C{NODE_INFO} rows in the database, retrievable as two
</span><span class="lines">@@ -942,12 +1079,11 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><del>-    def test_enqueueHappyPath(self):
</del><ins>+    def test_enqueueWorkDone(self):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         When a L{WorkItem} is scheduled for execution via
</span><del>-        L{PeerConnectionPool.enqueueWork} its C{doWork} method will be invoked
-        by the time the L{Deferred} returned from the resulting
-        L{WorkProposal}'s C{whenExecuted} method has fired.
</del><ins>+        L{PeerConnectionPool.enqueueWork} its C{doWork} method will be
+        run.
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         # TODO: this exact test should run against LocalQueuer as well.
</span><span class="cx">         def operation(txn):
</span><span class="lines">@@ -956,40 +1092,20 @@
</span><span class="cx">             # Should probably do something with components.
</span><span class="cx">             return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
</span><span class="cx">                                notBefore=datetime.datetime.utcnow())
</span><del>-        result = yield inTransaction(self.store.newTransaction, operation)
</del><ins>+        yield inTransaction(self.store.newTransaction, operation)
+
</ins><span class="cx">         # Wait for it to be executed.  Hopefully this does not time out :-\.
</span><del>-        yield result.whenExecuted()
</del><ins>+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
</ins><span class="cx"> 
</span><del>-        def op2(txn):
-            return Select(
-                [
-                    schema.DUMMY_WORK_DONE.WORK_ID,
-                    schema.DUMMY_WORK_DONE.JOB_ID,
-                    schema.DUMMY_WORK_DONE.A_PLUS_B,
-                ],
-                From=schema.DUMMY_WORK_DONE
-            ).on(txn)
</del><ins>+        self.assertEquals(DummyWorkItem.results, {100: 7})
</ins><span class="cx"> 
</span><del>-        rows = yield inTransaction(self.store.newTransaction, op2)
-        self.assertEquals(rows, [[101, 200, 7]])
</del><span class="cx"> 
</span><del>-
</del><span class="cx">     @inlineCallbacks
</span><span class="cx">     def test_noWorkDoneWhenConcurrentlyDeleted(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         When a L{WorkItem} is concurrently deleted by another transaction, it
</span><span class="cx">         should I{not} perform its work.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        # Provide access to a method called &quot;concurrently&quot; everything using
-        original = self.store.newTransaction
-
-        def decorate(*a, **k):
-            result = original(*a, **k)
-            result.concurrently = self.store.newTransaction
-            return result
-
-        self.store.newTransaction = decorate
-
</del><span class="cx">         def operation(txn):
</span><span class="cx">             return txn.enqueue(
</span><span class="cx">                 DummyWorkItem, a=30, b=40, workID=5678,
</span><span class="lines">@@ -997,33 +1113,15 @@
</span><span class="cx">                 notBefore=datetime.datetime.utcnow()
</span><span class="cx">             )
</span><span class="cx"> 
</span><del>-        proposal = yield inTransaction(self.store.newTransaction, operation)
-        yield proposal.whenExecuted()
</del><ins>+        yield inTransaction(self.store.newTransaction, operation)
</ins><span class="cx"> 
</span><del>-        # Sanity check on the concurrent deletion.
-        def op2(txn):
-            return Select(
-                [schema.DUMMY_WORK_ITEM.WORK_ID],
-                From=schema.DUMMY_WORK_ITEM
-            ).on(txn)
</del><ins>+        # Wait for it to be executed.  Hopefully this does not time out :-\.
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
</ins><span class="cx"> 
</span><del>-        rows = yield inTransaction(self.store.newTransaction, op2)
-        self.assertEquals(rows, [])
</del><ins>+        self.assertEquals(DummyWorkItem.results, {})
</ins><span class="cx"> 
</span><del>-        def op3(txn):
-            return Select(
-                [
-                    schema.DUMMY_WORK_DONE.WORK_ID,
-                    schema.DUMMY_WORK_DONE.A_PLUS_B,
-                ],
-                From=schema.DUMMY_WORK_DONE
-            ).on(txn)
</del><span class="cx"> 
</span><del>-        rows = yield inTransaction(self.store.newTransaction, op3)
-        self.assertEquals(rows, [])
</del><span class="cx"> 
</span><del>-
-
</del><span class="cx"> class DummyProposal(object):
</span><span class="cx"> 
</span><span class="cx">     def __init__(self, *ignored):
</span></span></pre>
</div>
</div>

</body>
</html>