<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[13472] twext/trunk</title>
</head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; }
#msg dl a { font-weight: bold}
#msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/13472">13472</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-05-14 12:50:12 -0700 (Wed, 14 May 2014)</dd>
</dl>
<h3>Log Message</h3>
<pre>New jobqueue implementation.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterpriseadbapi2py">twext/trunk/twext/enterprise/adbapi2.py</a></li>
<li><a href="#twexttrunktwextenterprisedalmodelpy">twext/trunk/twext/enterprise/dal/model.py</a></li>
<li><a href="#twexttrunktwextenterprisedalrecordpy">twext/trunk/twext/enterprise/dal/record.py</a></li>
<li><a href="#twexttrunktwextenterprisedalsyntaxpy">twext/trunk/twext/enterprise/dal/syntax.py</a></li>
<li><a href="#twexttrunktwextenterprisejobqueuepy">twext/trunk/twext/enterprise/jobqueue.py</a></li>
<li><a href="#twexttrunktwextenterprisequeuepy">twext/trunk/twext/enterprise/queue.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_jobqueuepy">twext/trunk/twext/enterprise/test/test_jobqueue.py</a></li>
</ul>
<h3>Property Changed</h3>
<ul>
<li><a href="#twexttrunk">twext/trunk/</a></li>
<li><a href="#twexttrunktwext">twext/trunk/twext/</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunk"></a>
<div class="propset"><h4>Property changes: twext/trunk</h4>
<pre class="diff"><span>
</span></pre></div>
<a id="svnmergeinfo"></a>
<div class="modfile"><h4>Modified: svn:mergeinfo</h4></div>
<span class="cx"> + /twext/branches/users/cdaboo/jobqueue-3:13444-13471
</span><span class="cx">/twext/branches/users/cdaboo/jobs:12742-12780
</span><a id="twexttrunktwext"></a>
<div class="propset"><h4>Property changes: twext/trunk/twext</h4>
<pre class="diff"><span>
</span></pre></div>
<a id="svnmergeinfo"></a>
<div class="modfile"><h4>Modified: svn:mergeinfo</h4></div>
<span class="cx">/CalendarServer/branches/config-separation/twext:4379-4443
</span><span class="cx">/CalendarServer/branches/egg-info-351/twext:4589-4625
</span><span class="cx">/CalendarServer/branches/generic-sqlstore/twext:6167-6191
</span><span class="cx">/CalendarServer/branches/new-store/twext:5594-5934
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile/twext:5911-5935
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile-2/twext:5936-5981
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-4.3-dev/twext:10180-10190,10192
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-5.1-dev/twext:11846
</span><span class="cx">/CalendarServer/branches/users/cdaboo/batchupload-6699/twext:6700-7198
</span><span class="cx">/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/twext:5693-5702
</span><span class="cx">/CalendarServer/branches/users/cdaboo/component-set-fixes/twext:8130-8346
</span><span class="cx">/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/twext:3628-3644
</span><span class="cx">/CalendarServer/branches/users/cdaboo/fix-no-ischedule/twext:11607-11871
</span><span class="cx">/CalendarServer/branches/users/cdaboo/implicituidrace/twext:8137-8141
</span><span class="cx">/CalendarServer/branches/users/cdaboo/ischedule-dkim/twext:9747-9979
</span><span class="cx">/CalendarServer/branches/users/cdaboo/json/twext:11622-11912
</span><span class="cx">/CalendarServer/branches/users/cdaboo/managed-attachments/twext:9985-10145
</span><span class="cx">/CalendarServer/branches/users/cdaboo/more-sharing-5591/twext:5592-5601
</span><span class="cx">/CalendarServer/branches/users/cdaboo/partition-4464/twext:4465-4957
</span><span class="cx">/CalendarServer/branches/users/cdaboo/performance-tweaks/twext:11824-11836
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pods/twext:7297-7377
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycalendar/twext:7085-7206
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycard/twext:7227-7237
</span><span class="cx">/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twext:7740-8287
</span><span class="cx">/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/twext:5071-5105
</span><span class="cx">/CalendarServer/branches/users/cdaboo/reverse-proxy-pods/twext:11875-11900
</span><span class="cx">/CalendarServer/branches/users/cdaboo/shared-calendars-5187/twext:5188-5440
</span><span class="cx">/CalendarServer/branches/users/cdaboo/sharing-in-the-store/twext:11935-12016
</span><span class="cx">/CalendarServer/branches/users/cdaboo/store-scheduling/twext:10876-11129
</span><span class="cx">/CalendarServer/branches/users/cdaboo/timezones/twext:7443-7699
</span><span class="cx">/CalendarServer/branches/users/cdaboo/txn-debugging/twext:8730-8743
</span><span class="cx">/CalendarServer/branches/users/gaya/sharedgroups-3/twext:11088-11204
</span><span class="cx">/CalendarServer/branches/users/glyph/always-abort-txn-on-error/twext:9958-9969
</span><span class="cx">/CalendarServer/branches/users/glyph/case-insensitive-uid/twext:8772-8805
</span><span class="cx">/CalendarServer/branches/users/glyph/conn-limit/twext:6574-6577
</span><span class="cx">/CalendarServer/branches/users/glyph/contacts-server-merge/twext:4971-5080
</span><span class="cx">/CalendarServer/branches/users/glyph/dalify/twext:6932-7023
</span><span class="cx">/CalendarServer/branches/users/glyph/db-reconnect/twext:6824-6876
</span><span class="cx">/CalendarServer/branches/users/glyph/deploybuild/twext:7563-7572
</span><span class="cx">/CalendarServer/branches/users/glyph/digest-auth-redux/twext:10624-10635
</span><span class="cx">/CalendarServer/branches/users/glyph/disable-quota/twext:7718-7727
</span><span class="cx">/CalendarServer/branches/users/glyph/dont-start-postgres/twext:6592-6614
</span><span class="cx">/CalendarServer/branches/users/glyph/enforce-max-requests/twext:11640-11643
</span><span class="cx">/CalendarServer/branches/users/glyph/hang-fix/twext:11465-11491
</span><span class="cx">/CalendarServer/branches/users/glyph/imip-and-admin-html/twext:7866-7984
</span><span class="cx">/CalendarServer/branches/users/glyph/ipv6-client/twext:9054-9105
</span><span class="cx">/CalendarServer/branches/users/glyph/launchd-wrapper-bis/twext:11413-11436
</span><span class="cx">/CalendarServer/branches/users/glyph/linux-tests/twext:6893-6900
</span><span class="cx">/CalendarServer/branches/users/glyph/log-cleanups/twext:11691-11731
</span><span class="cx">/CalendarServer/branches/users/glyph/migrate-merge/twext:8690-8713
</span><span class="cx">/CalendarServer/branches/users/glyph/misc-portability-fixes/twext:7365-7374
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-6/twext:6322-6368
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-7/twext:6369-6445
</span><span class="cx">/CalendarServer/branches/users/glyph/multiget-delete/twext:8321-8330
</span><span class="cx">/CalendarServer/branches/users/glyph/new-export/twext:7444-7485
</span><span class="cx">/CalendarServer/branches/users/glyph/one-home-list-api/twext:10048-10073
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle/twext:7106-7155
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle-nulls/twext:7340-7351
</span><span class="cx">/CalendarServer/branches/users/glyph/other-html/twext:8062-8091
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-sim/twext:8240-8251
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade/twext:8376-8400
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext:8571-8583
</span><span class="cx">/CalendarServer/branches/users/glyph/q/twext:9560-9688
</span><span class="cx">/CalendarServer/branches/users/glyph/queue-locking-and-timing/twext:10204-10289
</span><span class="cx">/CalendarServer/branches/users/glyph/quota/twext:7604-7637
</span><span class="cx">/CalendarServer/branches/users/glyph/sendfdport/twext:5388-5424
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-fixes/twext:8436-8443
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-take2/twext:8155-8174
</span><span class="cx">/CalendarServer/branches/users/glyph/sharedpool/twext:6490-6550
</span><span class="cx">/CalendarServer/branches/users/glyph/sharing-api/twext:9192-9205
</span><span class="cx">/CalendarServer/branches/users/glyph/skip-lonely-vtimezones/twext:8524-8535
</span><span class="cx">/CalendarServer/branches/users/glyph/sql-store/twext:5929-6073
</span><span class="cx">/CalendarServer/branches/users/glyph/start-service-start-loop/twext:11060-11065
</span><span class="cx">/CalendarServer/branches/users/glyph/subtransactions/twext:7248-7258
</span><span class="cx">/CalendarServer/branches/users/glyph/table-alias/twext:8651-8664
</span><span class="cx">/CalendarServer/branches/users/glyph/uidexport/twext:7673-7676
</span><span class="cx">/CalendarServer/branches/users/glyph/unshare-when-access-revoked/twext:10562-10595
</span><span class="cx">/CalendarServer/branches/users/glyph/use-system-twisted/twext:5084-5149
</span><span class="cx">/CalendarServer/branches/users/glyph/uuid-normalize/twext:9268-9296
</span><span class="cx">/CalendarServer/branches/users/glyph/warning-cleanups/twext:11347-11357
</span><span class="cx">/CalendarServer/branches/users/glyph/whenNotProposed/twext:11881-11897
</span><span class="cx">/CalendarServer/branches/users/glyph/xattrs-from-files/twext:7757-7769
</span><span class="cx">/CalendarServer/branches/users/sagen/applepush/twext:8126-8184
</span><span class="cx">/CalendarServer/branches/users/sagen/inboxitems/twext:7380-7381
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources/twext:5032-5051
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources-2/twext:5052-5061
</span><span class="cx">/CalendarServer/branches/users/sagen/purge_old_events/twext:6735-6746
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4038/twext:4040-4067
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4066/twext:4068-4075
</span><span class="cx">/CalendarServer/branches/users/sagen/resources-2/twext:5084-5093
</span><span class="cx">/CalendarServer/branches/users/sagen/testing/twext:10827-10851,10853-10855
</span><span class="cx">/CalendarServer/branches/users/wsanchez/transations/twext:5515-5593
</span><span class="cx">/twext/branches/users/cdaboo/jobs/twext:12742-12780
</span><span class="cx"> + /CalDAVTester/trunk/twext:11193-11198
</span><span class="cx">/CalendarServer/branches/config-separation/twext:4379-4443
</span><span class="cx">/CalendarServer/branches/egg-info-351/twext:4589-4625
</span><span class="cx">/CalendarServer/branches/generic-sqlstore/twext:6167-6191
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile-2/twext:5936-5981
</span><span class="cx">/CalendarServer/branches/new-store-no-caldavfile/twext:5911-5935
</span><span class="cx">/CalendarServer/branches/new-store/twext:5594-5934
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-4.3-dev/twext:10180-10190,10192
</span><span class="cx">/CalendarServer/branches/release/CalendarServer-5.1-dev/twext:11846
</span><span class="cx">/CalendarServer/branches/users/cdaboo/batchupload-6699/twext:6700-7198
</span><span class="cx">/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692/twext:5693-5702
</span><span class="cx">/CalendarServer/branches/users/cdaboo/component-set-fixes/twext:8130-8346
</span><span class="cx">/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627/twext:3628-3644
</span><span class="cx">/CalendarServer/branches/users/cdaboo/fix-no-ischedule/twext:11607-11871
</span><span class="cx">/CalendarServer/branches/users/cdaboo/implicituidrace/twext:8137-8141
</span><span class="cx">/CalendarServer/branches/users/cdaboo/ischedule-dkim/twext:9747-9979
</span><span class="cx">/CalendarServer/branches/users/cdaboo/json/twext:11622-11912
</span><span class="cx">/CalendarServer/branches/users/cdaboo/managed-attachments/twext:9985-10145
</span><span class="cx">/CalendarServer/branches/users/cdaboo/more-sharing-5591/twext:5592-5601
</span><span class="cx">/CalendarServer/branches/users/cdaboo/partition-4464/twext:4465-4957
</span><span class="cx">/CalendarServer/branches/users/cdaboo/performance-tweaks/twext:11824-11836
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pods/twext:7297-7377
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycalendar/twext:7085-7206
</span><span class="cx">/CalendarServer/branches/users/cdaboo/pycard/twext:7227-7237
</span><span class="cx">/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twext:7740-8287
</span><span class="cx">/CalendarServer/branches/users/cdaboo/relative-config-paths-5070/twext:5071-5105
</span><span class="cx">/CalendarServer/branches/users/cdaboo/reverse-proxy-pods/twext:11875-11900
</span><span class="cx">/CalendarServer/branches/users/cdaboo/shared-calendars-5187/twext:5188-5440
</span><span class="cx">/CalendarServer/branches/users/cdaboo/sharing-in-the-store/twext:11935-12016
</span><span class="cx">/CalendarServer/branches/users/cdaboo/store-scheduling/twext:10876-11129
</span><span class="cx">/CalendarServer/branches/users/cdaboo/timezones/twext:7443-7699
</span><span class="cx">/CalendarServer/branches/users/cdaboo/txn-debugging/twext:8730-8743
</span><span class="cx">/CalendarServer/branches/users/gaya/sharedgroups-3/twext:11088-11204
</span><span class="cx">/CalendarServer/branches/users/glyph/always-abort-txn-on-error/twext:9958-9969
</span><span class="cx">/CalendarServer/branches/users/glyph/case-insensitive-uid/twext:8772-8805
</span><span class="cx">/CalendarServer/branches/users/glyph/conn-limit/twext:6574-6577
</span><span class="cx">/CalendarServer/branches/users/glyph/contacts-server-merge/twext:4971-5080
</span><span class="cx">/CalendarServer/branches/users/glyph/dalify/twext:6932-7023
</span><span class="cx">/CalendarServer/branches/users/glyph/db-reconnect/twext:6824-6876
</span><span class="cx">/CalendarServer/branches/users/glyph/deploybuild/twext:7563-7572
</span><span class="cx">/CalendarServer/branches/users/glyph/digest-auth-redux/twext:10624-10635
</span><span class="cx">/CalendarServer/branches/users/glyph/disable-quota/twext:7718-7727
</span><span class="cx">/CalendarServer/branches/users/glyph/dont-start-postgres/twext:6592-6614
</span><span class="cx">/CalendarServer/branches/users/glyph/enforce-max-requests/twext:11640-11643
</span><span class="cx">/CalendarServer/branches/users/glyph/hang-fix/twext:11465-11491
</span><span class="cx">/CalendarServer/branches/users/glyph/imip-and-admin-html/twext:7866-7984
</span><span class="cx">/CalendarServer/branches/users/glyph/ipv6-client/twext:9054-9105
</span><span class="cx">/CalendarServer/branches/users/glyph/launchd-wrapper-bis/twext:11413-11436
</span><span class="cx">/CalendarServer/branches/users/glyph/linux-tests/twext:6893-6900
</span><span class="cx">/CalendarServer/branches/users/glyph/log-cleanups/twext:11691-11731
</span><span class="cx">/CalendarServer/branches/users/glyph/migrate-merge/twext:8690-8713
</span><span class="cx">/CalendarServer/branches/users/glyph/misc-portability-fixes/twext:7365-7374
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-6/twext:6322-6368
</span><span class="cx">/CalendarServer/branches/users/glyph/more-deferreds-7/twext:6369-6445
</span><span class="cx">/CalendarServer/branches/users/glyph/multiget-delete/twext:8321-8330
</span><span class="cx">/CalendarServer/branches/users/glyph/new-export/twext:7444-7485
</span><span class="cx">/CalendarServer/branches/users/glyph/one-home-list-api/twext:10048-10073
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle-nulls/twext:7340-7351
</span><span class="cx">/CalendarServer/branches/users/glyph/oracle/twext:7106-7155
</span><span class="cx">/CalendarServer/branches/users/glyph/other-html/twext:8062-8091
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-sim/twext:8240-8251
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade/twext:8376-8400
</span><span class="cx">/CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext:8571-8583
</span><span class="cx">/CalendarServer/branches/users/glyph/q/twext:9560-9688
</span><span class="cx">/CalendarServer/branches/users/glyph/queue-locking-and-timing/twext:10204-10289
</span><span class="cx">/CalendarServer/branches/users/glyph/quota/twext:7604-7637
</span><span class="cx">/CalendarServer/branches/users/glyph/sendfdport/twext:5388-5424
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-fixes/twext:8436-8443
</span><span class="cx">/CalendarServer/branches/users/glyph/shared-pool-take2/twext:8155-8174
</span><span class="cx">/CalendarServer/branches/users/glyph/sharedpool/twext:6490-6550
</span><span class="cx">/CalendarServer/branches/users/glyph/sharing-api/twext:9192-9205
</span><span class="cx">/CalendarServer/branches/users/glyph/skip-lonely-vtimezones/twext:8524-8535
</span><span class="cx">/CalendarServer/branches/users/glyph/sql-store/twext:5929-6073
</span><span class="cx">/CalendarServer/branches/users/glyph/start-service-start-loop/twext:11060-11065
</span><span class="cx">/CalendarServer/branches/users/glyph/subtransactions/twext:7248-7258
</span><span class="cx">/CalendarServer/branches/users/glyph/table-alias/twext:8651-8664
</span><span class="cx">/CalendarServer/branches/users/glyph/uidexport/twext:7673-7676
</span><span class="cx">/CalendarServer/branches/users/glyph/unshare-when-access-revoked/twext:10562-10595
</span><span class="cx">/CalendarServer/branches/users/glyph/use-system-twisted/twext:5084-5149
</span><span class="cx">/CalendarServer/branches/users/glyph/uuid-normalize/twext:9268-9296
</span><span class="cx">/CalendarServer/branches/users/glyph/warning-cleanups/twext:11347-11357
</span><span class="cx">/CalendarServer/branches/users/glyph/whenNotProposed/twext:11881-11897
</span><span class="cx">/CalendarServer/branches/users/glyph/xattrs-from-files/twext:7757-7769
</span><span class="cx">/CalendarServer/branches/users/sagen/applepush/twext:8126-8184
</span><span class="cx">/CalendarServer/branches/users/sagen/inboxitems/twext:7380-7381
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources-2/twext:5052-5061
</span><span class="cx">/CalendarServer/branches/users/sagen/locations-resources/twext:5032-5051
</span><span class="cx">/CalendarServer/branches/users/sagen/purge_old_events/twext:6735-6746
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4038/twext:4040-4067
</span><span class="cx">/CalendarServer/branches/users/sagen/resource-delegates-4066/twext:4068-4075
</span><span class="cx">/CalendarServer/branches/users/sagen/resources-2/twext:5084-5093
</span><span class="cx">/CalendarServer/branches/users/sagen/testing/twext:10827-10851,10853-10855
</span><span class="cx">/CalendarServer/branches/users/wsanchez/transations/twext:5515-5593
</span><span class="cx">/twext/branches/users/cdaboo/jobqueue-3/twext:13444-13471
</span><span class="cx">/twext/branches/users/cdaboo/jobs/twext:12742-12780
</span><a id="twexttrunktwextenterpriseadbapi2py"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/adbapi2.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/adbapi2.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/adbapi2.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -154,13 +154,14 @@
</span><span class="cx">
</span><span class="cx"> noisy = False
</span><span class="cx">
</span><del>- def __init__(self, pool, threadHolder, connection, cursor):
</del><ins>+ def __init__(self, pool, threadHolder, connection, cursor, label=None):
</ins><span class="cx"> self._pool = pool
</span><span class="cx"> self._completed = "idle"
</span><span class="cx"> self._cursor = cursor
</span><span class="cx"> self._connection = connection
</span><span class="cx"> self._holder = threadHolder
</span><span class="cx"> self._first = True
</span><ins>+ self._label = label
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @_forward
</span><span class="lines">@@ -402,10 +403,11 @@
</span><span class="cx"> """
</span><span class="cx"> implements(IAsyncTransaction)
</span><span class="cx">
</span><del>- def __init__(self, pool, reason):
</del><ins>+ def __init__(self, pool, reason, label=None):
</ins><span class="cx"> self.paramstyle = pool.paramstyle
</span><span class="cx"> self.dialect = pool.dialect
</span><span class="cx"> self.reason = reason
</span><ins>+ self._label = label
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def _everything(self, *a, **kw):
</span><span class="lines">@@ -430,7 +432,7 @@
</span><span class="cx">
</span><span class="cx"> implements(IAsyncTransaction)
</span><span class="cx">
</span><del>- def __init__(self, pool):
</del><ins>+ def __init__(self, pool, label=None):
</ins><span class="cx"> """
</span><span class="cx"> Initialize a L{_WaitingTxn} based on a L{ConnectionPool}. (The C{pool}
</span><span class="cx"> is used only to reflect C{dialect} and C{paramstyle} attributes; not
</span><span class="lines">@@ -439,6 +441,7 @@
</span><span class="cx"> self._spool = []
</span><span class="cx"> self.paramstyle = pool.paramstyle
</span><span class="cx"> self.dialect = pool.dialect
</span><ins>+ self._label = label
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def _enspool(self, cmd, a=(), kw={}):
</span><span class="lines">@@ -593,6 +596,7 @@
</span><span class="cx"> super(_SingleTxn, self).__init__()
</span><span class="cx"> self._pool = pool
</span><span class="cx"> self._baseTxn = baseTxn
</span><ins>+ self._label = self._baseTxn._label
</ins><span class="cx"> self._completed = False
</span><span class="cx"> self._currentBlock = None
</span><span class="cx"> self._blockedQueue = None
</span><span class="lines">@@ -718,7 +722,8 @@
</span><span class="cx"> self._unspoolOnto(_NoTxn(
</span><span class="cx"> self._pool,
</span><span class="cx"> "connection pool shut down while txn "
</span><del>- "waiting for database connection."
</del><ins>+ "waiting for database connection.",
+ label=self._label,
</ins><span class="cx"> ))
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -746,7 +751,7 @@
</span><span class="cx"> self._checkComplete()
</span><span class="cx"> block = CommandBlock(self)
</span><span class="cx"> if self._currentBlock is None:
</span><del>- self._blockedQueue = _WaitingTxn(self._pool)
</del><ins>+ self._blockedQueue = _WaitingTxn(self._pool, label=self._label)
</ins><span class="cx"> # FIXME: test the case where it's ready immediately.
</span><span class="cx"> self._checkNextBlock()
</span><span class="cx"> return block
</span><span class="lines">@@ -795,7 +800,7 @@
</span><span class="cx"> self._singleTxn = singleTxn
</span><span class="cx"> self.paramstyle = singleTxn.paramstyle
</span><span class="cx"> self.dialect = singleTxn.dialect
</span><del>- self._spool = _WaitingTxn(singleTxn._pool)
</del><ins>+ self._spool = _WaitingTxn(singleTxn._pool, label=singleTxn._label)
</ins><span class="cx"> self._started = False
</span><span class="cx"> self._ended = False
</span><span class="cx"> self._waitingForEnd = []
</span><span class="lines">@@ -1067,14 +1072,15 @@
</span><span class="cx"> if self._stopping:
</span><span class="cx"> # FIXME: should be wrapping a _SingleTxn around this to get
</span><span class="cx"> # .commandBlock()
</span><del>- return _NoTxn(self, "txn created while DB pool shutting down")
</del><ins>+ return _NoTxn(self, "txn created while DB pool shutting down", label=label)
</ins><span class="cx">
</span><span class="cx"> if self._free:
</span><span class="cx"> basetxn = self._free.pop(0)
</span><ins>+ basetxn._label = label
</ins><span class="cx"> self._busy.append(basetxn)
</span><span class="cx"> txn = _SingleTxn(self, basetxn)
</span><span class="cx"> else:
</span><del>- txn = _SingleTxn(self, _WaitingTxn(self))
</del><ins>+ txn = _SingleTxn(self, _WaitingTxn(self, label=label))
</ins><span class="cx"> self._waiting.append(txn)
</span><span class="cx"> # FIXME/TESTME: should be len(self._busy) + len(self._finishing)
</span><span class="cx"> # (free doesn't need to be considered, as it's tested above)
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalmodelpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/model.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/model.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/model.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -221,12 +221,12 @@
</span><span class="cx">
</span><span class="cx"> compareAttributes = 'table name'.split()
</span><span class="cx">
</span><del>- def __init__(self, table, name, type):
</del><ins>+ def __init__(self, table, name, type, default=NO_DEFAULT):
</ins><span class="cx"> _checkstr(name)
</span><span class="cx"> self.table = table
</span><span class="cx"> self.name = name
</span><span class="cx"> self.type = type
</span><del>- self.default = NO_DEFAULT
</del><ins>+ self.default = default
</ins><span class="cx"> self.references = None
</span><span class="cx"> self.deleteAction = None
</span><span class="cx">
</span><span class="lines">@@ -253,14 +253,16 @@
</span><span class="cx"> # Some DBs don't allow sequence as a default
</span><span class="cx"> if (
</span><span class="cx"> isinstance(self.default, Sequence) and other.default == NO_DEFAULT or
</span><del>- self.default == NO_DEFAULT and isinstance(other.default, Sequence)
</del><ins>+ self.default == NO_DEFAULT and isinstance(other.default, Sequence) or
+ self.default is None and other.default == NO_DEFAULT or
+ self.default == NO_DEFAULT and other.default is None
</ins><span class="cx"> ):
</span><span class="cx"> pass
</span><span class="cx"> else:
</span><span class="cx"> results.append("Table: %s, column name %s default mismatch" % (self.table.name, self.name,))
</span><span class="cx"> if stringIfNone(self.references, "name") != stringIfNone(other.references, "name"):
</span><span class="cx"> results.append("Table: %s, column name %s references mismatch" % (self.table.name, self.name,))
</span><del>- if self.deleteAction != other.deleteAction:
</del><ins>+ if stringIfNone(self.deleteAction, "") != stringIfNone(other.deleteAction, ""):
</ins><span class="cx"> results.append("Table: %s, column name %s delete action mismatch" % (self.table.name, self.name,))
</span><span class="cx"> return results
</span><span class="cx">
</span><span class="lines">@@ -403,7 +405,7 @@
</span><span class="cx"> raise KeyError("no such column: %r" % (name,))
</span><span class="cx">
</span><span class="cx">
</span><del>- def addColumn(self, name, type):
</del><ins>+ def addColumn(self, name, type, default=NO_DEFAULT, notNull=False, primaryKey=False):
</ins><span class="cx"> """
</span><span class="cx"> A new column was parsed for this table.
</span><span class="cx">
</span><span class="lines">@@ -413,8 +415,12 @@
</span><span class="cx">
</span><span class="cx"> @param type: The L{SQLType} describing the column's type.
</span><span class="cx"> """
</span><del>- column = Column(self, name, type)
</del><ins>+ column = Column(self, name, type, default=default)
</ins><span class="cx"> self.columns.append(column)
</span><ins>+ if notNull:
+ self.tableConstraint(Constraint.NOT_NULL, [name])
+ if primaryKey:
+ self.primaryKey = [column]
</ins><span class="cx"> return column
</span><span class="cx">
</span><span class="cx">
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalrecordpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/record.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/record.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/record.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -352,7 +352,7 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><del>- def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False):
</del><ins>+ def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False, limit=None):
</ins><span class="cx"> """
</span><span class="cx"> Query the table that corresponds to C{cls}, and return instances of
</span><span class="cx"> C{cls} corresponding to the rows that are returned from that table.
</span><span class="lines">@@ -385,6 +385,8 @@
</span><span class="cx"> kw.update(ForUpdate=True)
</span><span class="cx"> if noWait:
</span><span class="cx"> kw.update(NoWait=True)
</span><ins>+ if limit is not None:
+ kw.update(Limit=limit)
</ins><span class="cx"> return cls._rowsFromQuery(
</span><span class="cx"> transaction,
</span><span class="cx"> Select(
</span></span></pre></div>
<a id="twexttrunktwextenterprisedalsyntaxpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/syntax.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/syntax.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/dal/syntax.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -1433,9 +1433,12 @@
</span><span class="cx"> stmt.append(SQLFragment(kw))
</span><span class="cx">
</span><span class="cx"> if self.ForUpdate:
</span><del>- stmt.text += " for update"
- if self.NoWait:
- stmt.text += " nowait"
</del><ins>+ # FOR UPDATE not supported with sqlite - but that is probably not relevant
+ # given that sqlite does file level locking of the DB
+ if queryGenerator.dialect != SQLITE_DIALECT:
+ stmt.text += " for update"
+ if self.NoWait:
+ stmt.text += " nowait"
</ins><span class="cx">
</span><span class="cx"> if self.Limit is not None:
</span><span class="cx"> limitConst = Constant(self.Limit).subSQL(queryGenerator, allTables)
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobqueue.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobqueue.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/jobqueue.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -81,7 +81,8 @@
</span><span class="cx"> """
</span><span class="cx">
</span><span class="cx"> from functools import wraps
</span><del>-from datetime import datetime
</del><ins>+from datetime import datetime, timedelta
+from collections import namedtuple
</ins><span class="cx">
</span><span class="cx"> from zope.interface import implements
</span><span class="cx">
</span><span class="lines">@@ -91,12 +92,12 @@
</span><span class="cx"> inlineCallbacks, returnValue, Deferred, passthru, succeed
</span><span class="cx"> )
</span><span class="cx"> from twisted.internet.endpoints import TCP4ClientEndpoint
</span><del>-from twisted.protocols.amp import AMP, Command, Integer, String
</del><ins>+from twisted.protocols.amp import AMP, Command, Integer, String, Argument
</ins><span class="cx"> from twisted.python.reflect import qual
</span><del>-from twisted.python import log
</del><ins>+from twext.python.log import Logger
</ins><span class="cx">
</span><span class="cx"> from twext.enterprise.dal.syntax import (
</span><del>- SchemaSyntax, Lock, NamedValue, Select, Count
</del><ins>+ SchemaSyntax, Lock, NamedValue
</ins><span class="cx"> )
</span><span class="cx">
</span><span class="cx"> from twext.enterprise.dal.model import ProcedureCall
</span><span class="lines">@@ -109,7 +110,10 @@
</span><span class="cx"> from zope.interface.interface import Interface
</span><span class="cx"> from twext.enterprise.locking import NamedLock
</span><span class="cx">
</span><ins>+import time
</ins><span class="cx">
</span><ins>+log = Logger()
+
</ins><span class="cx"> class _IJobPerformer(Interface):
</span><span class="cx"> """
</span><span class="cx"> An object that can perform work.
</span><span class="lines">@@ -118,10 +122,10 @@
</span><span class="cx"> (in the worst case) pass from worker->controller->controller->worker.
</span><span class="cx"> """
</span><span class="cx">
</span><del>- def performJob(jobID): # @NoSelf
</del><ins>+ def performJob(job): # @NoSelf
</ins><span class="cx"> """
</span><del>- @param jobID: The primary key identifier of the given job.
- @type jobID: L{int}
</del><ins>+ @param job: Details about the job to perform.
+ @type job: L{JobDescriptor}
</ins><span class="cx">
</span><span class="cx"> @return: a L{Deferred} firing with an empty dictionary when the work is
</span><span class="cx"> complete.
</span><span class="lines">@@ -180,17 +184,13 @@
</span><span class="cx"> # transaction is made aware of somehow.
</span><span class="cx"> JobTable = Table(inSchema, "JOB")
</span><span class="cx">
</span><del>- JobTable.addColumn("JOB_ID", SQLType("integer", None)).setDefaultValue(
- ProcedureCall("nextval", ["JOB_SEQ"])
- )
- JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255))
- JobTable.addColumn("PRIORITY", SQLType("integer", 0))
- JobTable.addColumn("WEIGHT", SQLType("integer", 0))
- JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None))
- JobTable.addColumn("NOT_AFTER", SQLType("timestamp", None))
- for column in ("JOB_ID", "WORK_TYPE"):
- JobTable.tableConstraint(Constraint.NOT_NULL, [column])
- JobTable.primaryKey = [JobTable.columnNamed("JOB_ID"), ]
</del><ins>+ JobTable.addColumn("JOB_ID", SQLType("integer", None), default=ProcedureCall("nextval", ["JOB_SEQ"]), notNull=True, primaryKey=True)
+ JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255), notNull=True)
+ JobTable.addColumn("PRIORITY", SQLType("integer", 0), default=0)
+ JobTable.addColumn("WEIGHT", SQLType("integer", 0), default=0)
+ JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None), notNull=True)
+ JobTable.addColumn("ASSIGNED", SQLType("timestamp", None), default=None)
+ JobTable.addColumn("FAILED", SQLType("integer", 0), default=0)
</ins><span class="cx">
</span><span class="cx"> return inSchema
</span><span class="cx">
</span><span class="lines">@@ -199,7 +199,7 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><del>-def inTransaction(transactionCreator, operation, label="jobqueue.inTransaction"):
</del><ins>+def inTransaction(transactionCreator, operation, label="jobqueue.inTransaction", **kwargs):
</ins><span class="cx"> """
</span><span class="cx"> Perform the given operation in a transaction, committing or aborting as
</span><span class="cx"> required.
</span><span class="lines">@@ -218,7 +218,7 @@
</span><span class="cx"> """
</span><span class="cx"> txn = transactionCreator(label=label)
</span><span class="cx"> try:
</span><del>- result = yield operation(txn)
</del><ins>+ result = yield operation(txn, **kwargs)
</ins><span class="cx"> except:
</span><span class="cx"> f = Failure()
</span><span class="cx"> yield txn.abort()
</span><span class="lines">@@ -270,6 +270,16 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><ins>+class JobFailedError(Exception):
+ """
+ A job failed to run - we need to be smart about clean up.
+ """
+
+ def __init__(self, ex):
+ self._ex = ex
+
+
+
</ins><span class="cx"> class JobItem(Record, fromTable(JobInfoSchema.JOB)):
</span><span class="cx"> """
</span><span class="cx"> An item in the job table. This is typically not directly used by code
</span><span class="lines">@@ -277,6 +287,10 @@
</span><span class="cx"> associated with work items.
</span><span class="cx"> """
</span><span class="cx">
</span><ins>+ def descriptor(self):
+ return JobDescriptor(self.jobID, self.weight)
+
+
</ins><span class="cx"> @inlineCallbacks
</span><span class="cx"> def workItem(self):
</span><span class="cx"> workItemClass = WorkItem.forTableName(self.workType)
</span><span class="lines">@@ -286,7 +300,183 @@
</span><span class="cx"> returnValue(workItems[0] if len(workItems) == 1 else None)
</span><span class="cx">
</span><span class="cx">
</span><ins>+ def assign(self, now):
+ """
+ Mark this job as assigned to a worker by setting the assigned column to the current,
+ or provided, timestamp.
+
+ @param now: current timestamp
+ @type now: L{datetime.datetime}
+ @param when: explicitly set the assigned time - typically only used in tests
+ @type when: L{datetime.datetime} or L{None}
+ """
+ return self.update(assigned=now)
+
+
+ def failedToRun(self):
+ """
+ The attempt to run the job failed. Leave it in the queue, but mark it
+ as unassigned, bump the failure count and set to run at some point in
+ the future.
+ """
+ return self.update(
+ assigned=None,
+ failed=self.failed + 1,
+ notBefore=datetime.utcnow() + timedelta(seconds=60)
+ )
+
+
+ @classmethod
</ins><span class="cx"> @inlineCallbacks
</span><ins>+ def ultimatelyPerform(cls, txnFactory, jobID):
+ """
+ Eventually, after routing the job to the appropriate place, somebody
+ actually has to I{do} it.
+
+ @param txnFactory: a 0- or 1-argument callable that creates an
+ L{IAsyncTransaction}
+ @type txnFactory: L{callable}
+
+ @param jobID: the ID of the job to be performed
+ @type jobID: L{int}
+
+ @return: a L{Deferred} which fires with C{None} when the job has been
+ performed, or fails if the job can't be performed.
+ """
+
+ t = time.time()
+ def _tm():
+ return "{:.3f}".format(1000 * (time.time() - t))
+ def _overtm(nb):
+ return "{:.0f}".format(1000 * (t - astimestamp(nb)))
+
+ log.debug("JobItem: starting to run {jobid}".format(jobid=jobID))
+ txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
+ try:
+ job = yield cls.load(txn, jobID)
+ if hasattr(txn, "_label"):
+ txn._label = "{} <{}>".format(txn._label, job.workType)
+ log.debug("JobItem: loaded {jobid} {work} t={tm}".format(
+ jobid=jobID,
+ work=job.workType,
+ tm=_tm())
+ )
+ yield job.run()
+
+ except NoSuchRecord:
+ # The record has already been removed
+ yield txn.commit()
+ log.debug("JobItem: already removed {jobid} t={tm}".format(jobid=jobID, tm=_tm()))
+
+ except JobFailedError:
+ # Job failed: abort with cleanup, but pretend this method succeeded
+ def _cleanUp():
+ @inlineCallbacks
+ def _cleanUp2(txn2):
+ job = yield cls.load(txn2, jobID)
+ log.debug("JobItem: marking as failed {jobid}, failure count: {count} t={tm}".format(jobid=jobID, count=job.failed + 1, tm=_tm()))
+ yield job.failedToRun()
+ return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
+ txn.postAbort(_cleanUp)
+ yield txn.abort()
+ log.debug("JobItem: failed {jobid} {work} t={tm}".format(
+ jobid=jobID,
+ work=job.workType,
+ tm=_tm()
+ ))
+
+ except:
+ f = Failure()
+ log.error("JobItem: Unknown exception for {jobid} failed t={tm} {exc}".format(
+ jobid=jobID,
+ tm=_tm(),
+ exc=f,
+ ))
+ yield txn.abort()
+ returnValue(f)
+
+ else:
+ yield txn.commit()
+ log.debug("JobItem: completed {jobid} {work} t={tm} over={over}".format(
+ jobid=jobID,
+ work=job.workType,
+ tm=_tm(),
+ over=_overtm(job.notBefore)
+ ))
+
+ returnValue(None)
+
+
+ @classmethod
+ @inlineCallbacks
+ def nextjob(cls, txn, now, minPriority, overdue):
+ """
+ Find the next available job based on priority, also return any that are overdue. This
+ method relies on there being a nextjob() SQL stored procedure to enable skipping over
+ items which are row locked to help avoid contention when multiple nodes are operating
+ on the job queue simultaneously.
+
+ @param txn: the transaction to use
+ @type txn: L{IAsyncTransaction}
+ @param now: current timestamp
+ @type now: L{datetime.datetime}
+ @param minPriority: lowest priority level to query for
+ @type minPriority: L{int}
+ @param overdue: how long before an assigned item is considered overdue
+ @type overdue: L{datetime.datetime}
+
+ @return: the job record
+ @rtype: L{JobItem}
+ """
+
+ jobs = yield cls.nextjobs(txn, now, minPriority, overdue, limit=1)
+
+ # Must only be one or zero
+ if jobs and len(jobs) > 1:
+ raise AssertionError("next_job() returned more than one row")
+
+ returnValue(jobs[0] if jobs else None)
+
+
+ @classmethod
+ @inlineCallbacks
+ def nextjobs(cls, txn, now, minPriority, overdue, limit=1):
+ """
+ Find the next available job based on priority, also return any that are overdue. This
+ method relies on there being a nextjob() SQL stored procedure to enable skipping over
+ items which are row locked to help avoid contention when multiple nodes are operating
+ on the job queue simultaneously.
+
+ @param txn: the transaction to use
+ @type txn: L{IAsyncTransaction}
+ @param now: current timestamp
+ @type now: L{datetime.datetime}
+ @param minPriority: lowest priority level to query for
+ @type minPriority: L{int}
+ @param overdue: how long before an assigned item is considered overdue
+ @type overdue: L{datetime.datetime}
+ @param limit: limit on number of jobs to return
+ @type limit: L{int}
+
+ @return: the job record
+ @rtype: L{JobItem}
+ """
+
+ jobs = yield cls.query(
+ txn,
+ (cls.notBefore <= now).And
+ (((cls.priority >= minPriority).And(cls.assigned == None)).Or(cls.assigned < overdue)),
+ order=(cls.assigned, cls.priority),
+ ascending=False,
+ forUpdate=True,
+ noWait=False,
+ limit=limit,
+ )
+
+ returnValue(jobs)
+
+
+ @inlineCallbacks
</ins><span class="cx"> def run(self):
</span><span class="cx"> """
</span><span class="cx"> Run this job item by finding the appropriate work item class and
</span><span class="lines">@@ -304,7 +494,15 @@
</span><span class="cx"> # The record has already been removed
</span><span class="cx"> pass
</span><span class="cx"> else:
</span><del>- yield workItem.doWork()
</del><ins>+ try:
+ yield workItem.doWork()
+ except Exception as e:
+ log.error("JobItem: {jobid}, WorkItem: {workid} failed: {exc}".format(
+ jobid=self.jobID,
+ workid=workItem.workID,
+ exc=e,
+ ))
+ raise JobFailedError(e)
</ins><span class="cx">
</span><span class="cx"> try:
</span><span class="cx"> # Once the work is done we delete ourselves
</span><span class="lines">@@ -325,22 +523,48 @@
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><span class="cx"> @inlineCallbacks
</span><ins>+ def waitEmpty(cls, txnCreator, reactor, timeout):
+ """
+ Wait for the job queue to drain. Only use this in tests
+ that need to wait for results from jobs.
+ """
+ t = time.time()
+ while True:
+ work = yield inTransaction(txnCreator, cls.all)
+ if not work:
+ break
+ if time.time() - t > timeout:
+ returnValue(False)
+ d = Deferred()
+ reactor.callLater(0.1, lambda : d.callback(None))
+ yield d
+
+ returnValue(True)
+
+
+ @classmethod
+ @inlineCallbacks
</ins><span class="cx"> def histogram(cls, txn):
</span><span class="cx"> """
</span><span class="cx"> Generate a histogram of work items currently in the queue.
</span><span class="cx"> """
</span><del>- jb = JobInfoSchema.JOB
- rows = yield Select(
- [jb.WORK_TYPE, Count(jb.WORK_TYPE)],
- From=jb,
- GroupBy=jb.WORK_TYPE
- ).on(txn)
- results = dict(rows)
-
- # Add in empty data for other work
</del><ins>+ results = {}
+ now = datetime.utcnow()
</ins><span class="cx"> for workType in cls.workTypes():
</span><del>- results.setdefault(workType.table.model.name, 0)
</del><ins>+ results.setdefault(workType.table.model.name, [0, 0, 0, 0])
</ins><span class="cx">
</span><ins>+ jobs = yield cls.all(txn)
+
+ for job in jobs:
+ r = results[job.workType]
+ r[0] += 1
+ if job.assigned is not None:
+ r[1] += 1
+ if job.failed:
+ r[2] += 1
+ if job.assigned is None and job.notBefore < now:
+ r[3] += 1
+
</ins><span class="cx"> returnValue(results)
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -349,14 +573,41 @@
</span><span class="cx"> return len(cls.workTypes())
</span><span class="cx">
</span><span class="cx">
</span><ins>+JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight"])
</ins><span class="cx">
</span><ins>+class JobDescriptorArg(Argument):
+ """
+ Comma-separated.
+ """
+ def toString(self, inObject):
+ return ",".join(map(str, inObject))
+
+
+ def fromString(self, inString):
+ return JobDescriptor(*map(int, inString.split(",")))
+
+
</ins><span class="cx"> # Priority for work - used to order work items in the job queue
</span><span class="cx"> WORK_PRIORITY_LOW = 1
</span><span class="cx"> WORK_PRIORITY_MEDIUM = 2
</span><span class="cx"> WORK_PRIORITY_HIGH = 3
</span><span class="cx">
</span><ins>+# Weight for work - used to schedule workers based on capacity
+WORK_WEIGHT_0 = 0
+WORK_WEIGHT_1 = 1
+WORK_WEIGHT_2 = 2
+WORK_WEIGHT_3 = 3
+WORK_WEIGHT_4 = 4
+WORK_WEIGHT_5 = 5
+WORK_WEIGHT_6 = 6
+WORK_WEIGHT_7 = 7
+WORK_WEIGHT_8 = 8
+WORK_WEIGHT_9 = 9
+WORK_WEIGHT_10 = 10
+WORK_WEIGHT_CAPACITY = 10 # Total amount of work any one worker can manage
</ins><span class="cx">
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> class WorkItem(Record):
</span><span class="cx"> """
</span><span class="cx"> A L{WorkItem} is an item of work which may be stored in a database, then
</span><span class="lines">@@ -450,7 +701,8 @@
</span><span class="cx"> """
</span><span class="cx">
</span><span class="cx"> group = None
</span><del>- priority = WORK_PRIORITY_LOW # Default - subclasses should override
</del><ins>+ default_priority = WORK_PRIORITY_LOW # Default - subclasses should override
+ default_weight = WORK_WEIGHT_5 # Default - subclasses should override
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><span class="lines">@@ -469,19 +721,21 @@
</span><span class="cx"> }
</span><span class="cx">
</span><span class="cx"> def _transferArg(name):
</span><del>- if name in kwargs:
- jobargs[name] = kwargs[name]
- del kwargs[name]
</del><ins>+ arg = kwargs.pop(name, None)
+ if arg is not None:
+ jobargs[name] = arg
+ elif hasattr(cls, "default_{}".format(name)):
+ jobargs[name] = getattr(cls, "default_{}".format(name))
</ins><span class="cx">
</span><span class="cx"> _transferArg("jobID")
</span><del>- if "priority" in kwargs:
- _transferArg("priority")
- else:
- jobargs["priority"] = cls.priority
</del><ins>+ _transferArg("priority")
</ins><span class="cx"> _transferArg("weight")
</span><span class="cx"> _transferArg("notBefore")
</span><del>- _transferArg("notAfter")
</del><span class="cx">
</span><ins>+ # Always need a notBefore
+ if "notBefore" not in jobargs:
+ jobargs["notBefore"] = datetime.utcnow()
+
</ins><span class="cx"> job = yield JobItem.create(transaction, **jobargs)
</span><span class="cx">
</span><span class="cx"> kwargs["jobID"] = job.jobID
</span><span class="lines">@@ -540,7 +794,7 @@
</span><span class="cx"> """
</span><span class="cx">
</span><span class="cx"> arguments = [
</span><del>- ("jobID", Integer()),
</del><ins>+ ("job", JobDescriptorArg()),
</ins><span class="cx"> ]
</span><span class="cx"> response = []
</span><span class="cx">
</span><span class="lines">@@ -665,7 +919,7 @@
</span><span class="cx"> return self._reportedLoad + self._bonusLoad
</span><span class="cx">
</span><span class="cx">
</span><del>- def performJob(self, jobID):
</del><ins>+ def performJob(self, job):
</ins><span class="cx"> """
</span><span class="cx"> A L{local worker connection <ConnectionFromWorker>} is asking this
</span><span class="cx"> specific peer node-controller process to perform a job, having
</span><span class="lines">@@ -673,12 +927,12 @@
</span><span class="cx">
</span><span class="cx"> @see: L{_IJobPerformer.performJob}
</span><span class="cx"> """
</span><del>- d = self.callRemote(PerformJob, jobID=jobID)
- self._bonusLoad += 1
</del><ins>+ d = self.callRemote(PerformJob, job=job)
+ self._bonusLoad += job.weight
</ins><span class="cx">
</span><span class="cx"> @d.addBoth
</span><span class="cx"> def performed(result):
</span><del>- self._bonusLoad -= 1
</del><ins>+ self._bonusLoad -= job.weight
</ins><span class="cx"> return result
</span><span class="cx">
</span><span class="cx"> @d.addCallback
</span><span class="lines">@@ -689,17 +943,17 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @PerformJob.responder
</span><del>- def dispatchToWorker(self, jobID):
</del><ins>+ def dispatchToWorker(self, job):
</ins><span class="cx"> """
</span><span class="cx"> A remote peer node has asked this node to do a job; dispatch it to
</span><span class="cx"> a local worker on this node.
</span><span class="cx">
</span><del>- @param jobID: the identifier of the job.
- @type jobID: L{int}
</del><ins>+ @param job: the details of the job.
+ @type job: L{JobDescriptor}
</ins><span class="cx">
</span><span class="cx"> @return: a L{Deferred} that fires when the work has been completed.
</span><span class="cx"> """
</span><del>- d = self.peerPool.performJobForPeer(jobID)
</del><ins>+ d = self.peerPool.performJobForPeer(job)
</ins><span class="cx"> d.addCallback(lambda ignored: {})
</span><span class="cx"> return d
</span><span class="cx">
</span><span class="lines">@@ -721,7 +975,7 @@
</span><span class="cx"> """
</span><span class="cx"> implements(_IJobPerformer)
</span><span class="cx">
</span><del>- def __init__(self, maximumLoadPerWorker=5):
</del><ins>+ def __init__(self, maximumLoadPerWorker=WORK_WEIGHT_CAPACITY):
</ins><span class="cx"> self.workers = []
</span><span class="cx"> self.maximumLoadPerWorker = maximumLoadPerWorker
</span><span class="cx">
</span><span class="lines">@@ -753,6 +1007,26 @@
</span><span class="cx"> return False
</span><span class="cx">
</span><span class="cx">
</span><ins>+ def loadLevel(self):
+ """
+ Return the overall load of this worker connection pool have as a percentage of
+ total capacity.
+
+ @return: current load percentage.
+ @rtype: L{int}
+ """
+ current = sum(worker.currentLoad for worker in self.workers)
+ total = len(self.workers) * self.maximumLoadPerWorker
+ return ((current * 100) / total) if total else 0
+
+
+ def eachWorkerLoad(self):
+ """
+ The load of all currently connected workers.
+ """
+ return [(worker.currentLoad, worker.totalCompleted) for worker in self.workers]
+
+
</ins><span class="cx"> def allWorkerLoad(self):
</span><span class="cx"> """
</span><span class="cx"> The total load of all currently connected workers.
</span><span class="lines">@@ -771,20 +1045,20 @@
</span><span class="cx"> return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
</span><span class="cx">
</span><span class="cx">
</span><del>- def performJob(self, jobID):
</del><ins>+ def performJob(self, job):
</ins><span class="cx"> """
</span><span class="cx"> Select a local worker that is idle enough to perform the given job,
</span><span class="cx"> then ask them to perform it.
</span><span class="cx">
</span><del>- @param jobID: The primary key identifier of the given job.
- @type jobID: L{int}
</del><ins>+ @param job: The details of the given job.
+ @type job: L{JobDescriptor}
</ins><span class="cx">
</span><span class="cx"> @return: a L{Deferred} firing with an empty dictionary when the work is
</span><span class="cx"> complete.
</span><span class="cx"> @rtype: L{Deferred} firing L{dict}
</span><span class="cx"> """
</span><span class="cx"> preferredWorker = self._selectLowestLoadWorker()
</span><del>- result = preferredWorker.performJob(jobID)
</del><ins>+ result = preferredWorker.performJob(job)
</ins><span class="cx"> return result
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -799,6 +1073,7 @@
</span><span class="cx"> super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
</span><span class="cx"> self.peerPool = peerPool
</span><span class="cx"> self._load = 0
</span><ins>+ self._completed = 0
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @property
</span><span class="lines">@@ -809,6 +1084,14 @@
</span><span class="cx"> return self._load
</span><span class="cx">
</span><span class="cx">
</span><ins>+ @property
+ def totalCompleted(self):
+ """
+ What is the current load of this worker?
+ """
+ return self._completed
+
+
</ins><span class="cx"> def startReceivingBoxes(self, sender):
</span><span class="cx"> """
</span><span class="cx"> Start receiving AMP boxes from the peer. Initialize all necessary
</span><span class="lines">@@ -829,19 +1112,20 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @PerformJob.responder
</span><del>- def performJob(self, jobID):
</del><ins>+ def performJob(self, job):
</ins><span class="cx"> """
</span><span class="cx"> Dispatch a job to this worker.
</span><span class="cx">
</span><span class="cx"> @see: The responder for this should always be
</span><span class="cx"> L{ConnectionFromController.actuallyReallyExecuteJobHere}.
</span><span class="cx"> """
</span><del>- d = self.callRemote(PerformJob, jobID=jobID)
- self._load += 1
</del><ins>+ d = self.callRemote(PerformJob, job=job)
+ self._load += job.weight
</ins><span class="cx">
</span><span class="cx"> @d.addBoth
</span><span class="cx"> def f(result):
</span><del>- self._load -= 1
</del><ins>+ self._load -= job.weight
+ self._completed += 1
</ins><span class="cx"> return result
</span><span class="cx">
</span><span class="cx"> return d
</span><span class="lines">@@ -883,11 +1167,11 @@
</span><span class="cx"> return self
</span><span class="cx">
</span><span class="cx">
</span><del>- def performJob(self, jobID):
</del><ins>+ def performJob(self, job):
</ins><span class="cx"> """
</span><span class="cx"> Ask the controller to perform a job on our behalf.
</span><span class="cx"> """
</span><del>- return self.callRemote(PerformJob, jobID=jobID)
</del><ins>+ return self.callRemote(PerformJob, job=job)
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><span class="lines">@@ -914,48 +1198,18 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @PerformJob.responder
</span><del>- def actuallyReallyExecuteJobHere(self, jobID):
</del><ins>+ def actuallyReallyExecuteJobHere(self, job):
</ins><span class="cx"> """
</span><span class="cx"> This is where it's time to actually do the job. The controller
</span><span class="cx"> process has instructed this worker to do it; so, look up the data in
</span><span class="cx"> the row, and do it.
</span><span class="cx"> """
</span><del>- d = ultimatelyPerform(self.transactionFactory, jobID)
</del><ins>+ d = JobItem.ultimatelyPerform(self.transactionFactory, job.jobID)
</ins><span class="cx"> d.addCallback(lambda ignored: {})
</span><span class="cx"> return d
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><del>-def ultimatelyPerform(txnFactory, jobID):
- """
- Eventually, after routing the job to the appropriate place, somebody
- actually has to I{do} it.
-
- @param txnFactory: a 0- or 1-argument callable that creates an
- L{IAsyncTransaction}
- @type txnFactory: L{callable}
-
- @param jobID: the ID of the job to be performed
- @type jobID: L{int}
-
- @return: a L{Deferred} which fires with C{None} when the job has been
- performed, or fails if the job can't be performed.
- """
- @inlineCallbacks
- def runJob(txn):
- try:
- job = yield JobItem.load(txn, jobID)
- if hasattr(txn, "_label"):
- txn._label = "{} <{}>".format(txn._label, job.workType)
- yield job.run()
- except NoSuchRecord:
- # The record has already been removed
- pass
-
- return inTransaction(txnFactory, runJob, label="ultimatelyPerform: {}".format(jobID))
-
-
-
</del><span class="cx"> class LocalPerformer(object):
</span><span class="cx"> """
</span><span class="cx"> Implementor of C{performJob} that does its work in the local process,
</span><span class="lines">@@ -970,11 +1224,11 @@
</span><span class="cx"> self.txnFactory = txnFactory
</span><span class="cx">
</span><span class="cx">
</span><del>- def performJob(self, jobID):
</del><ins>+ def performJob(self, job):
</ins><span class="cx"> """
</span><span class="cx"> Perform the given job right now.
</span><span class="cx"> """
</span><del>- return ultimatelyPerform(self.txnFactory, jobID)
</del><ins>+ return JobItem.ultimatelyPerform(self.txnFactory, job.jobID)
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -1049,7 +1303,6 @@
</span><span class="cx"> self.txn = txn
</span><span class="cx"> self.workItemType = workItemType
</span><span class="cx"> self.kw = kw
</span><del>- self._whenExecuted = Deferred()
</del><span class="cx"> self._whenCommitted = Deferred()
</span><span class="cx"> self.workItem = None
</span><span class="cx">
</span><span class="lines">@@ -1073,60 +1326,11 @@
</span><span class="cx"> def whenDone():
</span><span class="cx"> self._whenCommitted.callback(self)
</span><span class="cx">
</span><del>- def maybeLater():
- performer = self._chooser.choosePerformer()
-
- @passthru(
- performer.performJob(created.jobID).addCallback
- )
- def performed(result):
- self._whenExecuted.callback(self)
-
- @performed.addErrback
- def notPerformed(why):
- self._whenExecuted.errback(why)
-
- reactor = self._chooser.reactor
-
- if created.job.notBefore is not None:
- when = max(
- 0,
- astimestamp(created.job.notBefore) - reactor.seconds()
- )
- else:
- when = 0
- # TODO: Track the returned DelayedCall so it can be stopped
- # when the service stops.
- self._chooser.reactor.callLater(when, maybeLater)
-
</del><span class="cx"> @self.txn.postAbort
</span><span class="cx"> def whenFailed():
</span><span class="cx"> self._whenCommitted.errback(TransactionFailed)
</span><span class="cx">
</span><span class="cx">
</span><del>- def whenExecuted(self):
- """
- Let the caller know when the proposed work has been fully executed.
-
- @note: The L{Deferred} returned by C{whenExecuted} should be used with
- extreme caution. If an application decides to do any
- database-persistent work as a result of this L{Deferred} firing,
- that work I{may be lost} as a result of a service being normally
- shut down between the time that the work is scheduled and the time
- that it is executed. So, the only things that should be added as
- callbacks to this L{Deferred} are those which are ephemeral, in
- memory, and reflect only presentation state associated with the
- user's perception of the completion of work, not logical chains of
- work which need to be completed in sequence; those should all be
- completed within the transaction of the L{WorkItem.doWork} that
- gets executed.
-
- @return: a L{Deferred} that fires with this L{WorkProposal} when the
- work has been completed remotely.
- """
- return _cloneDeferred(self._whenExecuted)
-
-
</del><span class="cx"> def whenProposed(self):
</span><span class="cx"> """
</span><span class="cx"> Let the caller know when the work has been proposed; i.e. when the work
</span><span class="lines">@@ -1221,14 +1425,14 @@
</span><span class="cx"> than waiting for it to be requested. By default, 10 minutes.
</span><span class="cx"> @type queueProcessTimeout: L{float} (in seconds)
</span><span class="cx">
</span><del>- @ivar queueDelayedProcessInterval: The amount of time between database
</del><ins>+ @ivar queuePollInterval: The amount of time between database
</ins><span class="cx"> pings, i.e. checks for over-due queue items that might have been
</span><span class="cx"> orphaned by a controller process that died mid-transaction. This is
</span><span class="cx"> how often the shared database should be pinged by I{all} nodes (i.e.,
</span><span class="cx"> all controller processes, or each instance of L{PeerConnectionPool});
</span><span class="cx"> each individual node will ping commensurately less often as more nodes
</span><span class="cx"> join the database.
</span><del>- @type queueDelayedProcessInterval: L{float} (in seconds)
</del><ins>+ @type queuePollInterval: L{float} (in seconds)
</ins><span class="cx">
</span><span class="cx"> @ivar reactor: The reactor used for scheduling timed events.
</span><span class="cx"> @type reactor: L{IReactorTime} provider.
</span><span class="lines">@@ -1243,9 +1447,13 @@
</span><span class="cx"> getfqdn = staticmethod(getfqdn)
</span><span class="cx"> getpid = staticmethod(getpid)
</span><span class="cx">
</span><del>- queueProcessTimeout = (10.0 * 60.0)
- queueDelayedProcessInterval = (60.0)
</del><ins>+ queuePollInterval = 0.1 # How often to poll for new work
+ queueOrphanTimeout = 5.0 * 60.0 # How long before assigned work is possibly orphaned
</ins><span class="cx">
</span><ins>+ overloadLevel = 95 # Percentage load level above which job queue processing stops
+ highPriorityLevel = 80 # Percentage load level above which only high priority jobs are processed
+ mediumPriorityLevel = 50 # Percentage load level above which high and medium priority jobs are processed
+
</ins><span class="cx"> def __init__(self, reactor, transactionFactory, ampPort):
</span><span class="cx"> """
</span><span class="cx"> Initialize a L{PeerConnectionPool}.
</span><span class="lines">@@ -1324,13 +1532,13 @@
</span><span class="cx"> return LocalPerformer(self.transactionFactory)
</span><span class="cx">
</span><span class="cx">
</span><del>- def performJobForPeer(self, jobID):
</del><ins>+ def performJobForPeer(self, job):
</ins><span class="cx"> """
</span><span class="cx"> A peer has requested us to perform a job; choose a job performer
</span><span class="cx"> local to this node, and then execute it.
</span><span class="cx"> """
</span><span class="cx"> performer = self.choosePerformer(onlyLocally=True)
</span><del>- return performer.performJob(jobID)
</del><ins>+ return performer.performJob(job)
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def totalNumberOfNodes(self):
</span><span class="lines">@@ -1362,67 +1570,113 @@
</span><span class="cx"> return self._lastSeenNodeIndex
</span><span class="cx">
</span><span class="cx">
</span><del>- def _periodicLostWorkCheck(self):
</del><ins>+ @inlineCallbacks
+ def _workCheck(self):
</ins><span class="cx"> """
</span><del>- Periodically, every node controller has to check to make sure that work
- hasn't been dropped on the floor by someone. In order to do that it
- queries each work-item table.
</del><ins>+ Every node controller will periodically check for any new work to do, and dispatch
+ as much as possible given the current load.
</ins><span class="cx"> """
</span><del>- @inlineCallbacks
- def workCheck(txn):
- if self.thisProcess:
- nodes = [(node.hostname, node.port) for node in
- (yield self.activeNodes(txn))]
- nodes.sort()
- self._lastSeenTotalNodes = len(nodes)
- self._lastSeenNodeIndex = nodes.index(
- (self.thisProcess.hostname, self.thisProcess.port)
- )
</del><ins>+ # FIXME: not sure if we should do this node check on every work poll
+# if self.thisProcess:
+# nodes = [(node.hostname, node.port) for node in
+# (yield self.activeNodes(txn))]
+# nodes.sort()
+# self._lastSeenTotalNodes = len(nodes)
+# self._lastSeenNodeIndex = nodes.index(
+# (self.thisProcess.hostname, self.thisProcess.port)
+# )
</ins><span class="cx">
</span><ins>+ loopCounter = 0
+ while True:
+ if not self.running:
+ returnValue(None)
+
+ # Check the overall service load - if overloaded skip this poll cycle.
+ # FIXME: need to include capacity of other nodes. For now we only check
+ # our own capacity and stop processing if too busy. Other nodes that
+ # are not busy will pick up work.
+ level = self.workerPool.loadLevel()
+
+ # Check overload level first
+ if level > self.overloadLevel:
+ log.error("workCheck: jobqueue is overloaded")
+ break
+ elif level > self.highPriorityLevel:
+ log.debug("workCheck: jobqueue high priority only")
+ minPriority = WORK_PRIORITY_HIGH
+ elif level > self.mediumPriorityLevel:
+ log.debug("workCheck: jobqueue high/medium priority only")
+ minPriority = WORK_PRIORITY_MEDIUM
+ else:
+ minPriority = WORK_PRIORITY_LOW
+
+ # Determine what the timestamp cutoff
</ins><span class="cx"> # TODO: here is where we should iterate over the unlocked items
</span><span class="cx"> # that are due, ordered by priority, notBefore etc
</span><del>- tooLate = datetime.utcfromtimestamp(
- self.reactor.seconds() - self.queueProcessTimeout
- )
- overdueItems = (yield JobItem.query(
- txn, (JobItem.notBefore < tooLate))
- )
- for overdueItem in overdueItems:
- peer = self.choosePerformer()
</del><ins>+ nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
+ orphanTime = nowTime - timedelta(seconds=self.queueOrphanTimeout)
+
+ txn = self.transactionFactory(label="jobqueue.workCheck")
+ try:
+ nextJob = yield JobItem.nextjob(txn, nowTime, minPriority, orphanTime)
+ if nextJob is None:
+ break
+
+ # If it is now assigned but not earlier than the orphan time, ignore as this may have
+ # been returned after another txn just assigned it
+ if nextJob.assigned is not None and nextJob.assigned > orphanTime:
+ continue
+
+ # Always assign as a new job even when it is an orphan
+ yield nextJob.assign(nowTime)
+ loopCounter += 1
+
+ except Exception as e:
+ log.error("Failed to pick a new job, {exc}", exc=e)
+ yield txn.abort()
+ txn = None
+ nextJob = None
+ finally:
+ if txn:
+ yield txn.commit()
+
+ if nextJob is not None:
+ peer = self.choosePerformer(onlyLocally=True)
</ins><span class="cx"> try:
</span><del>- yield peer.performJob(overdueItem.jobID)
</del><ins>+ # Send the job over but DO NOT block on the response - that will ensure
+ # we can do stuff in parallel
+ peer.performJob(nextJob.descriptor())
</ins><span class="cx"> except Exception as e:
</span><del>- log.err("Failed to perform periodic lost job for jobid={}, {}".format(overdueItem.jobID, e))
</del><ins>+ log.error("Failed to perform job for jobid={jobid}, {exc}", jobid=nextJob.jobID, exc=e)
</ins><span class="cx">
</span><del>- return inTransaction(self.transactionFactory, workCheck, label="periodicLostWorkCheck")
</del><ins>+ if loopCounter:
+ log.debug("workCheck: processed {} jobs in one loop".format(loopCounter))
</ins><span class="cx">
</span><span class="cx"> _currentWorkDeferred = None
</span><del>- _lostWorkCheckCall = None
</del><ins>+ _workCheckCall = None
</ins><span class="cx">
</span><del>- def _lostWorkCheckLoop(self):
</del><ins>+ def _workCheckLoop(self):
</ins><span class="cx"> """
</span><span class="cx"> While the service is running, keep checking for any overdue / lost work
</span><span class="cx"> items and re-submit them to the cluster for processing. Space out
</span><span class="cx"> those checks in time based on the size of the cluster.
</span><span class="cx"> """
</span><del>- self._lostWorkCheckCall = None
</del><ins>+ self._workCheckCall = None
</ins><span class="cx">
</span><ins>+ if not self.running:
+ return
+
</ins><span class="cx"> @passthru(
</span><del>- self._periodicLostWorkCheck().addErrback(log.err).addCallback
</del><ins>+ self._workCheck().addErrback(log.error).addCallback
</ins><span class="cx"> )
</span><span class="cx"> def scheduleNext(result):
</span><ins>+ # TODO: if multiple nodes are present, see if we can
+ # stagger the polling to avoid contention.
</ins><span class="cx"> self._currentWorkDeferred = None
</span><span class="cx"> if not self.running:
</span><span class="cx"> return
</span><del>- index = self.nodeIndex()
- now = self.reactor.seconds()
-
- interval = self.queueDelayedProcessInterval
- count = self.totalNumberOfNodes()
- when = (now - (now % interval)) + (interval * (count + index))
- delay = when - now
- self._lostWorkCheckCall = self.reactor.callLater(
- delay, self._lostWorkCheckLoop
</del><ins>+ self._workCheckCall = self.reactor.callLater(
+ self.queuePollInterval, self._workCheckLoop
</ins><span class="cx"> )
</span><span class="cx">
</span><span class="cx"> self._currentWorkDeferred = scheduleNext
</span><span class="lines">@@ -1432,32 +1686,37 @@
</span><span class="cx"> """
</span><span class="cx"> Register ourselves with the database and establish all outgoing
</span><span class="cx"> connections to other servers in the cluster.
</span><ins>+
+ @param waitForService: an optional L{Deferred} that will be called back when
+ the service startup is done.
+ @type waitForService: L{Deferred} or L{None}
</ins><span class="cx"> """
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def startup(txn):
</span><del>- endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
- # If this fails, the failure mode is going to be ugly, just like
- # all conflicted-port failures. But, at least it won't proceed.
- self._listeningPort = yield endpoint.listen(self.peerFactory())
- self.ampPort = self._listeningPort.getHost().port
- yield Lock.exclusive(NodeInfo.table).on(txn)
- nodes = yield self.activeNodes(txn)
- selves = [node for node in nodes
- if ((node.hostname == self.hostname) and
- (node.port == self.ampPort))]
- if selves:
- self.thisProcess = selves[0]
- nodes.remove(self.thisProcess)
- yield self.thisProcess.update(pid=self.pid,
- time=datetime.now())
- else:
- self.thisProcess = yield NodeInfo.create(
- txn, hostname=self.hostname, port=self.ampPort,
- pid=self.pid, time=datetime.now()
- )
</del><ins>+ if self.ampPort is not None:
+ endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+ # If this fails, the failure mode is going to be ugly, just like
+ # all conflicted-port failures. But, at least it won't proceed.
+ self._listeningPort = yield endpoint.listen(self.peerFactory())
+ self.ampPort = self._listeningPort.getHost().port
+ yield Lock.exclusive(NodeInfo.table).on(txn)
+ nodes = yield self.activeNodes(txn)
+ selves = [node for node in nodes
+ if ((node.hostname == self.hostname) and
+ (node.port == self.ampPort))]
+ if selves:
+ self.thisProcess = selves[0]
+ nodes.remove(self.thisProcess)
+ yield self.thisProcess.update(pid=self.pid,
+ time=datetime.now())
+ else:
+ self.thisProcess = yield NodeInfo.create(
+ txn, hostname=self.hostname, port=self.ampPort,
+ pid=self.pid, time=datetime.now()
+ )
</ins><span class="cx">
</span><del>- for node in nodes:
- self._startConnectingTo(node)
</del><ins>+ for node in nodes:
+ self._startConnectingTo(node)
</ins><span class="cx">
</span><span class="cx"> self._startingUp = inTransaction(self.transactionFactory, startup, label="PeerConnectionPool.startService")
</span><span class="cx">
</span><span class="lines">@@ -1465,7 +1724,7 @@
</span><span class="cx"> def done(result):
</span><span class="cx"> self._startingUp = None
</span><span class="cx"> super(PeerConnectionPool, self).startService()
</span><del>- self._lostWorkCheckLoop()
</del><ins>+ self._workCheckLoop()
</ins><span class="cx"> return result
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -1474,20 +1733,30 @@
</span><span class="cx"> """
</span><span class="cx"> Stop this service, terminating any incoming or outgoing connections.
</span><span class="cx"> """
</span><del>- yield super(PeerConnectionPool, self).stopService()
</del><span class="cx">
</span><ins>+ # If in the process of starting up, always wait for startup to complete before
+ # stopping,.
</ins><span class="cx"> if self._startingUp is not None:
</span><del>- yield self._startingUp
</del><ins>+ d = Deferred()
+ self._startingUp.addBoth(lambda result: d.callback(None))
+ yield d
</ins><span class="cx">
</span><ins>+ yield super(PeerConnectionPool, self).stopService()
+
</ins><span class="cx"> if self._listeningPort is not None:
</span><span class="cx"> yield self._listeningPort.stopListening()
</span><span class="cx">
</span><del>- if self._lostWorkCheckCall is not None:
- self._lostWorkCheckCall.cancel()
</del><ins>+ if self._workCheckCall is not None:
+ self._workCheckCall.cancel()
</ins><span class="cx">
</span><span class="cx"> if self._currentWorkDeferred is not None:
</span><del>- yield self._currentWorkDeferred
</del><ins>+ self._currentWorkDeferred.cancel()
</ins><span class="cx">
</span><ins>+ for connector in self._connectingToPeer:
+ d = Deferred()
+ connector.addBoth(lambda result: d.callback(None))
+ yield d
+
</ins><span class="cx"> for peer in self.peers:
</span><span class="cx"> peer.transport.abortConnection()
</span><span class="cx">
</span><span class="lines">@@ -1510,6 +1779,7 @@
</span><span class="cx"> # self.mappedPeers.pop((host, port)).transport.loseConnection()
</span><span class="cx"> self.mappedPeers[(host, port)] = peer
</span><span class="cx">
</span><ins>+ _connectingToPeer = []
</ins><span class="cx">
</span><span class="cx"> def _startConnectingTo(self, node):
</span><span class="cx"> """
</span><span class="lines">@@ -1519,8 +1789,10 @@
</span><span class="cx"> @type node: L{NodeInfo}
</span><span class="cx"> """
</span><span class="cx"> connected = node.endpoint(self.reactor).connect(self.peerFactory())
</span><ins>+ self._connectingToPeer.append(connected)
</ins><span class="cx">
</span><span class="cx"> def whenConnected(proto):
</span><ins>+ self._connectingToPeer.remove(connected)
</ins><span class="cx"> self.mapPeer(node.hostname, node.port, proto)
</span><span class="cx"> proto.callRemote(
</span><span class="cx"> IdentifyNode,
</span><span class="lines">@@ -1529,9 +1801,11 @@
</span><span class="cx"> ).addErrback(noted, "identify")
</span><span class="cx">
</span><span class="cx"> def noted(err, x="connect"):
</span><del>- log.msg(
- "Could not {0} to cluster peer {1} because {2}"
- .format(x, node, str(err.value))
</del><ins>+ if x == "connect":
+ self._connectingToPeer.remove(connected)
+ log.error(
+ "Could not {action} to cluster peer {node} because {reason}",
+ action=x, node=node, reason=str(err.value),
</ins><span class="cx"> )
</span><span class="cx">
</span><span class="cx"> connected.addCallbacks(whenConnected, noted)
</span><span class="lines">@@ -1594,7 +1868,7 @@
</span><span class="cx"> """
</span><span class="cx"> implements(_IJobPerformer)
</span><span class="cx">
</span><del>- def performJob(self, jobID):
</del><ins>+ def performJob(self, job):
</ins><span class="cx"> """
</span><span class="cx"> Don't perform job.
</span><span class="cx"> """
</span></span></pre></div>
<a id="twexttrunktwextenterprisequeuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/queue.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/queue.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/queue.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -1323,6 +1323,9 @@
</span><span class="cx"> """
</span><span class="cx"> self._lostWorkCheckCall = None
</span><span class="cx">
</span><ins>+ if not self.running:
+ return
+
</ins><span class="cx"> @passthru(
</span><span class="cx"> self._periodicLostWorkCheck().addErrback(log.err).addCallback
</span><span class="cx"> )
</span><span class="lines">@@ -1390,11 +1393,15 @@
</span><span class="cx"> """
</span><span class="cx"> Stop this service, terminating any incoming or outgoing connections.
</span><span class="cx"> """
</span><ins>+ # If in the process of starting up, always wait for startup to complete before
+ # stopping,.
+ if self._startingUp is not None:
+ d = Deferred()
+ self._startingUp.addBoth(lambda result: d.callback(None))
+ yield d
+
</ins><span class="cx"> yield super(PeerConnectionPool, self).stopService()
</span><span class="cx">
</span><del>- if self._startingUp is not None:
- yield self._startingUp
-
</del><span class="cx"> if self._listeningPort is not None:
</span><span class="cx"> yield self._listeningPort.stopListening()
</span><span class="cx">
</span><span class="lines">@@ -1402,8 +1409,13 @@
</span><span class="cx"> self._lostWorkCheckCall.cancel()
</span><span class="cx">
</span><span class="cx"> if self._currentWorkDeferred is not None:
</span><del>- yield self._currentWorkDeferred
</del><ins>+ self._currentWorkDeferred.cancel()
</ins><span class="cx">
</span><ins>+ for connector in self._connectingToPeer:
+ d = Deferred()
+ connector.addBoth(lambda result: d.callback(None))
+ yield d
+
</ins><span class="cx"> for peer in self.peers:
</span><span class="cx"> peer.transport.abortConnection()
</span><span class="cx">
</span><span class="lines">@@ -1426,6 +1438,7 @@
</span><span class="cx"> # self.mappedPeers.pop((host, port)).transport.loseConnection()
</span><span class="cx"> self.mappedPeers[(host, port)] = peer
</span><span class="cx">
</span><ins>+ _connectingToPeer = []
</ins><span class="cx">
</span><span class="cx"> def _startConnectingTo(self, node):
</span><span class="cx"> """
</span><span class="lines">@@ -1435,8 +1448,10 @@
</span><span class="cx"> @type node: L{NodeInfo}
</span><span class="cx"> """
</span><span class="cx"> connected = node.endpoint(self.reactor).connect(self.peerFactory())
</span><ins>+ self._connectingToPeer.append(connected)
</ins><span class="cx">
</span><span class="cx"> def whenConnected(proto):
</span><ins>+ self._connectingToPeer.remove(connected)
</ins><span class="cx"> self.mapPeer(node.hostname, node.port, proto)
</span><span class="cx"> proto.callRemote(
</span><span class="cx"> IdentifyNode,
</span><span class="lines">@@ -1445,6 +1460,8 @@
</span><span class="cx"> ).addErrback(noted, "identify")
</span><span class="cx">
</span><span class="cx"> def noted(err, x="connect"):
</span><ins>+ if x == "connect":
+ self._connectingToPeer.remove(connected)
</ins><span class="cx"> log.msg(
</span><span class="cx"> "Could not {0} to cluster peer {1} because {2}"
</span><span class="cx"> .format(x, node, str(err.value))
</span></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_jobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py (13471 => 13472)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-05-14 16:19:13 UTC (rev 13471)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-05-14 19:50:12 UTC (rev 13472)
</span><span class="lines">@@ -22,16 +22,16 @@
</span><span class="cx">
</span><span class="cx"> from zope.interface.verify import verifyObject
</span><span class="cx">
</span><ins>+from twisted.internet import reactor
</ins><span class="cx"> from twisted.trial.unittest import TestCase, SkipTest
</span><span class="cx"> from twisted.test.proto_helpers import StringTransport, MemoryReactor
</span><del>-from twisted.internet.defer import (
- Deferred, inlineCallbacks, gatherResults, passthru, returnValue
-)
</del><ins>+from twisted.internet.defer import \
+ Deferred, inlineCallbacks, gatherResults, passthru, returnValue, succeed
</ins><span class="cx"> from twisted.internet.task import Clock as _Clock
</span><span class="cx"> from twisted.protocols.amp import Command, AMP, Integer
</span><span class="cx"> from twisted.application.service import Service, MultiService
</span><span class="cx">
</span><del>-from twext.enterprise.dal.syntax import SchemaSyntax, Select
</del><ins>+from twext.enterprise.dal.syntax import SchemaSyntax
</ins><span class="cx"> from twext.enterprise.dal.record import fromTable
</span><span class="cx"> from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
</span><span class="cx"> from twext.enterprise.fixtures import buildConnectionPool
</span><span class="lines">@@ -40,7 +40,9 @@
</span><span class="cx"> inTransaction, PeerConnectionPool, astimestamp,
</span><span class="cx"> LocalPerformer, _IJobPerformer, WorkItem, WorkerConnectionPool,
</span><span class="cx"> ConnectionFromPeerNode, LocalQueuer,
</span><del>- _BaseQueuer, NonPerformingQueuer
</del><ins>+ _BaseQueuer, NonPerformingQueuer, JobItem,
+ WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM,
+ JobDescriptor
</ins><span class="cx"> )
</span><span class="cx"> import twext.enterprise.jobqueue
</span><span class="cx">
</span><span class="lines">@@ -67,7 +69,27 @@
</span><span class="cx"> return super(Clock, self).callLater(_seconds, _f, *args, **kw)
</span><span class="cx">
</span><span class="cx">
</span><ins>+ @inlineCallbacks
+ def advanceCompletely(self, amount):
+ """
+ Move time on this clock forward by the given amount and run whatever
+ pending calls should be run. Always complete the deferred calls before
+ returning.
</ins><span class="cx">
</span><ins>+ @type amount: C{float}
+ @param amount: The number of seconds which to advance this clock's
+ time.
+ """
+ self.rightNow += amount
+ self._sortCalls()
+ while self.calls and self.calls[0].getTime() <= self.seconds():
+ call = self.calls.pop(0)
+ call.called = 1
+ yield call.func(*call.args, **call.kw)
+ self._sortCalls()
+
+
+
</ins><span class="cx"> class MemoryReactorWithClock(MemoryReactor, Clock):
</span><span class="cx"> """
</span><span class="cx"> Simulate a real reactor.
</span><span class="lines">@@ -75,6 +97,7 @@
</span><span class="cx"> def __init__(self):
</span><span class="cx"> MemoryReactor.__init__(self)
</span><span class="cx"> Clock.__init__(self)
</span><ins>+ self._sortCalls()
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -180,8 +203,9 @@
</span><span class="cx"> WORK_TYPE varchar(255) not null,
</span><span class="cx"> PRIORITY integer default 0,
</span><span class="cx"> WEIGHT integer default 0,
</span><del>- NOT_BEFORE timestamp default null,
- NOT_AFTER timestamp default null
</del><ins>+ NOT_BEFORE timestamp not null,
+ ASSIGNED timestamp default null,
+ FAILED integer default 0
</ins><span class="cx"> );
</span><span class="cx"> """
</span><span class="cx"> )
</span><span class="lines">@@ -194,11 +218,6 @@
</span><span class="cx"> A integer, B integer,
</span><span class="cx"> DELETE_ON_LOAD integer default 0
</span><span class="cx"> );
</span><del>- create table DUMMY_WORK_DONE (
- WORK_ID integer primary key,
- JOB_ID integer references JOB,
- A_PLUS_B integer
- );
</del><span class="cx"> """
</span><span class="cx"> )
</span><span class="cx">
</span><span class="lines">@@ -207,37 +226,30 @@
</span><span class="cx">
</span><span class="cx"> dropSQL = [
</span><span class="cx"> "drop table {name} cascade".format(name=table)
</span><del>- for table in ("DUMMY_WORK_ITEM", "DUMMY_WORK_DONE")
</del><ins>+ for table in ("DUMMY_WORK_ITEM",)
</ins><span class="cx"> ] + ["delete from job"]
</span><span class="cx"> except SkipTest as e:
</span><del>- DummyWorkDone = DummyWorkItem = object
</del><ins>+ DummyWorkItem = object
</ins><span class="cx"> skip = e
</span><span class="cx"> else:
</span><del>- DummyWorkDone = fromTable(schema.DUMMY_WORK_DONE)
</del><span class="cx"> DummyWorkItem = fromTable(schema.DUMMY_WORK_ITEM)
</span><span class="cx"> skip = False
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><del>-class DummyWorkDone(WorkItem, DummyWorkDone):
- """
- Work result.
- """
-
-
-
</del><span class="cx"> class DummyWorkItem(WorkItem, DummyWorkItem):
</span><span class="cx"> """
</span><span class="cx"> Sample L{WorkItem} subclass that adds two integers together and stores them
</span><span class="cx"> in another table.
</span><span class="cx"> """
</span><span class="cx">
</span><ins>+ results = {}
+
</ins><span class="cx"> def doWork(self):
</span><span class="cx"> if self.a == -1:
</span><span class="cx"> raise ValueError("Ooops")
</span><del>- return DummyWorkDone.makeJob(
- self.transaction, jobID=self.jobID + 100, workID=self.workID + 100, aPlusB=self.a + self.b
- )
</del><ins>+ self.results[self.jobID] = self.a + self.b
+ return succeed(None)
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><span class="lines">@@ -250,7 +262,7 @@
</span><span class="cx"> """
</span><span class="cx"> workItems = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
</span><span class="cx"> if workItems[0].deleteOnLoad:
</span><del>- otherTransaction = txn.concurrently()
</del><ins>+ otherTransaction = txn.store().newTransaction()
</ins><span class="cx"> otherSelf = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
</span><span class="cx"> yield otherSelf[0].delete()
</span><span class="cx"> yield otherTransaction.commit()
</span><span class="lines">@@ -306,12 +318,10 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><del>- def test_enqueue(self):
- """
- L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
- """
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</del><ins>+ def _enqueue(self, dbpool, a, b, notBefore=None, priority=None, weight=None):
</ins><span class="cx"> fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
</span><ins>+ if notBefore is None:
+ notBefore = datetime.datetime(2012, 12, 13, 12, 12, 0)
</ins><span class="cx"> sinceEpoch = astimestamp(fakeNow)
</span><span class="cx"> clock = Clock()
</span><span class="cx"> clock.advance(sinceEpoch)
</span><span class="lines">@@ -329,36 +339,134 @@
</span><span class="cx"> @transactionally(dbpool.connection)
</span><span class="cx"> def check(txn):
</span><span class="cx"> return qpool.enqueueWork(
</span><del>- txn, DummyWorkItem, a=3, b=9,
- notBefore=datetime.datetime(2012, 12, 13, 12, 12, 0)
</del><ins>+ txn, DummyWorkItem,
+ a=a, b=b, priority=priority, weight=weight,
+ notBefore=notBefore
</ins><span class="cx"> )
</span><span class="cx">
</span><span class="cx"> proposal = yield check
</span><span class="cx"> yield proposal.whenProposed()
</span><span class="cx">
</span><ins>+
+ @inlineCallbacks
+ def test_enqueue(self):
+ """
+ L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ yield self._enqueue(dbpool, 1, 2)
+
</ins><span class="cx"> # Make sure we have one JOB and one DUMMY_WORK_ITEM
</span><span class="cx"> @transactionally(dbpool.connection)
</span><span class="cx"> def checkJob(txn):
</span><del>- return Select(
- From=schema.JOB
- ).on(txn)
</del><ins>+ return JobItem.all(txn)
</ins><span class="cx">
</span><span class="cx"> jobs = yield checkJob
</span><span class="cx"> self.assertTrue(len(jobs) == 1)
</span><del>- self.assertTrue(jobs[0][1] == "DUMMY_WORK_ITEM")
</del><ins>+ self.assertTrue(jobs[0].workType == "DUMMY_WORK_ITEM")
+ self.assertTrue(jobs[0].assigned is None)
</ins><span class="cx">
</span><span class="cx"> @transactionally(dbpool.connection)
</span><span class="cx"> def checkWork(txn):
</span><del>- return Select(
- From=schema.DUMMY_WORK_ITEM
- ).on(txn)
</del><ins>+ return DummyWorkItem.all(txn)
</ins><span class="cx">
</span><span class="cx"> work = yield checkWork
</span><span class="cx"> self.assertTrue(len(work) == 1)
</span><del>- self.assertTrue(work[0][1] == jobs[0][0])
</del><ins>+ self.assertTrue(work[0].jobID == jobs[0].jobID)
</ins><span class="cx">
</span><span class="cx">
</span><ins>+ @inlineCallbacks
+ def test_assign(self):
+ """
+ L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ yield self._enqueue(dbpool, 1, 2)
</ins><span class="cx">
</span><ins>+ # Make sure we have one JOB and one DUMMY_WORK_ITEM
+ def checkJob(txn):
+ return JobItem.all(txn)
+
+ jobs = yield inTransaction(dbpool.connection, checkJob)
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(jobs[0].assigned is None)
+
+ @inlineCallbacks
+ def assignJob(txn):
+ job = yield JobItem.load(txn, jobs[0].jobID)
+ yield job.assign(datetime.datetime.utcnow())
+ yield inTransaction(dbpool.connection, assignJob)
+
+ jobs = yield inTransaction(dbpool.connection, checkJob)
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(jobs[0].assigned is not None)
+
+
+ @inlineCallbacks
+ def test_nextjob(self):
+ """
+ L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+ """
+
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ now = datetime.datetime.utcnow()
+
+ # Empty job queue
+ @inlineCallbacks
+ def _next(txn, priority=WORK_PRIORITY_LOW):
+ job = yield JobItem.nextjob(txn, now, priority, now - datetime.timedelta(seconds=PeerConnectionPool.queueOrphanTimeout))
+ if job is not None:
+ work = yield job.workItem()
+ else:
+ work = None
+ returnValue((job, work))
+ job, work = yield inTransaction(dbpool.connection, _next)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Unassigned job with future notBefore not returned
+ yield self._enqueue(dbpool, 1, 1, now + datetime.timedelta(days=1))
+ job, work = yield inTransaction(dbpool.connection, _next)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Unassigned job with past notBefore returned
+ yield self._enqueue(dbpool, 2, 1, now + datetime.timedelta(days=-1))
+ job, work = yield inTransaction(dbpool.connection, _next)
+ self.assertTrue(job is not None)
+ self.assertTrue(work.a == 2)
+ assignID = job.jobID
+
+ # Assigned job with past notBefore not returned
+ @inlineCallbacks
+ def assignJob(txn, when=None):
+ assignee = yield JobItem.load(txn, assignID)
+ yield assignee.assign(now if when is None else when)
+ yield inTransaction(dbpool.connection, assignJob)
+ job, work = yield inTransaction(dbpool.connection, _next)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Unassigned low priority job with past notBefore not returned if high priority required
+ yield self._enqueue(dbpool, 4, 1, now + datetime.timedelta(days=-1))
+ job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Unassigned low priority job with past notBefore not returned if medium priority required
+ yield self._enqueue(dbpool, 5, 1, now + datetime.timedelta(days=-1))
+ job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_MEDIUM)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Assigned job with past notBefore, but overdue is returned
+ yield inTransaction(dbpool.connection, assignJob, when=now + datetime.timedelta(days=-1))
+ job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
+ self.assertTrue(job is not None)
+ self.assertTrue(work.a == 2)
+
+
+
</ins><span class="cx"> class WorkerConnectionPoolTests(TestCase):
</span><span class="cx"> """
</span><span class="cx"> A L{WorkerConnectionPool} is responsible for managing, in a node's
</span><span class="lines">@@ -412,6 +520,7 @@
</span><span class="cx"> Create a L{PeerConnectionPool} that is just initialized enough.
</span><span class="cx"> """
</span><span class="cx"> self.pcp = PeerConnectionPool(None, None, 4321)
</span><ins>+ DummyWorkItem.results = {}
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def checkPerformer(self, cls):
</span><span class="lines">@@ -424,6 +533,34 @@
</span><span class="cx"> verifyObject(_IJobPerformer, performer)
</span><span class="cx">
</span><span class="cx">
</span><ins>+ def _setupPools(self):
+ """
+ Setup pool and reactor clock for time stepped tests.
+ """
+ reactor = MemoryReactorWithClock()
+ cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+ then = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ reactor.advance(astimestamp(then))
+ cph.setUp(self)
+ qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+
+ realChoosePerformer = qpool.choosePerformer
+ performerChosen = []
+
+ def catchPerformerChoice(onlyLocally=False):
+ result = realChoosePerformer(onlyLocally=onlyLocally)
+ performerChosen.append(True)
+ return result
+
+ qpool.choosePerformer = catchPerformerChoice
+ reactor.callLater(0, qpool._workCheck)
+
+ qpool.startService()
+ cph.flushHolders()
+
+ return cph, qpool, reactor, performerChosen
+
+
</ins><span class="cx"> def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
</span><span class="cx"> """
</span><span class="cx"> If L{PeerConnectionPool.choosePerformer} is invoked when no workers
</span><span class="lines">@@ -478,8 +615,8 @@
</span><span class="cx"> d = Deferred()
</span><span class="cx">
</span><span class="cx"> class DummyPerformer(object):
</span><del>- def performJob(self, jobID):
- self.jobID = jobID
</del><ins>+ def performJob(self, job):
+ self.jobID = job.jobID
</ins><span class="cx"> return d
</span><span class="cx">
</span><span class="cx"> # Doing real database I/O in this test would be tedious so fake the
</span><span class="lines">@@ -490,7 +627,7 @@
</span><span class="cx"> return dummy
</span><span class="cx">
</span><span class="cx"> peer.choosePerformer = chooseDummy
</span><del>- performed = local.performJob(7384)
</del><ins>+ performed = local.performJob(JobDescriptor(7384, 1))
</ins><span class="cx"> performResult = []
</span><span class="cx"> performed.addCallback(performResult.append)
</span><span class="cx">
</span><span class="lines">@@ -535,25 +672,19 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><del>- def test_notBeforeWhenCheckingForLostWork(self):
</del><ins>+ def test_notBeforeWhenCheckingForWork(self):
</ins><span class="cx"> """
</span><del>- L{PeerConnectionPool._periodicLostWorkCheck} should execute any
</del><ins>+ L{PeerConnectionPool._workCheck} should execute any
</ins><span class="cx"> outstanding work items, but only those that are expired.
</span><span class="cx"> """
</span><del>- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
- # An arbitrary point in time.
</del><ins>+ dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
</ins><span class="cx"> fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
</span><del>- # *why* does datetime still not have .astimestamp()
- sinceEpoch = astimestamp(fakeNow)
- clock = Clock()
- clock.advance(sinceEpoch)
- qpool = PeerConnectionPool(clock, dbpool.connection, 0)
</del><span class="cx">
</span><span class="cx"> # Let's create a couple of work items directly, not via the enqueue
</span><span class="cx"> # method, so that they exist but nobody will try to immediately execute
</span><span class="cx"> # them.
</span><span class="cx">
</span><del>- @transactionally(dbpool.connection)
</del><ins>+ @transactionally(dbpool.pool.connection)
</ins><span class="cx"> @inlineCallbacks
</span><span class="cx"> def setup(txn):
</span><span class="cx"> # First, one that's right now.
</span><span class="lines">@@ -564,9 +695,7 @@
</span><span class="cx"> txn, a=3, b=4, notBefore=(
</span><span class="cx"> # Schedule it in the past so that it should have already
</span><span class="cx"> # run.
</span><del>- fakeNow - datetime.timedelta(
- seconds=qpool.queueProcessTimeout + 20
- )
</del><ins>+ fakeNow - datetime.timedelta(seconds=20)
</ins><span class="cx"> )
</span><span class="cx"> )
</span><span class="cx">
</span><span class="lines">@@ -575,14 +704,13 @@
</span><span class="cx"> txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
</span><span class="cx"> )
</span><span class="cx"> yield setup
</span><del>- yield qpool._periodicLostWorkCheck()
</del><span class="cx">
</span><del>- @transactionally(dbpool.connection)
- def check(txn):
- return DummyWorkDone.all(txn)
</del><ins>+ # Wait for job
+ while len(DummyWorkItem.results) != 2:
+ clock.advance(1)
</ins><span class="cx">
</span><del>- every = yield check
- self.assertEquals([x.aPlusB for x in every], [7])
</del><ins>+ # Work item complete
+ self.assertTrue(DummyWorkItem.results == {1: 3, 2: 7})
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><span class="lines">@@ -592,23 +720,10 @@
</span><span class="cx"> only executes it when enough time has elapsed to allow the C{notBefore}
</span><span class="cx"> attribute of the given work item to have passed.
</span><span class="cx"> """
</span><del>- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
- fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
- sinceEpoch = astimestamp(fakeNow)
- clock = Clock()
- clock.advance(sinceEpoch)
- qpool = PeerConnectionPool(clock, dbpool.connection, 0)
- realChoosePerformer = qpool.choosePerformer
- performerChosen = []
</del><span class="cx">
</span><del>- def catchPerformerChoice():
- result = realChoosePerformer()
- performerChosen.append(True)
- return result
</del><ins>+ dbpool, qpool, clock, performerChosen = self._setupPools()
</ins><span class="cx">
</span><del>- qpool.choosePerformer = catchPerformerChoice
-
- @transactionally(dbpool.connection)
</del><ins>+ @transactionally(dbpool.pool.connection)
</ins><span class="cx"> def check(txn):
</span><span class="cx"> return qpool.enqueueWork(
</span><span class="cx"> txn, DummyWorkItem, a=3, b=9,
</span><span class="lines">@@ -630,11 +745,12 @@
</span><span class="cx"> clock.advance(20 - 12)
</span><span class="cx"> self.assertEquals(performerChosen, [True])
</span><span class="cx">
</span><del>- # FIXME: if this fails, it will hang, but that's better than no
- # notification that it is broken at all.
</del><ins>+ # Wait for job
+ while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+ clock.advance(1)
</ins><span class="cx">
</span><del>- result = yield proposal.whenExecuted()
- self.assertIdentical(result, proposal)
</del><ins>+ # Work item complete
+ self.assertTrue(DummyWorkItem.results == {1: 12})
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><span class="lines">@@ -643,23 +759,9 @@
</span><span class="cx"> L{PeerConnectionPool.enqueueWork} will execute its work immediately if
</span><span class="cx"> the C{notBefore} attribute of the work item in question is in the past.
</span><span class="cx"> """
</span><del>- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
- fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
- sinceEpoch = astimestamp(fakeNow)
- clock = Clock()
- clock.advance(sinceEpoch)
- qpool = PeerConnectionPool(clock, dbpool.connection, 0)
- realChoosePerformer = qpool.choosePerformer
- performerChosen = []
</del><ins>+ dbpool, qpool, clock, performerChosen = self._setupPools()
</ins><span class="cx">
</span><del>- def catchPerformerChoice():
- result = realChoosePerformer()
- performerChosen.append(True)
- return result
-
- qpool.choosePerformer = catchPerformerChoice
-
- @transactionally(dbpool.connection)
</del><ins>+ @transactionally(dbpool.pool.connection)
</ins><span class="cx"> def check(txn):
</span><span class="cx"> return qpool.enqueueWork(
</span><span class="cx"> txn, DummyWorkItem, a=3, b=9,
</span><span class="lines">@@ -673,10 +775,14 @@
</span><span class="cx"> # Advance far beyond the given timestamp.
</span><span class="cx"> self.assertEquals(performerChosen, [True])
</span><span class="cx">
</span><del>- result = yield proposal.whenExecuted()
- self.assertIdentical(result, proposal)
</del><ins>+ # Wait for job
+ while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+ clock.advance(1)
</ins><span class="cx">
</span><ins>+ # Work item complete
+ self.assertTrue(DummyWorkItem.results == {1: 12})
</ins><span class="cx">
</span><ins>+
</ins><span class="cx"> def test_workerConnectionPoolPerformJob(self):
</span><span class="cx"> """
</span><span class="cx"> L{WorkerConnectionPool.performJob} performs work by selecting a
</span><span class="lines">@@ -696,12 +802,12 @@
</span><span class="cx"> worker2, _ignore_trans2 = peer()
</span><span class="cx">
</span><span class="cx"> # Ask the worker to do something.
</span><del>- worker1.performJob(1)
</del><ins>+ worker1.performJob(JobDescriptor(1, 1))
</ins><span class="cx"> self.assertEquals(worker1.currentLoad, 1)
</span><span class="cx"> self.assertEquals(worker2.currentLoad, 0)
</span><span class="cx">
</span><span class="cx"> # Now ask the pool to do something
</span><del>- peerPool.workerPool.performJob(2)
</del><ins>+ peerPool.workerPool.performJob(JobDescriptor(2, 1))
</ins><span class="cx"> self.assertEquals(worker1.currentLoad, 1)
</span><span class="cx"> self.assertEquals(worker2.currentLoad, 1)
</span><span class="cx">
</span><span class="lines">@@ -716,49 +822,42 @@
</span><span class="cx"> reactor.advance(astimestamp(then))
</span><span class="cx"> cph.setUp(self)
</span><span class="cx"> pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321)
</span><del>- now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
</del><ins>+ now = then + datetime.timedelta(seconds=20)
</ins><span class="cx">
</span><span class="cx"> @transactionally(cph.pool.connection)
</span><span class="cx"> def createOldWork(txn):
</span><del>- one = DummyWorkItem.makeJob(txn, jobID=100, workID=1, a=3, b=4, notBefore=then)
- two = DummyWorkItem.makeJob(txn, jobID=101, workID=2, a=7, b=9, notBefore=now)
</del><ins>+ one = DummyWorkItem.makeJob(txn, jobID=1, workID=1, a=3, b=4, notBefore=then)
+ two = DummyWorkItem.makeJob(txn, jobID=2, workID=2, a=7, b=9, notBefore=now)
</ins><span class="cx"> return gatherResults([one, two])
</span><span class="cx">
</span><span class="cx"> pcp.startService()
</span><span class="cx"> cph.flushHolders()
</span><del>- reactor.advance(pcp.queueProcessTimeout * 2)
</del><ins>+ reactor.advance(19)
</ins><span class="cx"> self.assertEquals(
</span><del>- cph.rows("select * from DUMMY_WORK_DONE"),
- [(101, 200, 7)]
</del><ins>+ DummyWorkItem.results,
+ {1: 7}
</ins><span class="cx"> )
</span><del>- cph.rows("delete from DUMMY_WORK_DONE")
- reactor.advance(pcp.queueProcessTimeout * 2)
</del><ins>+ reactor.advance(20)
</ins><span class="cx"> self.assertEquals(
</span><del>- cph.rows("select * from DUMMY_WORK_DONE"),
- [(102, 201, 16)]
</del><ins>+ DummyWorkItem.results,
+ {1: 7, 2: 16}
</ins><span class="cx"> )
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><del>- def test_exceptionWhenCheckingForLostWork(self):
</del><ins>+ def test_exceptionWhenWorking(self):
</ins><span class="cx"> """
</span><del>- L{PeerConnectionPool._periodicLostWorkCheck} should execute any
</del><ins>+ L{PeerConnectionPool._workCheck} should execute any
</ins><span class="cx"> outstanding work items, and keep going if some raise an exception.
</span><span class="cx"> """
</span><del>- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
- # An arbitrary point in time.
</del><ins>+ dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
</ins><span class="cx"> fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
</span><del>- # *why* does datetime still not have .astimestamp()
- sinceEpoch = astimestamp(fakeNow)
- clock = Clock()
- clock.advance(sinceEpoch)
- qpool = PeerConnectionPool(clock, dbpool.connection, 0)
</del><span class="cx">
</span><span class="cx"> # Let's create a couple of work items directly, not via the enqueue
</span><span class="cx"> # method, so that they exist but nobody will try to immediately execute
</span><span class="cx"> # them.
</span><span class="cx">
</span><del>- @transactionally(dbpool.connection)
</del><ins>+ @transactionally(dbpool.pool.connection)
</ins><span class="cx"> @inlineCallbacks
</span><span class="cx"> def setup(txn):
</span><span class="cx"> # First, one that's right now.
</span><span class="lines">@@ -776,14 +875,51 @@
</span><span class="cx"> txn, a=2, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60)
</span><span class="cx"> )
</span><span class="cx"> yield setup
</span><del>- yield qpool._periodicLostWorkCheck()
</del><ins>+ clock.advance(20 - 12)
</ins><span class="cx">
</span><del>- @transactionally(dbpool.connection)
</del><ins>+ # Wait for job
+# while True:
+# jobs = yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))
+# if all([job.a == -1 for job in jobs]):
+# break
+# clock.advance(1)
+
+ # Work item complete
+ self.assertTrue(DummyWorkItem.results == {1: 1, 3: 2})
+
+
+ @inlineCallbacks
+ def test_exceptionUnassign(self):
+ """
+ When a work item fails it should appear as unassigned in the JOB
+ table and have the failure count bumped, and a notBefore one minute ahead.
+ """
+ dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+
+ # Let's create a couple of work items directly, not via the enqueue
+ # method, so that they exist but nobody will try to immediately execute
+ # them.
+
+ @transactionally(dbpool.pool.connection)
+ @inlineCallbacks
+ def setup(txn):
+ # Next, create failing work that's actually far enough into the past to run.
+ yield DummyWorkItem.makeJob(
+ txn, a=-1, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+ )
+ yield setup
+ clock.advance(20 - 12)
+
+ @transactionally(dbpool.pool.connection)
</ins><span class="cx"> def check(txn):
</span><del>- return DummyWorkDone.all(txn)
</del><ins>+ return JobItem.all(txn)
</ins><span class="cx">
</span><del>- every = yield check
- self.assertEquals([x.aPlusB for x in every], [1, 2])
</del><ins>+ jobs = yield check
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(jobs[0].assigned is None)
+ self.assertTrue(jobs[0].failed == 1)
+ self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow())
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -902,7 +1038,6 @@
</span><span class="cx"> )
</span><span class="cx"> self.addCleanup(deschema)
</span><span class="cx">
</span><del>- from twisted.internet import reactor
</del><span class="cx"> self.node1 = PeerConnectionPool(
</span><span class="cx"> reactor, indirectedTransactionFactory, 0)
</span><span class="cx"> self.node2 = PeerConnectionPool(
</span><span class="lines">@@ -928,7 +1063,9 @@
</span><span class="cx"> yield gatherResults([d1, d2])
</span><span class="cx"> self.store.queuer = self.node1
</span><span class="cx">
</span><ins>+ DummyWorkItem.results = {}
</ins><span class="cx">
</span><ins>+
</ins><span class="cx"> def test_currentNodeInfo(self):
</span><span class="cx"> """
</span><span class="cx"> There will be two C{NODE_INFO} rows in the database, retrievable as two
</span><span class="lines">@@ -942,12 +1079,11 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><del>- def test_enqueueHappyPath(self):
</del><ins>+ def test_enqueueWorkDone(self):
</ins><span class="cx"> """
</span><span class="cx"> When a L{WorkItem} is scheduled for execution via
</span><del>- L{PeerConnectionPool.enqueueWork} its C{doWork} method will be invoked
- by the time the L{Deferred} returned from the resulting
- L{WorkProposal}'s C{whenExecuted} method has fired.
</del><ins>+ L{PeerConnectionPool.enqueueWork} its C{doWork} method will be
+ run.
</ins><span class="cx"> """
</span><span class="cx"> # TODO: this exact test should run against LocalQueuer as well.
</span><span class="cx"> def operation(txn):
</span><span class="lines">@@ -956,40 +1092,20 @@
</span><span class="cx"> # Should probably do something with components.
</span><span class="cx"> return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
</span><span class="cx"> notBefore=datetime.datetime.utcnow())
</span><del>- result = yield inTransaction(self.store.newTransaction, operation)
</del><ins>+ yield inTransaction(self.store.newTransaction, operation)
+
</ins><span class="cx"> # Wait for it to be executed. Hopefully this does not time out :-\.
</span><del>- yield result.whenExecuted()
</del><ins>+ yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
</ins><span class="cx">
</span><del>- def op2(txn):
- return Select(
- [
- schema.DUMMY_WORK_DONE.WORK_ID,
- schema.DUMMY_WORK_DONE.JOB_ID,
- schema.DUMMY_WORK_DONE.A_PLUS_B,
- ],
- From=schema.DUMMY_WORK_DONE
- ).on(txn)
</del><ins>+ self.assertEquals(DummyWorkItem.results, {100: 7})
</ins><span class="cx">
</span><del>- rows = yield inTransaction(self.store.newTransaction, op2)
- self.assertEquals(rows, [[101, 200, 7]])
</del><span class="cx">
</span><del>-
</del><span class="cx"> @inlineCallbacks
</span><span class="cx"> def test_noWorkDoneWhenConcurrentlyDeleted(self):
</span><span class="cx"> """
</span><span class="cx"> When a L{WorkItem} is concurrently deleted by another transaction, it
</span><span class="cx"> should I{not} perform its work.
</span><span class="cx"> """
</span><del>- # Provide access to a method called "concurrently" everything using
- original = self.store.newTransaction
-
- def decorate(*a, **k):
- result = original(*a, **k)
- result.concurrently = self.store.newTransaction
- return result
-
- self.store.newTransaction = decorate
-
</del><span class="cx"> def operation(txn):
</span><span class="cx"> return txn.enqueue(
</span><span class="cx"> DummyWorkItem, a=30, b=40, workID=5678,
</span><span class="lines">@@ -997,33 +1113,15 @@
</span><span class="cx"> notBefore=datetime.datetime.utcnow()
</span><span class="cx"> )
</span><span class="cx">
</span><del>- proposal = yield inTransaction(self.store.newTransaction, operation)
- yield proposal.whenExecuted()
</del><ins>+ yield inTransaction(self.store.newTransaction, operation)
</ins><span class="cx">
</span><del>- # Sanity check on the concurrent deletion.
- def op2(txn):
- return Select(
- [schema.DUMMY_WORK_ITEM.WORK_ID],
- From=schema.DUMMY_WORK_ITEM
- ).on(txn)
</del><ins>+ # Wait for it to be executed. Hopefully this does not time out :-\.
+ yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
</ins><span class="cx">
</span><del>- rows = yield inTransaction(self.store.newTransaction, op2)
- self.assertEquals(rows, [])
</del><ins>+ self.assertEquals(DummyWorkItem.results, {})
</ins><span class="cx">
</span><del>- def op3(txn):
- return Select(
- [
- schema.DUMMY_WORK_DONE.WORK_ID,
- schema.DUMMY_WORK_DONE.A_PLUS_B,
- ],
- From=schema.DUMMY_WORK_DONE
- ).on(txn)
</del><span class="cx">
</span><del>- rows = yield inTransaction(self.store.newTransaction, op3)
- self.assertEquals(rows, [])
</del><span class="cx">
</span><del>-
-
</del><span class="cx"> class DummyProposal(object):
</span><span class="cx">
</span><span class="cx"> def __init__(self, *ignored):
</span></span></pre>
</div>
</div>
</body>
</html>